How to use get_event_request_context method in localstack

Best Python code snippet using localstack_python

invocations.py

Source:invocations.py Github

copy

Full Screen

...284 if ":lambda:path" in uri:285 func_arn = (286 uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]287 )288 invocation_context.context = helpers.get_event_request_context(invocation_context)289 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)290 if invocation_context.authorizer_type:291 authorizer_context = {292 invocation_context.authorizer_type: invocation_context.auth_context293 }294 invocation_context.context["authorizer"] = authorizer_context295 request_templates = RequestTemplates()296 payload = request_templates.render(invocation_context)297 # TODO: change this signature to InvocationContext as well!298 result = lambda_api.process_apigateway_invocation(299 func_arn,300 relative_path,301 payload,302 stage,303 api_id,304 headers,305 is_base64_encoded=invocation_context.is_data_base64_encoded,306 path_params=path_params,307 query_string_params=query_string_params,308 method=method,309 resource_path=resource_path,310 request_context=invocation_context.context,311 stage_variables=invocation_context.stage_variables,312 )313 if isinstance(result, FlaskResponse):314 response = flask_to_requests_response(result)315 elif isinstance(result, Response):316 response = result317 else:318 response = LambdaResponse()319 parsed_result = (320 result if isinstance(result, dict) else json.loads(str(result or "{}"))321 )322 parsed_result = common.json_safe(parsed_result)323 parsed_result = {} if parsed_result is None else parsed_result324 response.status_code = int(parsed_result.get("statusCode", 200))325 parsed_headers = parsed_result.get("headers", {})326 if parsed_headers is not None:327 response.headers.update(parsed_headers)328 try:329 result_body = parsed_result.get("body")330 if isinstance(result_body, dict):331 response._content = json.dumps(result_body)332 else:333 body_bytes = to_bytes(to_str(result_body or ""))334 if parsed_result.get("isBase64Encoded", False):335 body_bytes = base64.b64decode(body_bytes)336 response._content = body_bytes337 except Exception as e:338 LOG.warning("Couldn't set Lambda response content: %s", e)339 response._content = "{}"340 update_content_length(response)341 response.multi_value_headers = parsed_result.get("multiValueHeaders") or {}342 # apply custom response template343 invocation_context.response = response344 response_templates = ResponseTemplates()345 response_templates.render(invocation_context)346 invocation_context.response.headers["Content-Length"] = str(len(response.content or ""))347 return invocation_context.response348 raise Exception(349 f'API Gateway integration type "{integration_type}", action "{uri}", method "{method}"'350 )351 elif integration_type == "AWS":352 if "kinesis:action/" in uri:353 if uri.endswith("kinesis:action/PutRecord"):354 target = kinesis_listener.ACTION_PUT_RECORD355 elif uri.endswith("kinesis:action/PutRecords"):356 target = kinesis_listener.ACTION_PUT_RECORDS357 elif uri.endswith("kinesis:action/ListStreams"):358 target = kinesis_listener.ACTION_LIST_STREAMS359 else:360 LOG.info(361 f"Unexpected API Gateway integration URI '{uri}' for integration type {integration_type}",362 )363 target = ""364 try:365 invocation_context.context = helpers.get_event_request_context(invocation_context)366 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)367 request_templates = RequestTemplates()368 payload = request_templates.render(invocation_context)369 except Exception as e:370 LOG.warning("Unable to convert API Gateway payload to str", e)371 raise372 # forward records to target kinesis stream373 headers = aws_stack.mock_aws_request_headers(374 service="kinesis", region_name=invocation_context.region_name375 )376 headers["X-Amz-Target"] = target377 result = common.make_http_request(378 url=config.service_url("kineses"), data=payload, headers=headers, method="POST"379 )380 # apply response template381 invocation_context.response = result382 response_templates = ResponseTemplates()383 response_templates.render(invocation_context)384 return invocation_context.response385 elif "states:action/" in uri:386 action = uri.split("/")[-1]387 if APPLICATION_JSON in integration.get("requestTemplates", {}):388 request_templates = RequestTemplates()389 payload = request_templates.render(invocation_context)390 payload = json.loads(payload)391 else:392 # XXX decoding in py3 sounds wrong, this actually might break393 payload = json.loads(data.decode("utf-8"))394 client = aws_stack.connect_to_service("stepfunctions")395 if isinstance(payload.get("input"), dict):396 payload["input"] = json.dumps(payload["input"])397 # Hot fix since step functions local package responses: Unsupported Operation: 'StartSyncExecution'398 method_name = (399 camel_to_snake_case(action) if action != "StartSyncExecution" else "start_execution"400 )401 try:402 method = getattr(client, method_name)403 except AttributeError:404 msg = "Invalid step function action: %s" % method_name405 LOG.error(msg)406 return make_error_response(msg, 400)407 result = method(**payload)408 result = json_safe({k: result[k] for k in result if k not in "ResponseMetadata"})409 response = requests_response(410 content=result,411 headers=aws_stack.mock_aws_request_headers(),412 )413 if action == "StartSyncExecution":414 # poll for the execution result and return it415 result = await_sfn_execution_result(result["executionArn"])416 result_status = result.get("status")417 if result_status != "SUCCEEDED":418 return make_error_response(419 "StepFunctions execution %s failed with status '%s'"420 % (result["executionArn"], result_status),421 500,422 )423 result = json_safe(result)424 response = requests_response(content=result)425 # apply response templates426 invocation_context.response = response427 response_templates = ResponseTemplates()428 response_templates.render(invocation_context)429 # response = apply_request_response_templates(430 # response, response_templates, content_type=APPLICATION_JSON431 # )432 return response433 # https://docs.aws.amazon.com/apigateway/api-reference/resource/integration/434 elif ("s3:path/" in uri or "s3:action/" in uri) and method == "GET":435 s3 = aws_stack.connect_to_service("s3")436 uri = apply_request_parameters(437 uri,438 integration=integration,439 path_params=path_params,440 query_params=query_string_params,441 )442 uri_match = re.match(TARGET_REGEX_PATH_S3_URI, uri) or re.match(443 TARGET_REGEX_ACTION_S3_URI, uri444 )445 if uri_match:446 bucket, object_key = uri_match.group("bucket", "object")447 LOG.debug("Getting request for bucket %s object %s", bucket, object_key)448 try:449 object = s3.get_object(Bucket=bucket, Key=object_key)450 except s3.exceptions.NoSuchKey:451 msg = "Object %s not found" % object_key452 LOG.debug(msg)453 return make_error_response(msg, 404)454 headers = aws_stack.mock_aws_request_headers(service="s3")455 if object.get("ContentType"):456 headers["Content-Type"] = object["ContentType"]457 # stream used so large files do not fill memory458 response = request_response_stream(stream=object["Body"], headers=headers)459 return response460 else:461 msg = "Request URI does not match s3 specifications"462 LOG.warning(msg)463 return make_error_response(msg, 400)464 if method == "POST":465 if uri.startswith("arn:aws:apigateway:") and ":sqs:path" in uri:466 template = integration["requestTemplates"][APPLICATION_JSON]467 account_id, queue = uri.split("/")[-2:]468 region_name = uri.split(":")[3]469 if "GetQueueUrl" in template or "CreateQueue" in template:470 request_templates = RequestTemplates()471 payload = request_templates.render(invocation_context)472 new_request = f"{payload}&QueueName={queue}"473 else:474 request_templates = RequestTemplates()475 payload = request_templates.render(invocation_context)476 queue_url = f"{config.get_edge_url()}/{account_id}/{queue}"477 new_request = f"{payload}&QueueUrl={queue_url}"478 headers = aws_stack.mock_aws_request_headers(service="sqs", region_name=region_name)479 url = urljoin(config.service_url("sqs"), f"{TEST_AWS_ACCOUNT_ID}/{queue}")480 result = common.make_http_request(481 url, method="POST", headers=headers, data=new_request482 )483 return result484 elif uri.startswith("arn:aws:apigateway:") and ":sns:path" in uri:485 invocation_context.context = helpers.get_event_request_context(invocation_context)486 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)487 integration_response = SnsIntegration().invoke(invocation_context)488 return apply_request_response_templates(489 integration_response, response_templates, content_type=APPLICATION_JSON490 )491 raise Exception(492 'API Gateway AWS integration action URI "%s", method "%s" not yet implemented'493 % (uri, method)494 )495 elif integration_type == "AWS_PROXY":496 if uri.startswith("arn:aws:apigateway:") and ":dynamodb:action" in uri:497 # arn:aws:apigateway:us-east-1:dynamodb:action/PutItem&Table=MusicCollection498 table_name = uri.split(":dynamodb:action")[1].split("&Table=")[1]499 action = uri.split(":dynamodb:action")[1].split("&Table=")[0]500 if "PutItem" in action and method == "PUT":501 response_template = response_templates.get("application/json")502 if response_template is None:503 msg = "Invalid response template defined in integration response."504 LOG.info("%s Existing: %s", msg, response_templates)505 return make_error_response(msg, 404)506 response_template = json.loads(response_template)507 if response_template["TableName"] != table_name:508 msg = "Invalid table name specified in integration response template."509 return make_error_response(msg, 404)510 dynamo_client = aws_stack.connect_to_resource("dynamodb")511 table = dynamo_client.Table(table_name)512 event_data = {}513 data_dict = json.loads(data)514 for key, _ in response_template["Item"].items():515 event_data[key] = data_dict[key]516 table.put_item(Item=event_data)517 response = requests_response(event_data)518 return response519 else:520 raise Exception(521 'API Gateway action uri "%s", integration type %s not yet implemented'522 % (uri, integration_type)523 )524 elif integration_type in ["HTTP_PROXY", "HTTP"]:525 if ":servicediscovery:" in uri:526 # check if this is a servicediscovery integration URI527 client = aws_stack.connect_to_service("servicediscovery")528 service_id = uri.split("/")[-1]529 instances = client.list_instances(ServiceId=service_id)["Instances"]530 instance = (instances or [None])[0]531 if instance and instance.get("Id"):532 uri = "http://%s/%s" % (instance["Id"], invocation_path.lstrip("/"))533 # apply custom request template534 invocation_context.context = helpers.get_event_request_context(invocation_context)535 invocation_context.stage_variables = helpers.get_stage_variables(invocation_context)536 request_templates = RequestTemplates()537 payload = request_templates.render(invocation_context)538 if isinstance(payload, dict):539 payload = json.dumps(payload)540 uri = apply_request_parameters(541 uri, integration=integration, path_params=path_params, query_params=query_string_params542 )543 result = requests.request(method=method, url=uri, data=payload, headers=headers)544 # apply custom response template545 invocation_context.response = result546 response_templates = ResponseTemplates()547 response_templates.render(invocation_context)548 return invocation_context.response...

Full Screen

Full Screen

integration.py

Source:integration.py Github

copy

Full Screen

...97 response.headers[header_name] = value.strip("'")98 return response99class SnsIntegration(BackendIntegration):100 def invoke(self, invocation_context: ApiInvocationContext) -> Response:101 invocation_context.context = get_event_request_context(invocation_context)102 try:103 payload = self.request_templates.render(invocation_context)104 except Exception as e:105 LOG.warning("Failed to apply template for SNS integration", e)106 raise107 uri = (108 invocation_context.integration.get("uri")109 or invocation_context.integration.get("integrationUri")110 or ""111 )112 region_name = uri.split(":")[3]113 headers = aws_stack.mock_aws_request_headers(service="sns", region_name=region_name)114 return make_http_request(115 config.service_url("sns"), method="POST", headers=headers, data=payload116 )117class LambdaProxyIntegration(BackendIntegration):118 @classmethod119 def update_content_length(cls, response: Response):120 if response and response.content is not None:121 response.headers["Content-Length"] = str(len(response.content))122 def invoke(self, invocation_context: ApiInvocationContext):123 uri = (124 invocation_context.integration.get("uri")125 or invocation_context.integration.get("integrationUri")126 or ""127 )128 relative_path, query_string_params = extract_query_string_params(129 path=invocation_context.path_with_query_string130 )131 api_id = invocation_context.api_id132 stage = invocation_context.stage133 headers = invocation_context.headers134 resource_path = invocation_context.resource_path135 invocation_context.context = get_event_request_context(invocation_context)136 try:137 path_params = extract_path_params(path=relative_path, extracted_path=resource_path)138 invocation_context.path_params = path_params139 except Exception:140 path_params = {}141 func_arn = uri142 if ":lambda:path" in uri:143 func_arn = uri.split(":lambda:path")[1].split("functions/")[1].split("/invocations")[0]144 if invocation_context.authorizer_type:145 invocation_context.context["authorizer"] = invocation_context.auth_context146 payload = self.request_templates.render(invocation_context)147 # TODO: change this signature to InvocationContext as well!148 result = lambda_api.process_apigateway_invocation(149 func_arn,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful