How to use receive_frame method in tempest

Best Python code snippet using tempest_python

test_protocol.py

Source:test_protocol.py Github

copy

Full Screen

...77 self.protocol._drain = delayed_drain78 close_frame = Frame(True, OP_CLOSE, serialize_close(1000, "close"))79 local_close = Frame(True, OP_CLOSE, serialize_close(1000, "local"))80 remote_close = Frame(True, OP_CLOSE, serialize_close(1000, "remote"))81 def receive_frame(self, frame):82 """83 Make the protocol receive a frame.84 """85 write = self.protocol.data_received86 mask = not self.protocol.is_client87 frame.write(write, mask=mask)88 def receive_eof(self):89 """90 Make the protocol receive the end of the data stream.91 Since ``WebSocketCommonProtocol.eof_received`` returns ``None``, an92 actual transport would close itself after calling it. This function93 emulates that behavior.94 """95 self.protocol.eof_received()96 self.loop.call_soon(self.transport.close)97 def receive_eof_if_client(self):98 """99 Like receive_eof, but only if this is the client side.100 Since the server is supposed to initiate the termination of the TCP101 connection, this method helps making tests work for both sides.102 """103 if self.protocol.is_client:104 self.receive_eof()105 def close_connection(self, code=1000, reason="close"):106 """107 Execute a closing handshake.108 This puts the connection in the CLOSED state.109 """110 close_frame_data = serialize_close(code, reason)111 # Prepare the response to the closing handshake from the remote side.112 self.receive_frame(Frame(True, OP_CLOSE, close_frame_data))113 self.receive_eof_if_client()114 # Trigger the closing handshake from the local side and complete it.115 self.loop.run_until_complete(self.protocol.close(code, reason))116 # Empty the outgoing data stream so we can make assertions later on.117 self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)118 assert self.protocol.state is State.CLOSED119 def half_close_connection_local(self, code=1000, reason="close"):120 """121 Start a closing handshake but do not complete it.122 The main difference with `close_connection` is that the connection is123 left in the CLOSING state until the event loop runs again.124 The current implementation returns a task that must be awaited or125 canceled, else asyncio complains about destroying a pending task.126 """127 close_frame_data = serialize_close(code, reason)128 # Trigger the closing handshake from the local endpoint.129 close_task = self.loop.create_task(self.protocol.close(code, reason))130 self.run_loop_once() # wait_for executes131 self.run_loop_once() # write_frame executes132 # Empty the outgoing data stream so we can make assertions later on.133 self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)134 assert self.protocol.state is State.CLOSING135 # Complete the closing sequence at 1ms intervals so the test can run136 # at each point even it goes back to the event loop several times.137 self.loop.call_later(138 MS, self.receive_frame, Frame(True, OP_CLOSE, close_frame_data)139 )140 self.loop.call_later(2 * MS, self.receive_eof_if_client)141 # This task must be awaited or canceled by the caller.142 return close_task143 def half_close_connection_remote(self, code=1000, reason="close"):144 """145 Receive a closing handshake but do not complete it.146 The main difference with `close_connection` is that the connection is147 left in the CLOSING state until the event loop runs again.148 """149 # On the server side, websockets completes the closing handshake and150 # closes the TCP connection immediately. Yield to the event loop after151 # sending the close frame to run the test while the connection is in152 # the CLOSING state.153 if not self.protocol.is_client:154 self.make_drain_slow()155 close_frame_data = serialize_close(code, reason)156 # Trigger the closing handshake from the remote endpoint.157 self.receive_frame(Frame(True, OP_CLOSE, close_frame_data))158 self.run_loop_once() # read_frame executes159 # Empty the outgoing data stream so we can make assertions later on.160 self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)161 assert self.protocol.state is State.CLOSING162 # Complete the closing sequence at 1ms intervals so the test can run163 # at each point even it goes back to the event loop several times.164 self.loop.call_later(2 * MS, self.receive_eof_if_client)165 def process_invalid_frames(self):166 """167 Make the protocol fail quickly after simulating invalid data.168 To achieve this, this function triggers the protocol's eof_received,169 which interrupts pending reads waiting for more data.170 """171 self.run_loop_once()172 self.receive_eof()173 self.loop.run_until_complete(self.protocol.close_connection_task)174 def sent_frames(self):175 """176 Read all frames sent to the transport.177 """178 stream = asyncio.StreamReader(loop=self.loop)179 for (data,), kw in self.transport.write.call_args_list:180 stream.feed_data(data)181 self.transport.write.call_args_list = []182 stream.feed_eof()183 frames = []184 while not stream.at_eof():185 frames.append(186 self.loop.run_until_complete(187 Frame.read(stream.readexactly, mask=self.protocol.is_client)188 )189 )190 return frames191 def last_sent_frame(self):192 """193 Read the last frame sent to the transport.194 This method assumes that at most one frame was sent. It raises an195 AssertionError otherwise.196 """197 frames = self.sent_frames()198 if frames:199 assert len(frames) == 1200 return frames[0]201 def assertFramesSent(self, *frames):202 self.assertEqual(self.sent_frames(), [Frame(*args) for args in frames])203 def assertOneFrameSent(self, *args):204 self.assertEqual(self.last_sent_frame(), Frame(*args))205 def assertNoFrameSent(self):206 self.assertIsNone(self.last_sent_frame())207 def assertConnectionClosed(self, code, message):208 # The following line guarantees that connection_lost was called.209 self.assertEqual(self.protocol.state, State.CLOSED)210 # A close frame was received.211 self.assertEqual(self.protocol.close_code, code)212 self.assertEqual(self.protocol.close_reason, message)213 def assertConnectionFailed(self, code, message):214 # The following line guarantees that connection_lost was called.215 self.assertEqual(self.protocol.state, State.CLOSED)216 # No close frame was received.217 self.assertEqual(self.protocol.close_code, 1006)218 self.assertEqual(self.protocol.close_reason, "")219 # A close frame was sent -- unless the connection was already lost.220 if code == 1006:221 self.assertNoFrameSent()222 else:223 self.assertOneFrameSent(True, OP_CLOSE, serialize_close(code, message))224 @contextlib.contextmanager225 def assertCompletesWithin(self, min_time, max_time):226 t0 = self.loop.time()227 yield228 t1 = self.loop.time()229 dt = t1 - t0230 self.assertGreaterEqual(dt, min_time, f"Too fast: {dt} < {min_time}")231 self.assertLess(dt, max_time, f"Too slow: {dt} >= {max_time}")232 # Test constructor.233 def test_timeout_backwards_compatibility(self):234 with warnings.catch_warnings(record=True) as recorded_warnings:235 protocol = WebSocketCommonProtocol(timeout=5)236 self.assertEqual(protocol.close_timeout, 5)237 self.assertEqual(len(recorded_warnings), 1)238 warning = recorded_warnings[0].message239 self.assertEqual(str(warning), "rename timeout to close_timeout")240 self.assertEqual(type(warning), DeprecationWarning)241 # Test public attributes.242 def test_local_address(self):243 get_extra_info = unittest.mock.Mock(return_value=("host", 4312))244 self.transport.get_extra_info = get_extra_info245 self.assertEqual(self.protocol.local_address, ("host", 4312))246 get_extra_info.assert_called_with("sockname")247 def test_local_address_before_connection(self):248 # Emulate the situation before connection_open() runs.249 _transport = self.protocol.transport250 del self.protocol.transport251 try:252 self.assertEqual(self.protocol.local_address, None)253 finally:254 self.protocol.transport = _transport255 def test_remote_address(self):256 get_extra_info = unittest.mock.Mock(return_value=("host", 4312))257 self.transport.get_extra_info = get_extra_info258 self.assertEqual(self.protocol.remote_address, ("host", 4312))259 get_extra_info.assert_called_with("peername")260 def test_remote_address_before_connection(self):261 # Emulate the situation before connection_open() runs.262 _transport = self.protocol.transport263 del self.protocol.transport264 try:265 self.assertEqual(self.protocol.remote_address, None)266 finally:267 self.protocol.transport = _transport268 def test_open(self):269 self.assertTrue(self.protocol.open)270 self.close_connection()271 self.assertFalse(self.protocol.open)272 def test_closed(self):273 self.assertFalse(self.protocol.closed)274 self.close_connection()275 self.assertTrue(self.protocol.closed)276 def test_wait_closed(self):277 wait_closed = self.loop.create_task(self.protocol.wait_closed())278 self.assertFalse(wait_closed.done())279 self.close_connection()280 self.assertTrue(wait_closed.done())281 # Test the recv coroutine.282 def test_recv_text(self):283 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))284 data = self.loop.run_until_complete(self.protocol.recv())285 self.assertEqual(data, "café")286 def test_recv_binary(self):287 self.receive_frame(Frame(True, OP_BINARY, b"tea"))288 data = self.loop.run_until_complete(self.protocol.recv())289 self.assertEqual(data, b"tea")290 def test_recv_on_closing_connection_local(self):291 close_task = self.half_close_connection_local()292 with self.assertRaises(ConnectionClosed):293 self.loop.run_until_complete(self.protocol.recv())294 self.loop.run_until_complete(close_task) # cleanup295 def test_recv_on_closing_connection_remote(self):296 self.half_close_connection_remote()297 with self.assertRaises(ConnectionClosed):298 self.loop.run_until_complete(self.protocol.recv())299 def test_recv_on_closed_connection(self):300 self.close_connection()301 with self.assertRaises(ConnectionClosed):302 self.loop.run_until_complete(self.protocol.recv())303 def test_recv_protocol_error(self):304 self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8")))305 self.process_invalid_frames()306 self.assertConnectionFailed(1002, "")307 def test_recv_unicode_error(self):308 self.receive_frame(Frame(True, OP_TEXT, "café".encode("latin-1")))309 self.process_invalid_frames()310 self.assertConnectionFailed(1007, "")311 def test_recv_text_payload_too_big(self):312 self.protocol.max_size = 1024313 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8") * 205))314 self.process_invalid_frames()315 self.assertConnectionFailed(1009, "")316 def test_recv_binary_payload_too_big(self):317 self.protocol.max_size = 1024318 self.receive_frame(Frame(True, OP_BINARY, b"tea" * 342))319 self.process_invalid_frames()320 self.assertConnectionFailed(1009, "")321 def test_recv_text_no_max_size(self):322 self.protocol.max_size = None # for test coverage323 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8") * 205))324 data = self.loop.run_until_complete(self.protocol.recv())325 self.assertEqual(data, "café" * 205)326 def test_recv_binary_no_max_size(self):327 self.protocol.max_size = None # for test coverage328 self.receive_frame(Frame(True, OP_BINARY, b"tea" * 342))329 data = self.loop.run_until_complete(self.protocol.recv())330 self.assertEqual(data, b"tea" * 342)331 def test_recv_queue_empty(self):332 recv = self.loop.create_task(self.protocol.recv())333 with self.assertRaises(asyncio.TimeoutError):334 self.loop.run_until_complete(335 asyncio.wait_for(asyncio.shield(recv), timeout=MS)336 )337 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))338 data = self.loop.run_until_complete(recv)339 self.assertEqual(data, "café")340 def test_recv_queue_full(self):341 self.protocol.max_queue = 2342 # Test internals because it's hard to verify buffers from the outside.343 self.assertEqual(list(self.protocol.messages), [])344 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))345 self.run_loop_once()346 self.assertEqual(list(self.protocol.messages), ["café"])347 self.receive_frame(Frame(True, OP_BINARY, b"tea"))348 self.run_loop_once()349 self.assertEqual(list(self.protocol.messages), ["café", b"tea"])350 self.receive_frame(Frame(True, OP_BINARY, b"milk"))351 self.run_loop_once()352 self.assertEqual(list(self.protocol.messages), ["café", b"tea"])353 self.loop.run_until_complete(self.protocol.recv())354 self.run_loop_once()355 self.assertEqual(list(self.protocol.messages), [b"tea", b"milk"])356 self.loop.run_until_complete(self.protocol.recv())357 self.run_loop_once()358 self.assertEqual(list(self.protocol.messages), [b"milk"])359 self.loop.run_until_complete(self.protocol.recv())360 self.run_loop_once()361 self.assertEqual(list(self.protocol.messages), [])362 def test_recv_queue_no_limit(self):363 self.protocol.max_queue = None364 for _ in range(100):365 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))366 self.run_loop_once()367 # Incoming message queue can contain at least 100 messages.368 self.assertEqual(list(self.protocol.messages), ["café"] * 100)369 for _ in range(100):370 self.loop.run_until_complete(self.protocol.recv())371 self.assertEqual(list(self.protocol.messages), [])372 def test_recv_other_error(self):373 async def read_message():374 raise Exception("BOOM")375 self.protocol.read_message = read_message376 self.process_invalid_frames()377 self.assertConnectionFailed(1011, "")378 def test_recv_canceled(self):379 recv = self.loop.create_task(self.protocol.recv())380 self.loop.call_soon(recv.cancel)381 with self.assertRaises(asyncio.CancelledError):382 self.loop.run_until_complete(recv)383 # The next frame doesn't disappear in a vacuum (it used to).384 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))385 data = self.loop.run_until_complete(self.protocol.recv())386 self.assertEqual(data, "café")387 def test_recv_canceled_race_condition(self):388 recv = self.loop.create_task(389 asyncio.wait_for(self.protocol.recv(), timeout=0.000_001)390 )391 self.loop.call_soon(392 self.receive_frame, Frame(True, OP_TEXT, "café".encode("utf-8"))393 )394 with self.assertRaises(asyncio.TimeoutError):395 self.loop.run_until_complete(recv)396 # The previous frame doesn't disappear in a vacuum (it used to).397 self.receive_frame(Frame(True, OP_TEXT, "tea".encode("utf-8")))398 data = self.loop.run_until_complete(self.protocol.recv())399 # If we're getting "tea" there, it means "café" was swallowed (ha, ha).400 self.assertEqual(data, "café")401 def test_recv_when_transfer_data_cancelled(self):402 # Clog incoming queue.403 self.protocol.max_queue = 1404 self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))405 self.receive_frame(Frame(True, OP_BINARY, b"tea"))406 self.run_loop_once()407 # Flow control kicks in (check with an implementation detail).408 self.assertFalse(self.protocol._put_message_waiter.done())409 # Schedule recv().410 recv = self.loop.create_task(self.protocol.recv())411 # Cancel transfer_data_task (again, implementation detail).412 self.protocol.fail_connection()413 self.run_loop_once()414 self.assertTrue(self.protocol.transfer_data_task.cancelled())415 # recv() completes properly.416 self.assertEqual(self.loop.run_until_complete(recv), "café")417 def test_recv_prevents_concurrent_calls(self):418 recv = self.loop.create_task(self.protocol.recv())419 with self.assertRaisesRegex(420 RuntimeError,421 "cannot call recv while another coroutine "422 "is already waiting for the next message",423 ):424 self.loop.run_until_complete(self.protocol.recv())425 recv.cancel()426 # Test the send coroutine.427 def test_send_text(self):428 self.loop.run_until_complete(self.protocol.send("café"))429 self.assertOneFrameSent(True, OP_TEXT, "café".encode("utf-8"))430 def test_send_binary(self):431 self.loop.run_until_complete(self.protocol.send(b"tea"))432 self.assertOneFrameSent(True, OP_BINARY, b"tea")433 def test_send_binary_from_bytearray(self):434 self.loop.run_until_complete(self.protocol.send(bytearray(b"tea")))435 self.assertOneFrameSent(True, OP_BINARY, b"tea")436 def test_send_binary_from_memoryview(self):437 self.loop.run_until_complete(self.protocol.send(memoryview(b"tea")))438 self.assertOneFrameSent(True, OP_BINARY, b"tea")439 def test_send_binary_from_non_contiguous_memoryview(self):440 self.loop.run_until_complete(self.protocol.send(memoryview(b"tteeaa")[::2]))441 self.assertOneFrameSent(True, OP_BINARY, b"tea")442 def test_send_type_error(self):443 with self.assertRaises(TypeError):444 self.loop.run_until_complete(self.protocol.send(42))445 self.assertNoFrameSent()446 def test_send_iterable_text(self):447 self.loop.run_until_complete(self.protocol.send(["ca", "fé"]))448 self.assertFramesSent(449 (False, OP_TEXT, "ca".encode("utf-8")),450 (False, OP_CONT, "fé".encode("utf-8")),451 (True, OP_CONT, "".encode("utf-8")),452 )453 def test_send_iterable_binary(self):454 self.loop.run_until_complete(self.protocol.send([b"te", b"a"]))455 self.assertFramesSent(456 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")457 )458 def test_send_iterable_binary_from_bytearray(self):459 self.loop.run_until_complete(460 self.protocol.send([bytearray(b"te"), bytearray(b"a")])461 )462 self.assertFramesSent(463 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")464 )465 def test_send_iterable_binary_from_memoryview(self):466 self.loop.run_until_complete(467 self.protocol.send([memoryview(b"te"), memoryview(b"a")])468 )469 self.assertFramesSent(470 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")471 )472 def test_send_iterable_binary_from_non_contiguous_memoryview(self):473 self.loop.run_until_complete(474 self.protocol.send([memoryview(b"ttee")[::2], memoryview(b"aa")[::2]])475 )476 self.assertFramesSent(477 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")478 )479 def test_send_empty_iterable(self):480 self.loop.run_until_complete(self.protocol.send([]))481 self.assertNoFrameSent()482 def test_send_iterable_type_error(self):483 with self.assertRaises(TypeError):484 self.loop.run_until_complete(self.protocol.send([42]))485 self.assertNoFrameSent()486 def test_send_iterable_mixed_type_error(self):487 with self.assertRaises(TypeError):488 self.loop.run_until_complete(self.protocol.send(["café", b"tea"]))489 self.assertFramesSent(490 (False, OP_TEXT, "café".encode("utf-8")),491 (True, OP_CLOSE, serialize_close(1011, "")),492 )493 def test_send_iterable_prevents_concurrent_send(self):494 self.make_drain_slow(2 * MS)495 async def send_iterable():496 await self.protocol.send(["ca", "fé"])497 async def send_concurrent():498 await asyncio.sleep(MS)499 await self.protocol.send(b"tea")500 self.loop.run_until_complete(asyncio.gather(send_iterable(), send_concurrent()))501 self.assertFramesSent(502 (False, OP_TEXT, "ca".encode("utf-8")),503 (False, OP_CONT, "fé".encode("utf-8")),504 (True, OP_CONT, "".encode("utf-8")),505 (True, OP_BINARY, b"tea"),506 )507 def test_send_async_iterable_text(self):508 self.loop.run_until_complete(self.protocol.send(async_iterable(["ca", "fé"])))509 self.assertFramesSent(510 (False, OP_TEXT, "ca".encode("utf-8")),511 (False, OP_CONT, "fé".encode("utf-8")),512 (True, OP_CONT, "".encode("utf-8")),513 )514 def test_send_async_iterable_binary(self):515 self.loop.run_until_complete(self.protocol.send(async_iterable([b"te", b"a"])))516 self.assertFramesSent(517 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")518 )519 def test_send_async_iterable_binary_from_bytearray(self):520 self.loop.run_until_complete(521 self.protocol.send(async_iterable([bytearray(b"te"), bytearray(b"a")]))522 )523 self.assertFramesSent(524 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")525 )526 def test_send_async_iterable_binary_from_memoryview(self):527 self.loop.run_until_complete(528 self.protocol.send(async_iterable([memoryview(b"te"), memoryview(b"a")]))529 )530 self.assertFramesSent(531 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")532 )533 def test_send_async_iterable_binary_from_non_contiguous_memoryview(self):534 self.loop.run_until_complete(535 self.protocol.send(536 async_iterable([memoryview(b"ttee")[::2], memoryview(b"aa")[::2]])537 )538 )539 self.assertFramesSent(540 (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")541 )542 def test_send_empty_async_iterable(self):543 self.loop.run_until_complete(self.protocol.send(async_iterable([])))544 self.assertNoFrameSent()545 def test_send_async_iterable_type_error(self):546 with self.assertRaises(TypeError):547 self.loop.run_until_complete(self.protocol.send(async_iterable([42])))548 self.assertNoFrameSent()549 def test_send_async_iterable_mixed_type_error(self):550 with self.assertRaises(TypeError):551 self.loop.run_until_complete(552 self.protocol.send(async_iterable(["café", b"tea"]))553 )554 self.assertFramesSent(555 (False, OP_TEXT, "café".encode("utf-8")),556 (True, OP_CLOSE, serialize_close(1011, "")),557 )558 def test_send_async_iterable_prevents_concurrent_send(self):559 self.make_drain_slow(2 * MS)560 async def send_async_iterable():561 await self.protocol.send(async_iterable(["ca", "fé"]))562 async def send_concurrent():563 await asyncio.sleep(MS)564 await self.protocol.send(b"tea")565 self.loop.run_until_complete(566 asyncio.gather(send_async_iterable(), send_concurrent())567 )568 self.assertFramesSent(569 (False, OP_TEXT, "ca".encode("utf-8")),570 (False, OP_CONT, "fé".encode("utf-8")),571 (True, OP_CONT, "".encode("utf-8")),572 (True, OP_BINARY, b"tea"),573 )574 def test_send_on_closing_connection_local(self):575 close_task = self.half_close_connection_local()576 with self.assertRaises(ConnectionClosed):577 self.loop.run_until_complete(self.protocol.send("foobar"))578 self.assertNoFrameSent()579 self.loop.run_until_complete(close_task) # cleanup580 def test_send_on_closing_connection_remote(self):581 self.half_close_connection_remote()582 with self.assertRaises(ConnectionClosed):583 self.loop.run_until_complete(self.protocol.send("foobar"))584 self.assertNoFrameSent()585 def test_send_on_closed_connection(self):586 self.close_connection()587 with self.assertRaises(ConnectionClosed):588 self.loop.run_until_complete(self.protocol.send("foobar"))589 self.assertNoFrameSent()590 # Test the ping coroutine.591 def test_ping_default(self):592 self.loop.run_until_complete(self.protocol.ping())593 # With our testing tools, it's more convenient to extract the expected594 # ping data from the library's internals than from the frame sent.595 ping_data = next(iter(self.protocol.pings))596 self.assertIsInstance(ping_data, bytes)597 self.assertEqual(len(ping_data), 4)598 self.assertOneFrameSent(True, OP_PING, ping_data)599 def test_ping_text(self):600 self.loop.run_until_complete(self.protocol.ping("café"))601 self.assertOneFrameSent(True, OP_PING, "café".encode("utf-8"))602 def test_ping_binary(self):603 self.loop.run_until_complete(self.protocol.ping(b"tea"))604 self.assertOneFrameSent(True, OP_PING, b"tea")605 def test_ping_binary_from_bytearray(self):606 self.loop.run_until_complete(self.protocol.ping(bytearray(b"tea")))607 self.assertOneFrameSent(True, OP_PING, b"tea")608 def test_ping_binary_from_memoryview(self):609 self.loop.run_until_complete(self.protocol.ping(memoryview(b"tea")))610 self.assertOneFrameSent(True, OP_PING, b"tea")611 def test_ping_binary_from_non_contiguous_memoryview(self):612 self.loop.run_until_complete(self.protocol.ping(memoryview(b"tteeaa")[::2]))613 self.assertOneFrameSent(True, OP_PING, b"tea")614 def test_ping_type_error(self):615 with self.assertRaises(TypeError):616 self.loop.run_until_complete(self.protocol.ping(42))617 self.assertNoFrameSent()618 def test_ping_on_closing_connection_local(self):619 close_task = self.half_close_connection_local()620 with self.assertRaises(ConnectionClosed):621 self.loop.run_until_complete(self.protocol.ping())622 self.assertNoFrameSent()623 self.loop.run_until_complete(close_task) # cleanup624 def test_ping_on_closing_connection_remote(self):625 self.half_close_connection_remote()626 with self.assertRaises(ConnectionClosed):627 self.loop.run_until_complete(self.protocol.ping())628 self.assertNoFrameSent()629 def test_ping_on_closed_connection(self):630 self.close_connection()631 with self.assertRaises(ConnectionClosed):632 self.loop.run_until_complete(self.protocol.ping())633 self.assertNoFrameSent()634 # Test the pong coroutine.635 def test_pong_default(self):636 self.loop.run_until_complete(self.protocol.pong())637 self.assertOneFrameSent(True, OP_PONG, b"")638 def test_pong_text(self):639 self.loop.run_until_complete(self.protocol.pong("café"))640 self.assertOneFrameSent(True, OP_PONG, "café".encode("utf-8"))641 def test_pong_binary(self):642 self.loop.run_until_complete(self.protocol.pong(b"tea"))643 self.assertOneFrameSent(True, OP_PONG, b"tea")644 def test_pong_binary_from_bytearray(self):645 self.loop.run_until_complete(self.protocol.pong(bytearray(b"tea")))646 self.assertOneFrameSent(True, OP_PONG, b"tea")647 def test_pong_binary_from_memoryview(self):648 self.loop.run_until_complete(self.protocol.pong(memoryview(b"tea")))649 self.assertOneFrameSent(True, OP_PONG, b"tea")650 def test_pong_binary_from_non_contiguous_memoryview(self):651 self.loop.run_until_complete(self.protocol.pong(memoryview(b"tteeaa")[::2]))652 self.assertOneFrameSent(True, OP_PONG, b"tea")653 def test_pong_type_error(self):654 with self.assertRaises(TypeError):655 self.loop.run_until_complete(self.protocol.pong(42))656 self.assertNoFrameSent()657 def test_pong_on_closing_connection_local(self):658 close_task = self.half_close_connection_local()659 with self.assertRaises(ConnectionClosed):660 self.loop.run_until_complete(self.protocol.pong())661 self.assertNoFrameSent()662 self.loop.run_until_complete(close_task) # cleanup663 def test_pong_on_closing_connection_remote(self):664 self.half_close_connection_remote()665 with self.assertRaises(ConnectionClosed):666 self.loop.run_until_complete(self.protocol.pong())667 self.assertNoFrameSent()668 def test_pong_on_closed_connection(self):669 self.close_connection()670 with self.assertRaises(ConnectionClosed):671 self.loop.run_until_complete(self.protocol.pong())672 self.assertNoFrameSent()673 # Test the protocol's logic for acknowledging pings with pongs.674 def test_answer_ping(self):675 self.receive_frame(Frame(True, OP_PING, b"test"))676 self.run_loop_once()677 self.assertOneFrameSent(True, OP_PONG, b"test")678 def test_ignore_pong(self):679 self.receive_frame(Frame(True, OP_PONG, b"test"))680 self.run_loop_once()681 self.assertNoFrameSent()682 def test_acknowledge_ping(self):683 ping = self.loop.run_until_complete(self.protocol.ping())684 self.assertFalse(ping.done())685 ping_frame = self.last_sent_frame()686 pong_frame = Frame(True, OP_PONG, ping_frame.data)687 self.receive_frame(pong_frame)688 self.run_loop_once()689 self.run_loop_once()690 self.assertTrue(ping.done())691 def test_abort_ping(self):692 ping = self.loop.run_until_complete(self.protocol.ping())693 # Remove the frame from the buffer, else close_connection() complains.694 self.last_sent_frame()695 self.assertFalse(ping.done())696 self.close_connection()697 self.assertTrue(ping.done())698 self.assertIsInstance(ping.exception(), ConnectionClosed)699 def test_abort_ping_does_not_log_exception_if_not_retreived(self):700 self.loop.run_until_complete(self.protocol.ping())701 # Get the internal Future, which isn't directly returned by ping().702 (ping,) = self.protocol.pings.values()703 # Remove the frame from the buffer, else close_connection() complains.704 self.last_sent_frame()705 self.close_connection()706 # Check a private attribute, for lack of a better solution.707 self.assertFalse(ping._log_traceback)708 def test_acknowledge_previous_pings(self):709 pings = [710 (self.loop.run_until_complete(self.protocol.ping()), self.last_sent_frame())711 for i in range(3)712 ]713 # Unsolicited pong doesn't acknowledge pings714 self.receive_frame(Frame(True, OP_PONG, b""))715 self.run_loop_once()716 self.run_loop_once()717 self.assertFalse(pings[0][0].done())718 self.assertFalse(pings[1][0].done())719 self.assertFalse(pings[2][0].done())720 # Pong acknowledges all previous pings721 self.receive_frame(Frame(True, OP_PONG, pings[1][1].data))722 self.run_loop_once()723 self.run_loop_once()724 self.assertTrue(pings[0][0].done())725 self.assertTrue(pings[1][0].done())726 self.assertFalse(pings[2][0].done())727 def test_acknowledge_aborted_ping(self):728 ping = self.loop.run_until_complete(self.protocol.ping())729 ping_frame = self.last_sent_frame()730 # Clog incoming queue. This lets connection_lost() abort pending pings731 # with a ConnectionClosed exception before transfer_data_task732 # terminates and close_connection cancels keepalive_ping_task.733 self.protocol.max_queue = 1734 self.receive_frame(Frame(True, OP_TEXT, b"1"))735 self.receive_frame(Frame(True, OP_TEXT, b"2"))736 # Add pong frame to the queue.737 pong_frame = Frame(True, OP_PONG, ping_frame.data)738 self.receive_frame(pong_frame)739 # Connection drops.740 self.receive_eof()741 self.loop.run_until_complete(self.protocol.wait_closed())742 # Ping receives a ConnectionClosed exception.743 with self.assertRaises(ConnectionClosed):744 ping.result()745 # transfer_data doesn't crash, which would be logged.746 with self.assertNoLogs():747 # Unclog incoming queue.748 self.loop.run_until_complete(self.protocol.recv())749 self.loop.run_until_complete(self.protocol.recv())750 def test_canceled_ping(self):751 ping = self.loop.run_until_complete(self.protocol.ping())752 ping_frame = self.last_sent_frame()753 ping.cancel()754 pong_frame = Frame(True, OP_PONG, ping_frame.data)755 self.receive_frame(pong_frame)756 self.run_loop_once()757 self.run_loop_once()758 self.assertTrue(ping.cancelled())759 def test_duplicate_ping(self):760 self.loop.run_until_complete(self.protocol.ping(b"foobar"))761 self.assertOneFrameSent(True, OP_PING, b"foobar")762 with self.assertRaises(ValueError):763 self.loop.run_until_complete(self.protocol.ping(b"foobar"))764 self.assertNoFrameSent()765 # Test the protocol's logic for rebuilding fragmented messages.766 def test_fragmented_text(self):767 self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))768 self.receive_frame(Frame(True, OP_CONT, "fé".encode("utf-8")))769 data = self.loop.run_until_complete(self.protocol.recv())770 self.assertEqual(data, "café")771 def test_fragmented_binary(self):772 self.receive_frame(Frame(False, OP_BINARY, b"t"))773 self.receive_frame(Frame(False, OP_CONT, b"e"))774 self.receive_frame(Frame(True, OP_CONT, b"a"))775 data = self.loop.run_until_complete(self.protocol.recv())776 self.assertEqual(data, b"tea")777 def test_fragmented_text_payload_too_big(self):778 self.protocol.max_size = 1024779 self.receive_frame(Frame(False, OP_TEXT, "café".encode("utf-8") * 100))780 self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8") * 105))781 self.process_invalid_frames()782 self.assertConnectionFailed(1009, "")783 def test_fragmented_binary_payload_too_big(self):784 self.protocol.max_size = 1024785 self.receive_frame(Frame(False, OP_BINARY, b"tea" * 171))786 self.receive_frame(Frame(True, OP_CONT, b"tea" * 171))787 self.process_invalid_frames()788 self.assertConnectionFailed(1009, "")789 def test_fragmented_text_no_max_size(self):790 self.protocol.max_size = None # for test coverage791 self.receive_frame(Frame(False, OP_TEXT, "café".encode("utf-8") * 100))792 self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8") * 105))793 data = self.loop.run_until_complete(self.protocol.recv())794 self.assertEqual(data, "café" * 205)795 def test_fragmented_binary_no_max_size(self):796 self.protocol.max_size = None # for test coverage797 self.receive_frame(Frame(False, OP_BINARY, b"tea" * 171))798 self.receive_frame(Frame(True, OP_CONT, b"tea" * 171))799 data = self.loop.run_until_complete(self.protocol.recv())800 self.assertEqual(data, b"tea" * 342)801 def test_control_frame_within_fragmented_text(self):802 self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))803 self.receive_frame(Frame(True, OP_PING, b""))804 self.receive_frame(Frame(True, OP_CONT, "fé".encode("utf-8")))805 data = self.loop.run_until_complete(self.protocol.recv())806 self.assertEqual(data, "café")807 self.assertOneFrameSent(True, OP_PONG, b"")808 def test_unterminated_fragmented_text(self):809 self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))810 # Missing the second part of the fragmented frame.811 self.receive_frame(Frame(True, OP_BINARY, b"tea"))812 self.process_invalid_frames()813 self.assertConnectionFailed(1002, "")814 def test_close_handshake_in_fragmented_text(self):815 self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))816 self.receive_frame(Frame(True, OP_CLOSE, b""))817 self.process_invalid_frames()818 # The RFC may have overlooked this case: it says that control frames819 # can be interjected in the middle of a fragmented message and that a820 # close frame must be echoed. Even though there's an unterminated821 # message, technically, the closing handshake was successful.822 self.assertConnectionClosed(1005, "")823 def test_connection_close_in_fragmented_text(self):824 self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))825 self.process_invalid_frames()826 self.assertConnectionFailed(1006, "")827 # Test miscellaneous code paths to ensure full coverage.828 def test_connection_lost(self):829 # Test calling connection_lost without going through close_connection.830 self.protocol.connection_lost(None)831 self.assertConnectionFailed(1006, "")832 def test_ensure_open_before_opening_handshake(self):833 # Simulate a bug by forcibly reverting the protocol state.834 self.protocol.state = State.CONNECTING835 with self.assertRaises(InvalidState):836 self.loop.run_until_complete(self.protocol.ensure_open())837 def test_ensure_open_during_unclean_close(self):838 # Process connection_made in order to start transfer_data_task.839 self.run_loop_once()840 # Ensure the test terminates quickly.841 self.loop.call_later(MS, self.receive_eof_if_client)842 # Simulate the case when close() times out sending a close frame.843 self.protocol.fail_connection()844 with self.assertRaises(ConnectionClosed):845 self.loop.run_until_complete(self.protocol.ensure_open())846 def test_legacy_recv(self):847 # By default legacy_recv in disabled.848 self.assertEqual(self.protocol.legacy_recv, False)849 self.close_connection()850 # Enable legacy_recv.851 self.protocol.legacy_recv = True852 # Now recv() returns None instead of raising ConnectionClosed.853 self.assertIsNone(self.loop.run_until_complete(self.protocol.recv()))854 def test_connection_closed_attributes(self):855 self.close_connection()856 with self.assertRaises(ConnectionClosed) as context:857 self.loop.run_until_complete(self.protocol.recv())858 connection_closed_exc = context.exception859 self.assertEqual(connection_closed_exc.code, 1000)860 self.assertEqual(connection_closed_exc.reason, "close")861 # Test the protocol logic for sending keepalive pings.862 def restart_protocol_with_keepalive_ping(863 self, ping_interval=3 * MS, ping_timeout=3 * MS864 ):865 initial_protocol = self.protocol866 # copied from tearDown867 self.transport.close()868 self.loop.run_until_complete(self.protocol.close())869 # copied from setUp, but enables keepalive pings870 self.protocol = WebSocketCommonProtocol(871 ping_interval=ping_interval, ping_timeout=ping_timeout872 )873 self.transport = TransportMock()874 self.transport.setup_mock(self.loop, self.protocol)875 self.protocol.is_client = initial_protocol.is_client876 self.protocol.side = initial_protocol.side877 def test_keepalive_ping(self):878 self.restart_protocol_with_keepalive_ping()879 # Ping is sent at 3ms and acknowledged at 4ms.880 self.loop.run_until_complete(asyncio.sleep(4 * MS))881 (ping_1,) = tuple(self.protocol.pings)882 self.assertOneFrameSent(True, OP_PING, ping_1)883 self.receive_frame(Frame(True, OP_PONG, ping_1))884 # Next ping is sent at 7ms.885 self.loop.run_until_complete(asyncio.sleep(4 * MS))886 (ping_2,) = tuple(self.protocol.pings)887 self.assertOneFrameSent(True, OP_PING, ping_2)888 # The keepalive ping task goes on.889 self.assertFalse(self.protocol.keepalive_ping_task.done())890 def test_keepalive_ping_not_acknowledged_closes_connection(self):891 self.restart_protocol_with_keepalive_ping()892 # Ping is sent at 3ms and not acknowleged.893 self.loop.run_until_complete(asyncio.sleep(4 * MS))894 (ping_1,) = tuple(self.protocol.pings)895 self.assertOneFrameSent(True, OP_PING, ping_1)896 # Connection is closed at 6ms.897 self.loop.run_until_complete(asyncio.sleep(4 * MS))898 self.assertOneFrameSent(True, OP_CLOSE, serialize_close(1011, ""))899 # The keepalive ping task is complete.900 self.assertEqual(self.protocol.keepalive_ping_task.result(), None)901 def test_keepalive_ping_stops_when_connection_closing(self):902 self.restart_protocol_with_keepalive_ping()903 close_task = self.half_close_connection_local()904 # No ping sent at 3ms because the closing handshake is in progress.905 self.loop.run_until_complete(asyncio.sleep(4 * MS))906 self.assertNoFrameSent()907 # The keepalive ping task terminated.908 self.assertTrue(self.protocol.keepalive_ping_task.cancelled())909 self.loop.run_until_complete(close_task) # cleanup910 def test_keepalive_ping_stops_when_connection_closed(self):911 self.restart_protocol_with_keepalive_ping()912 self.close_connection()913 # The keepalive ping task terminated.914 self.assertTrue(self.protocol.keepalive_ping_task.cancelled())915 def test_keepalive_ping_does_not_crash_when_connection_lost(self):916 self.restart_protocol_with_keepalive_ping()917 # Clog incoming queue. This lets connection_lost() abort pending pings918 # with a ConnectionClosed exception before transfer_data_task919 # terminates and close_connection cancels keepalive_ping_task.920 self.protocol.max_queue = 1921 self.receive_frame(Frame(True, OP_TEXT, b"1"))922 self.receive_frame(Frame(True, OP_TEXT, b"2"))923 # Ping is sent at 3ms.924 self.loop.run_until_complete(asyncio.sleep(4 * MS))925 (ping_waiter,) = tuple(self.protocol.pings.values())926 # Connection drops.927 self.receive_eof()928 self.loop.run_until_complete(self.protocol.wait_closed())929 # The ping waiter receives a ConnectionClosed exception.930 with self.assertRaises(ConnectionClosed):931 ping_waiter.result()932 # The keepalive ping task terminated properly.933 self.assertIsNone(self.protocol.keepalive_ping_task.result())934 # Unclog incoming queue to terminate the test quickly.935 self.loop.run_until_complete(self.protocol.recv())936 self.loop.run_until_complete(self.protocol.recv())937 def test_keepalive_ping_with_no_ping_interval(self):938 self.restart_protocol_with_keepalive_ping(ping_interval=None)939 # No ping is sent at 3ms.940 self.loop.run_until_complete(asyncio.sleep(4 * MS))941 self.assertNoFrameSent()942 def test_keepalive_ping_with_no_ping_timeout(self):943 self.restart_protocol_with_keepalive_ping(ping_timeout=None)944 # Ping is sent at 3ms and not acknowleged.945 self.loop.run_until_complete(asyncio.sleep(4 * MS))946 (ping_1,) = tuple(self.protocol.pings)947 self.assertOneFrameSent(True, OP_PING, ping_1)948 # Next ping is sent at 7ms anyway.949 self.loop.run_until_complete(asyncio.sleep(4 * MS))950 ping_1_again, ping_2 = tuple(self.protocol.pings)951 self.assertEqual(ping_1, ping_1_again)952 self.assertOneFrameSent(True, OP_PING, ping_2)953 # The keepalive ping task goes on.954 self.assertFalse(self.protocol.keepalive_ping_task.done())955 def test_keepalive_ping_unexpected_error(self):956 self.restart_protocol_with_keepalive_ping()957 async def ping():958 raise Exception("BOOM")959 self.protocol.ping = ping960 # The keepalive ping task fails when sending a ping at 3ms.961 self.loop.run_until_complete(asyncio.sleep(4 * MS))962 # The keepalive ping task is complete.963 # It logs and swallows the exception.964 self.assertEqual(self.protocol.keepalive_ping_task.result(), None)965 # Test the protocol logic for closing the connection.966 def test_local_close(self):967 # Emulate how the remote endpoint answers the closing handshake.968 self.loop.call_later(MS, self.receive_frame, self.close_frame)969 self.loop.call_later(MS, self.receive_eof_if_client)970 # Run the closing handshake.971 self.loop.run_until_complete(self.protocol.close(reason="close"))972 self.assertConnectionClosed(1000, "close")973 self.assertOneFrameSent(*self.close_frame)974 # Closing the connection again is a no-op.975 self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))976 self.assertConnectionClosed(1000, "close")977 self.assertNoFrameSent()978 def test_remote_close(self):979 # Emulate how the remote endpoint initiates the closing handshake.980 self.loop.call_later(MS, self.receive_frame, self.close_frame)981 self.loop.call_later(MS, self.receive_eof_if_client)982 # Wait for some data in order to process the handshake.983 # After recv() raises ConnectionClosed, the connection is closed.984 with self.assertRaises(ConnectionClosed):985 self.loop.run_until_complete(self.protocol.recv())986 self.assertConnectionClosed(1000, "close")987 self.assertOneFrameSent(*self.close_frame)988 # Closing the connection again is a no-op.989 self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))990 self.assertConnectionClosed(1000, "close")991 self.assertNoFrameSent()992 def test_remote_close_and_connection_lost(self):993 self.make_drain_slow()994 # Drop the connection right after receiving a close frame,995 # which prevents echoing the close frame properly.996 self.receive_frame(self.close_frame)997 self.receive_eof()998 with self.assertNoLogs():999 self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))1000 self.assertConnectionClosed(1000, "close")1001 self.assertOneFrameSent(*self.close_frame)1002 def test_simultaneous_close(self):1003 # Receive the incoming close frame right after self.protocol.close()1004 # starts executing. This reproduces the error described in:1005 # https://github.com/aaugustin/websockets/issues/3391006 self.loop.call_soon(self.receive_frame, self.remote_close)1007 self.loop.call_soon(self.receive_eof_if_client)1008 self.loop.run_until_complete(self.protocol.close(reason="local"))1009 self.assertConnectionClosed(1000, "remote")1010 # The current implementation sends a close frame in response to the1011 # close frame received from the remote end. It skips the close frame1012 # that should be sent as a result of calling close().1013 self.assertOneFrameSent(*self.remote_close)1014 def test_close_preserves_incoming_frames(self):1015 self.receive_frame(Frame(True, OP_TEXT, b"hello"))1016 self.loop.call_later(MS, self.receive_frame, self.close_frame)1017 self.loop.call_later(MS, self.receive_eof_if_client)1018 self.loop.run_until_complete(self.protocol.close(reason="close"))1019 self.assertConnectionClosed(1000, "close")1020 self.assertOneFrameSent(*self.close_frame)1021 next_message = self.loop.run_until_complete(self.protocol.recv())1022 self.assertEqual(next_message, "hello")1023 def test_close_protocol_error(self):1024 invalid_close_frame = Frame(True, OP_CLOSE, b"\x00")1025 self.receive_frame(invalid_close_frame)1026 self.receive_eof_if_client()1027 self.run_loop_once()1028 self.loop.run_until_complete(self.protocol.close(reason="close"))1029 self.assertConnectionFailed(1002, "")1030 def test_close_connection_lost(self):1031 self.receive_eof()1032 self.run_loop_once()1033 self.loop.run_until_complete(self.protocol.close(reason="close"))1034 self.assertConnectionFailed(1006, "")1035 def test_local_close_during_recv(self):1036 recv = self.loop.create_task(self.protocol.recv())1037 self.loop.call_later(MS, self.receive_frame, self.close_frame)1038 self.loop.call_later(MS, self.receive_eof_if_client)1039 self.loop.run_until_complete(self.protocol.close(reason="close"))1040 with self.assertRaises(ConnectionClosed):1041 self.loop.run_until_complete(recv)1042 self.assertConnectionClosed(1000, "close")1043 # There is no test_remote_close_during_recv because it would be identical1044 # to test_remote_close.1045 def test_remote_close_during_send(self):1046 self.make_drain_slow()1047 send = self.loop.create_task(self.protocol.send("hello"))1048 self.receive_frame(self.close_frame)1049 self.receive_eof()1050 with self.assertRaises(ConnectionClosed):1051 self.loop.run_until_complete(send)1052 self.assertConnectionClosed(1000, "close")1053 # There is no test_local_close_during_send because this cannot really1054 # happen, considering that writes are serialized.1055class ServerTests(CommonTests, AsyncioTestCase):1056 def setUp(self):1057 super().setUp()1058 self.protocol.is_client = False1059 self.protocol.side = "server"1060 def test_local_close_send_close_frame_timeout(self):1061 self.protocol.close_timeout = 10 * MS1062 self.make_drain_slow(50 * MS)1063 # If we can't send a close frame, time out in 10ms.1064 # Check the timing within -1/+9ms for robustness.1065 with self.assertCompletesWithin(9 * MS, 19 * MS):1066 self.loop.run_until_complete(self.protocol.close(reason="close"))1067 self.assertConnectionClosed(1006, "")1068 def test_local_close_receive_close_frame_timeout(self):1069 self.protocol.close_timeout = 10 * MS1070 # If the client doesn't send a close frame, time out in 10ms.1071 # Check the timing within -1/+9ms for robustness.1072 with self.assertCompletesWithin(9 * MS, 19 * MS):1073 self.loop.run_until_complete(self.protocol.close(reason="close"))1074 self.assertConnectionClosed(1006, "")1075 def test_local_close_connection_lost_timeout_after_write_eof(self):1076 self.protocol.close_timeout = 10 * MS1077 # If the client doesn't close its side of the TCP connection after we1078 # half-close our side with write_eof(), time out in 10ms.1079 # Check the timing within -1/+9ms for robustness.1080 with self.assertCompletesWithin(9 * MS, 19 * MS):1081 # HACK: disable write_eof => other end drops connection emulation.1082 self.transport._eof = True1083 self.receive_frame(self.close_frame)1084 self.loop.run_until_complete(self.protocol.close(reason="close"))1085 self.assertConnectionClosed(1000, "close")1086 def test_local_close_connection_lost_timeout_after_close(self):1087 self.protocol.close_timeout = 10 * MS1088 # If the client doesn't close its side of the TCP connection after we1089 # half-close our side with write_eof() and close it with close(), time1090 # out in 20ms.1091 # Check the timing within -1/+9ms for robustness.1092 with self.assertCompletesWithin(19 * MS, 29 * MS):1093 # HACK: disable write_eof => other end drops connection emulation.1094 self.transport._eof = True1095 # HACK: disable close => other end drops connection emulation.1096 self.transport._closing = True1097 self.receive_frame(self.close_frame)1098 self.loop.run_until_complete(self.protocol.close(reason="close"))1099 self.assertConnectionClosed(1000, "close")1100class ClientTests(CommonTests, AsyncioTestCase):1101 def setUp(self):1102 super().setUp()1103 self.protocol.is_client = True1104 self.protocol.side = "client"1105 def test_local_close_send_close_frame_timeout(self):1106 self.protocol.close_timeout = 10 * MS1107 self.make_drain_slow(50 * MS)1108 # If we can't send a close frame, time out in 20ms.1109 # - 10ms waiting for sending a close frame1110 # - 10ms waiting for receiving a half-close1111 # Check the timing within -1/+9ms for robustness.1112 with self.assertCompletesWithin(19 * MS, 29 * MS):1113 self.loop.run_until_complete(self.protocol.close(reason="close"))1114 self.assertConnectionClosed(1006, "")1115 def test_local_close_receive_close_frame_timeout(self):1116 self.protocol.close_timeout = 10 * MS1117 # If the server doesn't send a close frame, time out in 20ms:1118 # - 10ms waiting for receiving a close frame1119 # - 10ms waiting for receiving a half-close1120 # Check the timing within -1/+9ms for robustness.1121 with self.assertCompletesWithin(19 * MS, 29 * MS):1122 self.loop.run_until_complete(self.protocol.close(reason="close"))1123 self.assertConnectionClosed(1006, "")1124 def test_local_close_connection_lost_timeout_after_write_eof(self):1125 self.protocol.close_timeout = 10 * MS1126 # If the server doesn't half-close its side of the TCP connection1127 # after we send a close frame, time out in 20ms:1128 # - 10ms waiting for receiving a half-close1129 # - 10ms waiting for receiving a close after write_eof1130 # Check the timing within -1/+9ms for robustness.1131 with self.assertCompletesWithin(19 * MS, 29 * MS):1132 # HACK: disable write_eof => other end drops connection emulation.1133 self.transport._eof = True1134 self.receive_frame(self.close_frame)1135 self.loop.run_until_complete(self.protocol.close(reason="close"))1136 self.assertConnectionClosed(1000, "close")1137 def test_local_close_connection_lost_timeout_after_close(self):1138 self.protocol.close_timeout = 10 * MS1139 # If the client doesn't close its side of the TCP connection after we1140 # half-close our side with write_eof() and close it with close(), time1141 # out in 20ms.1142 # - 10ms waiting for receiving a half-close1143 # - 10ms waiting for receiving a close after write_eof1144 # - 10ms waiting for receiving a close after close1145 # Check the timing within -1/+9ms for robustness.1146 with self.assertCompletesWithin(29 * MS, 39 * MS):1147 # HACK: disable write_eof => other end drops connection emulation.1148 self.transport._eof = True1149 # HACK: disable close => other end drops connection emulation.1150 self.transport._closing = True1151 self.receive_frame(self.close_frame)1152 self.loop.run_until_complete(self.protocol.close(reason="close"))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful