How to use recv_nonblocking method in Airtest

Best Python code snippet using Airtest

udp_interface.py

Source:udp_interface.py Github

copy

Full Screen

...68            self.__sockI.settimeout(None)69        except socket.timeout:70            return []71        res = [data]72        nb = self.recv_nonblocking()73        res = res + nb74        return res75    def recv_nonblocking(self):76        """77        This returns a list of bytestrings (all messages in the buffer)78        Returns an empty list if no message is present in the buffer79        Reads everything in the buffer80        """81        assert self.__sockI is not None, "receiver not initialized"82        res = []83        while True:84            try:85                if not WINDOWS:86                    data, _ = self.__sockI.recvfrom(self.__buffersize, socket.MSG_DONTWAIT)87                else:88                    self.__sockI.setblocking(False)89                    data, _ = self.__sockI.recvfrom(self.__buffersize)90            except IOError:91                return res92            finally:93                if WINDOWS:94                    self.__sockI.setblocking(True)95            res += [data]96def test_send(interface, msg):97    print(f"sending: {msg}")98    interface.send(msg)99def test_recv(interface, timeout=None):100    print(f"receiving (blocking)...")101    res = interface.recv(timeout)102    print(f"received: {res}")103def test_recv_nonblocking(interface):104    print(f"receiving (non blocking)...")105    res = interface.recv_nonblocking()106    print(f"received: {res}")107def main(args):108    # standard library imports109    import time110    ip_send = args.ipsend111    ip_recv = args.iprecv112    port_send = args.portsend113    port_recv = args.portrecv114    conn = UDPInterface()115    conn.init_sender(ip_send, port_send)116    conn.init_receiver(ip_recv, port_recv)117    test_send(conn, b"hello 0")118    test_send(conn, b"hello 1")119    test_recv(conn)120    test_send(conn, b"hello 2")121    test_send(conn, b"hello 3")122    test_recv_nonblocking(conn)123    test_recv_nonblocking(conn)124    test_send(conn, b"hello 4")125    test_recv(conn, timeout=1.0)126    test_recv(conn, timeout=1.0)127if __name__ == "__main__":128    # standard library imports129    import argparse130    parser = argparse.ArgumentParser()131    parser.add_argument('--ipsend', type=str, default="127.0.0.1", help='IP address of the drone if any.')132    parser.add_argument('--iprecv', type=str, default="127.0.0.1", help='local IP address if any.')133    parser.add_argument('--portsend', type=int, default=8989, help='Port to send udp messages to.')134    parser.add_argument('--portrecv', type=int, default=8989, help='Port to reveive udp messages from.')135    args = parser.parse_args()...

Full Screen

Full Screen

safesocket.py

Source:safesocket.py Github

copy

Full Screen

...37            ret = None38        finally:39            self.sock.settimeout(None)40        return ret41    def recv_nonblocking(self, size):42        self.sock.settimeout(0)43        try:44            ret = self.recv(size)45        except(socket.error) as e:46            #10035 no data when nonblocking47            if e.args[0] == 10035: #errno.EWOULDBLOCK: 尼玛errno似乎不一致48                ret = None49            #10053 connection abort by client50            #10054 connection reset by peer51            elif e.args[0] in [10053, 10054]: #errno.ECONNABORTED:52                raise53            else:54                raise55        return ret...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful