Best Python code snippet using tempest_python
test_protocol.py
Source:test_protocol.py  
...77        self.protocol._drain = delayed_drain78    close_frame = Frame(True, OP_CLOSE, serialize_close(1000, "close"))79    local_close = Frame(True, OP_CLOSE, serialize_close(1000, "local"))80    remote_close = Frame(True, OP_CLOSE, serialize_close(1000, "remote"))81    def receive_frame(self, frame):82        """83        Make the protocol receive a frame.84        """85        write = self.protocol.data_received86        mask = not self.protocol.is_client87        frame.write(write, mask=mask)88    def receive_eof(self):89        """90        Make the protocol receive the end of the data stream.91        Since ``WebSocketCommonProtocol.eof_received`` returns ``None``, an92        actual transport would close itself after calling it. This function93        emulates that behavior.94        """95        self.protocol.eof_received()96        self.loop.call_soon(self.transport.close)97    def receive_eof_if_client(self):98        """99        Like receive_eof, but only if this is the client side.100        Since the server is supposed to initiate the termination of the TCP101        connection, this method helps making tests work for both sides.102        """103        if self.protocol.is_client:104            self.receive_eof()105    def close_connection(self, code=1000, reason="close"):106        """107        Execute a closing handshake.108        This puts the connection in the CLOSED state.109        """110        close_frame_data = serialize_close(code, reason)111        # Prepare the response to the closing handshake from the remote side.112        self.receive_frame(Frame(True, OP_CLOSE, close_frame_data))113        self.receive_eof_if_client()114        # Trigger the closing handshake from the local side and complete it.115        self.loop.run_until_complete(self.protocol.close(code, reason))116        # Empty the outgoing data stream so we can make assertions later on.117        self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)118        assert self.protocol.state is State.CLOSED119    def half_close_connection_local(self, code=1000, reason="close"):120        """121        Start a closing handshake but do not complete it.122        The main difference with `close_connection` is that the connection is123        left in the CLOSING state until the event loop runs again.124        The current implementation returns a task that must be awaited or125        canceled, else asyncio complains about destroying a pending task.126        """127        close_frame_data = serialize_close(code, reason)128        # Trigger the closing handshake from the local endpoint.129        close_task = self.loop.create_task(self.protocol.close(code, reason))130        self.run_loop_once()  # wait_for executes131        self.run_loop_once()  # write_frame executes132        # Empty the outgoing data stream so we can make assertions later on.133        self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)134        assert self.protocol.state is State.CLOSING135        # Complete the closing sequence at 1ms intervals so the test can run136        # at each point even it goes back to the event loop several times.137        self.loop.call_later(138            MS, self.receive_frame, Frame(True, OP_CLOSE, close_frame_data)139        )140        self.loop.call_later(2 * MS, self.receive_eof_if_client)141        # This task must be awaited or canceled by the caller.142        return close_task143    def half_close_connection_remote(self, code=1000, reason="close"):144        """145        Receive a closing handshake but do not complete it.146        The main difference with `close_connection` is that the connection is147        left in the CLOSING state until the event loop runs again.148        """149        # On the server side, websockets completes the closing handshake and150        # closes the TCP connection immediately. Yield to the event loop after151        # sending the close frame to run the test while the connection is in152        # the CLOSING state.153        if not self.protocol.is_client:154            self.make_drain_slow()155        close_frame_data = serialize_close(code, reason)156        # Trigger the closing handshake from the remote endpoint.157        self.receive_frame(Frame(True, OP_CLOSE, close_frame_data))158        self.run_loop_once()  # read_frame executes159        # Empty the outgoing data stream so we can make assertions later on.160        self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)161        assert self.protocol.state is State.CLOSING162        # Complete the closing sequence at 1ms intervals so the test can run163        # at each point even it goes back to the event loop several times.164        self.loop.call_later(2 * MS, self.receive_eof_if_client)165    def process_invalid_frames(self):166        """167        Make the protocol fail quickly after simulating invalid data.168        To achieve this, this function triggers the protocol's eof_received,169        which interrupts pending reads waiting for more data.170        """171        self.run_loop_once()172        self.receive_eof()173        self.loop.run_until_complete(self.protocol.close_connection_task)174    def sent_frames(self):175        """176        Read all frames sent to the transport.177        """178        stream = asyncio.StreamReader(loop=self.loop)179        for (data,), kw in self.transport.write.call_args_list:180            stream.feed_data(data)181        self.transport.write.call_args_list = []182        stream.feed_eof()183        frames = []184        while not stream.at_eof():185            frames.append(186                self.loop.run_until_complete(187                    Frame.read(stream.readexactly, mask=self.protocol.is_client)188                )189            )190        return frames191    def last_sent_frame(self):192        """193        Read the last frame sent to the transport.194        This method assumes that at most one frame was sent. It raises an195        AssertionError otherwise.196        """197        frames = self.sent_frames()198        if frames:199            assert len(frames) == 1200            return frames[0]201    def assertFramesSent(self, *frames):202        self.assertEqual(self.sent_frames(), [Frame(*args) for args in frames])203    def assertOneFrameSent(self, *args):204        self.assertEqual(self.last_sent_frame(), Frame(*args))205    def assertNoFrameSent(self):206        self.assertIsNone(self.last_sent_frame())207    def assertConnectionClosed(self, code, message):208        # The following line guarantees that connection_lost was called.209        self.assertEqual(self.protocol.state, State.CLOSED)210        # A close frame was received.211        self.assertEqual(self.protocol.close_code, code)212        self.assertEqual(self.protocol.close_reason, message)213    def assertConnectionFailed(self, code, message):214        # The following line guarantees that connection_lost was called.215        self.assertEqual(self.protocol.state, State.CLOSED)216        # No close frame was received.217        self.assertEqual(self.protocol.close_code, 1006)218        self.assertEqual(self.protocol.close_reason, "")219        # A close frame was sent -- unless the connection was already lost.220        if code == 1006:221            self.assertNoFrameSent()222        else:223            self.assertOneFrameSent(True, OP_CLOSE, serialize_close(code, message))224    @contextlib.contextmanager225    def assertCompletesWithin(self, min_time, max_time):226        t0 = self.loop.time()227        yield228        t1 = self.loop.time()229        dt = t1 - t0230        self.assertGreaterEqual(dt, min_time, f"Too fast: {dt} < {min_time}")231        self.assertLess(dt, max_time, f"Too slow: {dt} >= {max_time}")232    # Test constructor.233    def test_timeout_backwards_compatibility(self):234        with warnings.catch_warnings(record=True) as recorded_warnings:235            protocol = WebSocketCommonProtocol(timeout=5)236        self.assertEqual(protocol.close_timeout, 5)237        self.assertEqual(len(recorded_warnings), 1)238        warning = recorded_warnings[0].message239        self.assertEqual(str(warning), "rename timeout to close_timeout")240        self.assertEqual(type(warning), DeprecationWarning)241    # Test public attributes.242    def test_local_address(self):243        get_extra_info = unittest.mock.Mock(return_value=("host", 4312))244        self.transport.get_extra_info = get_extra_info245        self.assertEqual(self.protocol.local_address, ("host", 4312))246        get_extra_info.assert_called_with("sockname")247    def test_local_address_before_connection(self):248        # Emulate the situation before connection_open() runs.249        _transport = self.protocol.transport250        del self.protocol.transport251        try:252            self.assertEqual(self.protocol.local_address, None)253        finally:254            self.protocol.transport = _transport255    def test_remote_address(self):256        get_extra_info = unittest.mock.Mock(return_value=("host", 4312))257        self.transport.get_extra_info = get_extra_info258        self.assertEqual(self.protocol.remote_address, ("host", 4312))259        get_extra_info.assert_called_with("peername")260    def test_remote_address_before_connection(self):261        # Emulate the situation before connection_open() runs.262        _transport = self.protocol.transport263        del self.protocol.transport264        try:265            self.assertEqual(self.protocol.remote_address, None)266        finally:267            self.protocol.transport = _transport268    def test_open(self):269        self.assertTrue(self.protocol.open)270        self.close_connection()271        self.assertFalse(self.protocol.open)272    def test_closed(self):273        self.assertFalse(self.protocol.closed)274        self.close_connection()275        self.assertTrue(self.protocol.closed)276    def test_wait_closed(self):277        wait_closed = self.loop.create_task(self.protocol.wait_closed())278        self.assertFalse(wait_closed.done())279        self.close_connection()280        self.assertTrue(wait_closed.done())281    # Test the recv coroutine.282    def test_recv_text(self):283        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))284        data = self.loop.run_until_complete(self.protocol.recv())285        self.assertEqual(data, "café")286    def test_recv_binary(self):287        self.receive_frame(Frame(True, OP_BINARY, b"tea"))288        data = self.loop.run_until_complete(self.protocol.recv())289        self.assertEqual(data, b"tea")290    def test_recv_on_closing_connection_local(self):291        close_task = self.half_close_connection_local()292        with self.assertRaises(ConnectionClosed):293            self.loop.run_until_complete(self.protocol.recv())294        self.loop.run_until_complete(close_task)  # cleanup295    def test_recv_on_closing_connection_remote(self):296        self.half_close_connection_remote()297        with self.assertRaises(ConnectionClosed):298            self.loop.run_until_complete(self.protocol.recv())299    def test_recv_on_closed_connection(self):300        self.close_connection()301        with self.assertRaises(ConnectionClosed):302            self.loop.run_until_complete(self.protocol.recv())303    def test_recv_protocol_error(self):304        self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8")))305        self.process_invalid_frames()306        self.assertConnectionFailed(1002, "")307    def test_recv_unicode_error(self):308        self.receive_frame(Frame(True, OP_TEXT, "café".encode("latin-1")))309        self.process_invalid_frames()310        self.assertConnectionFailed(1007, "")311    def test_recv_text_payload_too_big(self):312        self.protocol.max_size = 1024313        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8") * 205))314        self.process_invalid_frames()315        self.assertConnectionFailed(1009, "")316    def test_recv_binary_payload_too_big(self):317        self.protocol.max_size = 1024318        self.receive_frame(Frame(True, OP_BINARY, b"tea" * 342))319        self.process_invalid_frames()320        self.assertConnectionFailed(1009, "")321    def test_recv_text_no_max_size(self):322        self.protocol.max_size = None  # for test coverage323        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8") * 205))324        data = self.loop.run_until_complete(self.protocol.recv())325        self.assertEqual(data, "café" * 205)326    def test_recv_binary_no_max_size(self):327        self.protocol.max_size = None  # for test coverage328        self.receive_frame(Frame(True, OP_BINARY, b"tea" * 342))329        data = self.loop.run_until_complete(self.protocol.recv())330        self.assertEqual(data, b"tea" * 342)331    def test_recv_queue_empty(self):332        recv = self.loop.create_task(self.protocol.recv())333        with self.assertRaises(asyncio.TimeoutError):334            self.loop.run_until_complete(335                asyncio.wait_for(asyncio.shield(recv), timeout=MS)336            )337        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))338        data = self.loop.run_until_complete(recv)339        self.assertEqual(data, "café")340    def test_recv_queue_full(self):341        self.protocol.max_queue = 2342        # Test internals because it's hard to verify buffers from the outside.343        self.assertEqual(list(self.protocol.messages), [])344        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))345        self.run_loop_once()346        self.assertEqual(list(self.protocol.messages), ["café"])347        self.receive_frame(Frame(True, OP_BINARY, b"tea"))348        self.run_loop_once()349        self.assertEqual(list(self.protocol.messages), ["café", b"tea"])350        self.receive_frame(Frame(True, OP_BINARY, b"milk"))351        self.run_loop_once()352        self.assertEqual(list(self.protocol.messages), ["café", b"tea"])353        self.loop.run_until_complete(self.protocol.recv())354        self.run_loop_once()355        self.assertEqual(list(self.protocol.messages), [b"tea", b"milk"])356        self.loop.run_until_complete(self.protocol.recv())357        self.run_loop_once()358        self.assertEqual(list(self.protocol.messages), [b"milk"])359        self.loop.run_until_complete(self.protocol.recv())360        self.run_loop_once()361        self.assertEqual(list(self.protocol.messages), [])362    def test_recv_queue_no_limit(self):363        self.protocol.max_queue = None364        for _ in range(100):365            self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))366            self.run_loop_once()367        # Incoming message queue can contain at least 100 messages.368        self.assertEqual(list(self.protocol.messages), ["café"] * 100)369        for _ in range(100):370            self.loop.run_until_complete(self.protocol.recv())371        self.assertEqual(list(self.protocol.messages), [])372    def test_recv_other_error(self):373        async def read_message():374            raise Exception("BOOM")375        self.protocol.read_message = read_message376        self.process_invalid_frames()377        self.assertConnectionFailed(1011, "")378    def test_recv_canceled(self):379        recv = self.loop.create_task(self.protocol.recv())380        self.loop.call_soon(recv.cancel)381        with self.assertRaises(asyncio.CancelledError):382            self.loop.run_until_complete(recv)383        # The next frame doesn't disappear in a vacuum (it used to).384        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))385        data = self.loop.run_until_complete(self.protocol.recv())386        self.assertEqual(data, "café")387    def test_recv_canceled_race_condition(self):388        recv = self.loop.create_task(389            asyncio.wait_for(self.protocol.recv(), timeout=0.000_001)390        )391        self.loop.call_soon(392            self.receive_frame, Frame(True, OP_TEXT, "café".encode("utf-8"))393        )394        with self.assertRaises(asyncio.TimeoutError):395            self.loop.run_until_complete(recv)396        # The previous frame doesn't disappear in a vacuum (it used to).397        self.receive_frame(Frame(True, OP_TEXT, "tea".encode("utf-8")))398        data = self.loop.run_until_complete(self.protocol.recv())399        # If we're getting "tea" there, it means "café" was swallowed (ha, ha).400        self.assertEqual(data, "café")401    def test_recv_when_transfer_data_cancelled(self):402        # Clog incoming queue.403        self.protocol.max_queue = 1404        self.receive_frame(Frame(True, OP_TEXT, "café".encode("utf-8")))405        self.receive_frame(Frame(True, OP_BINARY, b"tea"))406        self.run_loop_once()407        # Flow control kicks in (check with an implementation detail).408        self.assertFalse(self.protocol._put_message_waiter.done())409        # Schedule recv().410        recv = self.loop.create_task(self.protocol.recv())411        # Cancel transfer_data_task (again, implementation detail).412        self.protocol.fail_connection()413        self.run_loop_once()414        self.assertTrue(self.protocol.transfer_data_task.cancelled())415        # recv() completes properly.416        self.assertEqual(self.loop.run_until_complete(recv), "café")417    def test_recv_prevents_concurrent_calls(self):418        recv = self.loop.create_task(self.protocol.recv())419        with self.assertRaisesRegex(420            RuntimeError,421            "cannot call recv while another coroutine "422            "is already waiting for the next message",423        ):424            self.loop.run_until_complete(self.protocol.recv())425        recv.cancel()426    # Test the send coroutine.427    def test_send_text(self):428        self.loop.run_until_complete(self.protocol.send("café"))429        self.assertOneFrameSent(True, OP_TEXT, "café".encode("utf-8"))430    def test_send_binary(self):431        self.loop.run_until_complete(self.protocol.send(b"tea"))432        self.assertOneFrameSent(True, OP_BINARY, b"tea")433    def test_send_binary_from_bytearray(self):434        self.loop.run_until_complete(self.protocol.send(bytearray(b"tea")))435        self.assertOneFrameSent(True, OP_BINARY, b"tea")436    def test_send_binary_from_memoryview(self):437        self.loop.run_until_complete(self.protocol.send(memoryview(b"tea")))438        self.assertOneFrameSent(True, OP_BINARY, b"tea")439    def test_send_binary_from_non_contiguous_memoryview(self):440        self.loop.run_until_complete(self.protocol.send(memoryview(b"tteeaa")[::2]))441        self.assertOneFrameSent(True, OP_BINARY, b"tea")442    def test_send_type_error(self):443        with self.assertRaises(TypeError):444            self.loop.run_until_complete(self.protocol.send(42))445        self.assertNoFrameSent()446    def test_send_iterable_text(self):447        self.loop.run_until_complete(self.protocol.send(["ca", "fé"]))448        self.assertFramesSent(449            (False, OP_TEXT, "ca".encode("utf-8")),450            (False, OP_CONT, "fé".encode("utf-8")),451            (True, OP_CONT, "".encode("utf-8")),452        )453    def test_send_iterable_binary(self):454        self.loop.run_until_complete(self.protocol.send([b"te", b"a"]))455        self.assertFramesSent(456            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")457        )458    def test_send_iterable_binary_from_bytearray(self):459        self.loop.run_until_complete(460            self.protocol.send([bytearray(b"te"), bytearray(b"a")])461        )462        self.assertFramesSent(463            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")464        )465    def test_send_iterable_binary_from_memoryview(self):466        self.loop.run_until_complete(467            self.protocol.send([memoryview(b"te"), memoryview(b"a")])468        )469        self.assertFramesSent(470            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")471        )472    def test_send_iterable_binary_from_non_contiguous_memoryview(self):473        self.loop.run_until_complete(474            self.protocol.send([memoryview(b"ttee")[::2], memoryview(b"aa")[::2]])475        )476        self.assertFramesSent(477            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")478        )479    def test_send_empty_iterable(self):480        self.loop.run_until_complete(self.protocol.send([]))481        self.assertNoFrameSent()482    def test_send_iterable_type_error(self):483        with self.assertRaises(TypeError):484            self.loop.run_until_complete(self.protocol.send([42]))485        self.assertNoFrameSent()486    def test_send_iterable_mixed_type_error(self):487        with self.assertRaises(TypeError):488            self.loop.run_until_complete(self.protocol.send(["café", b"tea"]))489        self.assertFramesSent(490            (False, OP_TEXT, "café".encode("utf-8")),491            (True, OP_CLOSE, serialize_close(1011, "")),492        )493    def test_send_iterable_prevents_concurrent_send(self):494        self.make_drain_slow(2 * MS)495        async def send_iterable():496            await self.protocol.send(["ca", "fé"])497        async def send_concurrent():498            await asyncio.sleep(MS)499            await self.protocol.send(b"tea")500        self.loop.run_until_complete(asyncio.gather(send_iterable(), send_concurrent()))501        self.assertFramesSent(502            (False, OP_TEXT, "ca".encode("utf-8")),503            (False, OP_CONT, "fé".encode("utf-8")),504            (True, OP_CONT, "".encode("utf-8")),505            (True, OP_BINARY, b"tea"),506        )507    def test_send_async_iterable_text(self):508        self.loop.run_until_complete(self.protocol.send(async_iterable(["ca", "fé"])))509        self.assertFramesSent(510            (False, OP_TEXT, "ca".encode("utf-8")),511            (False, OP_CONT, "fé".encode("utf-8")),512            (True, OP_CONT, "".encode("utf-8")),513        )514    def test_send_async_iterable_binary(self):515        self.loop.run_until_complete(self.protocol.send(async_iterable([b"te", b"a"])))516        self.assertFramesSent(517            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")518        )519    def test_send_async_iterable_binary_from_bytearray(self):520        self.loop.run_until_complete(521            self.protocol.send(async_iterable([bytearray(b"te"), bytearray(b"a")]))522        )523        self.assertFramesSent(524            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")525        )526    def test_send_async_iterable_binary_from_memoryview(self):527        self.loop.run_until_complete(528            self.protocol.send(async_iterable([memoryview(b"te"), memoryview(b"a")]))529        )530        self.assertFramesSent(531            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")532        )533    def test_send_async_iterable_binary_from_non_contiguous_memoryview(self):534        self.loop.run_until_complete(535            self.protocol.send(536                async_iterable([memoryview(b"ttee")[::2], memoryview(b"aa")[::2]])537            )538        )539        self.assertFramesSent(540            (False, OP_BINARY, b"te"), (False, OP_CONT, b"a"), (True, OP_CONT, b"")541        )542    def test_send_empty_async_iterable(self):543        self.loop.run_until_complete(self.protocol.send(async_iterable([])))544        self.assertNoFrameSent()545    def test_send_async_iterable_type_error(self):546        with self.assertRaises(TypeError):547            self.loop.run_until_complete(self.protocol.send(async_iterable([42])))548        self.assertNoFrameSent()549    def test_send_async_iterable_mixed_type_error(self):550        with self.assertRaises(TypeError):551            self.loop.run_until_complete(552                self.protocol.send(async_iterable(["café", b"tea"]))553            )554        self.assertFramesSent(555            (False, OP_TEXT, "café".encode("utf-8")),556            (True, OP_CLOSE, serialize_close(1011, "")),557        )558    def test_send_async_iterable_prevents_concurrent_send(self):559        self.make_drain_slow(2 * MS)560        async def send_async_iterable():561            await self.protocol.send(async_iterable(["ca", "fé"]))562        async def send_concurrent():563            await asyncio.sleep(MS)564            await self.protocol.send(b"tea")565        self.loop.run_until_complete(566            asyncio.gather(send_async_iterable(), send_concurrent())567        )568        self.assertFramesSent(569            (False, OP_TEXT, "ca".encode("utf-8")),570            (False, OP_CONT, "fé".encode("utf-8")),571            (True, OP_CONT, "".encode("utf-8")),572            (True, OP_BINARY, b"tea"),573        )574    def test_send_on_closing_connection_local(self):575        close_task = self.half_close_connection_local()576        with self.assertRaises(ConnectionClosed):577            self.loop.run_until_complete(self.protocol.send("foobar"))578        self.assertNoFrameSent()579        self.loop.run_until_complete(close_task)  # cleanup580    def test_send_on_closing_connection_remote(self):581        self.half_close_connection_remote()582        with self.assertRaises(ConnectionClosed):583            self.loop.run_until_complete(self.protocol.send("foobar"))584        self.assertNoFrameSent()585    def test_send_on_closed_connection(self):586        self.close_connection()587        with self.assertRaises(ConnectionClosed):588            self.loop.run_until_complete(self.protocol.send("foobar"))589        self.assertNoFrameSent()590    # Test the ping coroutine.591    def test_ping_default(self):592        self.loop.run_until_complete(self.protocol.ping())593        # With our testing tools, it's more convenient to extract the expected594        # ping data from the library's internals than from the frame sent.595        ping_data = next(iter(self.protocol.pings))596        self.assertIsInstance(ping_data, bytes)597        self.assertEqual(len(ping_data), 4)598        self.assertOneFrameSent(True, OP_PING, ping_data)599    def test_ping_text(self):600        self.loop.run_until_complete(self.protocol.ping("café"))601        self.assertOneFrameSent(True, OP_PING, "café".encode("utf-8"))602    def test_ping_binary(self):603        self.loop.run_until_complete(self.protocol.ping(b"tea"))604        self.assertOneFrameSent(True, OP_PING, b"tea")605    def test_ping_binary_from_bytearray(self):606        self.loop.run_until_complete(self.protocol.ping(bytearray(b"tea")))607        self.assertOneFrameSent(True, OP_PING, b"tea")608    def test_ping_binary_from_memoryview(self):609        self.loop.run_until_complete(self.protocol.ping(memoryview(b"tea")))610        self.assertOneFrameSent(True, OP_PING, b"tea")611    def test_ping_binary_from_non_contiguous_memoryview(self):612        self.loop.run_until_complete(self.protocol.ping(memoryview(b"tteeaa")[::2]))613        self.assertOneFrameSent(True, OP_PING, b"tea")614    def test_ping_type_error(self):615        with self.assertRaises(TypeError):616            self.loop.run_until_complete(self.protocol.ping(42))617        self.assertNoFrameSent()618    def test_ping_on_closing_connection_local(self):619        close_task = self.half_close_connection_local()620        with self.assertRaises(ConnectionClosed):621            self.loop.run_until_complete(self.protocol.ping())622        self.assertNoFrameSent()623        self.loop.run_until_complete(close_task)  # cleanup624    def test_ping_on_closing_connection_remote(self):625        self.half_close_connection_remote()626        with self.assertRaises(ConnectionClosed):627            self.loop.run_until_complete(self.protocol.ping())628        self.assertNoFrameSent()629    def test_ping_on_closed_connection(self):630        self.close_connection()631        with self.assertRaises(ConnectionClosed):632            self.loop.run_until_complete(self.protocol.ping())633        self.assertNoFrameSent()634    # Test the pong coroutine.635    def test_pong_default(self):636        self.loop.run_until_complete(self.protocol.pong())637        self.assertOneFrameSent(True, OP_PONG, b"")638    def test_pong_text(self):639        self.loop.run_until_complete(self.protocol.pong("café"))640        self.assertOneFrameSent(True, OP_PONG, "café".encode("utf-8"))641    def test_pong_binary(self):642        self.loop.run_until_complete(self.protocol.pong(b"tea"))643        self.assertOneFrameSent(True, OP_PONG, b"tea")644    def test_pong_binary_from_bytearray(self):645        self.loop.run_until_complete(self.protocol.pong(bytearray(b"tea")))646        self.assertOneFrameSent(True, OP_PONG, b"tea")647    def test_pong_binary_from_memoryview(self):648        self.loop.run_until_complete(self.protocol.pong(memoryview(b"tea")))649        self.assertOneFrameSent(True, OP_PONG, b"tea")650    def test_pong_binary_from_non_contiguous_memoryview(self):651        self.loop.run_until_complete(self.protocol.pong(memoryview(b"tteeaa")[::2]))652        self.assertOneFrameSent(True, OP_PONG, b"tea")653    def test_pong_type_error(self):654        with self.assertRaises(TypeError):655            self.loop.run_until_complete(self.protocol.pong(42))656        self.assertNoFrameSent()657    def test_pong_on_closing_connection_local(self):658        close_task = self.half_close_connection_local()659        with self.assertRaises(ConnectionClosed):660            self.loop.run_until_complete(self.protocol.pong())661        self.assertNoFrameSent()662        self.loop.run_until_complete(close_task)  # cleanup663    def test_pong_on_closing_connection_remote(self):664        self.half_close_connection_remote()665        with self.assertRaises(ConnectionClosed):666            self.loop.run_until_complete(self.protocol.pong())667        self.assertNoFrameSent()668    def test_pong_on_closed_connection(self):669        self.close_connection()670        with self.assertRaises(ConnectionClosed):671            self.loop.run_until_complete(self.protocol.pong())672        self.assertNoFrameSent()673    # Test the protocol's logic for acknowledging pings with pongs.674    def test_answer_ping(self):675        self.receive_frame(Frame(True, OP_PING, b"test"))676        self.run_loop_once()677        self.assertOneFrameSent(True, OP_PONG, b"test")678    def test_ignore_pong(self):679        self.receive_frame(Frame(True, OP_PONG, b"test"))680        self.run_loop_once()681        self.assertNoFrameSent()682    def test_acknowledge_ping(self):683        ping = self.loop.run_until_complete(self.protocol.ping())684        self.assertFalse(ping.done())685        ping_frame = self.last_sent_frame()686        pong_frame = Frame(True, OP_PONG, ping_frame.data)687        self.receive_frame(pong_frame)688        self.run_loop_once()689        self.run_loop_once()690        self.assertTrue(ping.done())691    def test_abort_ping(self):692        ping = self.loop.run_until_complete(self.protocol.ping())693        # Remove the frame from the buffer, else close_connection() complains.694        self.last_sent_frame()695        self.assertFalse(ping.done())696        self.close_connection()697        self.assertTrue(ping.done())698        self.assertIsInstance(ping.exception(), ConnectionClosed)699    def test_abort_ping_does_not_log_exception_if_not_retreived(self):700        self.loop.run_until_complete(self.protocol.ping())701        # Get the internal Future, which isn't directly returned by ping().702        (ping,) = self.protocol.pings.values()703        # Remove the frame from the buffer, else close_connection() complains.704        self.last_sent_frame()705        self.close_connection()706        # Check a private attribute, for lack of a better solution.707        self.assertFalse(ping._log_traceback)708    def test_acknowledge_previous_pings(self):709        pings = [710            (self.loop.run_until_complete(self.protocol.ping()), self.last_sent_frame())711            for i in range(3)712        ]713        # Unsolicited pong doesn't acknowledge pings714        self.receive_frame(Frame(True, OP_PONG, b""))715        self.run_loop_once()716        self.run_loop_once()717        self.assertFalse(pings[0][0].done())718        self.assertFalse(pings[1][0].done())719        self.assertFalse(pings[2][0].done())720        # Pong acknowledges all previous pings721        self.receive_frame(Frame(True, OP_PONG, pings[1][1].data))722        self.run_loop_once()723        self.run_loop_once()724        self.assertTrue(pings[0][0].done())725        self.assertTrue(pings[1][0].done())726        self.assertFalse(pings[2][0].done())727    def test_acknowledge_aborted_ping(self):728        ping = self.loop.run_until_complete(self.protocol.ping())729        ping_frame = self.last_sent_frame()730        # Clog incoming queue. This lets connection_lost() abort pending pings731        # with a ConnectionClosed exception before transfer_data_task732        # terminates and close_connection cancels keepalive_ping_task.733        self.protocol.max_queue = 1734        self.receive_frame(Frame(True, OP_TEXT, b"1"))735        self.receive_frame(Frame(True, OP_TEXT, b"2"))736        # Add pong frame to the queue.737        pong_frame = Frame(True, OP_PONG, ping_frame.data)738        self.receive_frame(pong_frame)739        # Connection drops.740        self.receive_eof()741        self.loop.run_until_complete(self.protocol.wait_closed())742        # Ping receives a ConnectionClosed exception.743        with self.assertRaises(ConnectionClosed):744            ping.result()745        # transfer_data doesn't crash, which would be logged.746        with self.assertNoLogs():747            # Unclog incoming queue.748            self.loop.run_until_complete(self.protocol.recv())749            self.loop.run_until_complete(self.protocol.recv())750    def test_canceled_ping(self):751        ping = self.loop.run_until_complete(self.protocol.ping())752        ping_frame = self.last_sent_frame()753        ping.cancel()754        pong_frame = Frame(True, OP_PONG, ping_frame.data)755        self.receive_frame(pong_frame)756        self.run_loop_once()757        self.run_loop_once()758        self.assertTrue(ping.cancelled())759    def test_duplicate_ping(self):760        self.loop.run_until_complete(self.protocol.ping(b"foobar"))761        self.assertOneFrameSent(True, OP_PING, b"foobar")762        with self.assertRaises(ValueError):763            self.loop.run_until_complete(self.protocol.ping(b"foobar"))764        self.assertNoFrameSent()765    # Test the protocol's logic for rebuilding fragmented messages.766    def test_fragmented_text(self):767        self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))768        self.receive_frame(Frame(True, OP_CONT, "fé".encode("utf-8")))769        data = self.loop.run_until_complete(self.protocol.recv())770        self.assertEqual(data, "café")771    def test_fragmented_binary(self):772        self.receive_frame(Frame(False, OP_BINARY, b"t"))773        self.receive_frame(Frame(False, OP_CONT, b"e"))774        self.receive_frame(Frame(True, OP_CONT, b"a"))775        data = self.loop.run_until_complete(self.protocol.recv())776        self.assertEqual(data, b"tea")777    def test_fragmented_text_payload_too_big(self):778        self.protocol.max_size = 1024779        self.receive_frame(Frame(False, OP_TEXT, "café".encode("utf-8") * 100))780        self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8") * 105))781        self.process_invalid_frames()782        self.assertConnectionFailed(1009, "")783    def test_fragmented_binary_payload_too_big(self):784        self.protocol.max_size = 1024785        self.receive_frame(Frame(False, OP_BINARY, b"tea" * 171))786        self.receive_frame(Frame(True, OP_CONT, b"tea" * 171))787        self.process_invalid_frames()788        self.assertConnectionFailed(1009, "")789    def test_fragmented_text_no_max_size(self):790        self.protocol.max_size = None  # for test coverage791        self.receive_frame(Frame(False, OP_TEXT, "café".encode("utf-8") * 100))792        self.receive_frame(Frame(True, OP_CONT, "café".encode("utf-8") * 105))793        data = self.loop.run_until_complete(self.protocol.recv())794        self.assertEqual(data, "café" * 205)795    def test_fragmented_binary_no_max_size(self):796        self.protocol.max_size = None  # for test coverage797        self.receive_frame(Frame(False, OP_BINARY, b"tea" * 171))798        self.receive_frame(Frame(True, OP_CONT, b"tea" * 171))799        data = self.loop.run_until_complete(self.protocol.recv())800        self.assertEqual(data, b"tea" * 342)801    def test_control_frame_within_fragmented_text(self):802        self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))803        self.receive_frame(Frame(True, OP_PING, b""))804        self.receive_frame(Frame(True, OP_CONT, "fé".encode("utf-8")))805        data = self.loop.run_until_complete(self.protocol.recv())806        self.assertEqual(data, "café")807        self.assertOneFrameSent(True, OP_PONG, b"")808    def test_unterminated_fragmented_text(self):809        self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))810        # Missing the second part of the fragmented frame.811        self.receive_frame(Frame(True, OP_BINARY, b"tea"))812        self.process_invalid_frames()813        self.assertConnectionFailed(1002, "")814    def test_close_handshake_in_fragmented_text(self):815        self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))816        self.receive_frame(Frame(True, OP_CLOSE, b""))817        self.process_invalid_frames()818        # The RFC may have overlooked this case: it says that control frames819        # can be interjected in the middle of a fragmented message and that a820        # close frame must be echoed. Even though there's an unterminated821        # message, technically, the closing handshake was successful.822        self.assertConnectionClosed(1005, "")823    def test_connection_close_in_fragmented_text(self):824        self.receive_frame(Frame(False, OP_TEXT, "ca".encode("utf-8")))825        self.process_invalid_frames()826        self.assertConnectionFailed(1006, "")827    # Test miscellaneous code paths to ensure full coverage.828    def test_connection_lost(self):829        # Test calling connection_lost without going through close_connection.830        self.protocol.connection_lost(None)831        self.assertConnectionFailed(1006, "")832    def test_ensure_open_before_opening_handshake(self):833        # Simulate a bug by forcibly reverting the protocol state.834        self.protocol.state = State.CONNECTING835        with self.assertRaises(InvalidState):836            self.loop.run_until_complete(self.protocol.ensure_open())837    def test_ensure_open_during_unclean_close(self):838        # Process connection_made in order to start transfer_data_task.839        self.run_loop_once()840        # Ensure the test terminates quickly.841        self.loop.call_later(MS, self.receive_eof_if_client)842        # Simulate the case when close() times out sending a close frame.843        self.protocol.fail_connection()844        with self.assertRaises(ConnectionClosed):845            self.loop.run_until_complete(self.protocol.ensure_open())846    def test_legacy_recv(self):847        # By default legacy_recv in disabled.848        self.assertEqual(self.protocol.legacy_recv, False)849        self.close_connection()850        # Enable legacy_recv.851        self.protocol.legacy_recv = True852        # Now recv() returns None instead of raising ConnectionClosed.853        self.assertIsNone(self.loop.run_until_complete(self.protocol.recv()))854    def test_connection_closed_attributes(self):855        self.close_connection()856        with self.assertRaises(ConnectionClosed) as context:857            self.loop.run_until_complete(self.protocol.recv())858        connection_closed_exc = context.exception859        self.assertEqual(connection_closed_exc.code, 1000)860        self.assertEqual(connection_closed_exc.reason, "close")861    # Test the protocol logic for sending keepalive pings.862    def restart_protocol_with_keepalive_ping(863        self, ping_interval=3 * MS, ping_timeout=3 * MS864    ):865        initial_protocol = self.protocol866        # copied from tearDown867        self.transport.close()868        self.loop.run_until_complete(self.protocol.close())869        # copied from setUp, but enables keepalive pings870        self.protocol = WebSocketCommonProtocol(871            ping_interval=ping_interval, ping_timeout=ping_timeout872        )873        self.transport = TransportMock()874        self.transport.setup_mock(self.loop, self.protocol)875        self.protocol.is_client = initial_protocol.is_client876        self.protocol.side = initial_protocol.side877    def test_keepalive_ping(self):878        self.restart_protocol_with_keepalive_ping()879        # Ping is sent at 3ms and acknowledged at 4ms.880        self.loop.run_until_complete(asyncio.sleep(4 * MS))881        (ping_1,) = tuple(self.protocol.pings)882        self.assertOneFrameSent(True, OP_PING, ping_1)883        self.receive_frame(Frame(True, OP_PONG, ping_1))884        # Next ping is sent at 7ms.885        self.loop.run_until_complete(asyncio.sleep(4 * MS))886        (ping_2,) = tuple(self.protocol.pings)887        self.assertOneFrameSent(True, OP_PING, ping_2)888        # The keepalive ping task goes on.889        self.assertFalse(self.protocol.keepalive_ping_task.done())890    def test_keepalive_ping_not_acknowledged_closes_connection(self):891        self.restart_protocol_with_keepalive_ping()892        # Ping is sent at 3ms and not acknowleged.893        self.loop.run_until_complete(asyncio.sleep(4 * MS))894        (ping_1,) = tuple(self.protocol.pings)895        self.assertOneFrameSent(True, OP_PING, ping_1)896        # Connection is closed at 6ms.897        self.loop.run_until_complete(asyncio.sleep(4 * MS))898        self.assertOneFrameSent(True, OP_CLOSE, serialize_close(1011, ""))899        # The keepalive ping task is complete.900        self.assertEqual(self.protocol.keepalive_ping_task.result(), None)901    def test_keepalive_ping_stops_when_connection_closing(self):902        self.restart_protocol_with_keepalive_ping()903        close_task = self.half_close_connection_local()904        # No ping sent at 3ms because the closing handshake is in progress.905        self.loop.run_until_complete(asyncio.sleep(4 * MS))906        self.assertNoFrameSent()907        # The keepalive ping task terminated.908        self.assertTrue(self.protocol.keepalive_ping_task.cancelled())909        self.loop.run_until_complete(close_task)  # cleanup910    def test_keepalive_ping_stops_when_connection_closed(self):911        self.restart_protocol_with_keepalive_ping()912        self.close_connection()913        # The keepalive ping task terminated.914        self.assertTrue(self.protocol.keepalive_ping_task.cancelled())915    def test_keepalive_ping_does_not_crash_when_connection_lost(self):916        self.restart_protocol_with_keepalive_ping()917        # Clog incoming queue. This lets connection_lost() abort pending pings918        # with a ConnectionClosed exception before transfer_data_task919        # terminates and close_connection cancels keepalive_ping_task.920        self.protocol.max_queue = 1921        self.receive_frame(Frame(True, OP_TEXT, b"1"))922        self.receive_frame(Frame(True, OP_TEXT, b"2"))923        # Ping is sent at 3ms.924        self.loop.run_until_complete(asyncio.sleep(4 * MS))925        (ping_waiter,) = tuple(self.protocol.pings.values())926        # Connection drops.927        self.receive_eof()928        self.loop.run_until_complete(self.protocol.wait_closed())929        # The ping waiter receives a ConnectionClosed exception.930        with self.assertRaises(ConnectionClosed):931            ping_waiter.result()932        # The keepalive ping task terminated properly.933        self.assertIsNone(self.protocol.keepalive_ping_task.result())934        # Unclog incoming queue to terminate the test quickly.935        self.loop.run_until_complete(self.protocol.recv())936        self.loop.run_until_complete(self.protocol.recv())937    def test_keepalive_ping_with_no_ping_interval(self):938        self.restart_protocol_with_keepalive_ping(ping_interval=None)939        # No ping is sent at 3ms.940        self.loop.run_until_complete(asyncio.sleep(4 * MS))941        self.assertNoFrameSent()942    def test_keepalive_ping_with_no_ping_timeout(self):943        self.restart_protocol_with_keepalive_ping(ping_timeout=None)944        # Ping is sent at 3ms and not acknowleged.945        self.loop.run_until_complete(asyncio.sleep(4 * MS))946        (ping_1,) = tuple(self.protocol.pings)947        self.assertOneFrameSent(True, OP_PING, ping_1)948        # Next ping is sent at 7ms anyway.949        self.loop.run_until_complete(asyncio.sleep(4 * MS))950        ping_1_again, ping_2 = tuple(self.protocol.pings)951        self.assertEqual(ping_1, ping_1_again)952        self.assertOneFrameSent(True, OP_PING, ping_2)953        # The keepalive ping task goes on.954        self.assertFalse(self.protocol.keepalive_ping_task.done())955    def test_keepalive_ping_unexpected_error(self):956        self.restart_protocol_with_keepalive_ping()957        async def ping():958            raise Exception("BOOM")959        self.protocol.ping = ping960        # The keepalive ping task fails when sending a ping at 3ms.961        self.loop.run_until_complete(asyncio.sleep(4 * MS))962        # The keepalive ping task is complete.963        # It logs and swallows the exception.964        self.assertEqual(self.protocol.keepalive_ping_task.result(), None)965    # Test the protocol logic for closing the connection.966    def test_local_close(self):967        # Emulate how the remote endpoint answers the closing handshake.968        self.loop.call_later(MS, self.receive_frame, self.close_frame)969        self.loop.call_later(MS, self.receive_eof_if_client)970        # Run the closing handshake.971        self.loop.run_until_complete(self.protocol.close(reason="close"))972        self.assertConnectionClosed(1000, "close")973        self.assertOneFrameSent(*self.close_frame)974        # Closing the connection again is a no-op.975        self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))976        self.assertConnectionClosed(1000, "close")977        self.assertNoFrameSent()978    def test_remote_close(self):979        # Emulate how the remote endpoint initiates the closing handshake.980        self.loop.call_later(MS, self.receive_frame, self.close_frame)981        self.loop.call_later(MS, self.receive_eof_if_client)982        # Wait for some data in order to process the handshake.983        # After recv() raises ConnectionClosed, the connection is closed.984        with self.assertRaises(ConnectionClosed):985            self.loop.run_until_complete(self.protocol.recv())986        self.assertConnectionClosed(1000, "close")987        self.assertOneFrameSent(*self.close_frame)988        # Closing the connection again is a no-op.989        self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))990        self.assertConnectionClosed(1000, "close")991        self.assertNoFrameSent()992    def test_remote_close_and_connection_lost(self):993        self.make_drain_slow()994        # Drop the connection right after receiving a close frame,995        # which prevents echoing the close frame properly.996        self.receive_frame(self.close_frame)997        self.receive_eof()998        with self.assertNoLogs():999            self.loop.run_until_complete(self.protocol.close(reason="oh noes!"))1000        self.assertConnectionClosed(1000, "close")1001        self.assertOneFrameSent(*self.close_frame)1002    def test_simultaneous_close(self):1003        # Receive the incoming close frame right after self.protocol.close()1004        # starts executing. This reproduces the error described in:1005        # https://github.com/aaugustin/websockets/issues/3391006        self.loop.call_soon(self.receive_frame, self.remote_close)1007        self.loop.call_soon(self.receive_eof_if_client)1008        self.loop.run_until_complete(self.protocol.close(reason="local"))1009        self.assertConnectionClosed(1000, "remote")1010        # The current implementation sends a close frame in response to the1011        # close frame received from the remote end. It skips the close frame1012        # that should be sent as a result of calling close().1013        self.assertOneFrameSent(*self.remote_close)1014    def test_close_preserves_incoming_frames(self):1015        self.receive_frame(Frame(True, OP_TEXT, b"hello"))1016        self.loop.call_later(MS, self.receive_frame, self.close_frame)1017        self.loop.call_later(MS, self.receive_eof_if_client)1018        self.loop.run_until_complete(self.protocol.close(reason="close"))1019        self.assertConnectionClosed(1000, "close")1020        self.assertOneFrameSent(*self.close_frame)1021        next_message = self.loop.run_until_complete(self.protocol.recv())1022        self.assertEqual(next_message, "hello")1023    def test_close_protocol_error(self):1024        invalid_close_frame = Frame(True, OP_CLOSE, b"\x00")1025        self.receive_frame(invalid_close_frame)1026        self.receive_eof_if_client()1027        self.run_loop_once()1028        self.loop.run_until_complete(self.protocol.close(reason="close"))1029        self.assertConnectionFailed(1002, "")1030    def test_close_connection_lost(self):1031        self.receive_eof()1032        self.run_loop_once()1033        self.loop.run_until_complete(self.protocol.close(reason="close"))1034        self.assertConnectionFailed(1006, "")1035    def test_local_close_during_recv(self):1036        recv = self.loop.create_task(self.protocol.recv())1037        self.loop.call_later(MS, self.receive_frame, self.close_frame)1038        self.loop.call_later(MS, self.receive_eof_if_client)1039        self.loop.run_until_complete(self.protocol.close(reason="close"))1040        with self.assertRaises(ConnectionClosed):1041            self.loop.run_until_complete(recv)1042        self.assertConnectionClosed(1000, "close")1043    # There is no test_remote_close_during_recv because it would be identical1044    # to test_remote_close.1045    def test_remote_close_during_send(self):1046        self.make_drain_slow()1047        send = self.loop.create_task(self.protocol.send("hello"))1048        self.receive_frame(self.close_frame)1049        self.receive_eof()1050        with self.assertRaises(ConnectionClosed):1051            self.loop.run_until_complete(send)1052        self.assertConnectionClosed(1000, "close")1053    # There is no test_local_close_during_send because this cannot really1054    # happen, considering that writes are serialized.1055class ServerTests(CommonTests, AsyncioTestCase):1056    def setUp(self):1057        super().setUp()1058        self.protocol.is_client = False1059        self.protocol.side = "server"1060    def test_local_close_send_close_frame_timeout(self):1061        self.protocol.close_timeout = 10 * MS1062        self.make_drain_slow(50 * MS)1063        # If we can't send a close frame, time out in 10ms.1064        # Check the timing within -1/+9ms for robustness.1065        with self.assertCompletesWithin(9 * MS, 19 * MS):1066            self.loop.run_until_complete(self.protocol.close(reason="close"))1067        self.assertConnectionClosed(1006, "")1068    def test_local_close_receive_close_frame_timeout(self):1069        self.protocol.close_timeout = 10 * MS1070        # If the client doesn't send a close frame, time out in 10ms.1071        # Check the timing within -1/+9ms for robustness.1072        with self.assertCompletesWithin(9 * MS, 19 * MS):1073            self.loop.run_until_complete(self.protocol.close(reason="close"))1074        self.assertConnectionClosed(1006, "")1075    def test_local_close_connection_lost_timeout_after_write_eof(self):1076        self.protocol.close_timeout = 10 * MS1077        # If the client doesn't close its side of the TCP connection after we1078        # half-close our side with write_eof(), time out in 10ms.1079        # Check the timing within -1/+9ms for robustness.1080        with self.assertCompletesWithin(9 * MS, 19 * MS):1081            # HACK: disable write_eof => other end drops connection emulation.1082            self.transport._eof = True1083            self.receive_frame(self.close_frame)1084            self.loop.run_until_complete(self.protocol.close(reason="close"))1085        self.assertConnectionClosed(1000, "close")1086    def test_local_close_connection_lost_timeout_after_close(self):1087        self.protocol.close_timeout = 10 * MS1088        # If the client doesn't close its side of the TCP connection after we1089        # half-close our side with write_eof() and close it with close(), time1090        # out in 20ms.1091        # Check the timing within -1/+9ms for robustness.1092        with self.assertCompletesWithin(19 * MS, 29 * MS):1093            # HACK: disable write_eof => other end drops connection emulation.1094            self.transport._eof = True1095            # HACK: disable close => other end drops connection emulation.1096            self.transport._closing = True1097            self.receive_frame(self.close_frame)1098            self.loop.run_until_complete(self.protocol.close(reason="close"))1099        self.assertConnectionClosed(1000, "close")1100class ClientTests(CommonTests, AsyncioTestCase):1101    def setUp(self):1102        super().setUp()1103        self.protocol.is_client = True1104        self.protocol.side = "client"1105    def test_local_close_send_close_frame_timeout(self):1106        self.protocol.close_timeout = 10 * MS1107        self.make_drain_slow(50 * MS)1108        # If we can't send a close frame, time out in 20ms.1109        # - 10ms waiting for sending a close frame1110        # - 10ms waiting for receiving a half-close1111        # Check the timing within -1/+9ms for robustness.1112        with self.assertCompletesWithin(19 * MS, 29 * MS):1113            self.loop.run_until_complete(self.protocol.close(reason="close"))1114        self.assertConnectionClosed(1006, "")1115    def test_local_close_receive_close_frame_timeout(self):1116        self.protocol.close_timeout = 10 * MS1117        # If the server doesn't send a close frame, time out in 20ms:1118        # - 10ms waiting for receiving a close frame1119        # - 10ms waiting for receiving a half-close1120        # Check the timing within -1/+9ms for robustness.1121        with self.assertCompletesWithin(19 * MS, 29 * MS):1122            self.loop.run_until_complete(self.protocol.close(reason="close"))1123        self.assertConnectionClosed(1006, "")1124    def test_local_close_connection_lost_timeout_after_write_eof(self):1125        self.protocol.close_timeout = 10 * MS1126        # If the server doesn't half-close its side of the TCP connection1127        # after we send a close frame, time out in 20ms:1128        # - 10ms waiting for receiving a half-close1129        # - 10ms waiting for receiving a close after write_eof1130        # Check the timing within -1/+9ms for robustness.1131        with self.assertCompletesWithin(19 * MS, 29 * MS):1132            # HACK: disable write_eof => other end drops connection emulation.1133            self.transport._eof = True1134            self.receive_frame(self.close_frame)1135            self.loop.run_until_complete(self.protocol.close(reason="close"))1136        self.assertConnectionClosed(1000, "close")1137    def test_local_close_connection_lost_timeout_after_close(self):1138        self.protocol.close_timeout = 10 * MS1139        # If the client doesn't close its side of the TCP connection after we1140        # half-close our side with write_eof() and close it with close(), time1141        # out in 20ms.1142        # - 10ms waiting for receiving a half-close1143        # - 10ms waiting for receiving a close after write_eof1144        # - 10ms waiting for receiving a close after close1145        # Check the timing within -1/+9ms for robustness.1146        with self.assertCompletesWithin(29 * MS, 39 * MS):1147            # HACK: disable write_eof => other end drops connection emulation.1148            self.transport._eof = True1149            # HACK: disable close => other end drops connection emulation.1150            self.transport._closing = True1151            self.receive_frame(self.close_frame)1152            self.loop.run_until_complete(self.protocol.close(reason="close"))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
