How to use _connect_socket method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...31 except socket.error as e:32 serversocket.close()33 raise e34 return serversocket35 def _add_connect_socket(self, client):36 if self._connect_socket is not None:37 self._del_connect_socket()38 self._inputs.append(client)39 self._connect_socket = client40"add connected socket fd = %d"%(client.fileno()))41 def _del_connect_socket(self):42 if self._connect_socket is not None:43 self._inputs.remove(self._connect_socket)44 self._connect_socket.close()45 self._connect_socket = None46 def _net_to_uart(self):47 try:48 data = self._connect_socket.recv(2048)49 except socket.error as e:50 logger.warning("read %d socket error:%s"%(self._connect_socket.fileno(), repr(e)))51 self._del_connect_socket()52 return 053 54 if len(data) == 0:55 self._del_connect_socket()56 return 057 self._total_net_size += len(data)58"recev %s net byte size %d total recev :%d"%(self._uartname ,len(data),\59 self._total_net_size))60"net to uart data:%s"%(logger.print_list2hex(list(bytearray.fromhex(data.encode("hex"))))))61 try:62 self._uart_device.write((data))63 except Exception as e:64 logger.warning("write %s device error %s"%(self._uartname,repr(e)))65 return 066 return 167 def _uart_to_net(self):68 uart_data = if self._connect_socket is None:70 return 071 if not uart_data:72 return 073 else :74 self._total_uart_size += len(uart_data)75" read %s data byte size :%d total read size : %d"%\76 (self._uartname,len(uart_data),self._total_uart_size))77"uart to net data:%s"%(logger.print_list2hex(list(bytearray.fromhex(uart_data.encode("hex"))))))78 try:79 self._connect_socket.send(bytes(uart_data))80"client %d send byte size : %d"%(self._connect_socket.fileno(),len(uart_data)))81 except socket.error as e:82 logger.warning("device %s net send error"%(self._uartname))83 self._del_connect_socket()84 return 185 def run(self):86 logger.init("uart2net.log")87 logger.setLevel("warning")88 try:89 self._server_socket = self._create_server_socket(self._port)90 except socket.error as e:91 logger.boot("%s device create port %d server socket failed:%s" %(self._uartname, self._port, repr(e)))92 return93 else:94 logger.boot("%s device create server socket fd=%s port=%d" %(self._uartname, self._server_socket.fileno(), self._port))95 self._inputs.append(self._server_socket)96 while not self._uart_stop:97 '''nonboclking mode to poll the socket'''98 read_able_list =, [], [], 0)[0]99 if self._server_socket in read_able_list:100 client,addr=self._server_socket.accept()101 self._del_connect_socket()102 self._add_connect_socket(client)103 continue104 if self._connect_socket in read_able_list:105 try:106 self._net_to_uart()107 except Exception as e:108 logger.error("%s net to uart error:%s"%(self._uartname, repr(e)))109 break110 try:111 ret_uart2net = self._uart_to_net()112 except Exception as e:113 logger.error("%s uart to net error:%s"%(self._uartname, repr(e)))114 break115 if ret_uart2net == 0 and len(read_able_list) == 0:116 time.sleep(0.05)117 self._del_connect_socket()118 self._server_socket.close()119 def stop(self):120 self._uart_stop = True121def create_uart_server():122 for name, bus in Profile.get_buses().iteritems():123 if bus['bus'] == 'uart' and bus.has_key('port'):124 uartserver = UartServer(name, bus['port'])...

Full Screen

Full Screen Github


Full Screen

...28 except socket.error as e:29 serversocket.close()30 raise e31 return serversocket32 def _add_connect_socket(self, client):33 if self._connect_socket is not None:34 self._del_connect_socket()35 self._inputs.append(client)36 self._connect_socket = client37"add connected socket fd = %d"%(client.fileno()))38 def _del_connect_socket(self):39 if self._connect_socket is not None:40 self._inputs.remove(self._connect_socket)41 self._connect_socket.close()42 self._connect_socket = None43 def _tcp_to_device(self):44 try:45 data = self._connect_socket.recv(2048)46 except socket.error as e:47 logger.warning("read %d socket error:%s"%(self._connect_socket.fileno(), repr(e)))48 self._del_connect_socket()49 return 050 51 if len(data) == 0:52 self._del_connect_socket()53 return 054 self._total_net_size += len(data)55"recev tcp port %s byte size %d, total recev :%d"%(self._port ,len(data),\56 self._total_net_size))57"tcp to device data:%s"%(logger.print_list2hex(list(bytearray.fromhex(data.encode("hex"))))))58 try:59 self._device.write((data))60 except Exception as e:61 logger.warning("device write data byte size %d error :%s"%(len(data),repr(e)))62 return 063 return 164 def _device_to_tcp(self):65 device_data = if self._connect_socket is None:67 return 068 if not device_data:69 return 070 else :71 self._total_device_size += len(device_data)72"device read data byte size :%d, total read size : %d"%\73 (len(device_data),self._total_device_size))74"device to tcp data:%s"%(logger.print_list2hex(list(bytearray.fromhex(device_data.encode("hex"))))))75 try:76 self._connect_socket.send(bytes(device_data))77"client %d send byte size : %d"%(self._connect_socket.fileno(),len(device_data)))78 except socket.error as e:79 logger.warning("device send to tcp port %s error"%(self._port))80 self._del_connect_socket()81 return 182 def run(self):83 logger.init("tcp2device.log")84 logger.setLevel("warning")85 try:86 self._server_socket = self._create_server_socket(self._port)87 except socket.error as e:88 logger.boot("create tcp port %d to device server socket failed:%s" %(self._port, repr(e)))89 return90 else:91 logger.boot("create tcp to device server,server socket fd=%s, port=%d" %(self._server_socket.fileno(), self._port))92 self._inputs.append(self._server_socket)93 while not self._is_stop:94 '''nonboclking mode to poll the socket'''95 read_able_list =, [], [], 0)[0]96 if self._server_socket in read_able_list:97 client,addr=self._server_socket.accept()98 self._del_connect_socket()99 self._add_connect_socket(client)100 continue101 if self._connect_socket in read_able_list:102 try:103 self._tcp_to_device()104 except Exception as e:105 logger.error("tcp port %s to deivce error:%s"%(self._port, repr(e)))106 break107 try:108 ret_device2tcp = self._device_to_tcp()109 except Exception as e:110 logger.error("device to tcp port %s error:%s"%(self._port, repr(e)))111 break112 if ret_device2tcp == 0 and len(read_able_list) == 0:113 time.sleep(0.1)114 self._del_connect_socket()115 self._server_socket.close()116 def stop(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?