Best Python code snippet using localstack_python
invocations.py
Source:invocations.py  
...284            if ":lambda:path" in uri:285                func_arn = (286                    uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]287                )288            invocation_context.context = helpers.get_event_request_context(invocation_context)289            invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)290            if invocation_context.authorizer_type:291                authorizer_context = {292                    invocation_context.authorizer_type: invocation_context.auth_context293                }294                invocation_context.context["authorizer"] = authorizer_context295            request_templates = RequestTemplates()296            payload = request_templates.render(invocation_context)297            # TODO: change this signature to InvocationContext as well!298            result = lambda_api.process_apigateway_invocation(299                func_arn,300                relative_path,301                payload,302                stage,303                api_id,304                headers,305                is_base64_encoded=invocation_context.is_data_base64_encoded,306                path_params=path_params,307                query_string_params=query_string_params,308                method=method,309                resource_path=resource_path,310                request_context=invocation_context.context,311                stage_variables=invocation_context.stage_variables,312            )313            if isinstance(result, FlaskResponse):314                response = flask_to_requests_response(result)315            elif isinstance(result, Response):316                response = result317            else:318                response = LambdaResponse()319                parsed_result = (320                    result if isinstance(result, dict) else json.loads(str(result or "{}"))321                )322                parsed_result = common.json_safe(parsed_result)323                parsed_result = {} if parsed_result is None else parsed_result324                response.status_code = int(parsed_result.get("statusCode", 200))325                parsed_headers = parsed_result.get("headers", {})326                if parsed_headers is not None:327                    response.headers.update(parsed_headers)328                try:329                    result_body = parsed_result.get("body")330                    if isinstance(result_body, dict):331                        response._content = json.dumps(result_body)332                    else:333                        body_bytes = to_bytes(to_str(result_body or ""))334                        if parsed_result.get("isBase64Encoded", False):335                            body_bytes = base64.b64decode(body_bytes)336                        response._content = body_bytes337                except Exception as e:338                    LOG.warning("Couldn't set Lambda response content: %s", e)339                    response._content = "{}"340                update_content_length(response)341                response.multi_value_headers = parsed_result.get("multiValueHeaders") or {}342            # apply custom response template343            invocation_context.response = response344            response_templates = ResponseTemplates()345            response_templates.render(invocation_context)346            invocation_context.response.headers["Content-Length"] = str(len(response.content or ""))347            return invocation_context.response348        raise Exception(349            f'API Gateway integration type "{integration_type}", action "{uri}", method "{method}"'350        )351    elif integration_type == "AWS":352        if "kinesis:action/" in uri:353            if uri.endswith("kinesis:action/PutRecord"):354                target = kinesis_listener.ACTION_PUT_RECORD355            elif uri.endswith("kinesis:action/PutRecords"):356                target = kinesis_listener.ACTION_PUT_RECORDS357            elif uri.endswith("kinesis:action/ListStreams"):358                target = kinesis_listener.ACTION_LIST_STREAMS359            else:360                LOG.info(361                    f"Unexpected API Gateway integration URI '{uri}' for integration type {integration_type}",362                )363                target = ""364            try:365                invocation_context.context = helpers.get_event_request_context(invocation_context)366                invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)367                request_templates = RequestTemplates()368                payload = request_templates.render(invocation_context)369            except Exception as e:370                LOG.warning("Unable to convert API Gateway payload to str", e)371                raise372            # forward records to target kinesis stream373            headers = aws_stack.mock_aws_request_headers(374                service="kinesis", region_name=invocation_context.region_name375            )376            headers["X-Amz-Target"] = target377            result = common.make_http_request(378                url=config.service_url("kineses"), data=payload, headers=headers, method="POST"379            )380            # apply response template381            invocation_context.response = result382            response_templates = ResponseTemplates()383            response_templates.render(invocation_context)384            return invocation_context.response385        elif "states:action/" in uri:386            action = uri.split("/")[-1]387            if APPLICATION_JSON in integration.get("requestTemplates", {}):388                request_templates = RequestTemplates()389                payload = request_templates.render(invocation_context)390                payload = json.loads(payload)391            else:392                # XXX decoding in py3 sounds wrong, this actually might break393                payload = json.loads(data.decode("utf-8"))394            client = aws_stack.connect_to_service("stepfunctions")395            if isinstance(payload.get("input"), dict):396                payload["input"] = json.dumps(payload["input"])397            # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'398            method_name = (399                camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"400            )401            try:402                method = getattr(client, method_name)403            except AttributeError:404                msg = "Invalid step function action: %s" % method_name405                LOG.error(msg)406                return make_error_response(msg, 400)407            result = method(**payload)408            result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})409            response = requests_response(410                content=result,411                headers=aws_stack.mock_aws_request_headers(),412            )413            if action == "StartSyncExecution":414                # poll for the execution result and return it415                result = await_sfn_execution_result(result["executionArn"])416                result_status = result.get("status")417                if result_status != "SUCCEEDED":418                    return make_error_response(419                        "StepFunctions execution %s failed with status '%s'"420                        % (result["executionArn"], result_status),421                        500,422                    )423                result = json_safe(result)424                response = requests_response(content=result)425            # apply response templates426            invocation_context.response = response427            response_templates = ResponseTemplates()428            response_templates.render(invocation_context)429            # response = apply_request_response_templates(430            #     response, response_templates, content_type=APPLICATION_JSON431            # )432            return response433        # https://docs.aws.amazon.com/apigateway/api-reference/resource/integration/434        elif ("s3:path/" in uri or "s3:action/" in uri) and method == "GET":435            s3 = aws_stack.connect_to_service("s3")436            uri = apply_request_parameters(437                uri,438                integration=integration,439                path_params=path_params,440                query_params=query_string_params,441            )442            uri_match = re.match(TARGET_REGEX_PATH_S3_URI, uri) or re.match(443                TARGET_REGEX_ACTION_S3_URI, uri444            )445            if uri_match:446                bucket, object_key = uri_match.group("bucket", "object")447                LOG.debug("Getting request for bucket %s object %s", bucket, object_key)448                try:449                    object = s3.get_object(Bucket=bucket, Key=object_key)450                except s3.exceptions.NoSuchKey:451                    msg = "Object %s not found" % object_key452                    LOG.debug(msg)453                    return make_error_response(msg, 404)454                headers = aws_stack.mock_aws_request_headers(service="s3")455                if object.get("ContentType"):456                    headers["Content-Type"] = object["ContentType"]457                # stream used so large files do not fill memory458                response = request_response_stream(stream=object["Body"], headers=headers)459                return response460            else:461                msg = "Request URI does not match s3 specifications"462                LOG.warning(msg)463                return make_error_response(msg, 400)464        if method == "POST":465            if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:466                template = integration["requestTemplates"][APPLICATION_JSON]467                account_id, queue = uri.split("/")[-2:]468                region_name = uri.split(":")[3]469                if "GetQueueUrl" in template or "CreateQueue" in template:470                    request_templates = RequestTemplates()471                    payload = request_templates.render(invocation_context)472                    new_request = f"{payload}&QueueName={queue}"473                else:474                    request_templates = RequestTemplates()475                    payload = request_templates.render(invocation_context)476                    queue_url = f"{config.get_edge_url()}/{account_id}/{queue}"477                    new_request = f"{payload}&QueueUrl={queue_url}"478                headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)479                url = urljoin(config.service_url("sqs"), f"{TEST_AWS_ACCOUNT_ID}/{queue}")480                result = common.make_http_request(481                    url, method="POST", headers=headers, data=new_request482                )483                return result484            elif uri.startswith("arn:aws:apigateway:") and ":sns:path" in uri:485                invocation_context.context = helpers.get_event_request_context(invocation_context)486                invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)487                integration_response = SnsIntegration().invoke(invocation_context)488                return apply_request_response_templates(489                    integration_response, response_templates, content_type=APPLICATION_JSON490                )491        raise Exception(492            'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'493            % (uri, method)494        )495    elif integration_type == "AWS_PROXY":496        if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:497            # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection498            table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]499            action = uri.split(":dynamodb:action")[1].split("&Table=")[0]500            if "PutItem" in action and method == "PUT":501                response_template = response_templates.get("application/json")502                if response_template is None:503                    msg = "Invalid response template defined in integration response."504                    LOG.info("%s Existing: %s", msg, response_templates)505                    return make_error_response(msg, 404)506                response_template = json.loads(response_template)507                if response_template["TableName"] != table_name:508                    msg = "Invalid table name specified in integration response template."509                    return make_error_response(msg, 404)510                dynamo_client = aws_stack.connect_to_resource("dynamodb")511                table = dynamo_client.Table(table_name)512                event_data = {}513                data_dict = json.loads(data)514                for key, _ in response_template["Item"].items():515                    event_data[key] = data_dict[key]516                table.put_item(Item=event_data)517                response = requests_response(event_data)518                return response519        else:520            raise Exception(521                'API Gateway action uri "%s", integration type %s not yet implemented'522                % (uri, integration_type)523            )524    elif integration_type in ["HTTP_PROXY", "HTTP"]:525        if ":servicediscovery:" in uri:526            # check if this is a servicediscovery integration URI527            client = aws_stack.connect_to_service("servicediscovery")528            service_id = uri.split("/")[-1]529            instances = client.list_instances(ServiceId=service_id)["Instances"]530            instance = (instances or [None])[0]531            if instance and instance.get("Id"):532                uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))533        # apply custom request template534        invocation_context.context = helpers.get_event_request_context(invocation_context)535        invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)536        request_templates = RequestTemplates()537        payload = request_templates.render(invocation_context)538        if isinstance(payload, dict):539            payload = json.dumps(payload)540        uri = apply_request_parameters(541            uri, integration=integration, path_params=path_params, query_params=query_string_params542        )543        result = requests.request(method=method, url=uri, data=payload, headers=headers)544        # apply custom response template545        invocation_context.response = result546        response_templates = ResponseTemplates()547        response_templates.render(invocation_context)548        return invocation_context.response...integration.py
Source:integration.py  
...97                response.headers[header_name] = value.strip("'")98        return response99class SnsIntegration(BackendIntegration):100    def invoke(self, invocation_context: ApiInvocationContext) -> Response:101        invocation_context.context = get_event_request_context(invocation_context)102        try:103            payload = self.request_templates.render(invocation_context)104        except Exception as e:105            LOG.warning("Failed to apply template for SNS integration", e)106            raise107        uri = (108            invocation_context.integration.get("uri")109            or invocation_context.integration.get("integrationUri")110            or ""111        )112        region_name = uri.split(":")[3]113        headers = aws_stack.mock_aws_request_headers(service="sns", region_name=region_name)114        return make_http_request(115            config.service_url("sns"), method="POST", headers=headers, data=payload116        )117class LambdaProxyIntegration(BackendIntegration):118    @classmethod119    def update_content_length(cls, response: Response):120        if response and response.content is not None:121            response.headers["Content-Length"] = str(len(response.content))122    def invoke(self, invocation_context: ApiInvocationContext):123        uri = (124            invocation_context.integration.get("uri")125            or invocation_context.integration.get("integrationUri")126            or ""127        )128        relative_path, query_string_params = extract_query_string_params(129            path=invocation_context.path_with_query_string130        )131        api_id = invocation_context.api_id132        stage = invocation_context.stage133        headers = invocation_context.headers134        resource_path = invocation_context.resource_path135        invocation_context.context = get_event_request_context(invocation_context)136        try:137            path_params = extract_path_params(path=relative_path, extracted_path=resource_path)138            invocation_context.path_params = path_params139        except Exception:140            path_params = {}141        func_arn = uri142        if ":lambda:path" in uri:143            func_arn = uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]144        if invocation_context.authorizer_type:145            invocation_context.context["authorizer"] = invocation_context.auth_context146        payload = self.request_templates.render(invocation_context)147        # TODO: change this signature to InvocationContext as well!148        result = lambda_api.process_apigateway_invocation(149            func_arn,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
