Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions iam.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances",
"ec2:DescribeSecurityGroups"
"ec2:DescribeSecurityGroups",
"ec2:DescribeRegions",
"ec2:DescribeVpcEndpoints",
"elasticloadbalancing:DescribeLoadBalancers"
],
"Resource": "*"
}
]
}
}
227 changes: 112 additions & 115 deletions security_group_report/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,72 @@
import pandas as pd
import datetime

# Initialize EC2 client
ec2 = boto3.client("ec2")
# run for every region
regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]]
# specify regions
# regions = ['eu-west-1','us-west-1]

# Get rules from SG
# Get all available regions
regions = [region["RegionName"] for region in ec2.describe_regions()["Regions"]]

# Uncomment the following two lines to specify the regions you want to analyze
# regions = ['eu-west-1', 'us-west-1']

# Function to get inbound and outbound rules for a security group
def get_rules(sg, region):
ec2r = boto3.resource("ec2", region)
sgs = ec2r.security_groups.filter(
GroupIds=[
sg,
]
)
sgs = list(sgs.pages())
if not sgs or not sgs[0]:
sgs = next(sgs.pages(), [])
if not sgs:
return (None, None)
sg = sgs[0][0]
sg = sgs[0]
sg_inbound = sg.ip_permissions
sg_outbound = sg.ip_permissions_egress
return sg_inbound, sg_outbound


# Get available SGs
def get_sgs(instance):
sgs = instance.security_groups
# Function to get security groups for a resource
def get_sgs(resource, resource_type):
if resource_type == "instance":
sgs = resource.security_groups
elif resource_type == "load_balancer":
security_groups = resource.get("SecurityGroups")
if security_groups is not None:
sgs = [{"GroupId": sg} for sg in security_groups]
else:
sgs = []
elif resource_type == "endpoint":
groups = resource.get("Groups", [])
sgs = [{"GroupId": sg["GroupId"]} for sg in groups]
return sgs


# Get instance name
def get_name(instance):
if instance.tags:
for tag in instance.tags:
# Function to get a resource name
def get_name(resource, resource_type):
if resource_type == "instance" and getattr(resource, "tags", None):
for tag in resource.tags:
if tag["Key"] == "Name":
return tag["Value"]
else:
return "None"
elif resource_type == "load_balancer":
return resource.get("LoadBalancerName", "None")
elif resource_type == "endpoint":
return resource.get("VpcEndpointId", "None")
return "None"

# Function to get security group name
def get_sg_name(sg_id, region):
ec2r = boto3.resource("ec2", region)
sg = ec2r.SecurityGroup(sg_id)
return sg.group_name

# Main function
def main():
table = []
columns = [
"Resource Type",
"Region",
"Instance Name",
"Instance-ID",
"Resource Name",
"Resource-ID",
"SG-Name",
"SG-ID",
"Direction",
Expand All @@ -74,104 +93,82 @@ def main():
]
df = pd.DataFrame(table, columns=columns)
print("Collecting Security Groups information from every region....")

for region in regions:
ec2r = boto3.resource("ec2", region)
for instance in ec2r.instances.all():
inst_id = instance.id # get instance id
sgs = get_sgs(instance) # gets sg from instance
inst_name = get_name(instance) # gets the instance name
for sg in sgs:
sg_id = sg["GroupId"]
sg_name = sg["GroupName"]
rules_inbound = get_rules(sg_id, region)[0]
rules_outbound = get_rules(sg_id, region)[1]
for rule in rules_inbound:
rule_destination = inst_id
from_cidr = []
direction = "Inbound"
from_port_range = rule.get("FromPort", "any")
to_port_range = rule.get("ToPort", "any")
if from_port_range == to_port_range:
ports = from_port_range
else:
ports = str(from_port_range) + " - " + str(to_port_range)
if from_port_range == -1:
ports = "any"
protocol = rule["IpProtocol"]
if protocol == "-1":
protocol = "any"
for cidr in rule.get("IpRanges", []):
from_cidr.append(cidr["CidrIp"])
for cidrv6 in rule.get("Ipv6Ranges", []):
from_cidr.append(cidrv6["CidrIpv6"])
for source_sg in rule.get("UserIdGroupPairs", []):
from_cidr.append(source_sg["GroupId"])
for source_sg in rule.get("PrefixListIds", []):
from_cidr.append(source_sg["PrefixListId"])
if not from_cidr:
from_cidr.append("0.0.0.0/0")

df = df.append(
{
"Region": region,
"Instance Name": inst_name,
"Instance-ID": inst_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": direction,
"Source": from_cidr,
"Destination": rule_destination,
"Protocol": protocol,
"Ports": ports,
},
ignore_index=True,
)
for rule in rules_outbound:
rule_source = inst_id
to_cidr = []
direction = "Outbound"
protocol = rule["IpProtocol"]
from_port_range = rule.get("FromPort", "any")
to_port_range = rule.get("ToPort", "any")
if from_port_range == to_port_range:
ports = from_port_range
else:
ports = str(from_port_range) + " - " + str(to_port_range)
if from_port_range == -1:
ports = "any"
protocol = rule["IpProtocol"]
if protocol == "-1":
protocol = "any"
for cidr in rule.get("IpRanges", []):
to_cidr.append(cidr["CidrIp"])
for cidrv6 in rule.get("Ipv6Ranges", []):
to_cidr.append(cidrv6["CidrIpv6"])
for source_sg in rule.get("UserIdGroupPairs", []):
to_cidr.append(source_sg["GroupId"])
for source_sg in rule.get("PrefixListIds", []):
to_cidr.append(source_sg["PrefixListId"])
if not to_cidr:
to_cidr.append("0.0.0.0/0")
df = df.append(
{
"Region": region,
"Instance Name": inst_name,
"Instance-ID": inst_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": direction,
"Source": rule_source,
"Destination": to_cidr,
"Protocol": protocol,
"Ports": ports,
},
ignore_index=True,
)
elbv2 = boto3.client("elbv2", region)
instances = list(ec2r.instances.all())
load_balancers = elbv2.describe_load_balancers().get("LoadBalancers", [])
endpoints = ec2r.meta.client.describe_vpc_endpoints().get("VpcEndpoints", [])

resources = [
{"type": "instance", "data": instances},
{"type": "load_balancer", "data": load_balancers},
{"type": "endpoint", "data": endpoints},
]

for resource_type in resources:
for resource_data in resource_type["data"]:
if resource_type["type"] == "instance":
resource_id = resource_data.id
resource_name = get_name(resource_data, "instance")
elif resource_type["type"] == "load_balancer":
resource_id = resource_data.get("LoadBalancerArn")
resource_name = get_name(resource_data, "load_balancer")
elif resource_type["type"] == "endpoint":
resource_id = resource_data.get("VpcEndpointId")
resource_name = get_name(resource_data, "endpoint")

sgs = get_sgs(resource_data, resource_type["type"])

for sg in sgs:
sg_id = sg["GroupId"]
sg_name = get_sg_name(sg_id, region)
rules_inbound, rules_outbound = get_rules(sg_id, region)

# Process inbound rules and append to DataFrame
for rule in rules_inbound:
for ip_range in rule.get("IpRanges", []):
row = {
"Resource Type": resource_type["type"],
"Region": region,
"Resource Name": resource_name,
"Resource-ID": resource_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": "Inbound",
"Source": ip_range.get("CidrIp", ""),
"Destination": "",
"Protocol": rule.get("IpProtocol", ""),
"Ports": rule.get("FromPort", "N/A"),
}
row_df = pd.DataFrame([row], columns=columns)
df = pd.concat([df, row_df], ignore_index=True)

# Process outbound rules and append to DataFrame
for rule in rules_outbound:
for ip_range in rule.get("IpRanges", []):
row = {
"Resource Type": resource_type["type"],
"Region": region,
"Resource Name": resource_name,
"Resource-ID": resource_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": "Outbound",
"Source": "",
"Destination": ip_range.get("CidrIp", ""),
"Protocol": rule.get("IpProtocol", ""),
"Ports": rule.get("FromPort", "N/A"),
}
row_df = pd.DataFrame([row], columns=columns)
df = pd.concat([df, row_df], ignore_index=True)

# Save DataFrame to Excel file
time = datetime.datetime.now().strftime("%H-%M-%S_%d-%m-%Y")
file_name = "fw_policy-report-" + time + ".xlsx"
print(file_name + " has been created")
return df.to_excel(file_name)

df.to_excel(file_name)

if __name__ == "__main__":
main()
main()