diff --git a/iam.json b/iam.json index bc834e7..67e70e8 100644 --- a/iam.json +++ b/iam.json @@ -6,9 +6,12 @@ "Effect": "Allow", "Action": [ "ec2:DescribeInstances", - "ec2:DescribeSecurityGroups" + "ec2:DescribeSecurityGroups", + "ec2:DescribeRegions", + "ec2:DescribeVpcEndpoints", + "elasticloadbalancing:DescribeLoadBalancers" ], "Resource": "*" } ] -} \ No newline at end of file +} diff --git a/security_group_report/main.py b/security_group_report/main.py index d2c6ab1..f027f1d 100644 --- a/security_group_report/main.py +++ b/security_group_report/main.py @@ -17,15 +17,16 @@ import pandas as pd import datetime +# Initialize EC2 client ec2 = boto3.client("ec2") -# run for every region -regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]] -# specify regions -# regions = ['eu-west-1','us-west-1] -# Get rules from SG +# Get all available regions +regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]] +# Uncomment the following two lines to specify the regions you want to analyze +# regions = ['eu-west-1', 'us-west-1'] +# Function to get inbound and outbound rules for a security group def get_rules(sg, region): ec2r = boto3.resource("ec2", region) sgs = ec2r.security_groups.filter( @@ -33,37 +34,55 @@ def get_rules(sg, region): sg, ] ) - sgs = list(sgs.pages()) - if not sgs or not sgs[0]: + sgs = next(sgs.pages(), []) + if not sgs: return (None, None) - sg = sgs[0][0] + sg = sgs[0] sg_inbound = sg.ip_permissions sg_outbound = sg.ip_permissions_egress return sg_inbound, sg_outbound - -# Get available SGs -def get_sgs(instance): - sgs = instance.security_groups +# Function to get security groups for a resource +def get_sgs(resource, resource_type): + if resource_type == "instance": + sgs = resource.security_groups + elif resource_type == "load_balancer": + security_groups = resource.get("SecurityGroups") + if security_groups is not None: + sgs = [{"GroupId": sg} for sg in security_groups] + else: + sgs = [] + elif resource_type == "endpoint": + groups = resource.get("Groups", []) + sgs = [{"GroupId": sg["GroupId"]} for sg in groups] return sgs - -# Get instance name -def get_name(instance): - if instance.tags: - for tag in instance.tags: +# Function to get a resource name +def get_name(resource, resource_type): + if resource_type == "instance" and getattr(resource, "tags", None): + for tag in resource.tags: if tag["Key"] == "Name": return tag["Value"] - else: - return "None" + elif resource_type == "load_balancer": + return resource.get("LoadBalancerName", "None") + elif resource_type == "endpoint": + return resource.get("VpcEndpointId", "None") + return "None" +# Function to get security group name +def get_sg_name(sg_id, region): + ec2r = boto3.resource("ec2", region) + sg = ec2r.SecurityGroup(sg_id) + return sg.group_name +# Main function def main(): table = [] columns = [ + "Resource Type", "Region", - "Instance Name", - "Instance-ID", + "Resource Name", + "Resource-ID", "SG-Name", "SG-ID", "Direction", @@ -74,104 +93,82 @@ def main(): ] df = pd.DataFrame(table, columns=columns) print("Collecting Security Groups information from every region....") + for region in regions: ec2r = boto3.resource("ec2", region) - for instance in ec2r.instances.all(): - inst_id = instance.id # get instance id - sgs = get_sgs(instance) # gets sg from instance - inst_name = get_name(instance) # gets the instance name - for sg in sgs: - sg_id = sg["GroupId"] - sg_name = sg["GroupName"] - rules_inbound = get_rules(sg_id, region)[0] - rules_outbound = get_rules(sg_id, region)[1] - for rule in rules_inbound: - rule_destination = inst_id - from_cidr = [] - direction = "Inbound" - from_port_range = rule.get("FromPort", "any") - to_port_range = rule.get("ToPort", "any") - if from_port_range == to_port_range: - ports = from_port_range - else: - ports = str(from_port_range) + " - " + str(to_port_range) - if from_port_range == -1: - ports = "any" - protocol = rule["IpProtocol"] - if protocol == "-1": - protocol = "any" - for cidr in rule.get("IpRanges", []): - from_cidr.append(cidr["CidrIp"]) - for cidrv6 in rule.get("Ipv6Ranges", []): - from_cidr.append(cidrv6["CidrIpv6"]) - for source_sg in rule.get("UserIdGroupPairs", []): - from_cidr.append(source_sg["GroupId"]) - for source_sg in rule.get("PrefixListIds", []): - from_cidr.append(source_sg["PrefixListId"]) - if not from_cidr: - from_cidr.append("0.0.0.0/0") - - df = df.append( - { - "Region": region, - "Instance Name": inst_name, - "Instance-ID": inst_id, - "SG-Name": sg_name, - "SG-ID": sg_id, - "Direction": direction, - "Source": from_cidr, - "Destination": rule_destination, - "Protocol": protocol, - "Ports": ports, - }, - ignore_index=True, - ) - for rule in rules_outbound: - rule_source = inst_id - to_cidr = [] - direction = "Outbound" - protocol = rule["IpProtocol"] - from_port_range = rule.get("FromPort", "any") - to_port_range = rule.get("ToPort", "any") - if from_port_range == to_port_range: - ports = from_port_range - else: - ports = str(from_port_range) + " - " + str(to_port_range) - if from_port_range == -1: - ports = "any" - protocol = rule["IpProtocol"] - if protocol == "-1": - protocol = "any" - for cidr in rule.get("IpRanges", []): - to_cidr.append(cidr["CidrIp"]) - for cidrv6 in rule.get("Ipv6Ranges", []): - to_cidr.append(cidrv6["CidrIpv6"]) - for source_sg in rule.get("UserIdGroupPairs", []): - to_cidr.append(source_sg["GroupId"]) - for source_sg in rule.get("PrefixListIds", []): - to_cidr.append(source_sg["PrefixListId"]) - if not to_cidr: - to_cidr.append("0.0.0.0/0") - df = df.append( - { - "Region": region, - "Instance Name": inst_name, - "Instance-ID": inst_id, - "SG-Name": sg_name, - "SG-ID": sg_id, - "Direction": direction, - "Source": rule_source, - "Destination": to_cidr, - "Protocol": protocol, - "Ports": ports, - }, - ignore_index=True, - ) + elbv2 = boto3.client("elbv2", region) + instances = list(ec2r.instances.all()) + load_balancers = elbv2.describe_load_balancers().get("LoadBalancers", []) + endpoints = ec2r.meta.client.describe_vpc_endpoints().get("VpcEndpoints", []) + + resources = [ + {"type": "instance", "data": instances}, + {"type": "load_balancer", "data": load_balancers}, + {"type": "endpoint", "data": endpoints}, + ] + + for resource_type in resources: + for resource_data in resource_type["data"]: + if resource_type["type"] == "instance": + resource_id = resource_data.id + resource_name = get_name(resource_data, "instance") + elif resource_type["type"] == "load_balancer": + resource_id = resource_data.get("LoadBalancerArn") + resource_name = get_name(resource_data, "load_balancer") + elif resource_type["type"] == "endpoint": + resource_id = resource_data.get("VpcEndpointId") + resource_name = get_name(resource_data, "endpoint") + + sgs = get_sgs(resource_data, resource_type["type"]) + + for sg in sgs: + sg_id = sg["GroupId"] + sg_name = get_sg_name(sg_id, region) + rules_inbound, rules_outbound = get_rules(sg_id, region) + + # Process inbound rules and append to DataFrame + for rule in rules_inbound: + for ip_range in rule.get("IpRanges", []): + row = { + "Resource Type": resource_type["type"], + "Region": region, + "Resource Name": resource_name, + "Resource-ID": resource_id, + "SG-Name": sg_name, + "SG-ID": sg_id, + "Direction": "Inbound", + "Source": ip_range.get("CidrIp", ""), + "Destination": "", + "Protocol": rule.get("IpProtocol", ""), + "Ports": rule.get("FromPort", "N/A"), + } + row_df = pd.DataFrame([row], columns=columns) + df = pd.concat([df, row_df], ignore_index=True) + + # Process outbound rules and append to DataFrame + for rule in rules_outbound: + for ip_range in rule.get("IpRanges", []): + row = { + "Resource Type": resource_type["type"], + "Region": region, + "Resource Name": resource_name, + "Resource-ID": resource_id, + "SG-Name": sg_name, + "SG-ID": sg_id, + "Direction": "Outbound", + "Source": "", + "Destination": ip_range.get("CidrIp", ""), + "Protocol": rule.get("IpProtocol", ""), + "Ports": rule.get("FromPort", "N/A"), + } + row_df = pd.DataFrame([row], columns=columns) + df = pd.concat([df, row_df], ignore_index=True) + + # Save DataFrame to Excel file time = datetime.datetime.now().strftime("%H-%M-%S_%d-%m-%Y") file_name = "fw_policy-report-" + time + ".xlsx" print(file_name + " has been created") - return df.to_excel(file_name) - + df.to_excel(file_name) if __name__ == "__main__": - main() \ No newline at end of file + main()