How to use serve_forever method in avocado

Best Python code snippet using avocado_python

test_server_asyncio.py

Source:test_server_asyncio.py Github

copy

Full Screen

...77 serve.assert_awaited()78 @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")79 @asyncio.coroutine80 def testTcpServerServeForever(self):81 ''' Test StartTcpServer serve_forever() method '''82 with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:83 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)84 yield from server.serve_forever()85 serve.assert_awaited()86 @asyncio.coroutine87 def testTcpServerServeForeverTwice(self):88 ''' Call on serve_forever() twice should result in a runtime error '''89 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)90 if PYTHON_VERSION >= (3, 7):91 server_task = asyncio.create_task(server.serve_forever())92 else:93 server_task = asyncio.ensure_future(server.serve_forever())94 yield from server.serving95 with self.assertRaises(RuntimeError):96 yield from server.serve_forever()97 server.server_close()98 @asyncio.coroutine99 def testTcpServerReceiveData(self):100 ''' Test data sent on socket is received by internals - doesn't not process data '''101 data = b'\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'102 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)103 if PYTHON_VERSION >= (3, 7):104 server_task = asyncio.create_task(server.serve_forever())105 else:106 server_task = asyncio.ensure_future(server.serve_forever())107 yield from server.serving108 with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket', new_callable=Mock) as process:109 # process = server.framer.processIncomingPacket = Mock()110 connected = self.loop.create_future()111 random_port = server.server.sockets[0].getsockname()[1] # get the random server port112 class BasicClient(asyncio.BaseProtocol):113 def connection_made(self, transport):114 self.transport = transport115 self.transport.write(data)116 connected.set_result(True)117 def eof_received(self):118 pass119 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)120 yield from asyncio.sleep(0.1) # this may be better done by making an internal hook in the actual implementation121 # if this unit test fails on a machine, see if increasing the sleep time makes a difference, if it does122 # blame author for a fix123 if PYTHON_VERSION >= (3, 6):124 process.assert_called_once()125 self.assertTrue( process.call_args[1]["data"] == data )126 server.server_close()127 @asyncio.coroutine128 def testTcpServerRoundtrip(self):129 ''' Test sending and receiving data on tcp socket '''130 data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register131 expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context132 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)133 if PYTHON_VERSION >= (3, 7):134 server_task = asyncio.create_task(server.serve_forever())135 else:136 server_task = asyncio.ensure_future(server.serve_forever())137 yield from server.serving138 random_port = server.server.sockets[0].getsockname()[1] # get the random server port139 connected, done = self.loop.create_future(),self.loop.create_future()140 received_value = None141 class BasicClient(asyncio.BaseProtocol):142 def connection_made(self, transport):143 self.transport = transport144 self.transport.write(data)145 connected.set_result(True)146 def data_received(self, data):147 nonlocal received_value, done148 received_value = data149 done.set_result(True)150 def eof_received(self):151 pass152 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)153 yield from asyncio.wait_for(done, timeout=0.1)154 self.assertEqual(received_value, expected_response)155 transport.close()156 yield from asyncio.sleep(0)157 server.server_close()158 @asyncio.coroutine159 def testTcpServerConnectionLost(self):160 ''' Test tcp stream interruption '''161 data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"162 server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)163 if PYTHON_VERSION >= (3, 7):164 server_task = asyncio.create_task(server.serve_forever())165 else:166 server_task = asyncio.ensure_future(server.serve_forever())167 yield from server.serving168 random_port = server.server.sockets[0].getsockname()[1] # get the random server port169 step1 = self.loop.create_future()170 # done = self.loop.create_future()171 # received_value = None172 time.sleep(1)173 class BasicClient(asyncio.BaseProtocol):174 def connection_made(self, transport):175 self.transport = transport176 step1.set_result(True)177 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1', port=random_port)178 yield from step1179 # await asyncio.sleep(1)180 self.assertTrue(len(server.active_connections) == 1)181 protocol.transport.close() # close isn't synchronous and there's no notification that it's done182 # so we have to wait a bit183 yield from asyncio.sleep(0.1)184 self.assertTrue(len(server.active_connections) == 0)185 server.server_close()186 @asyncio.coroutine187 def testTcpServerCloseActiveConnection(self):188 ''' Test server_close() while there are active TCP connections '''189 data = b"\x01\x00\x00\x00\x00\x06\x01\x01\x00\x00\x00\x01"190 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)191 if PYTHON_VERSION >= (3, 7):192 server_task = asyncio.create_task(server.serve_forever())193 else:194 server_task = asyncio.ensure_future(server.serve_forever())195 yield from server.serving196 random_port = server.server.sockets[0].getsockname()[1] # get the random server port197 step1 = self.loop.create_future()198 done = self.loop.create_future()199 received_value = None200 class BasicClient(asyncio.BaseProtocol):201 def connection_made(self, transport):202 self.transport = transport203 step1.set_result(True)204 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)205 yield from step1206 server.server_close()207 # close isn't synchronous and there's no notification that it's done208 # so we have to wait a bit209 yield from asyncio.sleep(0.0)210 self.assertTrue( len(server.active_connections) == 0 )211 @asyncio.coroutine212 def testTcpServerNoSlave(self):213 ''' Test unknown slave unit exception '''214 context = ModbusServerContext(slaves={0x01: self.store, 0x02: self.store }, single=False)215 data = b"\x01\x00\x00\x00\x00\x06\x05\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)216 server = yield from StartTcpServer(context=context,address=("127.0.0.1", 0),loop=self.loop)217 if PYTHON_VERSION >= (3, 7):218 server_task = asyncio.create_task(server.serve_forever())219 else:220 server_task = asyncio.ensure_future(server.serve_forever())221 yield from server.serving222 connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()223 received_data = None224 random_port = server.server.sockets[0].getsockname()[1] # get the random server port225 class BasicClient(asyncio.BaseProtocol):226 def connection_made(self, transport):227 _logger.debug("Client connected")228 self.transport = transport229 transport.write(data)230 connect.set_result(True)231 def data_received(self, data):232 _logger.debug("Client received data")233 receive.set_result(True)234 received_data = data235 def eof_received(self):236 _logger.debug("Client stream eof")237 eof.set_result(True)238 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)239 yield from asyncio.wait_for(connect, timeout=0.1)240 self.assertFalse(eof.done())241 server.server_close()242 @asyncio.coroutine243 def testTcpServerModbusError(self):244 ''' Test sending garbage data on a TCP socket should drop the connection '''245 data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)246 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)247 if PYTHON_VERSION >= (3, 7):248 server_task = asyncio.create_task(server.serve_forever())249 else:250 server_task = asyncio.ensure_future(server.serve_forever())251 yield from server.serving252 with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",253 side_effect=NoSuchSlaveException):254 connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()255 received_data = None256 random_port = server.server.sockets[0].getsockname()[1] # get the random server port257 class BasicClient(asyncio.BaseProtocol):258 def connection_made(self, transport):259 _logger.debug("Client connected")260 self.transport = transport261 transport.write(data)262 connect.set_result(True)263 def data_received(self, data):264 _logger.debug("Client received data")265 receive.set_result(True)266 received_data = data267 def eof_received(self):268 _logger.debug("Client stream eof")269 eof.set_result(True)270 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)271 yield from asyncio.wait_for(connect, timeout=0.1)272 yield from asyncio.wait_for(receive, timeout=0.1)273 self.assertFalse(eof.done())274 transport.close()275 server.server_close()276 @asyncio.coroutine277 def testTcpServerInternalException(self):278 ''' Test sending garbage data on a TCP socket should drop the connection '''279 data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # get slave 5 function 3 (holding register)280 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)281 if PYTHON_VERSION >= (3, 7):282 server_task = asyncio.create_task(server.serve_forever())283 else:284 server_task = asyncio.ensure_future(server.serve_forever())285 yield from server.serving286 with patch("pymodbus.register_read_message.ReadHoldingRegistersRequest.execute",287 side_effect=Exception):288 connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()289 received_data = None290 random_port = server.server.sockets[0].getsockname()[1] # get the random server port291 class BasicClient(asyncio.BaseProtocol):292 def connection_made(self, transport):293 _logger.debug("Client connected")294 self.transport = transport295 transport.write(data)296 connect.set_result(True)297 def data_received(self, data):298 _logger.debug("Client received data")299 receive.set_result(True)300 received_data = data301 def eof_received(self):302 _logger.debug("Client stream eof")303 eof.set_result(True)304 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',port=random_port)305 yield from asyncio.wait_for(connect, timeout=0.1)306 yield from asyncio.wait_for(receive, timeout=0.1)307 self.assertFalse(eof.done())308 transport.close()309 server.server_close()310 #-----------------------------------------------------------------------#311 # Test ModbusTlsProtocol312 #-----------------------------------------------------------------------#313 @asyncio.coroutine314 def testStartTlsServer(self):315 ''' Test that the modbus tls asyncio server starts correctly '''316 with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:317 identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})318 self.loop = asynctest.Mock(self.loop)319 server = yield from StartTlsServer(context=self.context,loop=self.loop,identity=identity)320 self.assertEqual(server.control.Identity.VendorName, 'VendorName')321 self.assertIsNotNone(server.sslctx)322 if PYTHON_VERSION >= (3, 6):323 self.loop.create_server.assert_called_once()324 @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")325 @asyncio.coroutine326 def testTlsServerServeNoDefer(self):327 ''' Test StartTcpServer without deferred start (immediate execution of server) '''328 with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:329 with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:330 server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop, defer_start=False)331 serve.assert_awaited()332 @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")333 @asyncio.coroutine334 def testTlsServerServeForever(self):335 ''' Test StartTcpServer serve_forever() method '''336 with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:337 with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:338 server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)339 yield from server.serve_forever()340 serve.assert_awaited()341 @asyncio.coroutine342 def testTlsServerServeForeverTwice(self):343 ''' Call on serve_forever() twice should result in a runtime error '''344 with patch.object(ssl.SSLContext, 'load_cert_chain') as mock_method:345 server = yield from StartTlsServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)346 if PYTHON_VERSION >= (3, 7):347 server_task = asyncio.create_task(server.serve_forever())348 else:349 server_task = asyncio.ensure_future(server.serve_forever())350 yield from server.serving351 with self.assertRaises(RuntimeError):352 yield from server.serve_forever()353 server.server_close()354 #-----------------------------------------------------------------------#355 # Test ModbusUdpProtocol356 #-----------------------------------------------------------------------#357 @asyncio.coroutine358 def testStartUdpServer(self):359 ''' Test that the modbus udp asyncio server starts correctly '''360 identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})361 self.loop = asynctest.Mock(self.loop)362 server = yield from StartUdpServer(context=self.context,loop=self.loop,identity=identity)363 self.assertEqual(server.control.Identity.VendorName, 'VendorName')364 if PYTHON_VERSION >= (3, 6):365 self.loop.create_datagram_endpoint.assert_called_once()366 # async def testUdpServerServeNoDefer(self):367 # ''' Test StartUdpServer without deferred start - NOT IMPLEMENTED - this test is hard to do without additional368 # internal plumbing added to the implementation '''369 # asyncio.base_events.Server.serve_forever = asynctest.CoroutineMock()370 # server = yield from StartUdpServer(address=("127.0.0.1", 0), loop=self.loop, defer_start=False)371 # server.server.serve_forever.assert_awaited()372 @pytest.mark.skipif(PYTHON_VERSION < (3, 7), reason="requires python3.7 or above")373 @asyncio.coroutine374 def testUdpServerServeForeverStart(self):375 ''' Test StartUdpServer serve_forever() method '''376 with patch('asyncio.base_events.Server.serve_forever', new_callable=asynctest.CoroutineMock) as serve:377 server = yield from StartTcpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)378 yield from server.serve_forever()379 serve.assert_awaited()380 @asyncio.coroutine381 def testUdpServerServeForeverClose(self):382 ''' Test StartUdpServer serve_forever() method '''383 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0), loop=self.loop)384 if PYTHON_VERSION >= (3, 7):385 server_task = asyncio.create_task(server.serve_forever())386 else:387 server_task = asyncio.ensure_future(server.serve_forever())388 yield from server.serving389 self.assertTrue(asyncio.isfuture(server.on_connection_terminated))390 self.assertFalse(server.on_connection_terminated.done())391 server.server_close()392 self.assertTrue(server.protocol.is_closing())393 @asyncio.coroutine394 def testUdpServerServeForeverTwice(self):395 ''' Call on serve_forever() twice should result in a runtime error '''396 identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})397 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),398 loop=self.loop,identity=identity)399 if PYTHON_VERSION >= (3, 7):400 server_task = asyncio.create_task(server.serve_forever())401 else:402 server_task = asyncio.ensure_future(server.serve_forever())403 yield from server.serving404 with self.assertRaises(RuntimeError):405 yield from server.serve_forever()406 server.server_close()407 @asyncio.coroutine408 def testUdpServerReceiveData(self):409 ''' Test that the sending data on datagram socket gets data pushed to framer '''410 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)411 if PYTHON_VERSION >= (3, 7):412 server_task = asyncio.create_task(server.serve_forever())413 else:414 server_task = asyncio.ensure_future(server.serve_forever())415 yield from server.serving416 with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',new_callable=Mock) as process:417 server.endpoint.datagram_received(data=b"12345", addr=("127.0.0.1", 12345))418 yield from asyncio.sleep(0.1)419 process.seal()420 if PYTHON_VERSION >= (3, 6):421 process.assert_called_once()422 self.assertTrue( process.call_args[1]["data"] == b"12345" )423 server.server_close()424 @asyncio.coroutine425 def testUdpServerSendData(self):426 ''' Test that the modbus udp asyncio server correctly sends data outbound '''427 identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})428 data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'429 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0))430 if PYTHON_VERSION >= (3, 7):431 server_task = asyncio.create_task(server.serve_forever())432 else:433 server_task = asyncio.ensure_future(server.serve_forever())434 yield from server.serving435 random_port = server.protocol._sock.getsockname()[1]436 received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received)437 done = self.loop.create_future()438 received_value = None439 class BasicClient(asyncio.DatagramProtocol):440 def connection_made(self, transport):441 self.transport = transport442 self.transport.sendto(data)443 def datagram_received(self, data, addr):444 nonlocal received_value, done445 print("received")446 received_value = data447 done.set_result(True)448 self.transport.close()449 transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient,450 remote_addr=('127.0.0.1', random_port))451 yield from asyncio.sleep(0.1)452 if PYTHON_VERSION >= (3, 6):453 received.assert_called_once()454 self.assertEqual(received.call_args[0][0], data)455 server.server_close()456 self.assertTrue(server.protocol.is_closing())457 yield from asyncio.sleep(0.1)458 @asyncio.coroutine459 def testUdpServerRoundtrip(self):460 ''' Test sending and receiving data on udp socket'''461 data = b"\x01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01" # unit 1, read register462 expected_response = b'\x01\x00\x00\x00\x00\x05\x01\x03\x02\x00\x11' # value of 17 as per context463 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)464 if PYTHON_VERSION >= (3, 7):465 server_task = asyncio.create_task(server.serve_forever())466 else:467 server_task = asyncio.ensure_future(server.serve_forever())468 yield from server.serving469 random_port = server.protocol._sock.getsockname()[1]470 connected, done = self.loop.create_future(),self.loop.create_future()471 received_value = None472 class BasicClient(asyncio.DatagramProtocol):473 def connection_made(self, transport):474 self.transport = transport475 self.transport.sendto(data)476 def datagram_received(self, data, addr):477 nonlocal received_value, done478 print("received")479 received_value = data480 done.set_result(True)481 transport, protocol = yield from self.loop.create_datagram_endpoint( BasicClient,482 remote_addr=('127.0.0.1', random_port))483 yield from asyncio.wait_for(done, timeout=0.1)484 self.assertEqual(received_value, expected_response)485 transport.close()486 yield from asyncio.sleep(0)487 server.server_close()488 @asyncio.coroutine489 def testUdpServerException(self):490 ''' Test sending garbage data on a TCP socket should drop the connection '''491 garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'492 server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0),loop=self.loop)493 if PYTHON_VERSION >= (3, 7):494 server_task = asyncio.create_task(server.serve_forever())495 else:496 server_task = asyncio.ensure_future(server.serve_forever())497 yield from server.serving498 with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',499 new_callable=lambda: Mock(side_effect=Exception)) as process:500 connect, receive, eof = self.loop.create_future(),self.loop.create_future(),self.loop.create_future()501 received_data = None502 random_port = server.protocol._sock.getsockname()[1] # get the random server port503 class BasicClient(asyncio.DatagramProtocol):504 def connection_made(self, transport):505 _logger.debug("Client connected")506 self.transport = transport507 transport.sendto(garbage)508 connect.set_result(True)509 def datagram_received(self, data, addr):510 nonlocal receive511 _logger.debug("Client received data")512 receive.set_result(True)513 received_data = data514 transport, protocol = yield from self.loop.create_datagram_endpoint(BasicClient,515 remote_addr=('127.0.0.1', random_port))516 yield from asyncio.wait_for(connect, timeout=0.1)517 self.assertFalse(receive.done())518 self.assertFalse(server.protocol._sock._closed)519 server.server_close()520 # -----------------------------------------------------------------------#521 # Test ModbusServerFactory522 # -----------------------------------------------------------------------#523 def testModbusServerFactory(self):524 ''' Test the base class for all the clients '''525 with self.assertWarns(DeprecationWarning):526 factory = ModbusServerFactory(store=None)527 def testStopServer(self):528 with self.assertWarns(DeprecationWarning):529 StopServer()530 @asyncio.coroutine531 def testTcpServerException(self):532 ''' Sending garbage data on a TCP socket should drop the connection '''533 garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'534 server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)535 if PYTHON_VERSION >= (3, 7):536 server_task = asyncio.create_task(server.serve_forever())537 else:538 server_task = asyncio.ensure_future(server.serve_forever())539 yield from server.serving540 with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',541 new_callable=lambda: Mock(side_effect=Exception)) as process:542 connect, receive, eof = self.loop.create_future(), self.loop.create_future(), self.loop.create_future()543 received_data = None544 random_port = server.server.sockets[0].getsockname()[1] # get the random server port545 class BasicClient(asyncio.BaseProtocol):546 def connection_made(self, transport):547 _logger.debug("Client connected")548 self.transport = transport549 transport.write(garbage)550 connect.set_result(True)551 def data_received(self, data):552 _logger.debug("Client received data")553 receive.set_result(True)554 received_data = data555 def eof_received(self):556 _logger.debug("Client stream eof")557 eof.set_result(True)558 transport, protocol = yield from self.loop.create_connection(BasicClient, host='127.0.0.1',559 port=random_port)560 yield from asyncio.wait_for(connect, timeout=0.1)561 yield from asyncio.wait_for(eof, timeout=0.1)562 # neither of these should timeout if the test is successful563 server.server_close()564 @asyncio.coroutine565 def testTcpServerException(self):566 ''' Sending garbage data on a TCP socket should drop the connection '''567 garbage = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'568 server = yield from StartTcpServer(context=self.context, address=("127.0.0.1", 0), loop=self.loop)569 if PYTHON_VERSION >= (3, 7):570 server_task = asyncio.create_task(server.serve_forever())571 else:572 server_task = asyncio.ensure_future(server.serve_forever())573 yield from server.serving574 with patch('pymodbus.transaction.ModbusSocketFramer.processIncomingPacket',575 new_callable=lambda: Mock(side_effect=Exception)) as process:576 connect, receive, eof = self.loop.create_future(), self.loop.create_future(), self.loop.create_future()577 received_data = None578 random_port = server.server.sockets[0].getsockname()[1] # get the random server port579 class BasicClient(asyncio.BaseProtocol):580 def connection_made(self, transport):581 _logger.debug("Client connected")582 self.transport = transport583 transport.write(garbage)584 connect.set_result(True)585 def data_received(self, data):586 _logger.debug("Client received data")...

Full Screen

Full Screen

watchdog.py

Source:watchdog.py Github

copy

Full Screen

...22 server = WatchdogThreadingSocketServer((host, port), WatchdogTCPRequestHandler)23 handle_queue_thread = threading.Thread(target=handle_queue, args=(server, bi_queue, serve_forever))24 handle_queue_thread.start()25 try:26 server.serve_forever()27 except KeyboardInterrupt:28 pass29 finally:30 # shutdown server31 for unique_id in server.queued_data:32 # FIXME: Don't worked!33 pass34# bi_queue.put('parent', )35 server.shutdown()36def handle_worker(plugins_conf, bi_queue, serve_forever):37 """38 :type plugins_conf: list39 :type bi_queue: packages.bidirectional_queue.BidirectionalQueue40 :type serve_forever: multiprocessing.Value...

Full Screen

Full Screen

st2ss.py

Source:st2ss.py Github

copy

Full Screen

...5 for line in self.rfile:6 self.wfile.write(line)7if __name__ == '__main__':8 serv = TCPServer(('', 20000), EchoHandler)9 serv.serve_forever()10# 使用StreamRequestHandler11from socketserver import ThreadingTCPServer, ForkingTCPServer12if __name__ == '__main__':13 serv = ThreadingTCPServer(('', 20000), EchoHandler)14 serv.serve_forever()15# fork或者线程服务器潜在问题是会为每个客户端连接创建一个新的进程或者线程16if __name__ == '__main__':17 from threading import Thread18 NWORKERS = 1619 serv = TCPServer(('', 20000), EchoHandler)20 for i in range(NWORKERS):21 t = Thread(target=serv.serve_forever)22 t.daemon = True23 t.start()24 serv.serve_forever()25# 先创建一个普通的非线程服务器,然后在一个线程池中使用serve_forever()方法来启动26if __name__ == '__main__':27 import socket28 serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)29 serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)30 serv.server_bind()31 serv.server_activate()32 serv.serve_forever()33# 调整socket34if __name__ == '__main__':35 TCPServer.allow_reuse_address = True36 serv = TCPServer(('', 200000), EchoHandler)37 serv.serve_forever()38import socket39class EchoHandler(StreamRequestHandler):40 timeout = 541 rbufsize = -142 wbufsize = 043 disable_nagle_algorithm = False44 def handle(self):45 print('Got connection from ', self.client_address)46 try:47 for line in self.rfile:48 self.wfile.write(line)49 except socket.timeout:50 print('Time out!')51# StreamRequestHandler更加灵活...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful