How to use make_request method in localstack

Best Python code snippet using localstack_python

test_client_request.py

Source:test_client_request.py Github

copy

Full Screen

...15import aiohttp16from aiohttp import BaseConnector, helpers17from aiohttp.client_reqrep import ClientRequest, ClientResponse18@pytest.yield_fixture19def make_request(loop):20 request = None21 def maker(*args, **kwargs):22 nonlocal request23 request = ClientRequest(*args, loop=loop, **kwargs)24 return request25 yield maker26 if request is not None:27 loop.run_until_complete(request.close())28def test_method1(make_request):29 req = make_request('get', 'http://python.org/')30 assert req.method == 'GET'31def test_method2(make_request):32 req = make_request('head', 'http://python.org/')33 assert req.method == 'HEAD'34def test_method3(make_request):35 req = make_request('HEAD', 'http://python.org/')36 assert req.method == 'HEAD'37def test_version_1_0(make_request):38 req = make_request('get', 'http://python.org/', version='1.0')39 assert req.version == (1, 0)40def test_version_default(make_request):41 req = make_request('get', 'http://python.org/')42 assert req.version == (1, 1)43def test_version_err(make_request):44 with pytest.raises(ValueError):45 make_request('get', 'http://python.org/', version='1.c')46def test_host_port_default_http(make_request):47 req = make_request('get', 'http://python.org/')48 assert req.host == 'python.org'49 assert req.port == 8050 assert not req.ssl51def test_host_port_default_https(make_request):52 req = make_request('get', 'https://python.org/')53 assert req.host == 'python.org'54 assert req.port == 44355 assert req.ssl56def test_host_port_nondefault_http(make_request):57 req = make_request('get', 'http://python.org:960/')58 assert req.host == 'python.org'59 assert req.port == 96060 assert not req.ssl61def test_host_port_nondefault_https(make_request):62 req = make_request('get', 'https://python.org:960/')63 assert req.host == 'python.org'64 assert req.port == 96065 assert req.ssl66def test_host_port_default_ws(make_request):67 req = make_request('get', 'ws://python.org/')68 assert req.host == 'python.org'69 assert req.port == 8070 assert not req.ssl71def test_host_port_default_wss(make_request):72 req = make_request('get', 'wss://python.org/')73 assert req.host == 'python.org'74 assert req.port == 44375 assert req.ssl76def test_host_port_nondefault_ws(make_request):77 req = make_request('get', 'ws://python.org:960/')78 assert req.host == 'python.org'79 assert req.port == 96080 assert not req.ssl81def test_host_port_nondefault_wss(make_request):82 req = make_request('get', 'wss://python.org:960/')83 assert req.host == 'python.org'84 assert req.port == 96085 assert req.ssl86def test_host_port_err(make_request):87 with pytest.raises(ValueError):88 make_request('get', 'http://python.org:123e/')89def test_hostname_err(make_request):90 with pytest.raises(ValueError):91 make_request('get', 'http://:8080/')92def test_host_header_host_without_port(make_request):93 req = make_request('get', 'http://python.org/')94 assert req.headers['HOST'] == 'python.org'95def test_host_header_host_with_default_port(make_request):96 req = make_request('get', 'http://python.org:80/')97 assert req.headers['HOST'] == 'python.org:80'98def test_host_header_host_with_nondefault_port(make_request):99 req = make_request('get', 'http://python.org:99/')100 assert req.headers['HOST'] == 'python.org:99'101def test_host_header_explicit_host(make_request):102 req = make_request('get', 'http://python.org/',103 headers={'host': 'example.com'})104 assert req.headers['HOST'] == 'example.com'105def test_host_header_explicit_host_with_port(make_request):106 req = make_request('get', 'http://python.org/',107 headers={'host': 'example.com:99'})108 assert req.headers['HOST'] == 'example.com:99'109def test_default_loop(loop):110 asyncio.set_event_loop(loop)111 req = ClientRequest('get', 'http://python.org/')112 assert req.loop is loop113def test_default_headers_useragent(make_request):114 req = make_request('get', 'http://python.org/')115 assert 'SERVER' not in req.headers116 assert 'USER-AGENT' in req.headers117def test_default_headers_useragent_custom(make_request):118 req = make_request('get', 'http://python.org/',119 headers={'user-agent': 'my custom agent'})120 assert 'USER-Agent' in req.headers121 assert 'my custom agent' == req.headers['User-Agent']122def test_skip_default_useragent_header(make_request):123 req = make_request('get', 'http://python.org/',124 skip_auto_headers=set([upstr('user-agent')]))125 assert 'User-Agent' not in req.headers126def test_headers(make_request):127 req = make_request('get', 'http://python.org/',128 headers={'Content-Type': 'text/plain'})129 assert 'CONTENT-TYPE' in req.headers130 assert req.headers['CONTENT-TYPE'] == 'text/plain'131 assert req.headers['ACCEPT-ENCODING'] == 'gzip, deflate'132def test_headers_list(make_request):133 req = make_request('get', 'http://python.org/',134 headers=[('Content-Type', 'text/plain')])135 assert 'CONTENT-TYPE' in req.headers136 assert req.headers['CONTENT-TYPE'] == 'text/plain'137def test_headers_default(make_request):138 req = make_request('get', 'http://python.org/',139 headers={'ACCEPT-ENCODING': 'deflate'})140 assert req.headers['ACCEPT-ENCODING'] == 'deflate'141def test_invalid_url(make_request):142 with pytest.raises(ValueError):143 make_request('get', 'hiwpefhipowhefopw')144def test_invalid_idna(make_request):145 with pytest.raises(ValueError):146 make_request('get', 'http://\u2061owhefopw.com')147def test_no_path(make_request):148 req = make_request('get', 'http://python.org')149 assert '/' == req.path150def test_ipv6_default_http_port(make_request):151 req = make_request('get', 'http://[2001:db8::1]/')152 assert req.host == '2001:db8::1'153 assert req.port == 80154 assert not req.ssl155def test_ipv6_default_https_port(make_request):156 req = make_request('get', 'https://[2001:db8::1]/')157 assert req.host == '2001:db8::1'158 assert req.port == 443159 assert req.ssl160def test_ipv6_nondefault_http_port(make_request):161 req = make_request('get', 'http://[2001:db8::1]:960/')162 assert req.host == '2001:db8::1'163 assert req.port == 960164 assert not req.ssl165def test_ipv6_nondefault_https_port(make_request):166 req = make_request('get', 'https://[2001:db8::1]:960/')167 assert req.host == '2001:db8::1'168 assert req.port == 960169 assert req.ssl170def test_basic_auth(make_request):171 req = make_request('get', 'http://python.org',172 auth=aiohttp.helpers.BasicAuth('nkim', '1234'))173 assert 'AUTHORIZATION' in req.headers174 assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']175def test_basic_auth_utf8(make_request):176 req = make_request('get', 'http://python.org',177 auth=aiohttp.helpers.BasicAuth('nkim', 'секрет',178 'utf-8'))179 assert 'AUTHORIZATION' in req.headers180 assert 'Basic bmtpbTrRgdC10LrRgNC10YI=' == req.headers['AUTHORIZATION']181def test_basic_auth_tuple_forbidden(make_request):182 with pytest.raises(TypeError):183 make_request('get', 'http://python.org',184 auth=('nkim', '1234'))185def test_basic_auth_from_url(make_request):186 req = make_request('get', 'http://nkim:1234@python.org')187 assert 'AUTHORIZATION' in req.headers188 assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']189 assert 'python.org' == req.netloc190def test_basic_auth_from_url_overriden(make_request):191 req = make_request('get', 'http://garbage@python.org',192 auth=aiohttp.BasicAuth('nkim', '1234'))193 assert 'AUTHORIZATION' in req.headers194 assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']195 assert 'python.org' == req.netloc196def test_path_is_not_double_encoded1(make_request):197 req = make_request('get', "http://0.0.0.0/get/test case")198 assert req.path == "/get/test%20case"199def test_path_is_not_double_encoded2(make_request):200 req = make_request('get', "http://0.0.0.0/get/test%2fcase")201 assert req.path == "/get/test%2fcase"202def test_path_is_not_double_encoded3(make_request):203 req = make_request('get', "http://0.0.0.0/get/test%20case")204 assert req.path == "/get/test%20case"205def test_path_safe_chars_preserved(make_request):206 req = make_request('get', "http://0.0.0.0/get/%:=")207 assert req.path == "/get/%:="208def test_params_are_added_before_fragment1(make_request):209 req = make_request('GET', "http://example.com/path#fragment",210 params={"a": "b"})211 assert req.url == "http://example.com/path?a=b#fragment"212def test_params_are_added_before_fragment2(make_request):213 req = make_request('GET', "http://example.com/path?key=value#fragment",214 params={"a": "b"})215 assert req.url == "http://example.com/path?key=value&a=b#fragment"216def test_path_not_contain_fragment1(make_request):217 req = make_request('GET', "http://example.com/path#fragment")218 assert req.path == "/path"219def test_path_not_contain_fragment2(make_request):220 req = make_request('GET', "http://example.com/path?key=value#fragment")221 assert req.path == "/path?key=value"222def test_cookies(make_request):223 req = make_request('get', 'http://test.com/path',224 cookies={'cookie1': 'val1'})225 assert 'COOKIE' in req.headers226 assert 'cookie1=val1' == req.headers['COOKIE']227def test_cookies_merge_with_headers(make_request):228 req = make_request('get', 'http://test.com/path',229 headers={'cookie': 'cookie1=val1'},230 cookies={'cookie2': 'val2'})231 assert 'cookie1=val1; cookie2=val2' == req.headers['COOKIE']232def test_unicode_get1(make_request):233 req = make_request('get', 'http://python.org',234 params={'foo': 'f\xf8\xf8'})235 assert '/?foo=f%C3%B8%C3%B8' == req.path236def test_unicode_get2(make_request):237 req = make_request('', 'http://python.org',238 params={'f\xf8\xf8': 'f\xf8\xf8'})239 assert '/?f%C3%B8%C3%B8=f%C3%B8%C3%B8' == req.path240def test_unicode_get3(make_request):241 req = make_request('', 'http://python.org', params={'foo': 'foo'})242 assert '/?foo=foo' == req.path243def test_unicode_get4(make_request):244 def join(*suffix):245 return urllib.parse.urljoin('http://python.org/', '/'.join(suffix))246 req = make_request('', join('\xf8'), params={'foo': 'foo'})247 assert '/%C3%B8?foo=foo' == req.path248def test_query_multivalued_param(make_request):249 for meth in ClientRequest.ALL_METHODS:250 req = make_request(251 meth, 'http://python.org',252 params=(('test', 'foo'), ('test', 'baz')))253 assert req.path == '/?test=foo&test=baz'254def test_query_str_param(make_request):255 for meth in ClientRequest.ALL_METHODS:256 req = make_request(meth, 'http://python.org', params='test=foo')257 assert req.path == '/?test=foo'258def test_query_bytes_param_raises(make_request):259 for meth in ClientRequest.ALL_METHODS:260 with pytest.raises(TypeError) as ctx:261 make_request(meth, 'http://python.org', params=b'test=foo')262 assert re.match('not a valid non-string.*or mapping', str(ctx.value))263def test_query_str_param_is_not_encoded(make_request):264 for meth in ClientRequest.ALL_METHODS:265 req = make_request(meth, 'http://python.org', params='test=f+oo')266 assert req.path == '/?test=f+oo'267def test_params_update_path_and_url(make_request):268 req = make_request('get', 'http://python.org',269 params=(('test', 'foo'), ('test', 'baz')))270 assert req.path == '/?test=foo&test=baz'271 assert req.url == 'http://python.org/?test=foo&test=baz'272def test_params_empty_path_and_url(make_request):273 req_empty = make_request('get', 'http://python.org', params={})274 assert req_empty.path == '/'275 assert req_empty.url == 'http://python.org/'276 req_none = make_request('get', 'http://python.org')277 assert req_none.path == '/'278 assert req_none.url == 'http://python.org/'279def test_gen_netloc_all(make_request):280 req = make_request('get',281 'https://aiohttp:pwpwpw@' +282 '12345678901234567890123456789' +283 '012345678901234567890:8080')284 assert req.netloc == '12345678901234567890123456789' +\285 '012345678901234567890:8080'286def test_gen_netloc_no_port(make_request):287 req = make_request('get',288 'https://aiohttp:pwpwpw@' +289 '12345678901234567890123456789' +290 '012345678901234567890/')291 assert req.netloc == '12345678901234567890123456789' +\292 '012345678901234567890'293def test_gen_notloc_failed(make_request):294 with pytest.raises(ValueError) as excinfo:295 make_request('get',296 'https://aiohttp:pwpwpw@' +297 '123456789012345678901234567890123456789' +298 '01234567890123456789012345/')299 assert excinfo.value.message == "URL has an invalid label."300class TestClientRequest(unittest.TestCase):301 def setUp(self):302 self.loop = asyncio.new_event_loop()303 asyncio.set_event_loop(None)304 self.transport = mock.Mock()305 self.connection = mock.Mock()306 self.protocol = mock.Mock()307 self.protocol.writer.drain.return_value = ()308 self.stream = aiohttp.StreamParser(loop=self.loop)309 self.connector = BaseConnector(loop=self.loop)...

Full Screen

Full Screen

test_web_websocket.py

Source:test_web_websocket.py Github

copy

Full Screen

...21 ret = mock.Mock()22 ret.set_parser.return_value = ret23 return ret24@pytest.fixture25def make_request(app, writer, reader):26 def maker(method, path, headers=None, protocols=False):27 if headers is None:28 headers = CIMultiDict(29 {'HOST': 'server.example.com',30 'UPGRADE': 'websocket',31 'CONNECTION': 'Upgrade',32 'SEC-WEBSOCKET-KEY': 'dGhlIHNhbXBsZSBub25jZQ==',33 'ORIGIN': 'http://example.com',34 'SEC-WEBSOCKET-VERSION': '13'})35 if protocols:36 headers['SEC-WEBSOCKET-PROTOCOL'] = 'chat, superchat'37 return make_mocked_request(method, path, headers,38 app=app, writer=writer, reader=reader)39 return maker40def test_nonstarted_ping():41 ws = WebSocketResponse()42 with pytest.raises(RuntimeError):43 ws.ping()44def test_nonstarted_pong():45 ws = WebSocketResponse()46 with pytest.raises(RuntimeError):47 ws.pong()48def test_nonstarted_send_str():49 ws = WebSocketResponse()50 with pytest.raises(RuntimeError):51 ws.send_str('string')52def test_nonstarted_send_bytes():53 ws = WebSocketResponse()54 with pytest.raises(RuntimeError):55 ws.send_bytes(b'bytes')56def test_nonstarted_send_json():57 ws = WebSocketResponse()58 with pytest.raises(RuntimeError):59 ws.send_json({'type': 'json'})60@asyncio.coroutine61def test_nonstarted_close():62 ws = WebSocketResponse()63 with pytest.raises(RuntimeError):64 yield from ws.close()65@asyncio.coroutine66def test_nonstarted_receive_str():67 ws = WebSocketResponse()68 with pytest.raises(RuntimeError):69 yield from ws.receive_str()70@asyncio.coroutine71def test_nonstarted_receive_bytes():72 ws = WebSocketResponse()73 with pytest.raises(RuntimeError):74 yield from ws.receive_bytes()75@asyncio.coroutine76def test_nonstarted_receive_json():77 ws = WebSocketResponse()78 with pytest.raises(RuntimeError):79 yield from ws.receive_json()80@asyncio.coroutine81def test_receive_str_nonstring(make_request):82 req = make_request('GET', '/')83 ws = WebSocketResponse()84 yield from ws.prepare(req)85 @asyncio.coroutine86 def receive():87 return WSMessage(WSMsgType.BINARY, b'data', b'')88 ws.receive = receive89 with pytest.raises(TypeError):90 yield from ws.receive_str()91@asyncio.coroutine92def test_receive_bytes_nonsbytes(make_request):93 req = make_request('GET', '/')94 ws = WebSocketResponse()95 yield from ws.prepare(req)96 @asyncio.coroutine97 def receive():98 return WSMessage(WSMsgType.TEXT, 'data', b'')99 ws.receive = receive100 with pytest.raises(TypeError):101 yield from ws.receive_bytes()102@asyncio.coroutine103def test_send_str_nonstring(make_request):104 req = make_request('GET', '/')105 ws = WebSocketResponse()106 yield from ws.prepare(req)107 with pytest.raises(TypeError):108 ws.send_str(b'bytes')109@asyncio.coroutine110def test_send_bytes_nonbytes(make_request):111 req = make_request('GET', '/')112 ws = WebSocketResponse()113 yield from ws.prepare(req)114 with pytest.raises(TypeError):115 ws.send_bytes('string')116@asyncio.coroutine117def test_send_json_nonjson(make_request):118 req = make_request('GET', '/')119 ws = WebSocketResponse()120 yield from ws.prepare(req)121 with pytest.raises(TypeError):122 ws.send_json(set())123def test_write_non_prepared():124 ws = WebSocketResponse()125 with pytest.raises(RuntimeError):126 ws.write(b'data')127def test_websocket_ready():128 websocket_ready = WebSocketReady(True, 'chat')129 assert websocket_ready.ok is True130 assert websocket_ready.protocol == 'chat'131def test_websocket_not_ready():132 websocket_ready = WebSocketReady(False, None)133 assert websocket_ready.ok is False134 assert websocket_ready.protocol is None135def test_websocket_ready_unknown_protocol():136 websocket_ready = WebSocketReady(True, None)137 assert websocket_ready.ok is True138 assert websocket_ready.protocol is None139def test_bool_websocket_ready():140 websocket_ready = WebSocketReady(True, None)141 assert bool(websocket_ready) is True142def test_bool_websocket_not_ready():143 websocket_ready = WebSocketReady(False, None)144 assert bool(websocket_ready) is False145def test_can_prepare_ok(make_request):146 req = make_request('GET', '/', protocols=True)147 ws = WebSocketResponse(protocols=('chat',))148 assert(True, 'chat') == ws.can_prepare(req)149def test_can_prepare_unknown_protocol(make_request):150 req = make_request('GET', '/')151 ws = WebSocketResponse()152 assert (True, None) == ws.can_prepare(req)153def test_can_prepare_invalid_method(make_request):154 req = make_request('POST', '/')155 ws = WebSocketResponse()156 assert (False, None) == ws.can_prepare(req)157def test_can_prepare_without_upgrade(make_request):158 req = make_request('GET', '/',159 headers=CIMultiDict({}))160 ws = WebSocketResponse()161 assert (False, None) == ws.can_prepare(req)162@asyncio.coroutine163def test_can_prepare_started(make_request):164 req = make_request('GET', '/')165 ws = WebSocketResponse()166 yield from ws.prepare(req)167 with pytest.raises(RuntimeError) as ctx:168 ws.can_prepare(req)169 assert 'Already started' in str(ctx.value)170def test_closed_after_ctor():171 ws = WebSocketResponse()172 assert not ws.closed173 assert ws.close_code is None174@asyncio.coroutine175def test_send_str_closed(make_request):176 req = make_request('GET', '/')177 ws = WebSocketResponse()178 yield from ws.prepare(req)179 yield from ws.close()180 with pytest.raises(RuntimeError):181 ws.send_str('string')182@asyncio.coroutine183def test_send_bytes_closed(make_request):184 req = make_request('GET', '/')185 ws = WebSocketResponse()186 yield from ws.prepare(req)187 yield from ws.close()188 with pytest.raises(RuntimeError):189 ws.send_bytes(b'bytes')190@asyncio.coroutine191def test_send_json_closed(make_request):192 req = make_request('GET', '/')193 ws = WebSocketResponse()194 yield from ws.prepare(req)195 yield from ws.close()196 with pytest.raises(RuntimeError):197 ws.send_json({'type': 'json'})198@asyncio.coroutine199def test_ping_closed(make_request):200 req = make_request('GET', '/')201 ws = WebSocketResponse()202 yield from ws.prepare(req)203 yield from ws.close()204 with pytest.raises(RuntimeError):205 ws.ping()206@asyncio.coroutine207def test_pong_closed(make_request):208 req = make_request('GET', '/')209 ws = WebSocketResponse()210 yield from ws.prepare(req)211 yield from ws.close()212 with pytest.raises(RuntimeError):213 ws.pong()214@asyncio.coroutine215def test_close_idempotent(make_request, writer):216 req = make_request('GET', '/')217 ws = WebSocketResponse()218 yield from ws.prepare(req)219 assert (yield from ws.close(code=1, message='message1'))220 assert ws.closed221 assert not (yield from ws.close(code=2, message='message2'))222@asyncio.coroutine223def test_start_invalid_method(make_request):224 req = make_request('POST', '/')225 ws = WebSocketResponse()226 with pytest.raises(HTTPMethodNotAllowed):227 yield from ws.prepare(req)228@asyncio.coroutine229def test_start_without_upgrade(make_request):230 req = make_request('GET', '/',231 headers=CIMultiDict({}))232 ws = WebSocketResponse()233 with pytest.raises(HTTPBadRequest):234 yield from ws.prepare(req)235@asyncio.coroutine236def test_wait_closed_before_start():237 ws = WebSocketResponse()238 with pytest.raises(RuntimeError):239 yield from ws.close()240@asyncio.coroutine241def test_write_eof_not_started():242 ws = WebSocketResponse()243 with pytest.raises(RuntimeError):244 yield from ws.write_eof()245@asyncio.coroutine246def test_write_eof_idempotent(make_request):247 req = make_request('GET', '/')248 ws = WebSocketResponse()249 yield from ws.prepare(req)250 yield from ws.close()251 yield from ws.write_eof()252 yield from ws.write_eof()253 yield from ws.write_eof()254@asyncio.coroutine255def test_receive_exc_in_reader(make_request, loop, reader):256 req = make_request('GET', '/')257 ws = WebSocketResponse()258 yield from ws.prepare(req)259 exc = ValueError()260 res = helpers.create_future(loop)261 res.set_exception(exc)262 reader.read = make_mocked_coro(res)263 msg = yield from ws.receive()264 assert msg.type == WSMsgType.ERROR265 assert msg.type is msg.tp266 assert msg.data is exc267 assert ws.exception() is exc268@asyncio.coroutine269def test_receive_cancelled(make_request, loop, reader):270 req = make_request('GET', '/')271 ws = WebSocketResponse()272 yield from ws.prepare(req)273 res = helpers.create_future(loop)274 res.set_exception(asyncio.CancelledError())275 reader.read = make_mocked_coro(res)276 with pytest.raises(asyncio.CancelledError):277 yield from ws.receive()278@asyncio.coroutine279def test_receive_timeouterror(make_request, loop, reader):280 req = make_request('GET', '/')281 ws = WebSocketResponse()282 yield from ws.prepare(req)283 res = helpers.create_future(loop)284 res.set_exception(asyncio.TimeoutError())285 reader.read = make_mocked_coro(res)286 with pytest.raises(asyncio.TimeoutError):287 yield from ws.receive()288@asyncio.coroutine289def test_receive_client_disconnected(make_request, loop, reader):290 req = make_request('GET', '/')291 ws = WebSocketResponse()292 yield from ws.prepare(req)293 exc = errors.ClientDisconnectedError()294 res = helpers.create_future(loop)295 res.set_exception(exc)296 reader.read = make_mocked_coro(res)297 msg = yield from ws.receive()298 assert ws.closed299 assert msg.type == WSMsgType.CLOSE300 assert msg.type is msg.tp301 assert msg.data is None302 assert ws.exception() is None303@asyncio.coroutine304def test_multiple_receive_on_close_connection(make_request):305 req = make_request('GET', '/')306 ws = WebSocketResponse()307 yield from ws.prepare(req)308 yield from ws.close()309 yield from ws.receive()310 yield from ws.receive()311 yield from ws.receive()312 yield from ws.receive()313 with pytest.raises(RuntimeError):314 yield from ws.receive()315@asyncio.coroutine316def test_concurrent_receive(make_request):317 req = make_request('GET', '/')318 ws = WebSocketResponse()319 yield from ws.prepare(req)320 ws._waiting = True321 with pytest.raises(RuntimeError):322 yield from ws.receive()323@asyncio.coroutine324def test_close_exc(make_request, reader, loop):325 req = make_request('GET', '/')326 ws = WebSocketResponse()327 yield from ws.prepare(req)328 exc = ValueError()329 reader.read.return_value = helpers.create_future(loop)330 reader.read.return_value.set_exception(exc)331 yield from ws.close()332 assert ws.closed333 assert ws.exception() is exc334 ws._closed = False335 reader.read.return_value = helpers.create_future(loop)336 reader.read.return_value.set_exception(asyncio.CancelledError())337 with pytest.raises(asyncio.CancelledError):338 yield from ws.close()339 assert ws.close_code == 1006340@asyncio.coroutine341def test_close_exc2(make_request):342 req = make_request('GET', '/')343 ws = WebSocketResponse()344 yield from ws.prepare(req)345 exc = ValueError()346 ws._writer = mock.Mock()347 ws._writer.close.side_effect = exc348 yield from ws.close()349 assert ws.closed350 assert ws.exception() is exc351 ws._closed = False352 ws._writer.close.side_effect = asyncio.CancelledError()353 with pytest.raises(asyncio.CancelledError):354 yield from ws.close()355def test_start_twice_idempotent(make_request):356 req = make_request('GET', '/')357 ws = WebSocketResponse()358 with pytest.warns(DeprecationWarning):359 impl1 = ws.start(req)360 impl2 = ws.start(req)361 assert impl1 is impl2362def test_can_start_ok(make_request):363 req = make_request('GET', '/', protocols=True)364 ws = WebSocketResponse(protocols=('chat',))365 with pytest.warns(DeprecationWarning):366 assert (True, 'chat') == ws.can_start(req)367def test_msgtype_alias():368 # deprecated since 1.0...

