Best Python code snippet using localstack_python
apigateway_listener.py
Source:apigateway_listener.py  
...374    method = invocation_context.method375    headers = invocation_context.headers376    # run gateway authorizers for this request377    authorize_invocation(invocation_context)378    extracted_path, resource = get_target_resource_details(invocation_context)379    if not resource:380        return make_error_response("Unable to find path %s" % invocation_context.path, 404)381    # validate request382    validator = RequestValidator(invocation_context, aws_stack.connect_to_service("apigateway"))383    if not validator.is_request_valid():384        return make_error_response("Invalid request body", 400)385    api_key_required = resource.get("resourceMethods", {}).get(method, {}).get("apiKeyRequired")386    if not is_api_key_valid(api_key_required, headers, invocation_context.stage):387        return make_error_response("Access denied - invalid API key", 403)388    integrations = resource.get("resourceMethods", {})389    integration = integrations.get(method, {})390    if not integration:391        # HttpMethod: '*'392        # ResourcePath: '/*' - produces 'X-AMAZON-APIGATEWAY-ANY-METHOD'393        integration = integrations.get("ANY", {}) or integrations.get(394            "X-AMAZON-APIGATEWAY-ANY-METHOD", {}395        )396    integration = integration.get("methodIntegration")397    if not integration:398        if method == "OPTIONS" and "Origin" in headers:399            # default to returning CORS headers if this is an OPTIONS request400            return get_cors_response(headers)401        return make_error_response(402            "Unable to find integration for: %s %s (%s)" % (method, invocation_path, raw_path),403            404,404        )405    res_methods = resource.get("resourceMethods", {})406    meth_integration = res_methods.get(method, {}).get("methodIntegration", {})407    int_responses = meth_integration.get("integrationResponses", {})408    response_templates = int_responses.get("200", {}).get("responseTemplates", {})409    # update fields in invocation context, then forward request to next handler410    invocation_context.resource = resource411    invocation_context.resource_path = extracted_path412    invocation_context.response_templates = response_templates413    invocation_context.integration = integration414    return invoke_rest_api_integration(invocation_context)415def invoke_rest_api_integration(invocation_context: ApiInvocationContext):416    try:417        response = invoke_rest_api_integration_backend(invocation_context)418        # TODO remove this setter once all the integrations are migrated to the new response419        #  handling420        invocation_context.response = response421        response = apply_response_parameters(invocation_context)422        return response423    except Exception as e:424        msg = f"Error invoking integration for API Gateway ID '{invocation_context.api_id}': {e}"425        LOG.exception(msg)426        return make_error_response(msg, 400)427# TODO: refactor this to have a class per integration type to make it easy to428# test the encapsulated logic429def invoke_rest_api_integration_backend(invocation_context: ApiInvocationContext):430    # define local aliases from invocation context431    invocation_path = invocation_context.path_with_query_string432    method = invocation_context.method433    data = invocation_context.data434    headers = invocation_context.headers435    api_id = invocation_context.api_id436    stage = invocation_context.stage437    resource_path = invocation_context.resource_path438    response_templates = invocation_context.response_templates439    integration = invocation_context.integration440    # extract integration type and path parameters441    relative_path, query_string_params = extract_query_string_params(path=invocation_path)442    integration_type_orig = integration.get("type") or integration.get("integrationType") or ""443    integration_type = integration_type_orig.upper()444    uri = integration.get("uri") or integration.get("integrationUri") or ""445    # XXX we need replace the internal Authorization header with an Authorization header set from446    # the customer, even if it's empty that's what's expected in the integration.447    custom_auth_header = invocation_context.headers.pop(HEADER_LOCALSTACK_AUTHORIZATION, "")448    invocation_context.headers["Authorization"] = custom_auth_header449    try:450        path_params = extract_path_params(path=relative_path, extracted_path=resource_path)451        invocation_context.path_params = path_params452    except Exception:453        path_params = {}454    if (uri.startswith("arn:aws:apigateway:") and ":lambda:path" in uri) or uri.startswith(455        "arn:aws:lambda"456    ):457        if integration_type in ["AWS", "AWS_PROXY"]:458            func_arn = uri459            if ":lambda:path" in uri:460                func_arn = (461                    uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]462                )463            invocation_context.context = get_event_request_context(invocation_context)464            invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)465            if invocation_context.authorizer_type:466                authorizer_context = {467                    invocation_context.authorizer_type: invocation_context.auth_context468                }469                invocation_context.context["authorizer"] = authorizer_context470            request_templates = RequestTemplates()471            payload = request_templates.render(invocation_context)472            # TODO: change this signature to InvocationContext as well!473            result = lambda_api.process_apigateway_invocation(474                func_arn,475                relative_path,476                payload,477                stage,478                api_id,479                headers,480                is_base64_encoded=invocation_context.is_data_base64_encoded,481                path_params=path_params,482                query_string_params=query_string_params,483                method=method,484                resource_path=resource_path,485                request_context=invocation_context.context,486                stage_variables=invocation_context.stage_variables,487            )488            if isinstance(result, FlaskResponse):489                response = flask_to_requests_response(result)490            elif isinstance(result, Response):491                response = result492            else:493                response = LambdaResponse()494                parsed_result = (495                    result if isinstance(result, dict) else json.loads(str(result or "{}"))496                )497                parsed_result = common.json_safe(parsed_result)498                parsed_result = {} if parsed_result is None else parsed_result499                response.status_code = int(parsed_result.get("statusCode", 200))500                parsed_headers = parsed_result.get("headers", {})501                if parsed_headers is not None:502                    response.headers.update(parsed_headers)503                try:504                    result_body = parsed_result.get("body")505                    if isinstance(result_body, dict):506                        response._content = json.dumps(result_body)507                    else:508                        body_bytes = to_bytes(to_str(result_body or ""))509                        if parsed_result.get("isBase64Encoded", False):510                            body_bytes = base64.b64decode(body_bytes)511                        response._content = body_bytes512                except Exception as e:513                    LOG.warning("Couldn't set Lambda response content: %s", e)514                    response._content = "{}"515                update_content_length(response)516                response.multi_value_headers = parsed_result.get("multiValueHeaders") or {}517            # apply custom response template518            invocation_context.response = response519            response_templates = ResponseTemplates()520            response_templates.render(invocation_context)521            invocation_context.response.headers["Content-Length"] = str(len(response.content or ""))522            return invocation_context.response523        raise Exception(524            f'API Gateway integration type "{integration_type}", action "{uri}", method "{method}"'525        )526    elif integration_type == "AWS":527        if "kinesis:action/" in uri:528            if uri.endswith("kinesis:action/PutRecord"):529                target = kinesis_listener.ACTION_PUT_RECORD530            elif uri.endswith("kinesis:action/PutRecords"):531                target = kinesis_listener.ACTION_PUT_RECORDS532            elif uri.endswith("kinesis:action/ListStreams"):533                target = kinesis_listener.ACTION_LIST_STREAMS534            else:535                LOG.info(536                    f"Unexpected API Gateway integration URI '{uri}' for integration type {integration_type}",537                )538                target = ""539            try:540                invocation_context.context = get_event_request_context(invocation_context)541                invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)542                request_templates = RequestTemplates()543                payload = request_templates.render(invocation_context)544            except Exception as e:545                LOG.warning("Unable to convert API Gateway payload to str", e)546                raise547            # forward records to target kinesis stream548            headers = aws_stack.mock_aws_request_headers(549                service="kinesis", region_name=invocation_context.region_name550            )551            headers["X-Amz-Target"] = target552            result = common.make_http_request(553                url=config.service_url("kineses"), data=payload, headers=headers, method="POST"554            )555            # apply response template556            invocation_context.response = result557            response_templates = ResponseTemplates()558            response_templates.render(invocation_context)559            return invocation_context.response560        elif "states:action/" in uri:561            action = uri.split("/")[-1]562            if APPLICATION_JSON in integration.get("requestTemplates", {}):563                request_templates = RequestTemplates()564                payload = request_templates.render(invocation_context)565                payload = json.loads(payload)566            else:567                # XXX decoding in py3 sounds wrong, this actually might break568                payload = json.loads(data.decode("utf-8"))569            client = aws_stack.connect_to_service("stepfunctions")570            if isinstance(payload.get("input"), dict):571                payload["input"] = json.dumps(payload["input"])572            # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'573            method_name = (574                camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"575            )576            try:577                method = getattr(client, method_name)578            except AttributeError:579                msg = "Invalid step function action: %s" % method_name580                LOG.error(msg)581                return make_error_response(msg, 400)582            result = method(**payload)583            result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})584            response = requests_response(585                content=result,586                headers=aws_stack.mock_aws_request_headers(),587            )588            if action == "StartSyncExecution":589                # poll for the execution result and return it590                result = await_sfn_execution_result(result["executionArn"])591                result_status = result.get("status")592                if result_status != "SUCCEEDED":593                    return make_error_response(594                        "StepFunctions execution %s failed with status '%s'"595                        % (result["executionArn"], result_status),596                        500,597                    )598                result = json_safe(result)599                response = requests_response(content=result)600            # apply response templates601            invocation_context.response = response602            response_templates = ResponseTemplates()603            response_templates.render(invocation_context)604            # response = apply_request_response_templates(605            #     response, response_templates, content_type=APPLICATION_JSON606            # )607            return response608        # https://docs.aws.amazon.com/apigateway/api-reference/resource/integration/609        elif ("s3:path/" in uri or "s3:action/" in uri) and method == "GET":610            s3 = aws_stack.connect_to_service("s3")611            uri = apply_request_parameters(612                uri,613                integration=integration,614                path_params=path_params,615                query_params=query_string_params,616            )617            uri_match = re.match(TARGET_REGEX_PATH_S3_URI, uri) or re.match(618                TARGET_REGEX_ACTION_S3_URI, uri619            )620            if uri_match:621                bucket, object_key = uri_match.group("bucket", "object")622                LOG.debug("Getting request for bucket %s object %s", bucket, object_key)623                try:624                    object = s3.get_object(Bucket=bucket, Key=object_key)625                except s3.exceptions.NoSuchKey:626                    msg = "Object %s not found" % object_key627                    LOG.debug(msg)628                    return make_error_response(msg, 404)629                headers = aws_stack.mock_aws_request_headers(service="s3")630                if object.get("ContentType"):631                    headers["Content-Type"] = object["ContentType"]632                # stream used so large files do not fill memory633                response = request_response_stream(stream=object["Body"], headers=headers)634                return response635            else:636                msg = "Request URI does not match s3 specifications"637                LOG.warning(msg)638                return make_error_response(msg, 400)639        if method == "POST":640            if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:641                template = integration["requestTemplates"][APPLICATION_JSON]642                account_id, queue = uri.split("/")[-2:]643                region_name = uri.split(":")[3]644                if "GetQueueUrl" in template or "CreateQueue" in template:645                    request_templates = RequestTemplates()646                    payload = request_templates.render(invocation_context)647                    new_request = f"{payload}&QueueName={queue}"648                else:649                    request_templates = RequestTemplates()650                    payload = request_templates.render(invocation_context)651                    queue_url = f"{config.get_edge_url()}/{account_id}/{queue}"652                    new_request = f"{payload}&QueueUrl={queue_url}"653                headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)654                url = urljoin(config.service_url("sqs"), f"{TEST_AWS_ACCOUNT_ID}/{queue}")655                result = common.make_http_request(656                    url, method="POST", headers=headers, data=new_request657                )658                return result659            elif uri.startswith("arn:aws:apigateway:") and ":sns:path" in uri:660                invocation_context.context = get_event_request_context(invocation_context)661                invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)662                integration_response = SnsIntegration().invoke(invocation_context)663                return apply_request_response_templates(664                    integration_response, response_templates, content_type=APPLICATION_JSON665                )666        raise Exception(667            'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'668            % (uri, method)669        )670    elif integration_type == "AWS_PROXY":671        if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:672            # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection673            table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]674            action = uri.split(":dynamodb:action")[1].split("&Table=")[0]675            if "PutItem" in action and method == "PUT":676                response_template = response_templates.get("application/json")677                if response_template is None:678                    msg = "Invalid response template defined in integration response."679                    LOG.info("%s Existing: %s", msg, response_templates)680                    return make_error_response(msg, 404)681                response_template = json.loads(response_template)682                if response_template["TableName"] != table_name:683                    msg = "Invalid table name specified in integration response template."684                    return make_error_response(msg, 404)685                dynamo_client = aws_stack.connect_to_resource("dynamodb")686                table = dynamo_client.Table(table_name)687                event_data = {}688                data_dict = json.loads(data)689                for key, _ in response_template["Item"].items():690                    event_data[key] = data_dict[key]691                table.put_item(Item=event_data)692                response = requests_response(event_data)693                return response694        else:695            raise Exception(696                'API Gateway action uri "%s", integration type %s not yet implemented'697                % (uri, integration_type)698            )699    elif integration_type in ["HTTP_PROXY", "HTTP"]:700        if ":servicediscovery:" in uri:701            # check if this is a servicediscovery integration URI702            client = aws_stack.connect_to_service("servicediscovery")703            service_id = uri.split("/")[-1]704            instances = client.list_instances(ServiceId=service_id)["Instances"]705            instance = (instances or [None])[0]706            if instance and instance.get("Id"):707                uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))708        # apply custom request template709        invocation_context.context = get_event_request_context(invocation_context)710        invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)711        request_templates = RequestTemplates()712        payload = request_templates.render(invocation_context)713        if isinstance(payload, dict):714            payload = json.dumps(payload)715        uri = apply_request_parameters(716            uri, integration=integration, path_params=path_params, query_params=query_string_params717        )718        result = requests.request(method=method, url=uri, data=payload, headers=headers)719        # apply custom response template720        invocation_context.response = result721        response_templates = ResponseTemplates()722        response_templates.render(invocation_context)723        return invocation_context.response724    elif integration_type == "MOCK":725        # TODO: apply tell don't ask principle inside ResponseTemplates or InvocationContext726        invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)727        invocation_context.response = requests_response({})728        response_templates = ResponseTemplates()729        response_templates.render(invocation_context)730        return invocation_context.response731    if method == "OPTIONS":732        # fall back to returning CORS headers if this is an OPTIONS request733        return get_cors_response(headers)734    raise Exception(735        'API Gateway integration type "%s", method "%s", URI "%s" not yet implemented'736        % (integration_type, method, uri)737    )738def get_target_resource_details(invocation_context: ApiInvocationContext) -> Tuple[str, Dict]:739    """Look up and return the API GW resource (path pattern + resource dict) for the given invocation context."""740    path_map = helpers.get_rest_api_paths(741        rest_api_id=invocation_context.api_id, region_name=invocation_context.region_name742    )743    relative_path = invocation_context.invocation_path744    try:745        extracted_path, resource = get_resource_for_path(path=relative_path, path_map=path_map)746        invocation_context.resource = resource747        return extracted_path, resource748    except Exception:749        return None, None750def get_target_resource_method(invocation_context: ApiInvocationContext) -> Optional[Dict]:751    """Look up and return the API GW resource method for the given invocation context."""752    _, resource = get_target_resource_details(invocation_context)753    if not resource:754        return None755    methods = resource.get("resourceMethods") or {}756    method_name = invocation_context.method.upper()757    return methods.get(method_name) or methods.get("ANY")758def get_event_request_context(invocation_context: ApiInvocationContext):759    method = invocation_context.method760    path = invocation_context.path761    headers = invocation_context.headers762    integration_uri = invocation_context.integration_uri763    resource_path = invocation_context.resource_path764    resource_id = invocation_context.resource_id765    set_api_id_stage_invocation_path(invocation_context)766    relative_path, query_string_params = extract_query_string_params(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
