Best Python code snippet using localstack_python
message_forwarding.py
Source:message_forwarding.py  
...204        "Connection": "close",205    }206    endpoint = add_api_destination_authorization(destination, headers, event)207    if http_parameters:208        endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event)209    result = requests.request(210        method=method, url=endpoint, data=json.dumps(event or {}), headers=headers211    )212    if result.status_code >= 400:213        LOG.debug("Received code %s forwarding events: %s %s", result.status_code, method, endpoint)214        if result.status_code == 429 or 500 <= result.status_code <= 600:215            pass  # TODO: retry logic (only retry on 429 and 5xx response status)216def add_api_destination_authorization(destination, headers, event):217    connection_arn = destination.get("ConnectionArn", "")218    connection_name = re.search(r"connection\/([a-zA-Z0-9-_]+)\/", connection_arn).group(1)219    connection_region = extract_region_from_arn(connection_arn)220    # Using backend directly due to boto hiding passwords, keys and secret values221    event_backend = moto_events_backends.get(connection_region)222    connection = event_backend.describe_connection(name=connection_name)223    headers.update(auth_keys_from_connection(connection))224    auth_parameters = connection.get("AuthParameters", {})225    invocation_parameters = auth_parameters.get("InvocationHttpParameters")226    endpoint = destination.get("InvocationEndpoint")227    if invocation_parameters:228        header_parameters = list_of_parameters_to_object(229            invocation_parameters.get("HeaderParameters", [])230        )231        headers.update(header_parameters)232        body_parameters = list_of_parameters_to_object(233            invocation_parameters.get("BodyParameters", [])234        )235        event.update(body_parameters)236        query_parameters = invocation_parameters.get("QueryStringParameters", [])237        query_object = list_of_parameters_to_object(query_parameters)238        endpoint = add_query_params_to_url(endpoint, query_object)239    return endpoint240def add_target_http_parameters(http_parameters: Dict, endpoint: str, headers: Dict, body):241    endpoint = add_path_parameters_to_url(endpoint, http_parameters.get("PathParameterValues", []))242    # The request should prioritze connection header/query parameters over target params if there is an overlap243    query_params = http_parameters.get("QueryStringParameters", {})244    prev_query_params = extract_query_string_params(endpoint)[1]245    query_params.update(prev_query_params)246    endpoint = add_query_params_to_url(endpoint, query_params)247    target_headers = http_parameters.get("HeaderParameters", {})248    for target_header in target_headers.keys():249        if target_header not in headers:250            headers.update({target_header: target_headers.get(target_header)})...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
