How to use serve_asgi_adapter method in localstack

Best Python code snippet using localstack_python

test_asgi.py

Source:test_asgi.py Github

copy

Full Screen

...7from typing import List8import requests9from werkzeug import Request, Response10LOG = logging.getLogger(__name__)11def test_serve_asgi_adapter(serve_asgi_adapter):12 request_list: List[Request] = []13 @Request.application14 def app(request: Request) -> Response:15 request_list.append(request)16 return Response("ok", 200)17 server = serve_asgi_adapter(app)18 response0 = requests.get(server.url + "/foobar?foo=bar", headers={"x-amz-target": "testing"})19 assert response0.ok20 assert response0.text == "ok"21 response1 = requests.get(server.url + "/compute", data='{"foo": "bar"}')22 assert response1.ok23 assert response1.text == "ok"24 request0 = request_list[0]25 assert request0.path == "/foobar"26 assert request0.query_string == b"foo=bar"27 assert request0.full_path == "/foobar?foo=bar"28 assert request0.headers["x-amz-target"] == "testing"29 assert dict(request0.args) == {"foo": "bar"}30 request1 = request_list[1]31 assert request1.path == "/compute"32 assert request1.get_data() == b'{"foo": "bar"}'33def test_requests_are_not_blocking_the_server(serve_asgi_adapter):34 queue = Queue()35 @Request.application36 def app(request: Request) -> Response:37 time.sleep(1)38 queue.put_nowait(request)39 return Response("ok", 200)40 server = serve_asgi_adapter(app)41 then = time.time()42 Thread(target=requests.get, args=(server.url,)).start()43 Thread(target=requests.get, args=(server.url,)).start()44 Thread(target=requests.get, args=(server.url,)).start()45 Thread(target=requests.get, args=(server.url,)).start()46 # get the four responses47 queue.get(timeout=5)48 queue.get(timeout=5)49 queue.get(timeout=5)50 queue.get(timeout=5)51 assert (time.time() - then) < 4, "requests did not seem to be parallelized"52def test_chunked_transfer_encoding_response(serve_asgi_adapter):53 # this test makes sure that creating a response with a generator automatically creates a54 # transfer-encoding=chunked response55 @Request.application56 def app(_request: Request) -> Response:57 def _gen():58 yield "foo"59 yield "bar\n"60 yield "baz\n"61 return Response(_gen(), 200)62 server = serve_asgi_adapter(app)63 response = requests.get(server.url)64 assert response.headers["Transfer-Encoding"] == "chunked"65 it = response.iter_lines()66 assert next(it) == b"foobar"67 assert next(it) == b"baz"68def test_chunked_transfer_encoding_request(serve_asgi_adapter):69 request_list: List[Request] = []70 @Request.application71 def app(request: Request) -> Response:72 request_list.append(request)73 stream = request.stream74 data = bytearray()75 for i, item in enumerate(stream):76 data.extend(item)77 if i == 0:78 assert item == b"foobar\n"79 if i == 1:80 assert item == b"baz"81 return Response(data.decode("utf-8"), 200)82 server = serve_asgi_adapter(app)83 def gen():84 yield b"foo"85 yield b"bar\n"86 yield b"baz"87 response = requests.post(server.url, gen())88 assert response.ok89 assert response.text == "foobar\nbaz"90 assert request_list[0].headers["Transfer-Encoding"].lower() == "chunked"91def test_input_stream_methods(serve_asgi_adapter):92 @Request.application93 def app(request: Request) -> Response:94 assert request.stream.read(1) == b"f"95 assert request.stream.readline(10) == b"ood\n"96 assert request.stream.readline(3) == b"bar"97 assert next(request.stream) == b"ber\n"98 assert request.stream.readlines(3) == [b"fizz\n"]99 assert request.stream.readline() == b"buzz\n"100 assert request.stream.read() == b"really\ndone"101 assert request.stream.read(10) == b""102 return Response("ok", 200)103 server = serve_asgi_adapter(app)104 def gen():105 yield b"fo"106 yield b"od\n"107 yield b"barber\n"108 yield b"fizz\n"109 yield b"buzz\n"110 yield b"really\n"111 yield b"done"112 response = requests.post(server.url, data=gen())113 assert response.ok114 assert response.text == "ok"115def test_input_stream_readlines(serve_asgi_adapter):116 @Request.application117 def app(request: Request) -> Response:118 assert request.stream.readlines() == [b"fizz\n", b"buzz\n", b"done"]119 return Response("ok", 200)120 server = serve_asgi_adapter(app)121 def gen():122 yield b"fizz\n"123 yield b"buzz\n"124 yield b"done"125 response = requests.post(server.url, data=gen())126 assert response.ok127 assert response.text == "ok"128def test_input_stream_readlines_with_limit(serve_asgi_adapter):129 @Request.application130 def app(request: Request) -> Response:131 assert request.stream.readlines(1000) == [b"fizz\n", b"buzz\n", b"done"]132 return Response("ok", 200)133 server = serve_asgi_adapter(app)134 def gen():135 yield b"fizz\n"136 yield b"buzz\n"137 yield b"done"138 response = requests.post(server.url, data=gen())139 assert response.ok140 assert response.text == "ok"141def test_multipart_post(serve_asgi_adapter):142 @Request.application143 def app(request: Request) -> Response:144 assert request.mimetype == "multipart/form-data"145 result = {}146 for k, file_storage in request.files.items():147 result[k] = file_storage.stream.read().decode("utf-8")148 return Response(json.dumps(result), 200)149 server = serve_asgi_adapter(app)150 response = requests.post(server.url, files={"foo": "bar", "baz": "ed"})151 assert response.ok152 assert response.json() == {"foo": "bar", "baz": "ed"}153def test_utf8_path(serve_asgi_adapter):154 @Request.application155 def app(request: Request) -> Response:156 assert request.path == "/foo/Ā0Ä"157 assert request.environ["PATH_INFO"] == "/foo/Ä\x800Ã\x84"158 return Response("ok", 200)159 server = serve_asgi_adapter(app)160 response = requests.get(server.url + "/foo/Ā0Ä")161 assert response.ok162def test_serve_multiple_apps(serve_asgi_adapter):163 @Request.application164 def app0(request: Request) -> Response:165 return Response("ok0", 200)166 @Request.application167 def app1(request: Request) -> Response:168 return Response("ok1", 200)169 server0 = serve_asgi_adapter(app0)170 server1 = serve_asgi_adapter(app1)171 executor = ThreadPoolExecutor(6)172 response0_ftr = executor.submit(requests.get, server0.url)173 response1_ftr = executor.submit(requests.get, server1.url)174 response2_ftr = executor.submit(requests.get, server0.url)175 response3_ftr = executor.submit(requests.get, server1.url)176 response4_ftr = executor.submit(requests.get, server0.url)177 response5_ftr = executor.submit(requests.get, server1.url)178 executor.shutdown()179 result0 = response0_ftr.result(timeout=2)180 assert result0.ok181 assert result0.text == "ok0"182 result1 = response1_ftr.result(timeout=2)183 assert result1.ok184 assert result1.text == "ok1"...

