How to use replace_network_acl_association method in localstack

Best Python code snippet using localstack_python

ec2_vpc_nacl.py

Source:ec2_vpc_nacl.py Github

copy

Full Screen

...166 if not subnets:167 default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0]168 subnets = find_subnet_ids_by_nacl_id(nacl_id, client, module)169 if subnets:170 replace_network_acl_association(default_nacl_id, subnets, client, module)171 changed = True172 return changed173 changed = False174 return changed175 subs_added = subnets_added(nacl_id, subnets, client, module)176 if subs_added:177 replace_network_acl_association(nacl_id, subs_added, client, module)178 changed = True179 subs_removed = subnets_removed(nacl_id, subnets, client, module)180 if subs_removed:181 default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)[0]182 replace_network_acl_association(default_nacl_id, subs_removed, client, module)183 changed = True184 return changed185def nacls_changed(nacl, client, module):186 changed = False187 params = dict()188 params['egress'] = module.params.get('egress')189 params['ingress'] = module.params.get('ingress')190 nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']191 nacl = describe_network_acl(client, module)192 entries = nacl['NetworkAcls'][0]['Entries']193 tmp_egress = [entry for entry in entries if entry['Egress'] is True and DEFAULT_EGRESS !=entry]194 tmp_ingress = [entry for entry in entries if entry['Egress'] is False]195 egress = [rule for rule in tmp_egress if DEFAULT_EGRESS != rule]196 ingress = [rule for rule in tmp_ingress if DEFAULT_INGRESS != rule]197 if rules_changed(egress, params['egress'], True, nacl_id, client, module):198 changed = True199 if rules_changed(ingress, params['ingress'], False, nacl_id, client, module):200 changed = True201 return changed202def tags_changed(nacl_id, client, module):203 changed = False204 tags = dict()205 if module.params.get('tags'):206 tags = module.params.get('tags')207 tags['Name'] = module.params.get('name')208 nacl = find_acl_by_id(nacl_id, client, module)209 if nacl['NetworkAcls']:210 nacl_values = [t.values() for t in nacl['NetworkAcls'][0]['Tags']]211 nacl_tags = [item for sublist in nacl_values for item in sublist]212 tag_values = [[key, str(value)] for key, value in tags.iteritems()]213 tags = [item for sublist in tag_values for item in sublist]214 if sorted(nacl_tags) == sorted(tags):215 changed = False216 return changed217 else:218 delete_tags(nacl_id, client, module)219 create_tags(nacl_id, client, module)220 changed = True221 return changed222 return changed223def rules_changed(aws_rules, param_rules, Egress, nacl_id, client, module):224 changed = False225 rules = list()226 for entry in param_rules:227 rules.append(process_rule_entry(entry, Egress))228 if rules == aws_rules:229 return changed230 else:231 removed_rules = [x for x in aws_rules if x not in rules]232 if removed_rules:233 params = dict()234 for rule in removed_rules:235 params['NetworkAclId'] = nacl_id236 params['RuleNumber'] = rule['RuleNumber']237 params['Egress'] = Egress238 delete_network_acl_entry(params, client, module)239 changed = True240 added_rules = [x for x in rules if x not in aws_rules]241 if added_rules:242 for rule in added_rules:243 rule['NetworkAclId'] = nacl_id244 create_network_acl_entry(rule, client, module)245 changed = True246 return changed247def process_rule_entry(entry, Egress):248 params = dict()249 params['RuleNumber'] = entry[0]250 params['Protocol'] = str(PROTOCOL_NUMBERS[entry[1]])251 params['RuleAction'] = entry[2]252 params['Egress'] = Egress253 params['CidrBlock'] = entry[3]254 if icmp_present(entry):255 params['IcmpTypeCode'] = {"Type": int(entry[4]), "Code": int(entry[5])}256 else:257 if entry[6] or entry[7]:258 params['PortRange'] = {"From": entry[6], 'To': entry[7]}259 return params260def restore_default_associations(assoc_ids, default_nacl_id, client, module):261 if assoc_ids:262 params = dict()263 params['NetworkAclId'] = default_nacl_id[0]264 for assoc_id in assoc_ids:265 params['AssociationId'] = assoc_id266 restore_default_acl_association(params, client, module)267 return True268def construct_acl_entries(nacl, client, module):269 for entry in module.params.get('ingress'):270 params = process_rule_entry(entry, Egress=False)271 params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId']272 create_network_acl_entry(params, client, module)273 for rule in module.params.get('egress'):274 params = process_rule_entry(rule, Egress=True)275 params['NetworkAclId'] = nacl['NetworkAcl']['NetworkAclId']276 create_network_acl_entry(params, client, module)277## Module invocations278def setup_network_acl(client, module):279 changed = False280 nacl = describe_network_acl(client, module)281 if not nacl['NetworkAcls']:282 nacl = create_network_acl(module.params.get('vpc_id'), client, module)283 nacl_id = nacl['NetworkAcl']['NetworkAclId']284 create_tags(nacl_id, client, module)285 subnets = subnets_to_associate(nacl, client, module)286 replace_network_acl_association(nacl_id, subnets, client, module)287 construct_acl_entries(nacl, client, module)288 changed = True289 return(changed, nacl['NetworkAcl']['NetworkAclId'])290 else:291 changed = False292 nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']293 subnet_result = subnets_changed(nacl, client, module)294 nacl_result = nacls_changed(nacl, client, module)295 tag_result = tags_changed(nacl_id, client, module)296 if subnet_result is True or nacl_result is True or tag_result is True:297 changed = True298 return(changed, nacl_id)299 return (changed, nacl_id)300def remove_network_acl(client, module):301 changed = False302 result = dict()303 vpc_id = module.params.get('vpc_id')304 nacl = describe_network_acl(client, module)305 if nacl['NetworkAcls']:306 nacl_id = nacl['NetworkAcls'][0]['NetworkAclId']307 associations = nacl['NetworkAcls'][0]['Associations']308 assoc_ids = [a['NetworkAclAssociationId'] for a in associations]309 default_nacl_id = find_default_vpc_nacl(vpc_id, client, module)310 if not default_nacl_id:311 result = {vpc_id: "Default NACL ID not found - Check the VPC ID"}312 return changed, result313 if restore_default_associations(assoc_ids, default_nacl_id, client, module):314 delete_network_acl(nacl_id, client, module)315 changed = True316 result[nacl_id] = "Successfully deleted"317 return changed, result318 if not assoc_ids: 319 delete_network_acl(nacl_id, client, module)320 changed = True321 result[nacl_id] = "Successfully deleted"322 return changed, result 323 return changed, result324#Boto3 client methods325def create_network_acl(vpc_id, client, module):326 try:327 nacl = client.create_network_acl(VpcId=vpc_id)328 except botocore.exceptions.ClientError as e:329 module.fail_json(msg=str(e))330 return nacl331def create_network_acl_entry(params, client, module):332 try:333 result = client.create_network_acl_entry(**params)334 except botocore.exceptions.ClientError as e:335 module.fail_json(msg=str(e))336 return result337def create_tags(nacl_id, client, module):338 try:339 delete_tags(nacl_id, client, module)340 client.create_tags(Resources=[nacl_id], Tags=load_tags(module))341 except botocore.exceptions.ClientError as e:342 module.fail_json(msg=str(e))343def delete_network_acl(nacl_id, client, module):344 try:345 client.delete_network_acl(NetworkAclId=nacl_id)346 except botocore.exceptions.ClientError as e:347 module.fail_json(msg=str(e))348def delete_network_acl_entry(params, client, module):349 try:350 client.delete_network_acl_entry(**params)351 except botocore.exceptions.ClientError as e:352 module.fail_json(msg=str(e))353def delete_tags(nacl_id, client, module):354 try:355 client.delete_tags(Resources=[nacl_id])356 except botocore.exceptions.ClientError as e:357 module.fail_json(msg=str(e))358def describe_acl_associations(subnets, client, module):359 if not subnets:360 return []361 try:362 results = client.describe_network_acls(Filters=[363 {'Name': 'association.subnet-id', 'Values': subnets}364 ])365 except botocore.exceptions.ClientError as e:366 module.fail_json(msg=str(e))367 associations = results['NetworkAcls'][0]['Associations']368 return [a['NetworkAclAssociationId'] for a in associations if a['SubnetId'] in subnets]369def describe_network_acl(client, module):370 try:371 nacl = client.describe_network_acls(Filters=[372 {'Name': 'tag:Name', 'Values': [module.params.get('name')]}373 ])374 except botocore.exceptions.ClientError as e:375 module.fail_json(msg=str(e))376 return nacl377def find_acl_by_id(nacl_id, client, module):378 try:379 return client.describe_network_acls(NetworkAclIds=[nacl_id])380 except botocore.exceptions.ClientError as e:381 module.fail_json(msg=str(e))382def find_default_vpc_nacl(vpc_id, client, module):383 try:384 response = client.describe_network_acls(Filters=[385 {'Name': 'vpc-id', 'Values': [vpc_id]}])386 except botocore.exceptions.ClientError as e:387 module.fail_json(msg=str(e))388 nacls = response['NetworkAcls']389 return [n['NetworkAclId'] for n in nacls if n['IsDefault'] == True]390def find_subnet_ids_by_nacl_id(nacl_id, client, module):391 try:392 results = client.describe_network_acls(Filters=[393 {'Name': 'association.network-acl-id', 'Values': [nacl_id]}394 ])395 except botocore.exceptions.ClientError as e:396 module.fail_json(msg=str(e))397 if results['NetworkAcls']:398 associations = results['NetworkAcls'][0]['Associations']399 return [s['SubnetId'] for s in associations if s['SubnetId']]400 else:401 return []402def replace_network_acl_association(nacl_id, subnets, client, module):403 params = dict()404 params['NetworkAclId'] = nacl_id405 for association in describe_acl_associations(subnets, client, module):406 params['AssociationId'] = association407 try:408 client.replace_network_acl_association(**params)409 except botocore.exceptions.ClientError as e:410 module.fail_json(msg=str(e))411def replace_network_acl_entry(entries, Egress, nacl_id, client, module):412 params = dict()413 for entry in entries:414 params = entry415 params['NetworkAclId'] = nacl_id416 try:417 client.replace_network_acl_entry(**params)418 except botocore.exceptions.ClientError as e:419 module.fail_json(msg=str(e))420def restore_default_acl_association(params, client, module):421 try:422 client.replace_network_acl_association(**params)423 except botocore.exceptions.ClientError as e:424 module.fail_json(msg=str(e))425def subnets_to_associate(nacl, client, module):426 params = list(module.params.get('subnets'))427 if not params:428 return []429 if params[0].startswith("subnet-"):430 try:431 subnets = client.describe_subnets(Filters=[432 {'Name': 'subnet-id', 'Values': params}])433 except botocore.exceptions.ClientError as e:434 module.fail_json(msg=str(e))435 else:436 try:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful