AWS security group update script¶
Use this script to automatically update an AWS security group to allow ingress from Faculty on a set of ports.
Warning
This script will remove all other ingress rules from any security group it is applied to. We strongly recommend making a dedicated security group for access from Faculty, separate from other rules you may have configured.
Script source¶
"""
Change a security group's ingress rules to Faculty IP addresses.
"""
import argparse
from collections import namedtuple
import requests
import boto3
OtherSecurityGroup = namedtuple(
"OtherSecurityGroup",
[
"group_id",
"group_name",
"peering_status",
"user_id",
"vpc_id",
"vpc_peering_connection_id",
],
)
AccessRule = namedtuple(
"AccessRule",
[
"cidrs",
"ipv6_cidrs",
"other_security_groups",
"from_port",
"to_port",
"protocol",
],
)
def _add_entry(dictionary, key, value):
"""Add a value to a dictionary if the value is not None."""
if value is not None:
dictionary[key] = value
def rules_for_faculty(api_url, ports):
"""Generate a list of access rules for Faculty from the API.
Parameters
----------
api_url : str
The URL of the Faculty IP address API, e.g.
https://example.my.faculty.ai/api/cluster/ip-addresses
ports : list of int
The ports to allow access on from Faculty
Returns
-------
list of AccessRule
"""
response = requests.get(api_url)
response.raise_for_status()
ip_addresses = response.json()["ipAddresses"]
rules = []
for port in ports:
for ip in ip_addresses:
cidr = "{}/32".format(ip)
rules.append(
AccessRule(
(cidr,),
ipv6_cidrs=(),
other_security_groups=(),
from_port=port,
to_port=port,
protocol="tcp",
)
)
return rules
def extract_other_security_group(data):
return OtherSecurityGroup(
group_id=data.get("GroupId"),
group_name=data.get("GroupName"),
peering_status=data.get("PeeringStatus"),
user_id=data.get("UserId"),
vpc_id=data.get("VpcId"),
vpc_peering_connection_id=data.get("VpcPeeringConnectionId"),
)
def serialise_other_security_group(group):
data = {}
_add_entry(data, "GroupId", group.group_id)
_add_entry(data, "GroupName", group.group_name)
_add_entry(data, "PeeringStatus", group.peering_status)
_add_entry(data, "UserId", group.user_id)
_add_entry(data, "VpcId", group.vpc_id)
_add_entry(data, "VpcPeeringConnectionId", group.vpc_peering_connection_id)
return data
def current_ingress_rules(security_group):
"""Get the current ingress rules for a security group.
Parameters
----------
security_group : boto3.resources.factory.ec2.SecurityGroup
Returns
-------
list of AccessRule
"""
rules = []
for permission_set in security_group.ip_permissions:
protocol = permission_set["IpProtocol"]
from_port = permission_set.get("FromPort")
to_port = permission_set.get("ToPort")
cidrs = tuple(
ip_range["CidrIp"]
for ip_range in permission_set.get("IpRanges", [])
)
ipv6_cidrs = tuple(
ip_range["CidrIpv6"]
for ip_range in permission_set.get("Ipv6Ranges", [])
)
other_security_groups = tuple(
extract_other_security_group(group)
for group in permission_set.get("UserIdGroupPairs", [])
)
rules.append(
AccessRule(
cidrs,
ipv6_cidrs,
other_security_groups,
from_port,
to_port,
protocol,
)
)
return rules
def split_rule_sources(rules):
"""Split a sequence of access rules into sepearate sources.
Parameters
----------
rules : iterable of AccessRule
Yields
------
AccessRule
"""
for rule in rules:
for cidr in rule.cidrs:
yield AccessRule(
(cidr,), (), (), rule.from_port, rule.to_port, rule.protocol
)
for ipv6_cidr in rule.ipv6_cidrs:
yield AccessRule(
(),
(ipv6_cidr,),
(),
rule.from_port,
rule.to_port,
rule.protocol,
)
for group in rule.other_security_groups:
yield AccessRule(
(), (), (group,), rule.from_port, rule.to_port, rule.protocol
)
def generate_permission_set(rules):
"""Generate the payload for the AWS API describing a set of access rules.
Parameters
----------
rules : iterable of AccessRule
Returns
-------
list
The payload for the AWS API
"""
permissions = []
for rule in rules:
permission_set = {
"IpProtocol": rule.protocol,
"IpRanges": [{"CidrIp": cidr} for cidr in rule.cidrs],
"Ipv6Ranges": [{"CidrIpv6": cidr} for cidr in rule.ipv6_cidrs],
"UserIdGroupPairs": [
serialise_other_security_group(group)
for group in rule.other_security_groups
],
}
_add_entry(permission_set, "FromPort", rule.from_port)
_add_entry(permission_set, "ToPort", rule.to_port)
permissions.append(permission_set)
return permissions
def cli():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"faculty_api_url",
help="The URL of the Faculty API returning IP addresses",
)
parser.add_argument(
"security_group_id", help="The security group to update"
)
parser.add_argument(
"ports", type=int, nargs="+", help="The ports to allow access on"
)
parser.add_argument(
"--yes",
action="store_true",
help="Add this flag to actually apply the update",
)
args = parser.parse_args()
target = set(
split_rule_sources(
rules_for_faculty(args.faculty_api_url, args.ports)
)
)
EC2 = boto3.resource("ec2")
security_group = EC2.SecurityGroup(args.security_group_id)
current = set(split_rule_sources(current_ingress_rules(security_group)))
to_create = target - current
to_revoke = current - target
if not to_create and not to_revoke:
print("Security group up to date")
elif args.yes:
if to_revoke:
security_group.revoke_ingress(
IpPermissions=generate_permission_set(to_revoke)
)
print("Revoked {} security group rules".format(len(to_revoke)))
if to_create:
security_group.authorize_ingress(
IpPermissions=generate_permission_set(to_create)
)
print("Created {} security group rules".format(len(to_create)))
else:
if to_revoke:
print("Would revoke the following access rules:")
for rule in to_revoke:
print(" ", rule)
if to_create:
print("Would create the following access rules:")
for rule in to_create:
print(" ", rule)
if to_revoke or to_create:
print("Rerun with --yes to apply")
if __name__ == "__main__":
cli()
Usage¶
Run the script with --help
to get documentation about the command line
interface. Pass as arguments the URL of the IP address API on the relevant Faculty deployment, the security group
you want to update, and the ports you want to have open to ingress from
Faculty:
$ python update_faculty_aws_securitygroup.py \
https://example.my.faculty.ai/api/cluster/ip-addresses \
sg-00000000 5432 3306
By default, no changes are applied, but a summary of changes is instead printed
to the terminal. To apply the changes to the security group, run again adding
the --yes
flag.