Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ec2): improve EC2 Security Groups checks logic by checking if any instance is attached #3852

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
from prowler.providers.aws.services.ec2.lib.security_groups import (
check_if_open_security_group_is_attached_to_instance,
check_security_group,
)
from prowler.providers.aws.services.vpc.vpc_client import vpc_client


Expand All @@ -9,6 +12,7 @@ def execute(self):
findings = []
check_ports = [22]
for security_group in ec2_client.security_groups:
sg_is_open = False
# Check if ignoring flag is set and if the VPC and the SG is in use
if ec2_client.provider.scan_unused_services or (
security_group.vpc_id in vpc_client.vpcs
Expand All @@ -30,8 +34,30 @@ def execute(self):
ingress_rule, "tcp", check_ports, any_address=True
):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has SSH port 22 open to the Internet."
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has SSH port 22 open to the Internet but it is not attached."
report.check_metadata.Severity = "medium"
sg_is_open = True
break
findings.append(report)
if sg_is_open:
instances_attached = (
check_if_open_security_group_is_attached_to_instance(
security_group=security_group,
vpc_client=vpc_client,
port="SSH",
)
)
if instances_attached:
for instance_attached in instances_attached:
report.status = "FAIL"
report.check_metadata.Severity = instance_attached[
"severity"
]
report.status_extended = instance_attached["details"]
report.resource_details = instance_attached["instance_id"]
findings.append(report)
else:
findings.append(report)
else:
findings.append(report)

return findings
40 changes: 40 additions & 0 deletions prowler/providers/aws/services/ec2/lib/security_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,43 @@ def _is_cidr_public(cidr: str, any_address: bool = False) -> bool:
return True
if not any_address:
return ipaddress.ip_network(cidr).is_global


def check_if_open_security_group_is_attached_to_instance(
security_group: Any, vpc_client: Any, port: str
) -> list:
"""
Check if the security group is attached to any EC2 instance

Args:
security_group: AWS Security Group
vpc_client: VPC Client
port: Port to check
Returns:
list: List of reports for each EC2 instance attached to the security group
"""
reports = []
# Check if the security group is attached to any EC2 instance
for network_interface in security_group.network_interfaces:
instance_attached = network_interface.attachment.get("InstanceId")
if instance_attached:
report = {}
report["severity"] = "high"
# Check if the EC2 instance has a public IP
if not network_interface.association.get("PublicIp"):
report["details"] = (
f"EC2 Instance {instance_attached} has {port} exposed to 0.0.0.0/0 on private ip address {network_interface.private_ip}."
)
else:
report["details"] = (
f"EC2 Instance {instance_attached} has {port} exposed to 0.0.0.0/0 on public ip address {network_interface.association.get('PublicIp')}."
)
# Check if EC2 instance is in a public subnet
if vpc_client.vpc_subnets[network_interface.subnet_id].public:
report["details"] = (
f"EC2 Instance {instance_attached} has {port} exposed to 0.0.0.0/0 on public ip address {network_interface.association.get('PublicIp')} within public subnet {network_interface.subnet_id}."
)
report["severity"] = "critical"
report["instance_id"] = instance_attached
reports.append(report)
return reports
7 changes: 6 additions & 1 deletion prowler/providers/aws/services/vpc/vpc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,12 @@ def __describe_vpc_subnets__(self, regional_client):
for route in route_tables_for_subnet.get("RouteTables")[
0
].get("Routes"):
if "GatewayId" in route and "igw" in route["GatewayId"]:
if (
"GatewayId" in route
and "igw" in route["GatewayId"]
and route["DestinationCidrBlock"] == "0.0.0.0/0"
):
# If the route table has a default route to an internet gateway, the subnet is public
public = True
if "NatGatewayId" in route:
nat_gateway = True
Expand Down