Full Screen

Full Screen

test_proxy.py

Source:test_proxy.py Github

copy

Full Screen

...14 both the router and the server.15 """16 router = Router(dispatcher=handler_dispatcher())17 app = Request.application(router.dispatch)18 return router, serve_asgi_adapter(app)19class TestPathForwarder:20 def test_get_with_path_rule(self, router_server, httpserver: HTTPServer):21 router, proxy = router_server22 backend = httpserver23 backend.expect_request("/").respond_with_data("ok/")24 backend.expect_request("/bar").respond_with_data("ok/bar")25 backend.expect_request("/bar/ed").respond_with_data("ok/bar/ed")26 router.add("/foo/<path:path>", ProxyHandler(backend.url_for("/")))27 response = requests.get(proxy.url + "/foo/bar")28 assert response.ok29 assert response.text == "ok/bar"30 response = requests.get(proxy.url + "/foo/bar/ed")31 assert response.ok32 assert response.text == "ok/bar/ed"33 response = requests.get(proxy.url)34 assert not response.ok35 response = requests.get(proxy.url + "/bar")36 assert not response.ok37 backend.check()38 def test_get_with_plain_rule(self, router_server, httpserver: HTTPServer):39 router, proxy = router_server40 backend = httpserver41 backend.expect_request("/").respond_with_data("ok")42 router.add("/foo", ProxyHandler(backend.url_for("/")))43 response = requests.get(proxy.url + "/foo")44 assert response.ok45 assert response.text == "ok"46 response = requests.get(proxy.url + "/foo/bar")47 assert not response.ok48 def test_get_with_different_base_url(self, router_server, httpserver: HTTPServer):49 router, proxy = router_server50 backend = httpserver51 backend.expect_request("/bar/ed").respond_with_data("ok/bar/ed")52 backend.expect_request("/bar/ed/baz").respond_with_data("ok/bar/ed/baz")53 router.add("/foo/<path:path>", ProxyHandler(backend.url_for("/bar")))54 response = requests.get(proxy.url + "/foo/ed")55 assert response.ok56 assert response.text == "ok/bar/ed"57 response = requests.get(proxy.url + "/foo/ed/baz")58 assert response.ok59 assert response.text == "ok/bar/ed/baz"60 def test_get_with_different_base_url_plain_rule(self, router_server, httpserver: HTTPServer):61 router, proxy = router_server62 backend = httpserver63 backend.expect_request("/bar").respond_with_data("ok/bar")64 backend.expect_request("/bar/").respond_with_data("ok/bar/")65 router.add("/foo", ProxyHandler(backend.url_for("/bar")))66 response = requests.get(proxy.url + "/foo")67 assert response.ok68 assert response.text == "ok/bar/" # it's calling /bar/ because it's part of the root URL69 def test_xff_header(self, router_server, httpserver: HTTPServer):70 router, proxy = router_server71 backend = httpserver72 def _echo_headers(request):73 return Response(json.dumps(dict(request.headers)), mimetype="application/json")74 backend.expect_request("/echo").respond_with_handler(_echo_headers)75 router.add("/<path:path>", ProxyHandler(backend.url_for("/")))76 response = requests.get(proxy.url + "/echo")77 assert response.ok78 headers = response.json()79 assert headers["X-Forwarded-For"] == "127.0.0.1"80 # check that it appends remote address correctly if an header is already present81 response = requests.get(proxy.url + "/echo", headers={"X-Forwarded-For": "127.0.0.2"})82 assert response.ok83 headers = response.json()84 assert headers["X-Forwarded-For"] == "127.0.0.2, 127.0.0.1"85 def test_post_form_data_with_query_args(self, router_server, httpserver: HTTPServer):86 router, proxy = router_server87 backend = httpserver88 def _handler(request: Request):89 data = {90 "args": request.args,91 "form": request.form,92 }93 return Response(json.dumps(data), mimetype="application/json")94 backend.expect_request("/form").respond_with_handler(_handler)95 router.add("/<path:path>", ProxyHandler(backend.url_for("/")))96 response = requests.post(97 proxy.url + "/form?q=yes",98 data={"foo": "bar", "baz": "ed"},99 headers={"Content-Type": "application/x-www-form-urlencoded"},100 )101 assert response.ok102 doc = response.json()103 assert doc == {"args": {"q": "yes"}, "form": {"foo": "bar", "baz": "ed"}}104@pytest.mark.parametrize("consume_data", [True, False])105def test_forward_files_and_form_data_proxy_consumes_data(106 consume_data, serve_asgi_adapter, tmp_path107):108 """Tests that, when the proxy consumes (or doesn't) consume the request object's data prior to forwarding,109 the request is forwarded correctly. not using httpserver here because it consumes werkzeug data incorrectly (it110 calls request.get_data())"""111 @Request.application112 def _backend_handler(request: Request):113 data = {114 "data": request.data.decode("utf-8"),115 "args": request.args,116 "form": request.form,117 "files": {118 name: storage.stream.read().decode("utf-8")119 for name, storage in request.files.items()120 },121 }122 return Response(json.dumps(data), mimetype="application/json")123 @Request.application124 def _proxy_handler(request: Request):125 # heuristic to check whether the stream has been consumed126 assert getattr(request, "_cached_data", None) is None, "data has already been cached"127 if consume_data:128 assert (129 not request.data130 ) # data should be empty because it is consumed by parsing form data131 return forward(request, forward_base_url=backend.url)132 backend = serve_asgi_adapter(_backend_handler)133 proxy = serve_asgi_adapter(_proxy_handler)134 tmp_file_1 = tmp_path / "temp_file_1.txt"135 tmp_file_1.write_text("1: hello\nworld")136 tmp_file_2 = tmp_path / "temp_file_2.txt"137 tmp_file_2.write_text("2: foobar")138 response = requests.post(139 proxy.url,140 params={"q": "yes"},141 data={"foo": "bar", "baz": "ed"},142 files={"upload_file_1": open(tmp_file_1, "rb"), "upload_file_2": open(tmp_file_2, "rb")},143 )144 assert response.ok145 doc = response.json()146 assert doc == {147 "data": "",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful