Best Python code snippet using localstack_python
apigateway_listener.py
Source:apigateway_listener.py  
...463                target = kinesis_listener.ACTION_PUT_RECORDS464            if uri.endswith("kinesis:action/ListStreams"):465                target = kinesis_listener.ACTION_LIST_STREAMS466            # apply request templates467            new_data = apply_request_response_templates(468                data, integration.get("requestTemplates"), content_type=APPLICATION_JSON469            )470            # forward records to target kinesis stream471            headers = aws_stack.mock_aws_request_headers(service="kinesis")472            headers["X-Amz-Target"] = target473            result = common.make_http_request(474                url=config.TEST_KINESIS_URL, method="POST", data=new_data, headers=headers475            )476            # apply response template477            result = apply_request_response_templates(478                result, response_templates, content_type=APPLICATION_JSON479            )480            return result481        elif "states:action/" in uri:482            action = uri.split("/")[-1]483            payload = {}484            if APPLICATION_JSON in integration.get("requestTemplates", {}):485                payload = apply_request_response_templates(486                    data,487                    integration.get("requestTemplates"),488                    content_type=APPLICATION_JSON,489                    as_json=True,490                )491            else:492                payload = json.loads(data.decode("utf-8"))493            client = aws_stack.connect_to_service("stepfunctions")494            # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'495            method_name = (496                camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"497            )498            try:499                method = getattr(client, method_name)500            except AttributeError:501                msg = "Invalid step function action: %s" % method_name502                LOG.error(msg)503                return make_error_response(msg, 400)504            result = method(505                **payload,506            )507            result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})508            response = requests_response(509                content=result,510                headers=aws_stack.mock_aws_request_headers(),511            )512            if action == "StartSyncExecution":513                # poll for the execution result and return it514                result = await_sfn_execution_result(result["executionArn"])515                result_status = result.get("status")516                if result_status != "SUCCEEDED":517                    return make_error_response(518                        "StepFunctions execution %s failed with status '%s'"519                        % (result["executionArn"], result_status),520                        500,521                    )522                result = json_safe(result)523                response = requests_response(content=result)524            # apply response templates525            response = apply_request_response_templates(526                response, response_templates, content_type=APPLICATION_JSON527            )528            return response529        elif "s3:path/" in uri and method == "GET":530            s3 = aws_stack.connect_to_service("s3")531            uri_match = re.match(TARGET_REGEX_S3_URI, uri)532            if uri_match:533                bucket, object_key = uri_match.group("bucket", "object")534                LOG.debug("Getting request for bucket %s object %s", bucket, object_key)535                try:536                    object = s3.get_object(Bucket=bucket, Key=object_key)537                except s3.exceptions.NoSuchKey:538                    msg = "Object %s not found" % object_key539                    LOG.debug(msg)540                    return make_error_response(msg, 404)541                headers = aws_stack.mock_aws_request_headers(service="s3")542                if object.get("ContentType"):543                    headers["Content-Type"] = object["ContentType"]544                # stream used so large files do not fill memory545                response = request_response_stream(stream=object["Body"], headers=headers)546                return response547            else:548                msg = "Request URI does not match s3 specifications"549                LOG.warning(msg)550                return make_error_response(msg, 400)551        if method == "POST":552            if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:553                template = integration["requestTemplates"][APPLICATION_JSON]554                account_id, queue = uri.split("/")[-2:]555                region_name = uri.split(":")[3]556                new_request = "%s&QueueName=%s" % (557                    aws_stack.render_velocity_template(template, data),558                    queue,559                )560                headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)561                url = urljoin(config.TEST_SQS_URL, "%s/%s" % (TEST_AWS_ACCOUNT_ID, queue))562                result = common.make_http_request(563                    url, method="POST", headers=headers, data=new_request564                )565                return result566        raise Exception(567            'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'568            % (uri, method)569        )570    elif integration_type == "AWS_PROXY":571        if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:572            # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection573            table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]574            action = uri.split(":dynamodb:action")[1].split("&Table=")[0]575            if "PutItem" in action and method == "PUT":576                response_template = response_templates.get("application/json")577                if response_template is None:578                    msg = "Invalid response template defined in integration response."579                    LOG.info("%s Existing: %s" % (msg, response_templates))580                    return make_error_response(msg, 404)581                response_template = json.loads(response_template)582                if response_template["TableName"] != table_name:583                    msg = "Invalid table name specified in integration response template."584                    return make_error_response(msg, 404)585                dynamo_client = aws_stack.connect_to_resource("dynamodb")586                table = dynamo_client.Table(table_name)587                event_data = {}588                data_dict = json.loads(data)589                for key, _ in response_template["Item"].items():590                    event_data[key] = data_dict[key]591                table.put_item(Item=event_data)592                response = requests_response(event_data)593                return response594        else:595            raise Exception(596                'API Gateway action uri "%s", integration type %s not yet implemented'597                % (uri, integration_type)598            )599    elif integration_type in ["HTTP_PROXY", "HTTP"]:600        if ":servicediscovery:" in uri:601            # check if this is a servicediscovery integration URI602            client = aws_stack.connect_to_service("servicediscovery")603            service_id = uri.split("/")[-1]604            instances = client.list_instances(ServiceId=service_id)["Instances"]605            instance = (instances or [None])[0]606            if instance and instance.get("Id"):607                uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))608        # apply custom request template609        data = apply_template(integration, "request", data)610        if isinstance(data, dict):611            data = json.dumps(data)612        uri = apply_request_parameter(integration=integration, path_params=path_params)613        function = getattr(requests, method.lower())614        result = function(uri, data=data, headers=headers)615        # apply custom response template616        data = apply_template(integration, "response", data)617        return result618    elif integration_type == "MOCK":619        # return empty response - details filled in via responseParameters above...620        return requests_response({})621    if method == "OPTIONS":622        # fall back to returning CORS headers if this is an OPTIONS request623        return get_cors_response(headers)624    raise Exception(625        'API Gateway integration type "%s", method "%s", URI "%s" not yet implemented'626        % (integration_type, method, uri)627    )628def get_stage_variables(api_id, stage):629    region_name = [name for name, region in apigateway_backends.items() if api_id in region.apis][0]630    api_gateway_client = aws_stack.connect_to_service("apigateway", region_name=region_name)631    response = api_gateway_client.get_stage(restApiId=api_id, stageName=stage)632    return response.get("variables", None)633def get_lambda_event_request_context(634    method,635    path,636    data,637    headers,638    integration_uri=None,639    resource_id=None,640    resource_path=None,641    auth_info={},642):643    api_id, stage, relative_path_w_query_params = get_api_id_stage_invocation_path(path, headers)644    relative_path, query_string_params = extract_query_string_params(645        path=relative_path_w_query_params646    )647    source_ip = headers.get("X-Forwarded-For", ",").split(",")[-2].strip()648    integration_uri = integration_uri or ""649    account_id = integration_uri.split(":lambda:path")[-1].split(":function:")[0].split(":")[-1]650    account_id = account_id or TEST_AWS_ACCOUNT_ID651    domain_name = f"{api_id}.execute-api.{LOCALHOST_HOSTNAME}"652    request_context = {653        # adding stage to the request context path.654        # https://github.com/localstack/localstack/issues/2210655        "path": "/" + stage + relative_path,656        "resourcePath": resource_path or relative_path,657        "apiId": api_id,658        "domainPrefix": api_id,659        "domainName": domain_name,660        "accountId": account_id,661        "resourceId": resource_id,662        "stage": stage,663        "identity": {664            "accountId": account_id,665            "sourceIp": source_ip,666            "userAgent": headers.get("User-Agent"),667        },668        "httpMethod": method,669        "protocol": "HTTP/1.1",670        "requestTime": datetime.datetime.utcnow(),671        "requestTimeEpoch": int(time.time() * 1000),672    }673    if isinstance(auth_info, dict) and auth_info.get("context"):674        request_context["authorizer"] = auth_info["context"]675    return request_context676def apply_request_response_templates(677    data: Union[Response, bytes],678    templates: Dict[str, str],679    content_type: str = None,680    as_json: bool = False,681):682    """Apply the matching request/response template (if it exists) to the payload data and return the result"""683    content_type = content_type or APPLICATION_JSON684    is_response = isinstance(data, Response)685    templates = templates or {}686    template = templates.get(content_type)687    if not template:688        return data689    content = (data.content if is_response else data) or ""690    result = aws_stack.render_velocity_template(template, content, as_json=as_json)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
