How to use add_target_http_parameters method in localstack

Best Python code snippet using localstack_python

message_forwarding.py

Source:message_forwarding.py Github

copy

Full Screen

...204 "Connection": "close",205 }206 endpoint = add_api_destination_authorization(destination, headers, event)207 if http_parameters:208 endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event)209 result = requests.request(210 method=method, url=endpoint, data=json.dumps(event or {}), headers=headers211 )212 if result.status_code >= 400:213 LOG.debug("Received code %s forwarding events: %s %s", result.status_code, method, endpoint)214 if result.status_code == 429 or 500 <= result.status_code <= 600:215 pass # TODO: retry logic (only retry on 429 and 5xx response status)216def add_api_destination_authorization(destination, headers, event):217 connection_arn = destination.get("ConnectionArn", "")218 connection_name = re.search(r"connection\/([a-zA-Z0-9-_]+)\/", connection_arn).group(1)219 connection_region = extract_region_from_arn(connection_arn)220 # Using backend directly due to boto hiding passwords, keys and secret values221 event_backend = moto_events_backends.get(connection_region)222 connection = event_backend.describe_connection(name=connection_name)223 headers.update(auth_keys_from_connection(connection))224 auth_parameters = connection.get("AuthParameters", {})225 invocation_parameters = auth_parameters.get("InvocationHttpParameters")226 endpoint = destination.get("InvocationEndpoint")227 if invocation_parameters:228 header_parameters = list_of_parameters_to_object(229 invocation_parameters.get("HeaderParameters", [])230 )231 headers.update(header_parameters)232 body_parameters = list_of_parameters_to_object(233 invocation_parameters.get("BodyParameters", [])234 )235 event.update(body_parameters)236 query_parameters = invocation_parameters.get("QueryStringParameters", [])237 query_object = list_of_parameters_to_object(query_parameters)238 endpoint = add_query_params_to_url(endpoint, query_object)239 return endpoint240def add_target_http_parameters(http_parameters: Dict, endpoint: str, headers: Dict, body):241 endpoint = add_path_parameters_to_url(endpoint, http_parameters.get("PathParameterValues", []))242 # The request should prioritze connection header/query parameters over target params if there is an overlap243 query_params = http_parameters.get("QueryStringParameters", {})244 prev_query_params = extract_query_string_params(endpoint)[1]245 query_params.update(prev_query_params)246 endpoint = add_query_params_to_url(endpoint, query_params)247 target_headers = http_parameters.get("HeaderParameters", {})248 for target_header in target_headers.keys():249 if target_header not in headers:250 headers.update({target_header: target_headers.get(target_header)})...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful