Best Python code snippet using localstack_python
test_client_request.py
Source:test_client_request.py  
...15import aiohttp16from aiohttp import BaseConnector, helpers17from aiohttp.client_reqrep import ClientRequest, ClientResponse18@pytest.yield_fixture19def make_request(loop):20    request = None21    def maker(*args, **kwargs):22        nonlocal request23        request = ClientRequest(*args, loop=loop, **kwargs)24        return request25    yield maker26    if request is not None:27        loop.run_until_complete(request.close())28def test_method1(make_request):29    req = make_request('get', 'http://python.org/')30    assert req.method == 'GET'31def test_method2(make_request):32    req = make_request('head', 'http://python.org/')33    assert req.method == 'HEAD'34def test_method3(make_request):35    req = make_request('HEAD', 'http://python.org/')36    assert req.method == 'HEAD'37def test_version_1_0(make_request):38    req = make_request('get', 'http://python.org/', version='1.0')39    assert req.version == (1, 0)40def test_version_default(make_request):41    req = make_request('get', 'http://python.org/')42    assert req.version == (1, 1)43def test_version_err(make_request):44    with pytest.raises(ValueError):45        make_request('get', 'http://python.org/', version='1.c')46def test_host_port_default_http(make_request):47    req = make_request('get', 'http://python.org/')48    assert req.host == 'python.org'49    assert req.port == 8050    assert not req.ssl51def test_host_port_default_https(make_request):52    req = make_request('get', 'https://python.org/')53    assert req.host == 'python.org'54    assert req.port == 44355    assert req.ssl56def test_host_port_nondefault_http(make_request):57    req = make_request('get', 'http://python.org:960/')58    assert req.host == 'python.org'59    assert req.port == 96060    assert not req.ssl61def test_host_port_nondefault_https(make_request):62    req = make_request('get', 'https://python.org:960/')63    assert req.host == 'python.org'64    assert req.port == 96065    assert req.ssl66def test_host_port_default_ws(make_request):67    req = make_request('get', 'ws://python.org/')68    assert req.host == 'python.org'69    assert req.port == 8070    assert not req.ssl71def test_host_port_default_wss(make_request):72    req = make_request('get', 'wss://python.org/')73    assert req.host == 'python.org'74    assert req.port == 44375    assert req.ssl76def test_host_port_nondefault_ws(make_request):77    req = make_request('get', 'ws://python.org:960/')78    assert req.host == 'python.org'79    assert req.port == 96080    assert not req.ssl81def test_host_port_nondefault_wss(make_request):82    req = make_request('get', 'wss://python.org:960/')83    assert req.host == 'python.org'84    assert req.port == 96085    assert req.ssl86def test_host_port_err(make_request):87    with pytest.raises(ValueError):88        make_request('get', 'http://python.org:123e/')89def test_hostname_err(make_request):90    with pytest.raises(ValueError):91        make_request('get', 'http://:8080/')92def test_host_header_host_without_port(make_request):93    req = make_request('get', 'http://python.org/')94    assert req.headers['HOST'] == 'python.org'95def test_host_header_host_with_default_port(make_request):96    req = make_request('get', 'http://python.org:80/')97    assert req.headers['HOST'] == 'python.org:80'98def test_host_header_host_with_nondefault_port(make_request):99    req = make_request('get', 'http://python.org:99/')100    assert req.headers['HOST'] == 'python.org:99'101def test_host_header_explicit_host(make_request):102    req = make_request('get', 'http://python.org/',103                       headers={'host': 'example.com'})104    assert req.headers['HOST'] == 'example.com'105def test_host_header_explicit_host_with_port(make_request):106    req = make_request('get', 'http://python.org/',107                       headers={'host': 'example.com:99'})108    assert req.headers['HOST'] == 'example.com:99'109def test_default_loop(loop):110    asyncio.set_event_loop(loop)111    req = ClientRequest('get', 'http://python.org/')112    assert req.loop is loop113def test_default_headers_useragent(make_request):114    req = make_request('get', 'http://python.org/')115    assert 'SERVER' not in req.headers116    assert 'USER-AGENT' in req.headers117def test_default_headers_useragent_custom(make_request):118    req = make_request('get', 'http://python.org/',119                       headers={'user-agent': 'my custom agent'})120    assert 'USER-Agent' in req.headers121    assert 'my custom agent' == req.headers['User-Agent']122def test_skip_default_useragent_header(make_request):123    req = make_request('get', 'http://python.org/',124                       skip_auto_headers=set([upstr('user-agent')]))125    assert 'User-Agent' not in req.headers126def test_headers(make_request):127    req = make_request('get', 'http://python.org/',128                       headers={'Content-Type': 'text/plain'})129    assert 'CONTENT-TYPE' in req.headers130    assert req.headers['CONTENT-TYPE'] == 'text/plain'131    assert req.headers['ACCEPT-ENCODING'] == 'gzip, deflate'132def test_headers_list(make_request):133    req = make_request('get', 'http://python.org/',134                       headers=[('Content-Type', 'text/plain')])135    assert 'CONTENT-TYPE' in req.headers136    assert req.headers['CONTENT-TYPE'] == 'text/plain'137def test_headers_default(make_request):138    req = make_request('get', 'http://python.org/',139                       headers={'ACCEPT-ENCODING': 'deflate'})140    assert req.headers['ACCEPT-ENCODING'] == 'deflate'141def test_invalid_url(make_request):142    with pytest.raises(ValueError):143        make_request('get', 'hiwpefhipowhefopw')144def test_invalid_idna(make_request):145    with pytest.raises(ValueError):146        make_request('get', 'http://\u2061owhefopw.com')147def test_no_path(make_request):148    req = make_request('get', 'http://python.org')149    assert '/' == req.path150def test_ipv6_default_http_port(make_request):151    req = make_request('get', 'http://[2001:db8::1]/')152    assert req.host == '2001:db8::1'153    assert req.port == 80154    assert not req.ssl155def test_ipv6_default_https_port(make_request):156    req = make_request('get', 'https://[2001:db8::1]/')157    assert req.host == '2001:db8::1'158    assert req.port == 443159    assert req.ssl160def test_ipv6_nondefault_http_port(make_request):161    req = make_request('get', 'http://[2001:db8::1]:960/')162    assert req.host == '2001:db8::1'163    assert req.port == 960164    assert not req.ssl165def test_ipv6_nondefault_https_port(make_request):166    req = make_request('get', 'https://[2001:db8::1]:960/')167    assert req.host == '2001:db8::1'168    assert req.port == 960169    assert req.ssl170def test_basic_auth(make_request):171    req = make_request('get', 'http://python.org',172                       auth=aiohttp.helpers.BasicAuth('nkim', '1234'))173    assert 'AUTHORIZATION' in req.headers174    assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']175def test_basic_auth_utf8(make_request):176    req = make_request('get', 'http://python.org',177                       auth=aiohttp.helpers.BasicAuth('nkim', 'ÑекÑеÑ',178                                                      'utf-8'))179    assert 'AUTHORIZATION' in req.headers180    assert 'Basic bmtpbTrRgdC10LrRgNC10YI=' == req.headers['AUTHORIZATION']181def test_basic_auth_tuple_forbidden(make_request):182    with pytest.raises(TypeError):183        make_request('get', 'http://python.org',184                     auth=('nkim', '1234'))185def test_basic_auth_from_url(make_request):186    req = make_request('get', 'http://nkim:1234@python.org')187    assert 'AUTHORIZATION' in req.headers188    assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']189    assert 'python.org' == req.netloc190def test_basic_auth_from_url_overriden(make_request):191    req = make_request('get', 'http://garbage@python.org',192                       auth=aiohttp.BasicAuth('nkim', '1234'))193    assert 'AUTHORIZATION' in req.headers194    assert 'Basic bmtpbToxMjM0' == req.headers['AUTHORIZATION']195    assert 'python.org' == req.netloc196def test_path_is_not_double_encoded1(make_request):197    req = make_request('get', "http://0.0.0.0/get/test case")198    assert req.path == "/get/test%20case"199def test_path_is_not_double_encoded2(make_request):200    req = make_request('get', "http://0.0.0.0/get/test%2fcase")201    assert req.path == "/get/test%2fcase"202def test_path_is_not_double_encoded3(make_request):203    req = make_request('get', "http://0.0.0.0/get/test%20case")204    assert req.path == "/get/test%20case"205def test_path_safe_chars_preserved(make_request):206    req = make_request('get', "http://0.0.0.0/get/%:=")207    assert req.path == "/get/%:="208def test_params_are_added_before_fragment1(make_request):209    req = make_request('GET', "http://example.com/path#fragment",210                       params={"a": "b"})211    assert req.url == "http://example.com/path?a=b#fragment"212def test_params_are_added_before_fragment2(make_request):213    req = make_request('GET', "http://example.com/path?key=value#fragment",214                       params={"a": "b"})215    assert req.url == "http://example.com/path?key=value&a=b#fragment"216def test_path_not_contain_fragment1(make_request):217    req = make_request('GET', "http://example.com/path#fragment")218    assert req.path == "/path"219def test_path_not_contain_fragment2(make_request):220    req = make_request('GET', "http://example.com/path?key=value#fragment")221    assert req.path == "/path?key=value"222def test_cookies(make_request):223    req = make_request('get', 'http://test.com/path',224                       cookies={'cookie1': 'val1'})225    assert 'COOKIE' in req.headers226    assert 'cookie1=val1' == req.headers['COOKIE']227def test_cookies_merge_with_headers(make_request):228    req = make_request('get', 'http://test.com/path',229                       headers={'cookie': 'cookie1=val1'},230                       cookies={'cookie2': 'val2'})231    assert 'cookie1=val1; cookie2=val2' == req.headers['COOKIE']232def test_unicode_get1(make_request):233    req = make_request('get', 'http://python.org',234                       params={'foo': 'f\xf8\xf8'})235    assert '/?foo=f%C3%B8%C3%B8' == req.path236def test_unicode_get2(make_request):237    req = make_request('', 'http://python.org',238                       params={'f\xf8\xf8': 'f\xf8\xf8'})239    assert '/?f%C3%B8%C3%B8=f%C3%B8%C3%B8' == req.path240def test_unicode_get3(make_request):241    req = make_request('', 'http://python.org', params={'foo': 'foo'})242    assert '/?foo=foo' == req.path243def test_unicode_get4(make_request):244    def join(*suffix):245        return urllib.parse.urljoin('http://python.org/', '/'.join(suffix))246    req = make_request('', join('\xf8'), params={'foo': 'foo'})247    assert '/%C3%B8?foo=foo' == req.path248def test_query_multivalued_param(make_request):249    for meth in ClientRequest.ALL_METHODS:250        req = make_request(251            meth, 'http://python.org',252            params=(('test', 'foo'), ('test', 'baz')))253        assert req.path == '/?test=foo&test=baz'254def test_query_str_param(make_request):255    for meth in ClientRequest.ALL_METHODS:256        req = make_request(meth, 'http://python.org', params='test=foo')257        assert req.path == '/?test=foo'258def test_query_bytes_param_raises(make_request):259    for meth in ClientRequest.ALL_METHODS:260        with pytest.raises(TypeError) as ctx:261            make_request(meth, 'http://python.org', params=b'test=foo')262        assert re.match('not a valid non-string.*or mapping', str(ctx.value))263def test_query_str_param_is_not_encoded(make_request):264    for meth in ClientRequest.ALL_METHODS:265        req = make_request(meth, 'http://python.org', params='test=f+oo')266        assert req.path == '/?test=f+oo'267def test_params_update_path_and_url(make_request):268    req = make_request('get', 'http://python.org',269                       params=(('test', 'foo'), ('test', 'baz')))270    assert req.path == '/?test=foo&test=baz'271    assert req.url == 'http://python.org/?test=foo&test=baz'272def test_params_empty_path_and_url(make_request):273    req_empty = make_request('get', 'http://python.org', params={})274    assert req_empty.path == '/'275    assert req_empty.url == 'http://python.org/'276    req_none = make_request('get', 'http://python.org')277    assert req_none.path == '/'278    assert req_none.url == 'http://python.org/'279def test_gen_netloc_all(make_request):280    req = make_request('get',281                       'https://aiohttp:pwpwpw@' +282                       '12345678901234567890123456789' +283                       '012345678901234567890:8080')284    assert req.netloc == '12345678901234567890123456789' +\285                         '012345678901234567890:8080'286def test_gen_netloc_no_port(make_request):287    req = make_request('get',288                       'https://aiohttp:pwpwpw@' +289                       '12345678901234567890123456789' +290                       '012345678901234567890/')291    assert req.netloc == '12345678901234567890123456789' +\292                         '012345678901234567890'293def test_gen_notloc_failed(make_request):294    with pytest.raises(ValueError) as excinfo:295        make_request('get',296                     'https://aiohttp:pwpwpw@' +297                     '123456789012345678901234567890123456789' +298                     '01234567890123456789012345/')299        assert excinfo.value.message == "URL has an invalid label."300class TestClientRequest(unittest.TestCase):301    def setUp(self):302        self.loop = asyncio.new_event_loop()303        asyncio.set_event_loop(None)304        self.transport = mock.Mock()305        self.connection = mock.Mock()306        self.protocol = mock.Mock()307        self.protocol.writer.drain.return_value = ()308        self.stream = aiohttp.StreamParser(loop=self.loop)309        self.connector = BaseConnector(loop=self.loop)...test_web_websocket.py
Source:test_web_websocket.py  
...21    ret = mock.Mock()22    ret.set_parser.return_value = ret23    return ret24@pytest.fixture25def make_request(app, writer, reader):26    def maker(method, path, headers=None, protocols=False):27        if headers is None:28            headers = CIMultiDict(29                {'HOST': 'server.example.com',30                 'UPGRADE': 'websocket',31                 'CONNECTION': 'Upgrade',32                 'SEC-WEBSOCKET-KEY': 'dGhlIHNhbXBsZSBub25jZQ==',33                 'ORIGIN': 'http://example.com',34                 'SEC-WEBSOCKET-VERSION': '13'})35        if protocols:36            headers['SEC-WEBSOCKET-PROTOCOL'] = 'chat, superchat'37        return make_mocked_request(method, path, headers,38                                   app=app, writer=writer, reader=reader)39    return maker40def test_nonstarted_ping():41    ws = WebSocketResponse()42    with pytest.raises(RuntimeError):43        ws.ping()44def test_nonstarted_pong():45    ws = WebSocketResponse()46    with pytest.raises(RuntimeError):47        ws.pong()48def test_nonstarted_send_str():49    ws = WebSocketResponse()50    with pytest.raises(RuntimeError):51        ws.send_str('string')52def test_nonstarted_send_bytes():53    ws = WebSocketResponse()54    with pytest.raises(RuntimeError):55        ws.send_bytes(b'bytes')56def test_nonstarted_send_json():57    ws = WebSocketResponse()58    with pytest.raises(RuntimeError):59        ws.send_json({'type': 'json'})60@asyncio.coroutine61def test_nonstarted_close():62    ws = WebSocketResponse()63    with pytest.raises(RuntimeError):64        yield from ws.close()65@asyncio.coroutine66def test_nonstarted_receive_str():67    ws = WebSocketResponse()68    with pytest.raises(RuntimeError):69        yield from ws.receive_str()70@asyncio.coroutine71def test_nonstarted_receive_bytes():72    ws = WebSocketResponse()73    with pytest.raises(RuntimeError):74        yield from ws.receive_bytes()75@asyncio.coroutine76def test_nonstarted_receive_json():77    ws = WebSocketResponse()78    with pytest.raises(RuntimeError):79        yield from ws.receive_json()80@asyncio.coroutine81def test_receive_str_nonstring(make_request):82    req = make_request('GET', '/')83    ws = WebSocketResponse()84    yield from ws.prepare(req)85    @asyncio.coroutine86    def receive():87        return WSMessage(WSMsgType.BINARY, b'data', b'')88    ws.receive = receive89    with pytest.raises(TypeError):90        yield from ws.receive_str()91@asyncio.coroutine92def test_receive_bytes_nonsbytes(make_request):93    req = make_request('GET', '/')94    ws = WebSocketResponse()95    yield from ws.prepare(req)96    @asyncio.coroutine97    def receive():98        return WSMessage(WSMsgType.TEXT, 'data', b'')99    ws.receive = receive100    with pytest.raises(TypeError):101        yield from ws.receive_bytes()102@asyncio.coroutine103def test_send_str_nonstring(make_request):104    req = make_request('GET', '/')105    ws = WebSocketResponse()106    yield from ws.prepare(req)107    with pytest.raises(TypeError):108        ws.send_str(b'bytes')109@asyncio.coroutine110def test_send_bytes_nonbytes(make_request):111    req = make_request('GET', '/')112    ws = WebSocketResponse()113    yield from ws.prepare(req)114    with pytest.raises(TypeError):115        ws.send_bytes('string')116@asyncio.coroutine117def test_send_json_nonjson(make_request):118    req = make_request('GET', '/')119    ws = WebSocketResponse()120    yield from ws.prepare(req)121    with pytest.raises(TypeError):122        ws.send_json(set())123def test_write_non_prepared():124    ws = WebSocketResponse()125    with pytest.raises(RuntimeError):126        ws.write(b'data')127def test_websocket_ready():128    websocket_ready = WebSocketReady(True, 'chat')129    assert websocket_ready.ok is True130    assert websocket_ready.protocol == 'chat'131def test_websocket_not_ready():132    websocket_ready = WebSocketReady(False, None)133    assert websocket_ready.ok is False134    assert websocket_ready.protocol is None135def test_websocket_ready_unknown_protocol():136    websocket_ready = WebSocketReady(True, None)137    assert websocket_ready.ok is True138    assert websocket_ready.protocol is None139def test_bool_websocket_ready():140    websocket_ready = WebSocketReady(True, None)141    assert bool(websocket_ready) is True142def test_bool_websocket_not_ready():143    websocket_ready = WebSocketReady(False, None)144    assert bool(websocket_ready) is False145def test_can_prepare_ok(make_request):146    req = make_request('GET', '/', protocols=True)147    ws = WebSocketResponse(protocols=('chat',))148    assert(True, 'chat') == ws.can_prepare(req)149def test_can_prepare_unknown_protocol(make_request):150    req = make_request('GET', '/')151    ws = WebSocketResponse()152    assert (True, None) == ws.can_prepare(req)153def test_can_prepare_invalid_method(make_request):154    req = make_request('POST', '/')155    ws = WebSocketResponse()156    assert (False, None) == ws.can_prepare(req)157def test_can_prepare_without_upgrade(make_request):158    req = make_request('GET', '/',159                       headers=CIMultiDict({}))160    ws = WebSocketResponse()161    assert (False, None) == ws.can_prepare(req)162@asyncio.coroutine163def test_can_prepare_started(make_request):164    req = make_request('GET', '/')165    ws = WebSocketResponse()166    yield from ws.prepare(req)167    with pytest.raises(RuntimeError) as ctx:168        ws.can_prepare(req)169    assert 'Already started' in str(ctx.value)170def test_closed_after_ctor():171    ws = WebSocketResponse()172    assert not ws.closed173    assert ws.close_code is None174@asyncio.coroutine175def test_send_str_closed(make_request):176    req = make_request('GET', '/')177    ws = WebSocketResponse()178    yield from ws.prepare(req)179    yield from ws.close()180    with pytest.raises(RuntimeError):181        ws.send_str('string')182@asyncio.coroutine183def test_send_bytes_closed(make_request):184    req = make_request('GET', '/')185    ws = WebSocketResponse()186    yield from ws.prepare(req)187    yield from ws.close()188    with pytest.raises(RuntimeError):189        ws.send_bytes(b'bytes')190@asyncio.coroutine191def test_send_json_closed(make_request):192    req = make_request('GET', '/')193    ws = WebSocketResponse()194    yield from ws.prepare(req)195    yield from ws.close()196    with pytest.raises(RuntimeError):197        ws.send_json({'type': 'json'})198@asyncio.coroutine199def test_ping_closed(make_request):200    req = make_request('GET', '/')201    ws = WebSocketResponse()202    yield from ws.prepare(req)203    yield from ws.close()204    with pytest.raises(RuntimeError):205        ws.ping()206@asyncio.coroutine207def test_pong_closed(make_request):208    req = make_request('GET', '/')209    ws = WebSocketResponse()210    yield from ws.prepare(req)211    yield from ws.close()212    with pytest.raises(RuntimeError):213        ws.pong()214@asyncio.coroutine215def test_close_idempotent(make_request, writer):216    req = make_request('GET', '/')217    ws = WebSocketResponse()218    yield from ws.prepare(req)219    assert (yield from ws.close(code=1, message='message1'))220    assert ws.closed221    assert not (yield from ws.close(code=2, message='message2'))222@asyncio.coroutine223def test_start_invalid_method(make_request):224    req = make_request('POST', '/')225    ws = WebSocketResponse()226    with pytest.raises(HTTPMethodNotAllowed):227        yield from ws.prepare(req)228@asyncio.coroutine229def test_start_without_upgrade(make_request):230    req = make_request('GET', '/',231                       headers=CIMultiDict({}))232    ws = WebSocketResponse()233    with pytest.raises(HTTPBadRequest):234        yield from ws.prepare(req)235@asyncio.coroutine236def test_wait_closed_before_start():237    ws = WebSocketResponse()238    with pytest.raises(RuntimeError):239        yield from ws.close()240@asyncio.coroutine241def test_write_eof_not_started():242    ws = WebSocketResponse()243    with pytest.raises(RuntimeError):244        yield from ws.write_eof()245@asyncio.coroutine246def test_write_eof_idempotent(make_request):247    req = make_request('GET', '/')248    ws = WebSocketResponse()249    yield from ws.prepare(req)250    yield from ws.close()251    yield from ws.write_eof()252    yield from ws.write_eof()253    yield from ws.write_eof()254@asyncio.coroutine255def test_receive_exc_in_reader(make_request, loop, reader):256    req = make_request('GET', '/')257    ws = WebSocketResponse()258    yield from ws.prepare(req)259    exc = ValueError()260    res = helpers.create_future(loop)261    res.set_exception(exc)262    reader.read = make_mocked_coro(res)263    msg = yield from ws.receive()264    assert msg.type == WSMsgType.ERROR265    assert msg.type is msg.tp266    assert msg.data is exc267    assert ws.exception() is exc268@asyncio.coroutine269def test_receive_cancelled(make_request, loop, reader):270    req = make_request('GET', '/')271    ws = WebSocketResponse()272    yield from ws.prepare(req)273    res = helpers.create_future(loop)274    res.set_exception(asyncio.CancelledError())275    reader.read = make_mocked_coro(res)276    with pytest.raises(asyncio.CancelledError):277        yield from ws.receive()278@asyncio.coroutine279def test_receive_timeouterror(make_request, loop, reader):280    req = make_request('GET', '/')281    ws = WebSocketResponse()282    yield from ws.prepare(req)283    res = helpers.create_future(loop)284    res.set_exception(asyncio.TimeoutError())285    reader.read = make_mocked_coro(res)286    with pytest.raises(asyncio.TimeoutError):287        yield from ws.receive()288@asyncio.coroutine289def test_receive_client_disconnected(make_request, loop, reader):290    req = make_request('GET', '/')291    ws = WebSocketResponse()292    yield from ws.prepare(req)293    exc = errors.ClientDisconnectedError()294    res = helpers.create_future(loop)295    res.set_exception(exc)296    reader.read = make_mocked_coro(res)297    msg = yield from ws.receive()298    assert ws.closed299    assert msg.type == WSMsgType.CLOSE300    assert msg.type is msg.tp301    assert msg.data is None302    assert ws.exception() is None303@asyncio.coroutine304def test_multiple_receive_on_close_connection(make_request):305    req = make_request('GET', '/')306    ws = WebSocketResponse()307    yield from ws.prepare(req)308    yield from ws.close()309    yield from ws.receive()310    yield from ws.receive()311    yield from ws.receive()312    yield from ws.receive()313    with pytest.raises(RuntimeError):314        yield from ws.receive()315@asyncio.coroutine316def test_concurrent_receive(make_request):317    req = make_request('GET', '/')318    ws = WebSocketResponse()319    yield from ws.prepare(req)320    ws._waiting = True321    with pytest.raises(RuntimeError):322        yield from ws.receive()323@asyncio.coroutine324def test_close_exc(make_request, reader, loop):325    req = make_request('GET', '/')326    ws = WebSocketResponse()327    yield from ws.prepare(req)328    exc = ValueError()329    reader.read.return_value = helpers.create_future(loop)330    reader.read.return_value.set_exception(exc)331    yield from ws.close()332    assert ws.closed333    assert ws.exception() is exc334    ws._closed = False335    reader.read.return_value = helpers.create_future(loop)336    reader.read.return_value.set_exception(asyncio.CancelledError())337    with pytest.raises(asyncio.CancelledError):338        yield from ws.close()339    assert ws.close_code == 1006340@asyncio.coroutine341def test_close_exc2(make_request):342    req = make_request('GET', '/')343    ws = WebSocketResponse()344    yield from ws.prepare(req)345    exc = ValueError()346    ws._writer = mock.Mock()347    ws._writer.close.side_effect = exc348    yield from ws.close()349    assert ws.closed350    assert ws.exception() is exc351    ws._closed = False352    ws._writer.close.side_effect = asyncio.CancelledError()353    with pytest.raises(asyncio.CancelledError):354        yield from ws.close()355def test_start_twice_idempotent(make_request):356    req = make_request('GET', '/')357    ws = WebSocketResponse()358    with pytest.warns(DeprecationWarning):359        impl1 = ws.start(req)360        impl2 = ws.start(req)361        assert impl1 is impl2362def test_can_start_ok(make_request):363    req = make_request('GET', '/', protocols=True)364    ws = WebSocketResponse(protocols=('chat',))365    with pytest.warns(DeprecationWarning):366        assert (True, 'chat') == ws.can_start(req)367def test_msgtype_alias():368    # deprecated since 1.0...test_web_request.py
Source:test_web_request.py  
...4from multidict import CIMultiDict, MultiDict5from aiohttp.protocol import HttpVersion6from aiohttp.test_utils import make_mocked_request7@pytest.fixture8def make_request():9    return make_mocked_request10def test_ctor(make_request):11    req = make_request('GET', '/path/to?a=1&b=2')12    assert 'GET' == req.method13    assert HttpVersion(1, 1) == req.version14    assert req.host is None15    assert '/path/to?a=1&b=2' == req.path_qs16    assert '/path/to' == req.path17    assert 'a=1&b=2' == req.query_string18    assert CIMultiDict() == req.headers19    assert () == req.raw_headers20    get = req.GET21    assert MultiDict([('a', '1'), ('b', '2')]) == get22    # second call should return the same object23    assert get is req.GET24    assert req.keep_alive25    # just make sure that all lines of make_mocked_request covered26    headers = CIMultiDict(FOO='bar')27    reader = mock.Mock()28    writer = mock.Mock()29    payload = mock.Mock()30    transport = mock.Mock()31    app = mock.Mock()32    req = make_request('GET', '/path/to?a=1&b=2', headers=headers,33                       writer=writer, reader=reader, payload=payload,34                       transport=transport, app=app)35    assert req.app is app36    assert req.content is payload37    assert req.transport is transport38    assert req._reader is reader39    assert req._writer is writer40    assert req.headers == headers41    assert req.raw_headers == ((b'Foo', b'bar'),)42def test_doubleslashes(make_request):43    req = make_request('GET', '//foo/')44    assert '//foo/' == req.path45def test_POST(make_request):46    req = make_request('POST', '/')47    with pytest.raises(RuntimeError):48        req.POST49    marker = object()50    req._post = marker51    assert req.POST is marker52    assert req.POST is marker53def test_content_type_not_specified(make_request):54    req = make_request('Get', '/')55    assert 'application/octet-stream' == req.content_type56def test_content_type_from_spec(make_request):57    req = make_request('Get', '/',58                       CIMultiDict([('CONTENT-TYPE', 'application/json')]))59    assert 'application/json' == req.content_type60def test_content_type_from_spec_with_charset(make_request):61    req = make_request(62        'Get', '/',63        CIMultiDict([('CONTENT-TYPE', 'text/html; charset=UTF-8')]))64    assert 'text/html' == req.content_type65    assert 'UTF-8' == req.charset66def test_calc_content_type_on_getting_charset(make_request):67    req = make_request(68        'Get', '/',69        CIMultiDict([('CONTENT-TYPE', 'text/html; charset=UTF-8')]))70    assert 'UTF-8' == req.charset71    assert 'text/html' == req.content_type72def test_urlencoded_querystring(make_request):73    req = make_request('GET',74                       '/yandsearch?text=%D1%82%D0%B5%D0%BA%D1%81%D1%82')75    assert {'text': 'ÑекÑÑ'} == req.GET76def test_non_ascii_path(make_request):77    req = make_request('GET', '/пÑÑÑ')78    assert '/пÑÑÑ' == req.path79def test_content_length(make_request):80    req = make_request('Get', '/',81                       CIMultiDict([('CONTENT-LENGTH', '123')]))82    assert 123 == req.content_length83def test_non_keepalive_on_http10(make_request):84    req = make_request('GET', '/', version=HttpVersion(1, 0))85    assert not req.keep_alive86def test_non_keepalive_on_closing(make_request):87    req = make_request('GET', '/', closing=True)88    assert not req.keep_alive89@asyncio.coroutine90def test_call_POST_on_GET_request(make_request):91    req = make_request('GET', '/')92    ret = yield from req.post()93    assert CIMultiDict() == ret94@asyncio.coroutine95def test_call_POST_on_weird_content_type(make_request):96    req = make_request(97        'POST', '/',98        headers=CIMultiDict({'CONTENT-TYPE': 'something/weird'}))99    ret = yield from req.post()100    assert CIMultiDict() == ret101@asyncio.coroutine102def test_call_POST_twice(make_request):103    req = make_request('GET', '/')104    ret1 = yield from req.post()105    ret2 = yield from req.post()106    assert ret1 is ret2107def test_no_request_cookies(make_request):108    req = make_request('GET', '/')109    assert req.cookies == {}110    cookies = req.cookies111    assert cookies is req.cookies112def test_request_cookie(make_request):113    headers = CIMultiDict(COOKIE='cookie1=value1; cookie2=value2')114    req = make_request('GET', '/', headers=headers)115    assert req.cookies == {'cookie1': 'value1',116                           'cookie2': 'value2'}117def test_request_cookie__set_item(make_request):118    headers = CIMultiDict(COOKIE='name=value')119    req = make_request('GET', '/', headers=headers)120    assert req.cookies == {'name': 'value'}121    with pytest.raises(TypeError):122        req.cookies['my'] = 'value'123def test_match_info(make_request):124    req = make_request('GET', '/')125    assert req.match_info is None126    match = {'a': 'b'}127    req._match_info = match128    assert match is req.match_info129def test_request_is_dict(make_request):130    req = make_request('GET', '/')131    assert isinstance(req, dict)132    req['key'] = 'value'133    assert 'value' == req['key']134def test_copy(make_request):135    req = make_request('GET', '/')136    with pytest.raises(NotImplementedError):137        req.copy()138def test___repr__(make_request):139    req = make_request('GET', '/path/to')140    assert "<Request GET /path/to >" == repr(req)141def test___repr___non_ascii_path(make_request):142    req = make_request('GET', '/path/\U0001f415\U0001f308')143    assert "<Request GET /path/\\U0001f415\\U0001f308 >" == repr(req)144def test_http_scheme(make_request):145    req = make_request('GET', '/')146    assert "http" == req.scheme147def test_https_scheme_by_ssl_transport(make_request):148    req = make_request('GET', '/', sslcontext=True)149    assert "https" == req.scheme150def test_https_scheme_by_secure_proxy_ssl_header(make_request):151    req = make_request('GET', '/',152                       secure_proxy_ssl_header=('X-HEADER', '1'),153                       headers=CIMultiDict({'X-HEADER': '1'}))154    assert "https" == req.scheme155def test_https_scheme_by_secure_proxy_ssl_header_false_test(make_request):156    req = make_request('GET', '/',157                       secure_proxy_ssl_header=('X-HEADER', '1'),158                       headers=CIMultiDict({'X-HEADER': '0'}))159    assert "http" == req.scheme160def test_raw_headers(make_request):161    req = make_request('GET', '/',162                       headers=CIMultiDict({'X-HEADER': 'aaa'}))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
