Best Python code snippet using localstack_python
responses.py
Source:responses.py  
...146            return self.delete_vpc_link()147        if request.method == "GET":148            return self.get_vpc_link()149        if request.method == "PATCH":150            return self.update_vpc_link()151    def vpc_links(self, request, full_url, headers):152        self.setup_class(request, full_url, headers)153        if request.method == "GET":154            return self.get_vpc_links()155        if request.method == "POST":156            return self.create_vpc_link()157    def create_api(self):158        params = json.loads(self.body)159        api_key_selection_expression = params.get("apiKeySelectionExpression")160        cors_configuration = params.get("corsConfiguration")161        credentials_arn = params.get("credentialsArn")162        description = params.get("description")163        disable_schema_validation = params.get("disableSchemaValidation")164        disable_execute_api_endpoint = params.get("disableExecuteApiEndpoint")165        name = params.get("name")166        protocol_type = params.get("protocolType")167        route_key = params.get("routeKey")168        route_selection_expression = params.get("routeSelectionExpression")169        tags = params.get("tags")170        target = params.get("target")171        version = params.get("version")172        if protocol_type not in ["HTTP", "WEBSOCKET"]:173            raise UnknownProtocol174        api = self.apigatewayv2_backend.create_api(175            api_key_selection_expression=api_key_selection_expression,176            cors_configuration=cors_configuration,177            credentials_arn=credentials_arn,178            description=description,179            disable_schema_validation=disable_schema_validation,180            disable_execute_api_endpoint=disable_execute_api_endpoint,181            name=name,182            protocol_type=protocol_type,183            route_key=route_key,184            route_selection_expression=route_selection_expression,185            tags=tags,186            target=target,187            version=version,188        )189        return 200, {}, json.dumps(api.to_json())190    def delete_api(self):191        api_id = self.path.split("/")[-1]192        self.apigatewayv2_backend.delete_api(api_id=api_id)193        return 200, "", "{}"194    def get_api(self):195        api_id = self.path.split("/")[-1]196        api = self.apigatewayv2_backend.get_api(api_id=api_id)197        return 200, {}, json.dumps(api.to_json())198    def get_apis(self):199        apis = self.apigatewayv2_backend.get_apis()200        return 200, {}, json.dumps({"items": [a.to_json() for a in apis]})201    def update_api(self):202        api_id = self.path.split("/")[-1]203        params = json.loads(self.body)204        api_key_selection_expression = params.get("apiKeySelectionExpression")205        cors_configuration = params.get("corsConfiguration")206        description = params.get("description")207        disable_schema_validation = params.get("disableSchemaValidation")208        disable_execute_api_endpoint = params.get("disableExecuteApiEndpoint")209        name = params.get("name")210        route_selection_expression = params.get("routeSelectionExpression")211        version = params.get("version")212        api = self.apigatewayv2_backend.update_api(213            api_id=api_id,214            api_key_selection_expression=api_key_selection_expression,215            cors_configuration=cors_configuration,216            description=description,217            disable_schema_validation=disable_schema_validation,218            disable_execute_api_endpoint=disable_execute_api_endpoint,219            name=name,220            route_selection_expression=route_selection_expression,221            version=version,222        )223        return 200, {}, json.dumps(api.to_json())224    def reimport_api(self):225        api_id = self.path.split("/")[-1]226        params = json.loads(self.body)227        body = params.get("body")228        fail_on_warnings = (229            str(self._get_param("failOnWarnings", "false")).lower() == "true"230        )231        api = self.apigatewayv2_backend.reimport_api(api_id, body, fail_on_warnings)232        return 201, {}, json.dumps(api.to_json())233    def create_authorizer(self):234        api_id = self.path.split("/")[-2]235        params = json.loads(self.body)236        auth_creds_arn = params.get("authorizerCredentialsArn")237        auth_payload_format_version = params.get("authorizerPayloadFormatVersion")238        auth_result_ttl = params.get("authorizerResultTtlInSeconds")239        authorizer_type = params.get("authorizerType")240        authorizer_uri = params.get("authorizerUri")241        enable_simple_response = params.get("enableSimpleResponses")242        identity_source = params.get("identitySource")243        identity_validation_expr = params.get("identityValidationExpression")244        jwt_config = params.get("jwtConfiguration")245        name = params.get("name")246        authorizer = self.apigatewayv2_backend.create_authorizer(247            api_id,248            auth_creds_arn=auth_creds_arn,249            auth_payload_format_version=auth_payload_format_version,250            auth_result_ttl=auth_result_ttl,251            authorizer_type=authorizer_type,252            authorizer_uri=authorizer_uri,253            enable_simple_response=enable_simple_response,254            identity_source=identity_source,255            identity_validation_expr=identity_validation_expr,256            jwt_config=jwt_config,257            name=name,258        )259        return 200, {}, json.dumps(authorizer.to_json())260    def delete_authorizer(self):261        api_id = self.path.split("/")[-3]262        authorizer_id = self.path.split("/")[-1]263        self.apigatewayv2_backend.delete_authorizer(api_id, authorizer_id)264        return 200, {}, "{}"265    def get_authorizer(self):266        api_id = self.path.split("/")[-3]267        authorizer_id = self.path.split("/")[-1]268        authorizer = self.apigatewayv2_backend.get_authorizer(api_id, authorizer_id)269        return 200, {}, json.dumps(authorizer.to_json())270    def update_authorizer(self):271        api_id = self.path.split("/")[-3]272        authorizer_id = self.path.split("/")[-1]273        params = json.loads(self.body)274        auth_creds_arn = params.get("authorizerCredentialsArn")275        auth_payload_format_version = params.get("authorizerPayloadFormatVersion")276        auth_result_ttl = params.get("authorizerResultTtlInSeconds")277        authorizer_type = params.get("authorizerType")278        authorizer_uri = params.get("authorizerUri")279        enable_simple_response = params.get("enableSimpleResponses")280        identity_source = params.get("identitySource")281        identity_validation_expr = params.get("identityValidationExpression")282        jwt_config = params.get("jwtConfiguration")283        name = params.get("name")284        authorizer = self.apigatewayv2_backend.update_authorizer(285            api_id,286            authorizer_id=authorizer_id,287            auth_creds_arn=auth_creds_arn,288            auth_payload_format_version=auth_payload_format_version,289            auth_result_ttl=auth_result_ttl,290            authorizer_type=authorizer_type,291            authorizer_uri=authorizer_uri,292            enable_simple_response=enable_simple_response,293            identity_source=identity_source,294            identity_validation_expr=identity_validation_expr,295            jwt_config=jwt_config,296            name=name,297        )298        return 200, {}, json.dumps(authorizer.to_json())299    def delete_cors_configuration(self):300        api_id = self.path.split("/")[-2]301        self.apigatewayv2_backend.delete_cors_configuration(api_id)302        return 200, {}, "{}"303    def create_model(self):304        api_id = self.path.split("/")[-2]305        params = json.loads(self.body)306        content_type = params.get("contentType")307        description = params.get("description")308        name = params.get("name")309        schema = params.get("schema")310        model = self.apigatewayv2_backend.create_model(311            api_id, content_type, description, name, schema312        )313        return 200, {}, json.dumps(model.to_json())314    def delete_model(self):315        api_id = self.path.split("/")[-3]316        model_id = self.path.split("/")[-1]317        self.apigatewayv2_backend.delete_model(api_id, model_id)318        return 200, {}, "{}"319    def get_model(self):320        api_id = self.path.split("/")[-3]321        model_id = self.path.split("/")[-1]322        model = self.apigatewayv2_backend.get_model(api_id, model_id)323        return 200, {}, json.dumps(model.to_json())324    def update_model(self):325        api_id = self.path.split("/")[-3]326        model_id = self.path.split("/")[-1]327        params = json.loads(self.body)328        content_type = params.get("contentType")329        description = params.get("description")330        name = params.get("name")331        schema = params.get("schema")332        model = self.apigatewayv2_backend.update_model(333            api_id,334            model_id,335            content_type=content_type,336            description=description,337            name=name,338            schema=schema,339        )340        return 200, {}, json.dumps(model.to_json())341    def get_tags(self):342        resource_arn = unquote(self.path.split("/tags/")[1])343        tags = self.apigatewayv2_backend.get_tags(resource_arn)344        return 200, {}, json.dumps({"tags": tags})345    def tag_resource(self):346        resource_arn = unquote(self.path.split("/tags/")[1])347        tags = json.loads(self.body).get("tags", {})348        self.apigatewayv2_backend.tag_resource(resource_arn, tags)349        return 201, {}, "{}"350    def untag_resource(self):351        resource_arn = unquote(self.path.split("/tags/")[1])352        tag_keys = self.querystring.get("tagKeys")353        self.apigatewayv2_backend.untag_resource(resource_arn, tag_keys)354        return 200, {}, "{}"355    def create_route(self):356        api_id = self.path.split("/")[-2]357        params = json.loads(self.body)358        api_key_required = params.get("apiKeyRequired", False)359        authorization_scopes = params.get("authorizationScopes")360        authorization_type = params.get("authorizationType", "NONE")361        authorizer_id = params.get("authorizerId")362        model_selection_expression = params.get("modelSelectionExpression")363        operation_name = params.get("operationName")364        request_models = params.get("requestModels")365        request_parameters = params.get("requestParameters")366        route_key = params.get("routeKey")367        route_response_selection_expression = params.get(368            "routeResponseSelectionExpression"369        )370        target = params.get("target")371        route = self.apigatewayv2_backend.create_route(372            api_id=api_id,373            api_key_required=api_key_required,374            authorization_scopes=authorization_scopes,375            authorization_type=authorization_type,376            authorizer_id=authorizer_id,377            model_selection_expression=model_selection_expression,378            operation_name=operation_name,379            request_models=request_models,380            request_parameters=request_parameters,381            route_key=route_key,382            route_response_selection_expression=route_response_selection_expression,383            target=target,384        )385        return 201, {}, json.dumps(route.to_json())386    def delete_route(self):387        api_id = self.path.split("/")[-3]388        route_id = self.path.split("/")[-1]389        self.apigatewayv2_backend.delete_route(api_id=api_id, route_id=route_id)390        return 200, {}, "{}"391    def delete_route_request_parameter(self):392        api_id = self.path.split("/")[-5]393        route_id = self.path.split("/")[-3]394        request_param = self.path.split("/")[-1]395        self.apigatewayv2_backend.delete_route_request_parameter(396            api_id, route_id, request_param397        )398        return 200, {}, "{}"399    def get_route(self):400        api_id = self.path.split("/")[-3]401        route_id = self.path.split("/")[-1]402        api = self.apigatewayv2_backend.get_route(api_id=api_id, route_id=route_id)403        return 200, {}, json.dumps(api.to_json())404    def get_routes(self):405        api_id = self.path.split("/")[-2]406        apis = self.apigatewayv2_backend.get_routes(api_id=api_id)407        return 200, {}, json.dumps({"items": [api.to_json() for api in apis]})408    def update_route(self):409        api_id = self.path.split("/")[-3]410        route_id = self.path.split("/")[-1]411        params = json.loads(self.body)412        api_key_required = params.get("apiKeyRequired")413        authorization_scopes = params.get("authorizationScopes")414        authorization_type = params.get("authorizationType")415        authorizer_id = params.get("authorizerId")416        model_selection_expression = params.get("modelSelectionExpression")417        operation_name = params.get("operationName")418        request_models = params.get("requestModels")419        request_parameters = params.get("requestParameters")420        route_key = params.get("routeKey")421        route_response_selection_expression = params.get(422            "routeResponseSelectionExpression"423        )424        target = params.get("target")425        api = self.apigatewayv2_backend.update_route(426            api_id=api_id,427            api_key_required=api_key_required,428            authorization_scopes=authorization_scopes,429            authorization_type=authorization_type,430            authorizer_id=authorizer_id,431            model_selection_expression=model_selection_expression,432            operation_name=operation_name,433            request_models=request_models,434            request_parameters=request_parameters,435            route_id=route_id,436            route_key=route_key,437            route_response_selection_expression=route_response_selection_expression,438            target=target,439        )440        return 200, {}, json.dumps(api.to_json())441    def create_route_response(self):442        api_id = self.path.split("/")[-4]443        route_id = self.path.split("/")[-2]444        params = json.loads(self.body)445        response_models = params.get("responseModels")446        route_response_key = params.get("routeResponseKey")447        model_selection_expression = params.get("modelSelectionExpression")448        route_response = self.apigatewayv2_backend.create_route_response(449            api_id,450            route_id,451            route_response_key,452            model_selection_expression=model_selection_expression,453            response_models=response_models,454        )455        return 200, {}, json.dumps(route_response.to_json())456    def delete_route_response(self):457        api_id = self.path.split("/")[-5]458        route_id = self.path.split("/")[-3]459        route_response_id = self.path.split("/")[-1]460        self.apigatewayv2_backend.delete_route_response(461            api_id, route_id, route_response_id462        )463        return 200, {}, "{}"464    def get_route_response(self):465        api_id = self.path.split("/")[-5]466        route_id = self.path.split("/")[-3]467        route_response_id = self.path.split("/")[-1]468        route_response = self.apigatewayv2_backend.get_route_response(469            api_id, route_id, route_response_id470        )471        return 200, {}, json.dumps(route_response.to_json())472    def create_integration(self):473        api_id = self.path.split("/")[-2]474        params = json.loads(self.body)475        connection_id = params.get("connectionId")476        connection_type = params.get("connectionType")477        content_handling_strategy = params.get("contentHandlingStrategy")478        credentials_arn = params.get("credentialsArn")479        description = params.get("description")480        integration_method = params.get("integrationMethod")481        integration_subtype = params.get("integrationSubtype")482        integration_type = params.get("integrationType")483        integration_uri = params.get("integrationUri")484        passthrough_behavior = params.get("passthroughBehavior")485        payload_format_version = params.get("payloadFormatVersion")486        request_parameters = params.get("requestParameters")487        request_templates = params.get("requestTemplates")488        response_parameters = params.get("responseParameters")489        template_selection_expression = params.get("templateSelectionExpression")490        timeout_in_millis = params.get("timeoutInMillis")491        tls_config = params.get("tlsConfig")492        integration = self.apigatewayv2_backend.create_integration(493            api_id=api_id,494            connection_id=connection_id,495            connection_type=connection_type,496            content_handling_strategy=content_handling_strategy,497            credentials_arn=credentials_arn,498            description=description,499            integration_method=integration_method,500            integration_subtype=integration_subtype,501            integration_type=integration_type,502            integration_uri=integration_uri,503            passthrough_behavior=passthrough_behavior,504            payload_format_version=payload_format_version,505            request_parameters=request_parameters,506            request_templates=request_templates,507            response_parameters=response_parameters,508            template_selection_expression=template_selection_expression,509            timeout_in_millis=timeout_in_millis,510            tls_config=tls_config,511        )512        return 200, {}, json.dumps(integration.to_json())513    def get_integration(self):514        api_id = self.path.split("/")[-3]515        integration_id = self.path.split("/")[-1]516        integration = self.apigatewayv2_backend.get_integration(517            api_id=api_id, integration_id=integration_id518        )519        return 200, {}, json.dumps(integration.to_json())520    def get_integrations(self):521        api_id = self.path.split("/")[-2]522        integrations = self.apigatewayv2_backend.get_integrations(api_id=api_id)523        return 200, {}, json.dumps({"items": [i.to_json() for i in integrations]})524    def delete_integration(self):525        api_id = self.path.split("/")[-3]526        integration_id = self.path.split("/")[-1]527        self.apigatewayv2_backend.delete_integration(528            api_id=api_id, integration_id=integration_id,529        )530        return 200, {}, "{}"531    def update_integration(self):532        api_id = self.path.split("/")[-3]533        integration_id = self.path.split("/")[-1]534        params = json.loads(self.body)535        connection_id = params.get("connectionId")536        connection_type = params.get("connectionType")537        content_handling_strategy = params.get("contentHandlingStrategy")538        credentials_arn = params.get("credentialsArn")539        description = params.get("description")540        integration_method = params.get("integrationMethod")541        integration_subtype = params.get("integrationSubtype")542        integration_type = params.get("integrationType")543        integration_uri = params.get("integrationUri")544        passthrough_behavior = params.get("passthroughBehavior")545        payload_format_version = params.get("payloadFormatVersion")546        request_parameters = params.get("requestParameters")547        request_templates = params.get("requestTemplates")548        response_parameters = params.get("responseParameters")549        template_selection_expression = params.get("templateSelectionExpression")550        timeout_in_millis = params.get("timeoutInMillis")551        tls_config = params.get("tlsConfig")552        integration = self.apigatewayv2_backend.update_integration(553            api_id=api_id,554            connection_id=connection_id,555            connection_type=connection_type,556            content_handling_strategy=content_handling_strategy,557            credentials_arn=credentials_arn,558            description=description,559            integration_id=integration_id,560            integration_method=integration_method,561            integration_subtype=integration_subtype,562            integration_type=integration_type,563            integration_uri=integration_uri,564            passthrough_behavior=passthrough_behavior,565            payload_format_version=payload_format_version,566            request_parameters=request_parameters,567            request_templates=request_templates,568            response_parameters=response_parameters,569            template_selection_expression=template_selection_expression,570            timeout_in_millis=timeout_in_millis,571            tls_config=tls_config,572        )573        return 200, {}, json.dumps(integration.to_json())574    def create_integration_response(self):575        api_id = self.path.split("/")[-4]576        int_id = self.path.split("/")[-2]577        params = json.loads(self.body)578        content_handling_strategy = params.get("contentHandlingStrategy")579        integration_response_key = params.get("integrationResponseKey")580        response_parameters = params.get("responseParameters")581        response_templates = params.get("responseTemplates")582        template_selection_expression = params.get("templateSelectionExpression")583        integration_response = self.apigatewayv2_backend.create_integration_response(584            api_id=api_id,585            integration_id=int_id,586            content_handling_strategy=content_handling_strategy,587            integration_response_key=integration_response_key,588            response_parameters=response_parameters,589            response_templates=response_templates,590            template_selection_expression=template_selection_expression,591        )592        return 200, {}, json.dumps(integration_response.to_json())593    def delete_integration_response(self):594        api_id = self.path.split("/")[-5]595        int_id = self.path.split("/")[-3]596        int_res_id = self.path.split("/")[-1]597        self.apigatewayv2_backend.delete_integration_response(598            api_id=api_id, integration_id=int_id, integration_response_id=int_res_id599        )600        return 200, {}, "{}"601    def get_integration_response(self):602        api_id = self.path.split("/")[-5]603        int_id = self.path.split("/")[-3]604        int_res_id = self.path.split("/")[-1]605        int_response = self.apigatewayv2_backend.get_integration_response(606            api_id=api_id, integration_id=int_id, integration_response_id=int_res_id607        )608        return 200, {}, json.dumps(int_response.to_json())609    def get_integration_responses(self):610        api_id = self.path.split("/")[-4]611        int_id = self.path.split("/")[-2]612        int_response = self.apigatewayv2_backend.get_integration_responses(613            api_id=api_id, integration_id=int_id614        )615        return 200, {}, json.dumps({"items": [res.to_json() for res in int_response]})616    def update_integration_response(self):617        api_id = self.path.split("/")[-5]618        int_id = self.path.split("/")[-3]619        int_res_id = self.path.split("/")[-1]620        params = json.loads(self.body)621        content_handling_strategy = params.get("contentHandlingStrategy")622        integration_response_key = params.get("integrationResponseKey")623        response_parameters = params.get("responseParameters")624        response_templates = params.get("responseTemplates")625        template_selection_expression = params.get("templateSelectionExpression")626        integration_response = self.apigatewayv2_backend.update_integration_response(627            api_id=api_id,628            integration_id=int_id,629            integration_response_id=int_res_id,630            content_handling_strategy=content_handling_strategy,631            integration_response_key=integration_response_key,632            response_parameters=response_parameters,633            response_templates=response_templates,634            template_selection_expression=template_selection_expression,635        )636        return 200, {}, json.dumps(integration_response.to_json())637    def create_vpc_link(self):638        params = json.loads(self.body)639        name = params.get("name")640        sg_ids = params.get("securityGroupIds")641        subnet_ids = params.get("subnetIds")642        tags = params.get("tags")643        vpc_link = self.apigatewayv2_backend.create_vpc_link(644            name, sg_ids, subnet_ids, tags645        )646        return 200, {}, json.dumps(vpc_link.to_json())647    def delete_vpc_link(self):648        vpc_link_id = self.path.split("/")[-1]649        self.apigatewayv2_backend.delete_vpc_link(vpc_link_id)650        return 200, {}, "{}"651    def get_vpc_link(self):652        vpc_link_id = self.path.split("/")[-1]653        vpc_link = self.apigatewayv2_backend.get_vpc_link(vpc_link_id)654        return 200, {}, json.dumps(vpc_link.to_json())655    def get_vpc_links(self):656        vpc_links = self.apigatewayv2_backend.get_vpc_links()657        return 200, {}, json.dumps({"items": [l.to_json() for l in vpc_links]})658    def update_vpc_link(self):659        vpc_link_id = self.path.split("/")[-1]660        params = json.loads(self.body)661        name = params.get("name")662        vpc_link = self.apigatewayv2_backend.update_vpc_link(vpc_link_id, name=name)...test_apigatewayv2_vpclinks.py
Source:test_apigatewayv2_vpclinks.py  
...84    client.delete_vpc_link(VpcLinkId=vpc_link_id)85    links = client.get_vpc_links()["Items"]86    links.should.have.length_of(0)87@mock_apigatewayv288def test_update_vpc_link():89    client = boto3.client("apigatewayv2", region_name="eu-north-1")90    vpc_link_id = client.create_vpc_link(91        Name="vpcl",92        SecurityGroupIds=["sg1", "sg2"],93        SubnetIds=["sid1", "sid2"],94        Tags={"key1": "value1"},95    )["VpcLinkId"]96    resp = client.update_vpc_link(VpcLinkId=vpc_link_id, Name="vpcl2")97    resp.should.have.key("CreatedDate")98    resp.should.have.key("Name").equals("vpcl2")99    resp.should.have.key("SecurityGroupIds").equals(["sg1", "sg2"])100    resp.should.have.key("SubnetIds").equals(["sid1", "sid2"])101    resp.should.have.key("Tags").equals({"key1": "value1"})102    resp.should.have.key("VpcLinkId")103    resp.should.have.key("VpcLinkStatus").equals("AVAILABLE")104    resp.should.have.key("VpcLinkVersion").equals("V2")105@mock_apigatewayv2106def test_untag_vpc_link():107    client = boto3.client("apigatewayv2", region_name="eu-west-1")108    vpc_link_id = client.create_vpc_link(109        Name="vpcl",110        SecurityGroupIds=["sg1", "sg2"],...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
