Best Python code snippet using localstack_python
test_apigateway.py
Source:test_apigateway.py  
1import json2import unittest3from unittest.mock import Mock4import boto35from localstack import config6from localstack.constants import APPLICATION_JSON7from localstack.services.apigateway import apigateway_listener8from localstack.services.apigateway.apigateway_listener import (9    ApiInvocationContext,10    RequestValidator,11    apply_template,12)13from localstack.services.apigateway.helpers import apply_json_patch_safe14from localstack.utils.aws import templating15from localstack.utils.common import clone16class ApiGatewayPathsTest(unittest.TestCase):17    def test_extract_query_params(self):18        path, query_params = apigateway_listener.extract_query_string_params(19            "/foo/bar?foo=foo&bar=bar&bar=baz"20        )21        self.assertEqual("/foo/bar", path)22        self.assertEqual({"foo": "foo", "bar": ["bar", "baz"]}, query_params)23    def test_extract_path_params(self):24        params = apigateway_listener.extract_path_params("/foo/bar", "/foo/{param1}")25        self.assertEqual({"param1": "bar"}, params)26        params = apigateway_listener.extract_path_params("/foo/bar1/bar2", "/foo/{param1}/{param2}")27        self.assertEqual({"param1": "bar1", "param2": "bar2"}, params)28        params = apigateway_listener.extract_path_params("/foo/bar", "/foo/bar")29        self.assertEqual({}, params)30        params = apigateway_listener.extract_path_params("/foo/bar/baz", "/foo/{proxy+}")31        self.assertEqual({"proxy": "bar/baz"}, params)32    def test_path_matches(self):33        path, details = apigateway_listener.get_resource_for_path("/foo/bar", {"/foo/{param1}": {}})34        self.assertEqual("/foo/{param1}", path)35        path, details = apigateway_listener.get_resource_for_path(36            "/foo/bar", {"/foo/bar": {}, "/foo/{param1}": {}}37        )38        self.assertEqual("/foo/bar", path)39        path, details = apigateway_listener.get_resource_for_path(40            "/foo/bar/baz", {"/foo/bar": {}, "/foo/{proxy+}": {}}41        )42        self.assertEqual("/foo/{proxy+}", path)43        path, details = apigateway_listener.get_resource_for_path(44            "/foo/bar/baz", {"/{proxy+}": {}, "/foo/{proxy+}": {}}45        )46        self.assertEqual("/foo/{proxy+}", path)47        result = apigateway_listener.get_resource_for_path(48            "/foo/bar", {"/foo/bar1": {}, "/foo/bar2": {}}49        )50        self.assertEqual(None, result)51        result = apigateway_listener.get_resource_for_path(52            "/foo/bar", {"/{param1}/bar1": {}, "/foo/bar2": {}}53        )54        self.assertEqual(None, result)55        path_args = {"/{param1}/{param2}/foo/{param3}": {}, "/{param}/bar": {}}56        path, details = apigateway_listener.get_resource_for_path("/foo/bar", path_args)57        self.assertEqual("/{param}/bar", path)58        path_args = {"/{param1}/{param2}": {}, "/{param}/bar": {}}59        path, details = apigateway_listener.get_resource_for_path("/foo/bar", path_args)60        self.assertEqual("/{param}/bar", path)61        path_args = {"/{param1}/{param2}": {}, "/{param1}/bar": {}}62        path, details = apigateway_listener.get_resource_for_path("/foo/baz", path_args)63        self.assertEqual("/{param1}/{param2}", path)64        path_args = {"/{param1}/{param2}/baz": {}, "/{param1}/bar/{param2}": {}}65        path, details = apigateway_listener.get_resource_for_path("/foo/bar/baz", path_args)66        self.assertEqual("/{param1}/{param2}/baz", path)67        path_args = {"/{param1}/{param2}/baz": {}, "/{param1}/{param2}/{param2}": {}}68        path, details = apigateway_listener.get_resource_for_path("/foo/bar/baz", path_args)69        self.assertEqual("/{param1}/{param2}/baz", path)70        path_args = {"/foo123/{param1}/baz": {}}71        result = apigateway_listener.get_resource_for_path("/foo/bar/baz", path_args)72        self.assertEqual(None, result)73        path_args = {"/foo/{param1}/baz": {}, "/foo/{param1}/{param2}": {}}74        path, result = apigateway_listener.get_resource_for_path("/foo/bar/baz", path_args)75        self.assertEqual("/foo/{param1}/baz", path)76    def test_apply_request_parameters(self):77        integration = {78            "type": "HTTP_PROXY",79            "httpMethod": "ANY",80            "uri": "https://httpbin.org/anything/{proxy}",81            "requestParameters": {"integration.request.path.proxy": "method.request.path.proxy"},82            "passthroughBehavior": "WHEN_NO_MATCH",83            "timeoutInMillis": 29000,84            "cacheNamespace": "041fa782",85            "cacheKeyParameters": [],86        }87        uri = apigateway_listener.apply_request_parameters(88            uri="https://httpbin.org/anything/{proxy}",89            integration=integration,90            path_params={"proxy": "foo/bar/baz"},91            query_params={"param": "foobar"},92        )93        self.assertEqual("https://httpbin.org/anything/foo/bar/baz?param=foobar", uri)94    def test_if_request_is_valid_with_no_resource_methods(self):95        ctx = ApiInvocationContext("POST", "/", b"", {})96        validator = RequestValidator(ctx, None)97        self.assertTrue(validator.is_request_valid())98    def test_if_request_is_valid_with_no_matching_method(self):99        ctx = ApiInvocationContext("POST", "/", b"", {})100        ctx.resource = {"resourceMethods": {"GET": {}}}101        validator = RequestValidator(ctx, None)102        self.assertTrue(validator.is_request_valid())103    def test_if_request_is_valid_with_no_validator(self):104        ctx = ApiInvocationContext("POST", "/", b"", {})105        ctx.api_id = "deadbeef"106        ctx.resource = {"resourceMethods": {"POST": {"requestValidatorId": " "}}}107        validator = RequestValidator(ctx, None)108        self.assertTrue(validator.is_request_valid())109    def test_if_request_has_body_validator(self):110        apigateway_client = self._mock_client()111        apigateway_client.get_request_validator.return_value = {"validateRequestBody": True}112        apigateway_client.get_model.return_value = {"schema": '{"type": "object"}'}113        ctx = ApiInvocationContext("POST", "/", '{"id":"1"}', {})114        ctx.api_id = "deadbeef"115        ctx.resource = {116            "resourceMethods": {117                "POST": {118                    "requestValidatorId": "112233",119                    "requestModels": {"application/json": "schemaName"},120                }121            }122        }123        validator = RequestValidator(ctx, apigateway_client)124        self.assertTrue(validator.is_request_valid())125    def test_request_validate_body_with_no_request_model(self):126        apigateway_client = self._mock_client()127        apigateway_client.get_request_validator.return_value = {"validateRequestBody": True}128        ctx = ApiInvocationContext("POST", "/", '{"id":"1"}', {})129        ctx.api_id = "deadbeef"130        ctx.resource = {131            "resourceMethods": {132                "POST": {133                    "requestValidatorId": "112233",134                    "requestModels": None,135                }136            }137        }138        validator = RequestValidator(ctx, apigateway_client)139        self.assertFalse(validator.is_request_valid())140    def test_request_validate_body_with_no_model_for_schema_name(self):141        apigateway_client = self._mock_client()142        apigateway_client.get_request_validator.return_value = {"validateRequestBody": True}143        apigateway_client.get_model.return_value = None144        ctx = ApiInvocationContext("POST", "/", '{"id":"1"}', {})145        ctx.api_id = "deadbeef"146        ctx.resource = {147            "resourceMethods": {148                "POST": {149                    "requestValidatorId": "112233",150                    "requestModels": {"application/json": "schemaName"},151                }152            }153        }154        validator = RequestValidator(ctx, apigateway_client)155        self.assertFalse(validator.is_request_valid())156    def _mock_client(self):157        return Mock(boto3.client("apigateway", region_name=config.DEFAULT_REGION))158def test_render_template_values():159    util = templating.VelocityUtil()160    encoded = util.urlEncode("x=a+b")161    assert encoded == "x%3Da%2Bb"162    decoded = util.urlDecode("x=a+b")163    assert decoded == "x=a b"164    escape_tests = (165        ("it's", '"it\'s"'),166        ("0010", "10"),167        ("true", "true"),168        ("True", '"True"'),169        ("1.021", "1.021"),170        ("'''", "\"'''\""),171        ('""', '""'),172        ('"""', '"\\"\\"\\""'),173        ('{"foo": 123}', '{"foo": 123}'),174        ('{"foo"": 123}', '"{\\"foo\\"\\": 123}"'),175        (1, "1"),176        (True, "true"),177    )178    for string, expected in escape_tests:179        escaped = util.escapeJavaScript(string)180        assert escaped == expected181        # we should be able to json.loads in all of the cases!182        json.loads(escaped)183class TestJSONPatch(unittest.TestCase):184    def test_apply_json_patch(self):185        apply = apply_json_patch_safe186        # test replacing array index187        subject = {"root": [{"arr": ["1", "abc"]}]}188        result = apply(clone(subject), {"op": "replace", "path": "/root/0/arr/0", "value": 2})189        self.assertEqual({"arr": [2, "abc"]}, result["root"][0])190        # test replacing endpoint config type191        operation = {"op": "replace", "path": "/endpointConfiguration/types/0", "value": "EDGE"}192        subject = {193            "id": "b5d563g3yx",194            "endpointConfiguration": {"types": ["REGIONAL"], "vpcEndpointIds": []},195        }196        result = apply(clone(subject), operation)197        self.assertEqual(["EDGE"], result["endpointConfiguration"]["types"])198        # test replacing endpoint config type199        operation = {"op": "add", "path": "/features/-", "value": "feat2"}200        subject = {"features": ["feat1"]}201        result = apply(clone(subject), operation)202        self.assertEqual(["feat1", "feat2"], result["features"])203class TestApplyTemplate(unittest.TestCase):204    def test_apply_template(self):205        int_type = {206            "type": "HTTP",207            "requestTemplates": {208                APPLICATION_JSON: "$util.escapeJavaScript($input.json('$.message'))"209            },210        }211        resp_type = "request"212        inv_payload = '{"action":"$default","message":"foobar"}'213        rendered = apply_template(int_type, resp_type, inv_payload)214        self.assertEqual('"foobar"', rendered)215    def test_apply_template_no_json_payload(self):216        int_type = {217            "type": "HTTP",218            "requestTemplates": {219                APPLICATION_JSON: "$util.escapeJavaScript($input.json('$.message'))"220            },221        }222        resp_type = "request"223        inv_payload = "#foobar123"224        rendered = apply_template(int_type, resp_type, inv_payload)...get_request_validator.py
Source:get_request_validator.py  
...54        return GetRequestValidatorResult(55            request_validator_id=self.request_validator_id,56            validate_request_body=self.validate_request_body,57            validate_request_parameters=self.validate_request_parameters)58def get_request_validator(request_validator_id: Optional[str] = None,59                          rest_api_id: Optional[str] = None,60                          opts: Optional[pulumi.InvokeOptions] = None) -> AwaitableGetRequestValidatorResult:61    """62    Resource Type definition for AWS::ApiGateway::RequestValidator63    :param str request_validator_id: ID of the request validator.64    :param str rest_api_id: The identifier of the targeted API entity.65    """66    __args__ = dict()67    __args__['requestValidatorId'] = request_validator_id68    __args__['restApiId'] = rest_api_id69    opts = pulumi.InvokeOptions.merge(_utilities.get_invoke_opts_defaults(), opts)70    __ret__ = pulumi.runtime.invoke('aws-native:apigateway:getRequestValidator', __args__, opts=opts, typ=GetRequestValidatorResult).value71    return AwaitableGetRequestValidatorResult(72        request_validator_id=__ret__.request_validator_id,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
