Best Python code snippet using localstack_python
lambda_api.py
Source:lambda_api.py  
...208def get_stage_variables(api_id, stage):209    api_gateway_client = aws_stack.connect_to_service('apigateway')210    response = api_gateway_client.get_stage(restApiId=api_id, stageName=stage)211    return response.get('variables', None)212def fix_proxy_path_params(path_params):213    proxy_path_param_value = path_params.get('proxy+')214    if not proxy_path_param_value:215        return216    del path_params['proxy+']217    path_params['proxy'] = proxy_path_param_value218def message_attributes_to_lower(message_attrs):219    """ Convert message attribute details (first characters) to lower case (e.g., stringValue, dataType). """220    message_attrs = message_attrs or {}221    for _, attr in message_attrs.items():222        if not isinstance(attr, dict):223            continue224        for key, value in dict(attr).items():225            attr[first_char_to_lower(key)] = attr.pop(key)226    return message_attrs227def process_apigateway_invocation(func_arn, path, payload, stage, api_id, headers={},228                                  resource_path=None, method=None, path_params={},229                                  query_string_params=None, request_context={}, event_context={}):230    try:231        resource_path = resource_path or path232        path_params = dict(path_params)233        fix_proxy_path_params(path_params)234        event = {235            'path': path,236            'headers': dict(headers),237            'multiValueHeaders': multi_value_dict_for_list(headers),238            'pathParameters': path_params,239            'body': payload,240            'isBase64Encoded': False,241            'resource': resource_path,242            'httpMethod': method,243            'queryStringParameters': query_string_params,244            'multiValueQueryStringParameters': multi_value_dict_for_list(query_string_params),245            'requestContext': request_context,246            'stageVariables': get_stage_variables(api_id, stage),247        }...integration.py
Source:integration.py  
...149            key = to_str(key)150            temp_mv_dict[key].append(value)151        return dict((k, tuple(v)) for k, v in temp_mv_dict.items())152    @staticmethod153    def fix_proxy_path_params(path_params):154        proxy_path_param_value = path_params.get("proxy+")155        if not proxy_path_param_value:156            return157        del path_params["proxy+"]158        path_params["proxy"] = proxy_path_param_value159    @classmethod160    def construct_invocation_event(161        cls, method, path, headers, data, query_string_params=None, is_base64_encoded=False162    ):163        query_string_params = query_string_params or parse_request_data(method, path, "")164        # AWS canonical header names, converting them to lower-case165        headers = canonicalize_headers(headers)166        return {167            "path": path,168            "headers": dict(headers),169            "multiValueHeaders": cls.multi_value_dict_for_list(headers),170            "body": data,171            "isBase64Encoded": is_base64_encoded,172            "httpMethod": method,173            "queryStringParameters": query_string_params or None,174            "multiValueQueryStringParameters": cls.multi_value_dict_for_list(query_string_params)175            or None,176        }177    @classmethod178    def process_apigateway_invocation(179        cls,180        func_arn,181        path,182        payload,183        invocation_context: ApiInvocationContext,184        query_string_params=None,185    ) -> str:186        if (path_params := invocation_context.path_params) is None:187            path_params = {}188        if (request_context := invocation_context.context) is None:189            request_context = {}190        try:191            resource_path = invocation_context.resource_path or path192            event = cls.construct_invocation_event(193                invocation_context.method,194                path,195                invocation_context.headers,196                payload,197                query_string_params,198                invocation_context.is_data_base64_encoded,199            )200            path_params = dict(path_params)201            cls.fix_proxy_path_params(path_params)202            event["pathParameters"] = path_params203            event["resource"] = resource_path204            event["requestContext"] = request_context205            event["stageVariables"] = invocation_context.stage_variables206            LOG.debug(207                "Running Lambda function %s from API Gateway invocation: %s %s",208                func_arn,209                invocation_context.method or "GET",210                path,211            )212            asynchronous = invocation_context.headers.get("X-Amz-Invocation-Type") == "'Event'"213            return call_lambda(214                function_arn=func_arn, event=to_bytes(json.dumps(event)), asynchronous=asynchronous215            )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
