update_faculty_aws_securitygroup.py

Use this script to automatically update an AWS security group to allow ingress from Faculty on a set of ports.

Warning

This script will remove all other ingress rules from any security group it is applied to. We strongly recommend making a dedicated security group for access from Faculty, separate from other rules you may have configured.

Script source

"""
Change a security group's ingress rules to Faculty IP addresses.
"""


import argparse
from collections import namedtuple

import requests
import boto3


OtherSecurityGroup = namedtuple(
    "OtherSecurityGroup",
    [
        "group_id",
        "group_name",
        "peering_status",
        "user_id",
        "vpc_id",
        "vpc_peering_connection_id",
    ],
)
AccessRule = namedtuple(
    "AccessRule",
    [
        "cidrs",
        "ipv6_cidrs",
        "other_security_groups",
        "from_port",
        "to_port",
        "protocol",
    ],
)


def _add_entry(dictionary, key, value):
    """Add a value to a dictionary if the value is not None."""
    if value is not None:
        dictionary[key] = value


def rules_for_faculty(api_url, ports):
    """Generate a list of access rules for Faculty from the API.

    Parameters
    ----------
    api_url : str
        The URL of the Faculty IP address API, e.g.
        https://example.my.faculty.ai/api/cluster/ip-addresses
    ports : list of int
        The ports to allow access on from Faculty

    Returns
    -------
    list of AccessRule
    """

    response = requests.get(api_url)
    response.raise_for_status()
    ip_addresses = response.json()["ipAddresses"]

    rules = []
    for port in ports:
        for ip in ip_addresses:
            cidr = "{}/32".format(ip)
            rules.append(
                AccessRule(
                    (cidr,),
                    ipv6_cidrs=(),
                    other_security_groups=(),
                    from_port=port,
                    to_port=port,
                    protocol="tcp",
                )
            )
    return rules


def extract_other_security_group(data):
    return OtherSecurityGroup(
        group_id=data.get("GroupId"),
        group_name=data.get("GroupName"),
        peering_status=data.get("PeeringStatus"),
        user_id=data.get("UserId"),
        vpc_id=data.get("VpcId"),
        vpc_peering_connection_id=data.get("VpcPeeringConnectionId"),
    )


def serialise_other_security_group(group):
    data = {}
    _add_entry(data, "GroupId", group.group_id)
    _add_entry(data, "GroupName", group.group_name)
    _add_entry(data, "PeeringStatus", group.peering_status)
    _add_entry(data, "UserId", group.user_id)
    _add_entry(data, "VpcId", group.vpc_id)
    _add_entry(data, "VpcPeeringConnectionId", group.vpc_peering_connection_id)
    return data


def current_ingress_rules(security_group):
    """Get the current ingress rules for a security group.

    Parameters
    ----------
    security_group : boto3.resources.factory.ec2.SecurityGroup

    Returns
    -------
    list of AccessRule
    """

    rules = []

    for permission_set in security_group.ip_permissions:
        protocol = permission_set["IpProtocol"]
        from_port = permission_set.get("FromPort")
        to_port = permission_set.get("ToPort")
        cidrs = tuple(
            ip_range["CidrIp"]
            for ip_range in permission_set.get("IpRanges", [])
        )
        ipv6_cidrs = tuple(
            ip_range["CidrIpv6"]
            for ip_range in permission_set.get("Ipv6Ranges", [])
        )
        other_security_groups = tuple(
            extract_other_security_group(group)
            for group in permission_set.get("UserIdGroupPairs", [])
        )

        rules.append(
            AccessRule(
                cidrs,
                ipv6_cidrs,
                other_security_groups,
                from_port,
                to_port,
                protocol,
            )
        )

    return rules


def split_rule_sources(rules):
    """Split a sequence of access rules into sepearate sources.

    Parameters
    ----------
    rules : iterable of AccessRule

    Yields
    ------
    AccessRule
    """
    for rule in rules:
        for cidr in rule.cidrs:
            yield AccessRule(
                (cidr,), (), (), rule.from_port, rule.to_port, rule.protocol
            )
        for ipv6_cidr in rule.ipv6_cidrs:
            yield AccessRule(
                (),
                (ipv6_cidr,),
                (),
                rule.from_port,
                rule.to_port,
                rule.protocol,
            )
        for group in rule.other_security_groups:
            yield AccessRule(
                (), (), (group,), rule.from_port, rule.to_port, rule.protocol
            )


def generate_permission_set(rules):
    """Generate the payload for the AWS API describing a set of access rules.

    Parameters
    ----------
    rules : iterable of AccessRule

    Returns
    -------
    list
        The payload for the AWS API
    """

    permissions = []

    for rule in rules:
        permission_set = {
            "IpProtocol": rule.protocol,
            "IpRanges": [{"CidrIp": cidr} for cidr in rule.cidrs],
            "Ipv6Ranges": [{"CidrIpv6": cidr} for cidr in rule.ipv6_cidrs],
            "UserIdGroupPairs": [
                serialise_other_security_group(group)
                for group in rule.other_security_groups
            ],
        }
        _add_entry(permission_set, "FromPort", rule.from_port)
        _add_entry(permission_set, "ToPort", rule.to_port)
        permissions.append(permission_set)

    return permissions


def cli():

    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        "faculty_api_url",
        help="The URL of the Faculty API returning IP addresses",
    )
    parser.add_argument(
        "security_group_id", help="The security group to update"
    )
    parser.add_argument(
        "ports", type=int, nargs="+", help="The ports to allow access on"
    )
    parser.add_argument(
        "--yes",
        action="store_true",
        help="Add this flag to actually apply the update",
    )

    args = parser.parse_args()

    target = set(
        split_rule_sources(
            rules_for_faculty(args.faculty_api_url, args.ports)
        )
    )

    EC2 = boto3.resource("ec2")
    security_group = EC2.SecurityGroup(args.security_group_id)
    current = set(split_rule_sources(current_ingress_rules(security_group)))

    to_create = target - current
    to_revoke = current - target

    if not to_create and not to_revoke:
        print("Security group up to date")
    elif args.yes:
        if to_revoke:
            security_group.revoke_ingress(
                IpPermissions=generate_permission_set(to_revoke)
            )
            print("Revoked {} security group rules".format(len(to_revoke)))
        if to_create:
            security_group.authorize_ingress(
                IpPermissions=generate_permission_set(to_create)
            )
            print("Created {} security group rules".format(len(to_create)))
    else:
        if to_revoke:
            print("Would revoke the following access rules:")
            for rule in to_revoke:
                print("  ", rule)
        if to_create:
            print("Would create the following access rules:")
            for rule in to_create:
                print("  ", rule)
        if to_revoke or to_create:
            print("Rerun with --yes to apply")


if __name__ == "__main__":
    cli()

Usage

Run the script with --help to get documentation about the command line interface. Pass as arguments the URL of the IP address API on the relevant Faculty deployment, the security group you want to update, and the ports you want to have open to ingress from Faculty:

$ python update_faculty_aws_securitygroup.py \
    https://example.my.faculty.ai/api/cluster/ip-addresses \
    sg-00000000 5432 3306

By default, no changes are applied, but a summary of changes is instead printed to the terminal. To apply the changes to the security group, run again adding the --yes flag.