Full Screen

Full Screen

test_web_request.py

Source:test_web_request.py Github

copy

Full Screen

...4from multidict import CIMultiDict, MultiDict5from aiohttp.protocol import HttpVersion6from aiohttp.test_utils import make_mocked_request7@pytest.fixture8def make_request():9 return make_mocked_request10def test_ctor(make_request):11 req = make_request('GET', '/path/to?a=1&b=2')12 assert 'GET' == req.method13 assert HttpVersion(1, 1) == req.version14 assert req.host is None15 assert '/path/to?a=1&b=2' == req.path_qs16 assert '/path/to' == req.path17 assert 'a=1&b=2' == req.query_string18 assert CIMultiDict() == req.headers19 assert () == req.raw_headers20 get = req.GET21 assert MultiDict([('a', '1'), ('b', '2')]) == get22 # second call should return the same object23 assert get is req.GET24 assert req.keep_alive25 # just make sure that all lines of make_mocked_request covered26 headers = CIMultiDict(FOO='bar')27 reader = mock.Mock()28 writer = mock.Mock()29 payload = mock.Mock()30 transport = mock.Mock()31 app = mock.Mock()32 req = make_request('GET', '/path/to?a=1&b=2', headers=headers,33 writer=writer, reader=reader, payload=payload,34 transport=transport, app=app)35 assert req.app is app36 assert req.content is payload37 assert req.transport is transport38 assert req._reader is reader39 assert req._writer is writer40 assert req.headers == headers41 assert req.raw_headers == ((b'Foo', b'bar'),)42def test_doubleslashes(make_request):43 req = make_request('GET', '//foo/')44 assert '//foo/' == req.path45def test_POST(make_request):46 req = make_request('POST', '/')47 with pytest.raises(RuntimeError):48 req.POST49 marker = object()50 req._post = marker51 assert req.POST is marker52 assert req.POST is marker53def test_content_type_not_specified(make_request):54 req = make_request('Get', '/')55 assert 'application/octet-stream' == req.content_type56def test_content_type_from_spec(make_request):57 req = make_request('Get', '/',58 CIMultiDict([('CONTENT-TYPE', 'application/json')]))59 assert 'application/json' == req.content_type60def test_content_type_from_spec_with_charset(make_request):61 req = make_request(62 'Get', '/',63 CIMultiDict([('CONTENT-TYPE', 'text/html; charset=UTF-8')]))64 assert 'text/html' == req.content_type65 assert 'UTF-8' == req.charset66def test_calc_content_type_on_getting_charset(make_request):67 req = make_request(68 'Get', '/',69 CIMultiDict([('CONTENT-TYPE', 'text/html; charset=UTF-8')]))70 assert 'UTF-8' == req.charset71 assert 'text/html' == req.content_type72def test_urlencoded_querystring(make_request):73 req = make_request('GET',74 '/yandsearch?text=%D1%82%D0%B5%D0%BA%D1%81%D1%82')75 assert {'text': 'текст'} == req.GET76def test_non_ascii_path(make_request):77 req = make_request('GET', '/путь')78 assert '/путь' == req.path79def test_content_length(make_request):80 req = make_request('Get', '/',81 CIMultiDict([('CONTENT-LENGTH', '123')]))82 assert 123 == req.content_length83def test_non_keepalive_on_http10(make_request):84 req = make_request('GET', '/', version=HttpVersion(1, 0))85 assert not req.keep_alive86def test_non_keepalive_on_closing(make_request):87 req = make_request('GET', '/', closing=True)88 assert not req.keep_alive89@asyncio.coroutine90def test_call_POST_on_GET_request(make_request):91 req = make_request('GET', '/')92 ret = yield from req.post()93 assert CIMultiDict() == ret94@asyncio.coroutine95def test_call_POST_on_weird_content_type(make_request):96 req = make_request(97 'POST', '/',98 headers=CIMultiDict({'CONTENT-TYPE': 'something/weird'}))99 ret = yield from req.post()100 assert CIMultiDict() == ret101@asyncio.coroutine102def test_call_POST_twice(make_request):103 req = make_request('GET', '/')104 ret1 = yield from req.post()105 ret2 = yield from req.post()106 assert ret1 is ret2107def test_no_request_cookies(make_request):108 req = make_request('GET', '/')109 assert req.cookies == {}110 cookies = req.cookies111 assert cookies is req.cookies112def test_request_cookie(make_request):113 headers = CIMultiDict(COOKIE='cookie1=value1; cookie2=value2')114 req = make_request('GET', '/', headers=headers)115 assert req.cookies == {'cookie1': 'value1',116 'cookie2': 'value2'}117def test_request_cookie__set_item(make_request):118 headers = CIMultiDict(COOKIE='name=value')119 req = make_request('GET', '/', headers=headers)120 assert req.cookies == {'name': 'value'}121 with pytest.raises(TypeError):122 req.cookies['my'] = 'value'123def test_match_info(make_request):124 req = make_request('GET', '/')125 assert req.match_info is None126 match = {'a': 'b'}127 req._match_info = match128 assert match is req.match_info129def test_request_is_dict(make_request):130 req = make_request('GET', '/')131 assert isinstance(req, dict)132 req['key'] = 'value'133 assert 'value' == req['key']134def test_copy(make_request):135 req = make_request('GET', '/')136 with pytest.raises(NotImplementedError):137 req.copy()138def test___repr__(make_request):139 req = make_request('GET', '/path/to')140 assert "<Request GET /path/to >" == repr(req)141def test___repr___non_ascii_path(make_request):142 req = make_request('GET', '/path/\U0001f415\U0001f308')143 assert "<Request GET /path/\\U0001f415\\U0001f308 >" == repr(req)144def test_http_scheme(make_request):145 req = make_request('GET', '/')146 assert "http" == req.scheme147def test_https_scheme_by_ssl_transport(make_request):148 req = make_request('GET', '/', sslcontext=True)149 assert "https" == req.scheme150def test_https_scheme_by_secure_proxy_ssl_header(make_request):151 req = make_request('GET', '/',152 secure_proxy_ssl_header=('X-HEADER', '1'),153 headers=CIMultiDict({'X-HEADER': '1'}))154 assert "https" == req.scheme155def test_https_scheme_by_secure_proxy_ssl_header_false_test(make_request):156 req = make_request('GET', '/',157 secure_proxy_ssl_header=('X-HEADER', '1'),158 headers=CIMultiDict({'X-HEADER': '0'}))159 assert "http" == req.scheme160def test_raw_headers(make_request):161 req = make_request('GET', '/',162 headers=CIMultiDict({'X-HEADER': 'aaa'}))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful