How to use resolve_endpoints method in localstack

Best Python code snippet using localstack_python

intelliment.py

Source:intelliment.py Github

copy

Full Screen

...54 "services": resolve_services(policy)55 }],56 "tags" : policy["tags"] if "tags" in policy else []57 }58 policy_endpoints = resolve_endpoints(scenario_id, policy, internet)59 if is_aws_related(policy_endpoints):60 create_aws_related_requirement(scenario_id, requirement, policy_endpoints)61 else:62 tags = ["ansible"]63 create_requirement(scenario_id, requirement, tags, policy_endpoints["source"], policy_endpoints["source_type"], policy_endpoints["destination"], policy_endpoints["destination_type"])64""" 65 Creates AWS related requirement solving its security group or network ACL related 66"""67def create_aws_related_requirement(scenario_id, requirement, policy_endpoints):68 69 aws = get_aws_fields(policy_endpoints)70 server = request + scenario_id + "/objects?types=objects_group&name=" + aws["aws_namefield"]71 response = requests.get(server, headers = headers)72 data = response.json()["data"]73 74 for obj in data:75 obj_id = obj["id"]76 if "objects" in obj.keys():77 for net in obj["objects"]:78 if net["id"] == aws["aws_related_element"]:79 if aws["aws_related_element_type"] == "source":80 create_requirement(scenario_id, requirement, aws["aws_tag"], obj_id, "id", policy_endpoints["destination"], policy_endpoints["destination_type"])81 else:82 create_requirement(scenario_id, requirement, aws["aws_tag"], policy_endpoints["source"], policy_endpoints["source_type"], obj_id, "id")83""" 84 Creates a requirement via Intelliment API 85"""86def create_requirement(scenario_id, requirement, tags, source, source_type, destination, destination_type):87 88 requirement["tags"] += tags89 requirement["source"] = {90 "type": source_type,91 "value": source92 }93 requirement["destination"] = {94 "type": destination_type,95 "value": destination96 } 97 server = request + scenario_id + "/requirements"98 requests.post(server, headers = headers, data = json.dumps(requirement))99 time.sleep(1)100""" 101 Return the id of internet element on scenario attached to AWS 102"""103def get_internet_id(scenario_id):104 105 server = request + scenario_id + "/objects?types=internet"106 response = requests.get(server, headers = headers)107 data = response.json()["data"]108 return data[0]["id"]109""" 110 Resolves policies endpoints (source and destination) identified by tags, IPs and/or names 111"""112def resolve_endpoints(scenario_id, policy, internet):113 114 policy_endpoints = {115 "source" : "",116 "source_type" : "",117 "destination" : "",118 "destination_type" : ""119 }120 policy_endpoints = resolve_endpoints_by_tags(scenario_id, policy, policy_endpoints)121 policy_endpoints = resolve_enpoint(scenario_id, "source", "source_type", internet, policy, policy_endpoints)122 policy_endpoints = resolve_enpoint(scenario_id, "destination", "destination_type", internet, policy, policy_endpoints)123 return policy_endpoints124""" 125 Resolves endpoint by field and field type 126"""...

Full Screen

Full Screen

connect.py

Source:connect.py Github

copy

Full Screen

...82 # Get runtime bees83 source = getattr(source_hive, source_candidate.bee_name)84 target = getattr(target_hive, target_candidate.bee_name)85 return source, target86def resolve_endpoints(source, target):87 """Find resolved endpoints for source/targets which are dervived connection sources/targets (Hives)"""88 # TODO: register connection, or insert a listener function in between89 hive_source = isinstance(source, ConnectSourceDerived)90 hive_target = isinstance(target, ConnectTargetDerived)91 # Find appropriate bees to connect within respective hives92 if hive_source and hive_target:93 source, target = find_connection_between_hives(source, target)94 else:95 if hive_source:96 source = source._hive_get_connect_source(target)97 elif hive_target:98 target = target._hive_get_connect_target(source)99 return source, target100def build_connection(source, target):101 """Runtime connection builder between source and target"""102 source, target = resolve_endpoints(source, target)103 # raises an Exception if incompatible104 source._hive_is_connectable_source(target)105 target._hive_is_connectable_target(source)106 debug_context = get_debug_context()107 if debug_context is not None:108 debug_context.build_connection(source, target)109 else:110 target._hive_connect_target(source)111 source._hive_connect_source(target)112class Connection(Bindable):113 def __init__(self, source, target):114 self._source = source115 self._target = target116 super().__init__()...

Full Screen

Full Screen

lambda_function.py

Source:lambda_function.py Github

copy

Full Screen

...9''' 10This function resolved the desired targets and returns IP list, 11the results saved into "desired_routes" since it used more than one time in the code.12'''13def resolve_endpoints(endpoints):14 endpoints_list = []15 for endpoint in endpoints.split(','):16 ips = socket.gethostbyname_ex(endpoint)17 # print(ips)18 for ip in ips[2]:19 # print(ip + "/32")20 endpoints_list.append(ip + "/32")21 print(endpoints_list)22 return endpoints_list23desired_routes = resolve_endpoints(endpoints_name_list)24# resolves the current route table with all the recordes25def get_route_table_routes(rtb):26 route_table = ec2.RouteTable(rtb)27 route_list = route_table.routes_attribute28 return route_list29# filter any route that is not associated to the relevant NAT-GW30def find_associate_routes():31 match = []32 for route in get_route_table_routes(route_table_id):33 if route.get('NatGatewayId') == nat_gw_id:34 match.append(route['DestinationCidrBlock'])35 return match36# takes the current NAT-GW routes and compare it with the DNS resolving results and returns all the routes that are not in the DNS list37def find_unused_routes():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful