Best Python code snippet using localstack_python
ec2_vpc_nacl.py
Source:ec2_vpc_nacl.py  
...166    if not subnets:167        default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0]168        subnets = find_subnet_ids_by_nacl_id(nacl_id, client, module)169        if subnets:170            replace_network_acl_association(default_nacl_id, subnets, client, module)171            changed = True172            return changed173        changed = False174        return changed175    subs_added = subnets_added(nacl_id, subnets, client, module)176    if subs_added:177        replace_network_acl_association(nacl_id, subs_added, client, module)178        changed = True179    subs_removed = subnets_removed(nacl_id, subnets, client, module)180    if subs_removed:181        default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0]182        replace_network_acl_association(default_nacl_id, subs_removed, client, module)183        changed = True184    return changed185def nacls_changed(nacl, client, module):186    changed = False187    params = dict()188    params['egress'] = module.params.get('egress')189    params['ingress'] = module.params.get('ingress')190    nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']191    nacl = describe_network_acl(client, module)192    entries = nacl['NetworkAcls'][0]['Entries']193    tmp_egress = [entry for entry in entries if entry['Egress'] is True and DEFAULT_EGRESS !=entry]194    tmp_ingress = [entry for entry in entries if entry['Egress'] is False]195    egress = [rule for rule in tmp_egress if DEFAULT_EGRESS != rule]196    ingress = [rule for rule in tmp_ingress if DEFAULT_INGRESS != rule]197    if rules_changed(egress, params['egress'], True, nacl_id, client, module):198        changed = True199    if rules_changed(ingress, params['ingress'], False, nacl_id, client, module):200        changed = True201    return changed202def tags_changed(nacl_id, client, module):203    changed = False204    tags = dict()205    if module.params.get('tags'):206        tags = module.params.get('tags')207    tags['Name'] = module.params.get('name')208    nacl = find_acl_by_id(nacl_id, client, module)209    if nacl['NetworkAcls']:210        nacl_values = [t.values() for t in nacl['NetworkAcls'][0]['Tags']]211        nacl_tags = [item for sublist in nacl_values for item in sublist]212        tag_values = [[key, str(value)] for key, value in tags.iteritems()]213        tags = [item for sublist in tag_values for item in sublist]214        if sorted(nacl_tags) == sorted(tags):215            changed = False216            return changed217        else:218            delete_tags(nacl_id, client, module)219            create_tags(nacl_id, client, module)220            changed = True221            return changed222    return changed223def rules_changed(aws_rules, param_rules, Egress, nacl_id, client, module):224    changed = False225    rules = list()226    for entry in param_rules:227        rules.append(process_rule_entry(entry, Egress))228    if rules == aws_rules:229        return changed230    else:231        removed_rules = [x for x in aws_rules if x not in rules]232        if removed_rules:233            params = dict()234            for rule in removed_rules:235                params['NetworkAclId'] = nacl_id236                params['RuleNumber'] = rule['RuleNumber']237                params['Egress'] = Egress238                delete_network_acl_entry(params, client, module)239            changed = True240        added_rules = [x for x in rules if x not in aws_rules]241        if added_rules:242            for rule in added_rules:243                rule['NetworkAclId'] = nacl_id244                create_network_acl_entry(rule, client, module)245            changed = True246    return changed247def process_rule_entry(entry, Egress):248    params = dict()249    params['RuleNumber'] = entry[0]250    params['Protocol'] = str(PROTOCOL_NUMBERS[entry[1]])251    params['RuleAction'] = entry[2]252    params['Egress'] = Egress253    params['CidrBlock'] = entry[3]254    if icmp_present(entry):255        params['IcmpTypeCode'] = {"Type": int(entry[4]), "Code": int(entry[5])}256    else:257        if entry[6] or entry[7]:258            params['PortRange'] = {"From": entry[6], 'To': entry[7]}259    return params260def restore_default_associations(assoc_ids, default_nacl_id, client, module):261    if assoc_ids:262        params = dict()263        params['NetworkAclId'] = default_nacl_id[0]264        for assoc_id in assoc_ids:265            params['AssociationId'] = assoc_id266            restore_default_acl_association(params, client, module)267        return True268def construct_acl_entries(nacl, client, module):269    for entry in module.params.get('ingress'):270        params = process_rule_entry(entry, Egress=False)271        params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId']272        create_network_acl_entry(params, client, module)273    for rule in module.params.get('egress'):274        params = process_rule_entry(rule, Egress=True)275        params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId']276        create_network_acl_entry(params, client, module)277## Module invocations278def setup_network_acl(client, module):279    changed = False280    nacl = describe_network_acl(client, module)281    if not nacl['NetworkAcls']:282        nacl = create_network_acl(module.params.get('vpc_id'), client, module)283        nacl_id = nacl['NetworkAcl']['NetworkAclId']284        create_tags(nacl_id, client, module)285        subnets = subnets_to_associate(nacl, client, module)286        replace_network_acl_association(nacl_id, subnets, client, module)287        construct_acl_entries(nacl, client, module)288        changed = True289        return(changed, nacl['NetworkAcl']['NetworkAclId'])290    else:291        changed = False292        nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']293        subnet_result = subnets_changed(nacl, client, module)294        nacl_result = nacls_changed(nacl, client, module)295        tag_result = tags_changed(nacl_id, client, module)296        if subnet_result is True or nacl_result is True or tag_result is True:297            changed = True298            return(changed, nacl_id)299        return (changed, nacl_id)300def remove_network_acl(client, module):301    changed = False302    result = dict()303    vpc_id = module.params.get('vpc_id')304    nacl = describe_network_acl(client, module)305    if nacl['NetworkAcls']:306        nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']307        associations = nacl['NetworkAcls'][0]['Associations']308        assoc_ids = [a['NetworkAclAssociationId'] for a in associations]309        default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)310        if not default_nacl_id:311            result = {vpc_id: "Default NACL ID not found - Check the VPC ID"}312            return changed, result313        if restore_default_associations(assoc_ids, default_nacl_id, client, module):314            delete_network_acl(nacl_id, client, module)315            changed = True316            result[nacl_id] = "Successfully deleted"317            return changed, result318        if not assoc_ids: 319            delete_network_acl(nacl_id, client, module)320            changed = True321            result[nacl_id] = "Successfully deleted"322            return changed, result            323    return changed, result324#Boto3 client methods325def create_network_acl(vpc_id, client, module):326    try:327        nacl = client.create_network_acl(VpcId=vpc_id)328    except botocore.exceptions.ClientError as e:329        module.fail_json(msg=str(e))330    return nacl331def create_network_acl_entry(params, client, module):332    try:333        result = client.create_network_acl_entry(**params)334    except botocore.exceptions.ClientError as e:335        module.fail_json(msg=str(e))336    return result337def create_tags(nacl_id, client, module):338    try:339        delete_tags(nacl_id, client, module)340        client.create_tags(Resources=[nacl_id], Tags=load_tags(module))341    except botocore.exceptions.ClientError as e:342        module.fail_json(msg=str(e))343def delete_network_acl(nacl_id, client, module):344    try:345        client.delete_network_acl(NetworkAclId=nacl_id)346    except botocore.exceptions.ClientError as e:347        module.fail_json(msg=str(e))348def delete_network_acl_entry(params, client, module):349    try:350        client.delete_network_acl_entry(**params)351    except botocore.exceptions.ClientError as e:352        module.fail_json(msg=str(e))353def delete_tags(nacl_id, client, module):354    try:355        client.delete_tags(Resources=[nacl_id])356    except botocore.exceptions.ClientError as e:357        module.fail_json(msg=str(e))358def describe_acl_associations(subnets, client, module):359    if not subnets:360        return []361    try:362        results = client.describe_network_acls(Filters=[363            {'Name': 'association.subnet-id', 'Values': subnets}364        ])365    except botocore.exceptions.ClientError as e:366        module.fail_json(msg=str(e))367    associations = results['NetworkAcls'][0]['Associations']368    return [a['NetworkAclAssociationId'] for a in associations if a['SubnetId'] in subnets]369def describe_network_acl(client, module):370    try:371        nacl = client.describe_network_acls(Filters=[372            {'Name': 'tag:Name', 'Values': [module.params.get('name')]}373        ])374    except botocore.exceptions.ClientError as e:375        module.fail_json(msg=str(e))376    return nacl377def find_acl_by_id(nacl_id, client, module):378    try:379        return client.describe_network_acls(NetworkAclIds=[nacl_id])380    except botocore.exceptions.ClientError as e:381        module.fail_json(msg=str(e))382def find_default_vpc_nacl(vpc_id, client, module):383    try:384        response = client.describe_network_acls(Filters=[385            {'Name': 'vpc-id', 'Values': [vpc_id]}])386    except botocore.exceptions.ClientError as e:387        module.fail_json(msg=str(e))388    nacls = response['NetworkAcls']389    return [n['NetworkAclId'] for n in nacls if n['IsDefault'] == True]390def find_subnet_ids_by_nacl_id(nacl_id, client, module):391    try:392        results = client.describe_network_acls(Filters=[393            {'Name': 'association.network-acl-id', 'Values': [nacl_id]}394        ])395    except botocore.exceptions.ClientError as e:396        module.fail_json(msg=str(e))397    if results['NetworkAcls']:398        associations = results['NetworkAcls'][0]['Associations']399        return [s['SubnetId'] for s in associations if s['SubnetId']]400    else:401        return []402def replace_network_acl_association(nacl_id, subnets, client, module):403    params = dict()404    params['NetworkAclId'] = nacl_id405    for association in describe_acl_associations(subnets, client, module):406        params['AssociationId'] = association407        try:408            client.replace_network_acl_association(**params)409        except botocore.exceptions.ClientError as e:410            module.fail_json(msg=str(e))411def replace_network_acl_entry(entries, Egress, nacl_id, client, module):412    params = dict()413    for entry in entries:414        params = entry415        params['NetworkAclId'] = nacl_id416        try:417            client.replace_network_acl_entry(**params)418        except botocore.exceptions.ClientError as e:419            module.fail_json(msg=str(e))420def restore_default_acl_association(params, client, module):421    try:422        client.replace_network_acl_association(**params)423    except botocore.exceptions.ClientError as e:424        module.fail_json(msg=str(e))425def subnets_to_associate(nacl, client, module):426    params = list(module.params.get('subnets'))427    if not params:428        return []429    if params[0].startswith("subnet-"):430        try:431            subnets = client.describe_subnets(Filters=[432                {'Name': 'subnet-id', 'Values': params}])433        except botocore.exceptions.ClientError as e:434            module.fail_json(msg=str(e))435    else:436        try:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
