How to use recv_nonblocking method in Airtest

Best Python code snippet using Airtest

udp_interface.py

Source:udp_interface.py Github

copy

Full Screen

...68 self.__sockI.settimeout(None)69 except socket.timeout:70 return []71 res = [data]72 nb = self.recv_nonblocking()73 res = res + nb74 return res75 def recv_nonblocking(self):76 """77 This returns a list of bytestrings (all messages in the buffer)78 Returns an empty list if no message is present in the buffer79 Reads everything in the buffer80 """81 assert self.__sockI is not None, "receiver not initialized"82 res = []83 while True:84 try:85 if not WINDOWS:86 data, _ = self.__sockI.recvfrom(self.__buffersize, socket.MSG_DONTWAIT)87 else:88 self.__sockI.setblocking(False)89 data, _ = self.__sockI.recvfrom(self.__buffersize)90 except IOError:91 return res92 finally:93 if WINDOWS:94 self.__sockI.setblocking(True)95 res += [data]96def test_send(interface, msg):97 print(f"sending: {msg}")98 interface.send(msg)99def test_recv(interface, timeout=None):100 print(f"receiving (blocking)...")101 res = interface.recv(timeout)102 print(f"received: {res}")103def test_recv_nonblocking(interface):104 print(f"receiving (non blocking)...")105 res = interface.recv_nonblocking()106 print(f"received: {res}")107def main(args):108 # standard library imports109 import time110 ip_send = args.ipsend111 ip_recv = args.iprecv112 port_send = args.portsend113 port_recv = args.portrecv114 conn = UDPInterface()115 conn.init_sender(ip_send, port_send)116 conn.init_receiver(ip_recv, port_recv)117 test_send(conn, b"hello 0")118 test_send(conn, b"hello 1")119 test_recv(conn)120 test_send(conn, b"hello 2")121 test_send(conn, b"hello 3")122 test_recv_nonblocking(conn)123 test_recv_nonblocking(conn)124 test_send(conn, b"hello 4")125 test_recv(conn, timeout=1.0)126 test_recv(conn, timeout=1.0)127if __name__ == "__main__":128 # standard library imports129 import argparse130 parser = argparse.ArgumentParser()131 parser.add_argument('--ipsend', type=str, default="127.0.0.1", help='IP address of the drone if any.')132 parser.add_argument('--iprecv', type=str, default="127.0.0.1", help='local IP address if any.')133 parser.add_argument('--portsend', type=int, default=8989, help='Port to send udp messages to.')134 parser.add_argument('--portrecv', type=int, default=8989, help='Port to reveive udp messages from.')135 args = parser.parse_args()...

Full Screen

Full Screen

safesocket.py

Source:safesocket.py Github

copy

Full Screen

...37 ret = None38 finally:39 self.sock.settimeout(None)40 return ret41 def recv_nonblocking(self, size):42 self.sock.settimeout(0)43 try:44 ret = self.recv(size)45 except(socket.error) as e:46 #10035 no data when nonblocking47 if e.args[0] == 10035: #errno.EWOULDBLOCK: 尼玛errno似乎不一致48 ret = None49 #10053 connection abort by client50 #10054 connection reset by peer51 elif e.args[0] in [10053, 10054]: #errno.ECONNABORTED:52 raise53 else:54 raise55 return ret...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful