Best Python code snippet using avocado_python
test_server_asyncio.py
Source:test_server_asyncio.py  
...77            serve.assert_awaited()78    @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")79    @asyncio.coroutine80    def testTcpServerServeForever(self):81        ''' Test StartTcpServer serve_forever() method '''82        with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:83            server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)84            yield from server.serve_forever()85            serve.assert_awaited()86    @asyncio.coroutine87    def testTcpServerServeForeverTwice(self):88        ''' Call on serve_forever() twice should result in a runtime error '''89        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)90        if PYTHON_VERSION >= (3, 7):91            server_task = asyncio.create_task(server.serve_forever())92        else:93            server_task = asyncio.ensure_future(server.serve_forever())94        yield from server.serving95        with self.assertRaises(RuntimeError):96            yield from server.serve_forever()97        server.server_close()98    @asyncio.coroutine99    def testTcpServerReceiveData(self):100        ''' Test data sent on socket is received by internals - doesn't not process data '''101        data = b'\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'102        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)103        if PYTHON_VERSION >= (3, 7):104            server_task = asyncio.create_task(server.serve_forever())105        else:106            server_task = asyncio.ensure_future(server.serve_forever())107        yield from server.serving108        with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=Mock) as process:109        # process = server.framer.processIncomingPacket = Mock()110            connected = self.loop.create_future()111            random_port = server.server.sockets[0].getsockname()[1] # get the random server port112            class BasicClient(asyncio.BaseProtocol):113                def connection_made(self, transport):114                    self.transport = transport115                    self.transport.write(data)116                    connected.set_result(True)117                def eof_received(self):118                    pass119            transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)120            yield from asyncio.sleep(0.1) # this may be better done by making an internal hook in the actual implementation121            # if this unit test fails on a machine, see if increasing the sleep time makes a difference, if it does122            # blame author for a fix123            if PYTHON_VERSION >= (3, 6):124                process.assert_called_once()125            self.assertTrue( process.call_args[1]["data"] == data )126            server.server_close()127    @asyncio.coroutine128    def testTcpServerRoundtrip(self):129        ''' Test sending and receiving data on tcp socket '''130        data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register131        expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context132        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)133        if PYTHON_VERSION >= (3, 7):134            server_task = asyncio.create_task(server.serve_forever())135        else:136            server_task = asyncio.ensure_future(server.serve_forever())137        yield from server.serving138        random_port = server.server.sockets[0].getsockname()[1] # get the random server port139        connected, done = self.loop.create_future(),self.loop.create_future()140        received_value = None141        class BasicClient(asyncio.BaseProtocol):142            def connection_made(self, transport):143                self.transport = transport144                self.transport.write(data)145                connected.set_result(True)146            def data_received(self, data):147                nonlocal received_value, done148                received_value = data149                done.set_result(True)150            def eof_received(self):151                pass152        transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)153        yield from asyncio.wait_for(done, timeout=0.1)154        self.assertEqual(received_value, expected_response)155        transport.close()156        yield from asyncio.sleep(0)157        server.server_close()158    @asyncio.coroutine159    def testTcpServerConnectionLost(self):160        ''' Test tcp stream interruption '''161        data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"162        server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)163        if PYTHON_VERSION >= (3, 7):164            server_task = asyncio.create_task(server.serve_forever())165        else:166            server_task = asyncio.ensure_future(server.serve_forever())167        yield from server.serving168        random_port = server.server.sockets[0].getsockname()[1]     # get the random server port169        step1 = self.loop.create_future()170        # done = self.loop.create_future()171        # received_value = None172        time.sleep(1)173        class BasicClient(asyncio.BaseProtocol):174            def connection_made(self, transport):175                self.transport = transport176                step1.set_result(True)177        transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1', port=random_port)178        yield from step1179        # await asyncio.sleep(1)180        self.assertTrue(len(server.active_connections) == 1)181        protocol.transport.close()  # close isn't synchronous and there's no notification that it's done182        # so we have to wait a bit183        yield from asyncio.sleep(0.1)184        self.assertTrue(len(server.active_connections) == 0)185        server.server_close()186    @asyncio.coroutine187    def testTcpServerCloseActiveConnection(self):188        ''' Test server_close() while there are active TCP connections '''189        data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"190        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)191        if PYTHON_VERSION >= (3, 7):192            server_task = asyncio.create_task(server.serve_forever())193        else:194            server_task = asyncio.ensure_future(server.serve_forever())195        yield from server.serving196        random_port = server.server.sockets[0].getsockname()[1] # get the random server port197        step1 = self.loop.create_future()198        done = self.loop.create_future()199        received_value = None200        class BasicClient(asyncio.BaseProtocol):201            def connection_made(self, transport):202                self.transport = transport203                step1.set_result(True)204        transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)205        yield from step1206        server.server_close()207        # close isn't synchronous and there's no notification that it's done208        # so we have to wait a bit209        yield from asyncio.sleep(0.0)210        self.assertTrue( len(server.active_connections) == 0 )211    @asyncio.coroutine212    def testTcpServerNoSlave(self):213        ''' Test unknown slave unit exception '''214        context = ModbusServerContext(slaves={0x01: self.store, 0x02: self.store  }, single=False)215        data = b"\x01\x00\x00\x00\x00\x06\x05\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)216        server = yield from StartTcpServer(context=context,address=("127.0.0.1", 0),loop=self.loop)217        if PYTHON_VERSION >= (3, 7):218            server_task = asyncio.create_task(server.serve_forever())219        else:220            server_task = asyncio.ensure_future(server.serve_forever())221        yield from server.serving222        connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()223        received_data = None224        random_port = server.server.sockets[0].getsockname()[1] # get the random server port225        class BasicClient(asyncio.BaseProtocol):226            def connection_made(self, transport):227                _logger.debug("Client connected")228                self.transport = transport229                transport.write(data)230                connect.set_result(True)231            def data_received(self, data):232                _logger.debug("Client received data")233                receive.set_result(True)234                received_data = data235            def eof_received(self):236                _logger.debug("Client stream eof")237                eof.set_result(True)238        transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)239        yield from asyncio.wait_for(connect, timeout=0.1)240        self.assertFalse(eof.done())241        server.server_close()242    @asyncio.coroutine243    def testTcpServerModbusError(self):244        ''' Test sending garbage data on a TCP socket should drop the connection '''245        data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01"  # get slave 5 function 3 (holding register)246        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)247        if PYTHON_VERSION >= (3, 7):248            server_task = asyncio.create_task(server.serve_forever())249        else:250            server_task = asyncio.ensure_future(server.serve_forever())251        yield from server.serving252        with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",253                   side_effect=NoSuchSlaveException):254            connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()255            received_data = None256            random_port = server.server.sockets[0].getsockname()[1] # get the random server port257            class BasicClient(asyncio.BaseProtocol):258                def connection_made(self, transport):259                    _logger.debug("Client connected")260                    self.transport = transport261                    transport.write(data)262                    connect.set_result(True)263                def data_received(self, data):264                    _logger.debug("Client received data")265                    receive.set_result(True)266                    received_data = data267                def eof_received(self):268                    _logger.debug("Client stream eof")269                    eof.set_result(True)270            transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)271            yield from asyncio.wait_for(connect, timeout=0.1)272            yield from asyncio.wait_for(receive, timeout=0.1)273            self.assertFalse(eof.done())274            transport.close()275            server.server_close()276    @asyncio.coroutine277    def testTcpServerInternalException(self):278        ''' Test sending garbage data on a TCP socket should drop the connection '''279        data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01"  # get slave 5 function 3 (holding register)280        server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)281        if PYTHON_VERSION >= (3, 7):282            server_task = asyncio.create_task(server.serve_forever())283        else:284            server_task = asyncio.ensure_future(server.serve_forever())285        yield from server.serving286        with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",287                   side_effect=Exception):288            connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()289            received_data = None290            random_port = server.server.sockets[0].getsockname()[1] # get the random server port291            class BasicClient(asyncio.BaseProtocol):292                def connection_made(self, transport):293                    _logger.debug("Client connected")294                    self.transport = transport295                    transport.write(data)296                    connect.set_result(True)297                def data_received(self, data):298                    _logger.debug("Client received data")299                    receive.set_result(True)300                    received_data = data301                def eof_received(self):302                    _logger.debug("Client stream eof")303                    eof.set_result(True)304            transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)305            yield from asyncio.wait_for(connect, timeout=0.1)306            yield from asyncio.wait_for(receive, timeout=0.1)307            self.assertFalse(eof.done())308            transport.close()309            server.server_close()310    #-----------------------------------------------------------------------#311    # Test ModbusTlsProtocol312    #-----------------------------------------------------------------------#313    @asyncio.coroutine314    def testStartTlsServer(self):315        ''' Test that the modbus tls asyncio server starts correctly '''316        with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:317            identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})318            self.loop = asynctest.Mock(self.loop)319            server = yield from StartTlsServer(context=self.context,loop=self.loop,identity=identity)320            self.assertEqual(server.control.Identity.VendorName, 'VendorName')321            self.assertIsNotNone(server.sslctx)322            if PYTHON_VERSION >= (3, 6):323                self.loop.create_server.assert_called_once()324    @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")325    @asyncio.coroutine326    def testTlsServerServeNoDefer(self):327        ''' Test StartTcpServer without deferred start (immediate execution of server) '''328        with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:329            with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:330                server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop, defer_start=False)331                serve.assert_awaited()332    @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")333    @asyncio.coroutine334    def testTlsServerServeForever(self):335        ''' Test StartTcpServer serve_forever() method '''336        with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:337            with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:338                server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)339                yield from server.serve_forever()340                serve.assert_awaited()341    @asyncio.coroutine342    def testTlsServerServeForeverTwice(self):343        ''' Call on serve_forever() twice should result in a runtime error '''344        with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:345            server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)346            if PYTHON_VERSION >= (3, 7):347                server_task = asyncio.create_task(server.serve_forever())348            else:349                server_task = asyncio.ensure_future(server.serve_forever())350            yield from server.serving351            with self.assertRaises(RuntimeError):352                yield from server.serve_forever()353            server.server_close()354    #-----------------------------------------------------------------------#355    # Test ModbusUdpProtocol356    #-----------------------------------------------------------------------#357    @asyncio.coroutine358    def testStartUdpServer(self):359        ''' Test that the modbus udp asyncio server starts correctly '''360        identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})361        self.loop = asynctest.Mock(self.loop)362        server = yield from StartUdpServer(context=self.context,loop=self.loop,identity=identity)363        self.assertEqual(server.control.Identity.VendorName, 'VendorName')364        if PYTHON_VERSION >= (3, 6):365            self.loop.create_datagram_endpoint.assert_called_once()366    # async def testUdpServerServeNoDefer(self):367    #     ''' Test StartUdpServer without deferred start - NOT IMPLEMENTED - this test is hard to do without additional368    #       internal plumbing added to the implementation '''369    #     asyncio.base_events.Server.serve_forever = asynctest.CoroutineMock()370    #     server = yield from StartUdpServer(address=("127.0.0.1", 0), loop=self.loop, defer_start=False)371    #     server.server.serve_forever.assert_awaited()372    @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")373    @asyncio.coroutine374    def testUdpServerServeForeverStart(self):375        ''' Test StartUdpServer serve_forever() method '''376        with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:377            server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)378            yield from server.serve_forever()379            serve.assert_awaited()380    @asyncio.coroutine381    def testUdpServerServeForeverClose(self):382        ''' Test StartUdpServer serve_forever() method '''383        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)384        if PYTHON_VERSION >= (3, 7):385            server_task = asyncio.create_task(server.serve_forever())386        else:387            server_task = asyncio.ensure_future(server.serve_forever())388        yield from server.serving389        self.assertTrue(asyncio.isfuture(server.on_connection_terminated))390        self.assertFalse(server.on_connection_terminated.done())391        server.server_close()392        self.assertTrue(server.protocol.is_closing())393    @asyncio.coroutine394    def testUdpServerServeForeverTwice(self):395        ''' Call on serve_forever() twice should result in a runtime error '''396        identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})397        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),398                                      loop=self.loop,identity=identity)399        if PYTHON_VERSION >= (3, 7):400            server_task = asyncio.create_task(server.serve_forever())401        else:402            server_task = asyncio.ensure_future(server.serve_forever())403        yield from server.serving404        with self.assertRaises(RuntimeError):405            yield from server.serve_forever()406        server.server_close()407    @asyncio.coroutine408    def testUdpServerReceiveData(self):409        ''' Test that the sending data on datagram socket gets data pushed to framer '''410        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)411        if PYTHON_VERSION >= (3, 7):412            server_task = asyncio.create_task(server.serve_forever())413        else:414            server_task = asyncio.ensure_future(server.serve_forever())415        yield from server.serving416        with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',new_callable=Mock) as process:417            server.endpoint.datagram_received(data=b"12345", addr=("127.0.0.1", 12345))418            yield from asyncio.sleep(0.1)419            process.seal()420            if PYTHON_VERSION >= (3, 6):421                process.assert_called_once()422            self.assertTrue( process.call_args[1]["data"] == b"12345" )423            server.server_close()424    @asyncio.coroutine425    def testUdpServerSendData(self):426        ''' Test that the modbus udp asyncio server correctly sends data outbound '''427        identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})428        data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'429        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0))430        if PYTHON_VERSION >= (3, 7):431            server_task = asyncio.create_task(server.serve_forever())432        else:433            server_task = asyncio.ensure_future(server.serve_forever())434        yield from server.serving435        random_port = server.protocol._sock.getsockname()[1]436        received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received)437        done = self.loop.create_future()438        received_value = None439        class BasicClient(asyncio.DatagramProtocol):440            def connection_made(self, transport):441                self.transport = transport442                self.transport.sendto(data)443            def datagram_received(self, data, addr):444                nonlocal received_value, done445                print("received")446                received_value = data447                done.set_result(True)448                self.transport.close()449        transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient,450            remote_addr=('127.0.0.1', random_port))451        yield from asyncio.sleep(0.1)452        if PYTHON_VERSION >= (3, 6):453            received.assert_called_once()454        self.assertEqual(received.call_args[0][0], data)455        server.server_close()456        self.assertTrue(server.protocol.is_closing())457        yield from asyncio.sleep(0.1)458    @asyncio.coroutine459    def testUdpServerRoundtrip(self):460        ''' Test sending and receiving data on udp socket'''461        data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register462        expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context463        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)464        if PYTHON_VERSION >= (3, 7):465            server_task = asyncio.create_task(server.serve_forever())466        else:467            server_task = asyncio.ensure_future(server.serve_forever())468        yield from server.serving469        random_port = server.protocol._sock.getsockname()[1]470        connected, done = self.loop.create_future(),self.loop.create_future()471        received_value = None472        class BasicClient(asyncio.DatagramProtocol):473            def connection_made(self, transport):474                self.transport = transport475                self.transport.sendto(data)476            def datagram_received(self, data, addr):477                nonlocal received_value, done478                print("received")479                received_value = data480                done.set_result(True)481        transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient,482                                                                        remote_addr=('127.0.0.1', random_port))483        yield from asyncio.wait_for(done, timeout=0.1)484        self.assertEqual(received_value, expected_response)485        transport.close()486        yield from asyncio.sleep(0)487        server.server_close()488    @asyncio.coroutine489    def testUdpServerException(self):490        ''' Test sending garbage data on a TCP socket should drop the connection '''491        garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'492        server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)493        if PYTHON_VERSION >= (3, 7):494            server_task = asyncio.create_task(server.serve_forever())495        else:496            server_task = asyncio.ensure_future(server.serve_forever())497        yield from server.serving498        with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',499                   new_callable=lambda: Mock(side_effect=Exception)) as process:500            connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()501            received_data = None502            random_port = server.protocol._sock.getsockname()[1]  # get the random server port503            class BasicClient(asyncio.DatagramProtocol):504                def connection_made(self, transport):505                    _logger.debug("Client connected")506                    self.transport = transport507                    transport.sendto(garbage)508                    connect.set_result(True)509                def datagram_received(self, data, addr):510                    nonlocal receive511                    _logger.debug("Client received data")512                    receive.set_result(True)513                    received_data = data514            transport, protocol = yield from self.loop.create_datagram_endpoint(BasicClient,515                                                                           remote_addr=('127.0.0.1', random_port))516            yield from asyncio.wait_for(connect, timeout=0.1)517            self.assertFalse(receive.done())518            self.assertFalse(server.protocol._sock._closed)519            server.server_close()520    # -----------------------------------------------------------------------#521    # Test ModbusServerFactory522    # -----------------------------------------------------------------------#523    def testModbusServerFactory(self):524        ''' Test the base class for all the clients '''525        with self.assertWarns(DeprecationWarning):526            factory = ModbusServerFactory(store=None)527    def testStopServer(self):528        with self.assertWarns(DeprecationWarning):529            StopServer()530    @asyncio.coroutine531    def testTcpServerException(self):532        ''' Sending garbage data on a TCP socket should drop the connection '''533        garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'534        server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)535        if PYTHON_VERSION >= (3, 7):536            server_task = asyncio.create_task(server.serve_forever())537        else:538            server_task = asyncio.ensure_future(server.serve_forever())539        yield from server.serving540        with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',541                   new_callable=lambda: Mock(side_effect=Exception)) as process:542            connect, receive, eof = self.loop.create_future(), self.loop.create_future(), self.loop.create_future()543            received_data = None544            random_port = server.server.sockets[0].getsockname()[1]  # get the random server port545            class BasicClient(asyncio.BaseProtocol):546                def connection_made(self, transport):547                    _logger.debug("Client connected")548                    self.transport = transport549                    transport.write(garbage)550                    connect.set_result(True)551                def data_received(self, data):552                    _logger.debug("Client received data")553                    receive.set_result(True)554                    received_data = data555                def eof_received(self):556                    _logger.debug("Client stream eof")557                    eof.set_result(True)558            transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',559                                                                         port=random_port)560            yield from asyncio.wait_for(connect, timeout=0.1)561            yield from asyncio.wait_for(eof, timeout=0.1)562            # neither of these should timeout if the test is successful563            server.server_close()564    @asyncio.coroutine565    def testTcpServerException(self):566        ''' Sending garbage data on a TCP socket should drop the connection '''567        garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'568        server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)569        if PYTHON_VERSION >= (3, 7):570            server_task = asyncio.create_task(server.serve_forever())571        else:572            server_task = asyncio.ensure_future(server.serve_forever())573        yield from server.serving574        with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',575                   new_callable=lambda: Mock(side_effect=Exception)) as process:576            connect, receive, eof = self.loop.create_future(), self.loop.create_future(), self.loop.create_future()577            received_data = None578            random_port = server.server.sockets[0].getsockname()[1]  # get the random server port579            class BasicClient(asyncio.BaseProtocol):580                def connection_made(self, transport):581                    _logger.debug("Client connected")582                    self.transport = transport583                    transport.write(garbage)584                    connect.set_result(True)585                def data_received(self, data):586                    _logger.debug("Client received data")...watchdog.py
Source:watchdog.py  
...22    server = WatchdogThreadingSocketServer((host, port), WatchdogTCPRequestHandler)23    handle_queue_thread = threading.Thread(target=handle_queue, args=(server, bi_queue, serve_forever))24    handle_queue_thread.start()25    try:26        server.serve_forever()27    except KeyboardInterrupt:28        pass29    finally:30        # shutdown server31        for unique_id in server.queued_data:32            # FIXME: Don't worked!33            pass34#            bi_queue.put('parent', )35        server.shutdown()36def handle_worker(plugins_conf, bi_queue, serve_forever):37    """38    :type plugins_conf: list39    :type bi_queue: packages.bidirectional_queue.BidirectionalQueue40    :type serve_forever: multiprocessing.Value...st2ss.py
Source:st2ss.py  
...5        for line in self.rfile:6            self.wfile.write(line)7if __name__ == '__main__':8    serv = TCPServer(('', 20000), EchoHandler)9    serv.serve_forever()10# 使ç¨StreamRequestHandler11from socketserver import ThreadingTCPServer, ForkingTCPServer12if __name__ == '__main__':13    serv = ThreadingTCPServer(('', 20000), EchoHandler)14    serv.serve_forever()15# forkæè
çº¿ç¨æå¡å¨æ½å¨é®é¢æ¯ä¼ä¸ºæ¯ä¸ªå®¢æ·ç«¯è¿æ¥å建ä¸ä¸ªæ°çè¿ç¨æè
线ç¨16if __name__ == '__main__':17    from threading import Thread18    NWORKERS = 1619    serv = TCPServer(('', 20000), EchoHandler)20    for i in range(NWORKERS):21        t = Thread(target=serv.serve_forever)22        t.daemon = True23        t.start()24    serv.serve_forever()25# å
å建ä¸ä¸ªæ®éçéçº¿ç¨æå¡å¨ï¼ç¶åå¨ä¸ä¸ªçº¿ç¨æ± ä¸ä½¿ç¨serve_forever()æ¹æ³æ¥å¯å¨26if __name__ == '__main__':27    import socket28    serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)29    serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)30    serv.server_bind()31    serv.server_activate()32    serv.serve_forever()33# è°æ´socket34if __name__ == '__main__':35    TCPServer.allow_reuse_address = True36    serv = TCPServer(('', 200000), EchoHandler)37    serv.serve_forever()38import socket39class EchoHandler(StreamRequestHandler):40    timeout = 541    rbufsize = -142    wbufsize = 043    disable_nagle_algorithm = False44    def handle(self):45        print('Got connection from ', self.client_address)46        try:47            for line in self.rfile:48                self.wfile.write(line)49        except socket.timeout:50            print('Time out!')51# StreamRequestHandleræ´å çµæ´»...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
