How to use authorize_invocation method in localstack

Best Python code snippet using localstack_python

invocations.py

Source:invocations.py Github

copy

Full Screen

...114# ------------115def run_authorizer(invocation_context: ApiInvocationContext, authorizer: Dict):116 # TODO implement authorizers117 pass118def authorize_invocation(invocation_context: ApiInvocationContext):119 client = aws_stack.connect_to_service("apigateway")120 authorizers = client.get_authorizers(restApiId=invocation_context.api_id, limit=100).get(121 "items", []122 )123 for authorizer in authorizers:124 run_authorizer(invocation_context, authorizer)125def validate_api_key(api_key: str, stage: str):126 usage_plan_ids = []127 client = aws_stack.connect_to_service("apigateway")128 usage_plans = client.get_usage_plans()129 for item in usage_plans.get("items", []):130 api_stages = item.get("apiStages", [])131 usage_plan_ids.extend(132 item.get("id") for api_stage in api_stages if api_stage.get("stage") == stage133 )134 for usage_plan_id in usage_plan_ids:135 usage_plan_keys = client.get_usage_plan_keys(usagePlanId=usage_plan_id)136 for key in usage_plan_keys.get("items", []):137 if key.get("value") == api_key:138 return True139 return False140def is_api_key_valid(is_api_key_required: bool, headers: Dict[str, str], stage: str):141 if not is_api_key_required:142 return True143 api_key = headers.get("X-API-Key")144 if not api_key:145 return False146 return validate_api_key(api_key, stage)147def update_content_length(response: Response):148 if response and response.content is not None:149 response.headers["Content-Length"] = str(len(response.content))150def apply_request_parameters(151 uri: str, integration: Dict[str, Any], path_params: Dict[str, str], query_params: Dict[str, str]152):153 request_parameters = integration.get("requestParameters")154 uri = uri or integration.get("uri") or integration.get("integrationUri") or ""155 if request_parameters:156 for key in path_params:157 # check if path_params is present in the integration request parameters158 request_param_key = f"integration.request.path.{key}"159 request_param_value = f"method.request.path.{key}"160 if request_parameters.get(request_param_key) == request_param_value:161 uri = uri.replace(f"{{{key}}}", path_params[key])162 if integration.get("type") != "HTTP_PROXY" and request_parameters:163 for key in query_params.copy():164 request_query_key = f"integration.request.querystring.{key}"165 request_param_val = f"method.request.querystring.{key}"166 if request_parameters.get(request_query_key, None) != request_param_val:167 query_params.pop(key)168 return add_query_params_to_url(uri, query_params)169def apply_response_parameters(invocation_context: ApiInvocationContext):170 response = invocation_context.response171 integration = invocation_context.integration172 int_responses = integration.get("integrationResponses") or {}173 if not int_responses:174 return response175 entries = list(int_responses.keys())176 return_code = str(response.status_code)177 if return_code not in entries:178 if len(entries) > 1:179 LOG.info("Found multiple integration response status codes: %s", entries)180 return response181 return_code = entries[0]182 response_params = int_responses[return_code].get("responseParameters", {})183 for key, value in response_params.items():184 # TODO: add support for method.response.body, etc ...185 if str(key).lower().startswith("method.response.header."):186 header_name = key[len("method.response.header.") :]187 response.headers[header_name] = value.strip("'")188 return response189def invoke_rest_api_from_request(invocation_context: ApiInvocationContext):190 helpers.set_api_id_stage_invocation_path(invocation_context)191 try:192 return invoke_rest_api(invocation_context)193 except AuthorizationError as e:194 api_id = invocation_context.api_id195 return make_error_response("Not authorized to invoke REST API %s: %s" % (api_id, e), 403)196def invoke_rest_api(invocation_context: ApiInvocationContext):197 invocation_path = invocation_context.path_with_query_string198 raw_path = invocation_context.path or invocation_path199 method = invocation_context.method200 headers = invocation_context.headers201 # run gateway authorizers for this request202 authorize_invocation(invocation_context)203 extracted_path, resource = helpers.get_target_resource_details(invocation_context)204 if not resource:205 return make_error_response("Unable to find path %s" % invocation_context.path, 404)206 # validate request207 validator = RequestValidator(invocation_context, aws_stack.connect_to_service("apigateway"))208 if not validator.is_request_valid():209 return make_error_response("Invalid request body", 400)210 api_key_required = resource.get("resourceMethods", {}).get(method, {}).get("apiKeyRequired")211 if not is_api_key_valid(api_key_required, headers, invocation_context.stage):212 return make_error_response("Access denied - invalid API key", 403)213 integrations = resource.get("resourceMethods", {})214 integration = integrations.get(method, {})215 if not integration:216 # HttpMethod: '*'...

Full Screen

Full Screen

apigateway_listener.py

Source:apigateway_listener.py Github

copy

Full Screen

...114 return data115def run_authorizer(api_id, headers, authorizer):116 # TODO implement authorizers117 pass118def authorize_invocation(api_id, headers):119 client = aws_stack.connect_to_service('apigateway')120 authorizers = client.get_authorizers(restApiId=api_id, limit=100).get('items', [])121 for authorizer in authorizers:122 run_authorizer(api_id, headers, authorizer)123def validate_api_key(api_key, stage):124 key = None125 usage_plan_id = None126 client = aws_stack.connect_to_service('apigateway')127 usage_plans = client.get_usage_plans()128 for item in usage_plans.get('items', []):129 api_stages = item.get('apiStages', [])130 for api_stage in api_stages:131 if api_stage.get('stage') == stage:132 usage_plan_id = item.get('id')133 if not usage_plan_id:134 return False135 usage_plan_keys = client.get_usage_plan_keys(usagePlanId=usage_plan_id)136 for item in usage_plan_keys.get('items', []):137 key = item.get('value')138 if key != api_key:139 return False140 return True141def is_api_key_valid(is_api_key_required, headers, stage):142 if not is_api_key_required:143 return True144 api_key = headers.get('X-API-Key')145 if not api_key:146 return False147 return validate_api_key(api_key, stage)148def update_content_length(response):149 if response and response.content:150 response.headers['Content-Length'] = str(len(response.content))151def apply_template(integration, req_res_type, data, path_params={}, query_params={}, headers={}):152 if integration['type'] in ['HTTP', 'AWS']:153 # apply custom request template154 template = integration.get('%sTemplates' % req_res_type, {}).get(APPLICATION_JSON)155 if template:156 context = {}157 context['body'] = data158 def _params(name=None):159 # See https://docs.aws.amazon.com/apigateway/latest/developerguide/160 # api-gateway-mapping-template-reference.html#input-variable-reference161 # Returns "request parameter from the path, query string, or header value (searched in that order)"162 combined = {}163 combined.update(path_params or {})164 combined.update(query_params or {})165 combined.update(headers or {})166 return combined if not name else combined.get(name)167 context['params'] = _params168 data = aws_stack.render_velocity_template(template, context)169 return data170def get_api_id_stage_invocation_path(path):171 search_match = re.search(PATH_REGEX_USER_REQUEST, path)172 api_id = search_match.group(1)173 stage = search_match.group(2)174 relative_path_w_query_params = '/%s' % search_match.group(3)175 return api_id, stage, relative_path_w_query_params176def invoke_rest_api_from_request(method, path, data, headers, context={}):177 api_id, stage, relative_path_w_query_params = get_api_id_stage_invocation_path(path)178 try:179 return invoke_rest_api(api_id, stage, method, relative_path_w_query_params,180 data, headers, path=path, context=context)181 except AuthorizationError as e:182 return make_error_response('Not authorized to invoke REST API %s: %s' % (api_id, e), 403)183def invoke_rest_api(api_id, stage, method, invocation_path, data, headers, path=None, context={}):184 path = path or invocation_path185 relative_path, query_string_params = extract_query_string_params(path=invocation_path)186 # run gateway authorizers for this request187 authorize_invocation(api_id, headers)188 path_map = helpers.get_rest_api_paths(rest_api_id=api_id)189 try:190 extracted_path, resource = get_resource_for_path(path=relative_path, path_map=path_map)191 except Exception:192 return make_error_response('Unable to find path %s' % path, 404)193 api_key_required = resource.get('resourceMethods', {}).get(method, {}).get('apiKeyRequired')194 if not is_api_key_valid(api_key_required, headers, stage):195 return make_error_response('Access denied - invalid API key', 403)196 integrations = resource.get('resourceMethods', {})197 integration = integrations.get(method, {})198 if not integration:199 integration = integrations.get('ANY', {})200 integration = integration.get('methodIntegration')201 if not integration:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful