Best Python code snippet using molotov_python
selector_events.py
Source:selector_events.py  
1"""Event loop using a selector and related classes.2A selector is a "notify-when-ready" multiplexer.  For a subclass which3also includes support for signal handling, see the unix_events sub-module.4"""5__all__ = ['BaseSelectorEventLoop']6import collections7import errno8import functools9import socket10import sys11import warnings12try:13    import ssl14except ImportError:  # pragma: no cover15    ssl = None16from . import base_events17from . import constants18from . import events19from . import futures20from . import selectors21from . import transports22from . import sslproto23from .coroutines import coroutine24from .log import logger25def _test_selector_event(selector, fd, event):26    # Test if the selector is monitoring 'event' events27    # for the file descriptor 'fd'.28    try:29        key = selector.get_key(fd)30    except KeyError:31        return False32    else:33        return bool(key.events & event)34class BaseSelectorEventLoop(base_events.BaseEventLoop):35    """Selector event loop.36    See events.EventLoop for API specification.37    """38    def __init__(self, selector=None):39        super().__init__()40        if selector is None:41            selector = selectors.DefaultSelector()42        logger.debug('Using selector: %s', selector.__class__.__name__)43        self._selector = selector44        self._make_self_pipe()45    def _make_socket_transport(self, sock, protocol, waiter=None, *,46                               extra=None, server=None):47        return _SelectorSocketTransport(self, sock, protocol, waiter,48                                        extra, server)49    def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,50                            *, server_side=False, server_hostname=None,51                            extra=None, server=None):52        if not sslproto._is_sslproto_available():53            return self._make_legacy_ssl_transport(54                rawsock, protocol, sslcontext, waiter,55                server_side=server_side, server_hostname=server_hostname,56                extra=extra, server=server)57        ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,58                                            server_side, server_hostname)59        _SelectorSocketTransport(self, rawsock, ssl_protocol,60                                 extra=extra, server=server)61        return ssl_protocol._app_transport62    def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,63                                   waiter, *,64                                   server_side=False, server_hostname=None,65                                   extra=None, server=None):66        # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used67        # on Python 3.4 and older, when ssl.MemoryBIO is not available.68        return _SelectorSslTransport(69            self, rawsock, protocol, sslcontext, waiter,70            server_side, server_hostname, extra, server)71    def _make_datagram_transport(self, sock, protocol,72                                 address=None, waiter=None, extra=None):73        return _SelectorDatagramTransport(self, sock, protocol,74                                          address, waiter, extra)75    def close(self):76        if self.is_running():77            raise RuntimeError("Cannot close a running event loop")78        if self.is_closed():79            return80        self._close_self_pipe()81        super().close()82        if self._selector is not None:83            self._selector.close()84            self._selector = None85    def _socketpair(self):86        raise NotImplementedError87    def _close_self_pipe(self):88        self.remove_reader(self._ssock.fileno())89        self._ssock.close()90        self._ssock = None91        self._csock.close()92        self._csock = None93        self._internal_fds -= 194    def _make_self_pipe(self):95        # A self-socket, really. :-)96        self._ssock, self._csock = self._socketpair()97        self._ssock.setblocking(False)98        self._csock.setblocking(False)99        self._internal_fds += 1100        self.add_reader(self._ssock.fileno(), self._read_from_self)101    def _process_self_data(self, data):102        pass103    def _read_from_self(self):104        while True:105            try:106                data = self._ssock.recv(4096)107                if not data:108                    break109                self._process_self_data(data)110            except InterruptedError:111                continue112            except BlockingIOError:113                break114    def _write_to_self(self):115        # This may be called from a different thread, possibly after116        # _close_self_pipe() has been called or even while it is117        # running.  Guard for self._csock being None or closed.  When118        # a socket is closed, send() raises OSError (with errno set to119        # EBADF, but let's not rely on the exact error code).120        csock = self._csock121        if csock is not None:122            try:123                csock.send(b'\0')124            except OSError:125                if self._debug:126                    logger.debug("Fail to write a null byte into the "127                                 "self-pipe socket",128                                 exc_info=True)129    def _start_serving(self, protocol_factory, sock,130                       sslcontext=None, server=None):131        self.add_reader(sock.fileno(), self._accept_connection,132                        protocol_factory, sock, sslcontext, server)133    def _accept_connection(self, protocol_factory, sock,134                           sslcontext=None, server=None):135        try:136            conn, addr = sock.accept()137            if self._debug:138                logger.debug("%r got a new connection from %r: %r",139                             server, addr, conn)140            conn.setblocking(False)141        except (BlockingIOError, InterruptedError, ConnectionAbortedError):142            pass  # False alarm.143        except OSError as exc:144            # There's nowhere to send the error, so just log it.145            if exc.errno in (errno.EMFILE, errno.ENFILE,146                             errno.ENOBUFS, errno.ENOMEM):147                # Some platforms (e.g. Linux keep reporting the FD as148                # ready, so we remove the read handler temporarily.149                # We'll try again in a while.150                self.call_exception_handler({151                    'message': 'socket.accept() out of system resource',152                    'exception': exc,153                    'socket': sock,154                })155                self.remove_reader(sock.fileno())156                self.call_later(constants.ACCEPT_RETRY_DELAY,157                                self._start_serving,158                                protocol_factory, sock, sslcontext, server)159            else:160                raise  # The event loop will catch, log and ignore it.161        else:162            extra = {'peername': addr}163            accept = self._accept_connection2(protocol_factory, conn, extra,164                                              sslcontext, server)165            self.create_task(accept)166    @coroutine167    def _accept_connection2(self, protocol_factory, conn, extra,168                            sslcontext=None, server=None):169        protocol = None170        transport = None171        try:172            protocol = protocol_factory()173            waiter = futures.Future(loop=self)174            if sslcontext:175                transport = self._make_ssl_transport(176                    conn, protocol, sslcontext, waiter=waiter,177                    server_side=True, extra=extra, server=server)178            else:179                transport = self._make_socket_transport(180                    conn, protocol, waiter=waiter, extra=extra,181                    server=server)182            try:183                yield from waiter184            except:185                transport.close()186                raise187            # It's now up to the protocol to handle the connection.188        except Exception as exc:189            if self._debug:190                context = {191                    'message': ('Error on transport creation '192                                'for incoming connection'),193                    'exception': exc,194                }195                if protocol is not None:196                    context['protocol'] = protocol197                if transport is not None:198                    context['transport'] = transport199                self.call_exception_handler(context)200    def add_reader(self, fd, callback, *args):201        """Add a reader callback."""202        self._check_closed()203        handle = events.Handle(callback, args, self)204        try:205            key = self._selector.get_key(fd)206        except KeyError:207            self._selector.register(fd, selectors.EVENT_READ,208                                    (handle, None))209        else:210            mask, (reader, writer) = key.events, key.data211            self._selector.modify(fd, mask | selectors.EVENT_READ,212                                  (handle, writer))213            if reader is not None:214                reader.cancel()215    def remove_reader(self, fd):216        """Remove a reader callback."""217        if self.is_closed():218            return False219        try:220            key = self._selector.get_key(fd)221        except KeyError:222            return False223        else:224            mask, (reader, writer) = key.events, key.data225            mask &= ~selectors.EVENT_READ226            if not mask:227                self._selector.unregister(fd)228            else:229                self._selector.modify(fd, mask, (None, writer))230            if reader is not None:231                reader.cancel()232                return True233            else:234                return False235    def add_writer(self, fd, callback, *args):236        """Add a writer callback.."""237        self._check_closed()238        handle = events.Handle(callback, args, self)239        try:240            key = self._selector.get_key(fd)241        except KeyError:242            self._selector.register(fd, selectors.EVENT_WRITE,243                                    (None, handle))244        else:245            mask, (reader, writer) = key.events, key.data246            self._selector.modify(fd, mask | selectors.EVENT_WRITE,247                                  (reader, handle))248            if writer is not None:249                writer.cancel()250    def remove_writer(self, fd):251        """Remove a writer callback."""252        if self.is_closed():253            return False254        try:255            key = self._selector.get_key(fd)256        except KeyError:257            return False258        else:259            mask, (reader, writer) = key.events, key.data260            # Remove both writer and connector.261            mask &= ~selectors.EVENT_WRITE262            if not mask:263                self._selector.unregister(fd)264            else:265                self._selector.modify(fd, mask, (reader, None))266            if writer is not None:267                writer.cancel()268                return True269            else:270                return False271    def sock_recv(self, sock, n):272        """Receive data from the socket.273        The return value is a bytes object representing the data received.274        The maximum amount of data to be received at once is specified by275        nbytes.276        This method is a coroutine.277        """278        if self._debug and sock.gettimeout() != 0:279            raise ValueError("the socket must be non-blocking")280        fut = futures.Future(loop=self)281        self._sock_recv(fut, False, sock, n)282        return fut283    def _sock_recv(self, fut, registered, sock, n):284        # _sock_recv() can add itself as an I/O callback if the operation can't285        # be done immediately. Don't use it directly, call sock_recv().286        fd = sock.fileno()287        if registered:288            # Remove the callback early.  It should be rare that the289            # selector says the fd is ready but the call still returns290            # EAGAIN, and I am willing to take a hit in that case in291            # order to simplify the common case.292            self.remove_reader(fd)293        if fut.cancelled():294            return295        try:296            data = sock.recv(n)297        except (BlockingIOError, InterruptedError):298            self.add_reader(fd, self._sock_recv, fut, True, sock, n)299        except Exception as exc:300            fut.set_exception(exc)301        else:302            fut.set_result(data)303    def sock_sendall(self, sock, data):304        """Send data to the socket.305        The socket must be connected to a remote socket. This method continues306        to send data from data until either all data has been sent or an307        error occurs. None is returned on success. On error, an exception is308        raised, and there is no way to determine how much data, if any, was309        successfully processed by the receiving end of the connection.310        This method is a coroutine.311        """312        if self._debug and sock.gettimeout() != 0:313            raise ValueError("the socket must be non-blocking")314        fut = futures.Future(loop=self)315        if data:316            self._sock_sendall(fut, False, sock, data)317        else:318            fut.set_result(None)319        return fut320    def _sock_sendall(self, fut, registered, sock, data):321        fd = sock.fileno()322        if registered:323            self.remove_writer(fd)324        if fut.cancelled():325            return326        try:327            n = sock.send(data)328        except (BlockingIOError, InterruptedError):329            n = 0330        except Exception as exc:331            fut.set_exception(exc)332            return333        if n == len(data):334            fut.set_result(None)335        else:336            if n:337                data = data[n:]338            self.add_writer(fd, self._sock_sendall, fut, True, sock, data)339    def sock_connect(self, sock, address):340        """Connect to a remote socket at address.341        The address must be already resolved to avoid the trap of hanging the342        entire event loop when the address requires doing a DNS lookup. For343        example, it must be an IP address, not an hostname, for AF_INET and344        AF_INET6 address families. Use getaddrinfo() to resolve the hostname345        asynchronously.346        This method is a coroutine.347        """348        if self._debug and sock.gettimeout() != 0:349            raise ValueError("the socket must be non-blocking")350        fut = futures.Future(loop=self)351        try:352            if self._debug:353                base_events._check_resolved_address(sock, address)354        except ValueError as err:355            fut.set_exception(err)356        else:357            self._sock_connect(fut, sock, address)358        return fut359    def _sock_connect(self, fut, sock, address):360        fd = sock.fileno()361        try:362            while True:363                try:364                    sock.connect(address)365                except InterruptedError:366                    continue367                else:368                    break369        except BlockingIOError:370            fut.add_done_callback(functools.partial(self._sock_connect_done,371                                                    fd))372            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)373        except Exception as exc:374            fut.set_exception(exc)375        else:376            fut.set_result(None)377    def _sock_connect_done(self, fd, fut):378        self.remove_writer(fd)379    def _sock_connect_cb(self, fut, sock, address):380        if fut.cancelled():381            return382        try:383            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)384            if err != 0:385                # Jump to any except clause below.386                raise OSError(err, 'Connect call failed %s' % (address,))387        except (BlockingIOError, InterruptedError):388            # socket is still registered, the callback will be retried later389            pass390        except Exception as exc:391            fut.set_exception(exc)392        else:393            fut.set_result(None)394    def sock_accept(self, sock):395        """Accept a connection.396        The socket must be bound to an address and listening for connections.397        The return value is a pair (conn, address) where conn is a new socket398        object usable to send and receive data on the connection, and address399        is the address bound to the socket on the other end of the connection.400        This method is a coroutine.401        """402        if self._debug and sock.gettimeout() != 0:403            raise ValueError("the socket must be non-blocking")404        fut = futures.Future(loop=self)405        self._sock_accept(fut, False, sock)406        return fut407    def _sock_accept(self, fut, registered, sock):408        fd = sock.fileno()409        if registered:410            self.remove_reader(fd)411        if fut.cancelled():412            return413        try:414            conn, address = sock.accept()415            conn.setblocking(False)416        except (BlockingIOError, InterruptedError):417            self.add_reader(fd, self._sock_accept, fut, True, sock)418        except Exception as exc:419            fut.set_exception(exc)420        else:421            fut.set_result((conn, address))422    def _process_events(self, event_list):423        for key, mask in event_list:424            fileobj, (reader, writer) = key.fileobj, key.data425            if mask & selectors.EVENT_READ and reader is not None:426                if reader._cancelled:427                    self.remove_reader(fileobj)428                else:429                    self._add_callback(reader)430            if mask & selectors.EVENT_WRITE and writer is not None:431                if writer._cancelled:432                    self.remove_writer(fileobj)433                else:434                    self._add_callback(writer)435    def _stop_serving(self, sock):436        self.remove_reader(sock.fileno())437        sock.close()438class _SelectorTransport(transports._FlowControlMixin,439                         transports.Transport):440    max_size = 256 * 1024  # Buffer size passed to recv().441    _buffer_factory = bytearray  # Constructs initial value for self._buffer.442    # Attribute used in the destructor: it must be set even if the constructor443    # is not called (see _SelectorSslTransport which may start by raising an444    # exception)445    _sock = None446    def __init__(self, loop, sock, protocol, extra=None, server=None):447        super().__init__(extra, loop)448        self._extra['socket'] = sock449        self._extra['sockname'] = sock.getsockname()450        if 'peername' not in self._extra:451            try:452                self._extra['peername'] = sock.getpeername()453            except socket.error:454                self._extra['peername'] = None455        self._sock = sock456        self._sock_fd = sock.fileno()457        self._protocol = protocol458        self._protocol_connected = True459        self._server = server460        self._buffer = self._buffer_factory()461        self._conn_lost = 0  # Set when call to connection_lost scheduled.462        self._closing = False  # Set when close() called.463        if self._server is not None:464            self._server._attach()465    def __repr__(self):466        info = [self.__class__.__name__]467        if self._sock is None:468            info.append('closed')469        elif self._closing:470            info.append('closing')471        info.append('fd=%s' % self._sock_fd)472        # test if the transport was closed473        if self._loop is not None:474            polling = _test_selector_event(self._loop._selector,475                                           self._sock_fd, selectors.EVENT_READ)476            if polling:477                info.append('read=polling')478            else:479                info.append('read=idle')480            polling = _test_selector_event(self._loop._selector,481                                           self._sock_fd,482                                           selectors.EVENT_WRITE)483            if polling:484                state = 'polling'485            else:486                state = 'idle'487            bufsize = self.get_write_buffer_size()488            info.append('write=<%s, bufsize=%s>' % (state, bufsize))489        return '<%s>' % ' '.join(info)490    def abort(self):491        self._force_close(None)492    def close(self):493        if self._closing:494            return495        self._closing = True496        self._loop.remove_reader(self._sock_fd)497        if not self._buffer:498            self._conn_lost += 1499            self._loop.call_soon(self._call_connection_lost, None)500    # On Python 3.3 and older, objects with a destructor part of a reference501    # cycle are never destroyed. It's not more the case on Python 3.4 thanks502    # to the PEP 442.503    if sys.version_info >= (3, 4):504        def __del__(self):505            if self._sock is not None:506                warnings.warn("unclosed transport %r" % self, ResourceWarning)507                self._sock.close()508    def _fatal_error(self, exc, message='Fatal error on transport'):509        # Should be called from exception handler only.510        if isinstance(exc, (BrokenPipeError,511                            ConnectionResetError, ConnectionAbortedError)):512            if self._loop.get_debug():513                logger.debug("%r: %s", self, message, exc_info=True)514        else:515            self._loop.call_exception_handler({516                'message': message,517                'exception': exc,518                'transport': self,519                'protocol': self._protocol,520            })521        self._force_close(exc)522    def _force_close(self, exc):523        if self._conn_lost:524            return525        if self._buffer:526            self._buffer.clear()527            self._loop.remove_writer(self._sock_fd)528        if not self._closing:529            self._closing = True530            self._loop.remove_reader(self._sock_fd)531        self._conn_lost += 1532        self._loop.call_soon(self._call_connection_lost, exc)533    def _call_connection_lost(self, exc):534        try:535            if self._protocol_connected:536                self._protocol.connection_lost(exc)537        finally:538            self._sock.close()539            self._sock = None540            self._protocol = None541            self._loop = None542            server = self._server543            if server is not None:544                server._detach()545                self._server = None546    def get_write_buffer_size(self):547        return len(self._buffer)548class _SelectorSocketTransport(_SelectorTransport):549    def __init__(self, loop, sock, protocol, waiter=None,550                 extra=None, server=None):551        super().__init__(loop, sock, protocol, extra, server)552        self._eof = False553        self._paused = False554        self._loop.call_soon(self._protocol.connection_made, self)555        # only start reading when connection_made() has been called556        self._loop.call_soon(self._loop.add_reader,557                             self._sock_fd, self._read_ready)558        if waiter is not None:559            # only wake up the waiter when connection_made() has been called560            self._loop.call_soon(waiter._set_result_unless_cancelled, None)561    def pause_reading(self):562        if self._closing:563            raise RuntimeError('Cannot pause_reading() when closing')564        if self._paused:565            raise RuntimeError('Already paused')566        self._paused = True567        self._loop.remove_reader(self._sock_fd)568        if self._loop.get_debug():569            logger.debug("%r pauses reading", self)570    def resume_reading(self):571        if not self._paused:572            raise RuntimeError('Not paused')573        self._paused = False574        if self._closing:575            return576        self._loop.add_reader(self._sock_fd, self._read_ready)577        if self._loop.get_debug():578            logger.debug("%r resumes reading", self)579    def _read_ready(self):580        try:581            data = self._sock.recv(self.max_size)582        except (BlockingIOError, InterruptedError):583            pass584        except Exception as exc:585            self._fatal_error(exc, 'Fatal read error on socket transport')586        else:587            if data:588                self._protocol.data_received(data)589            else:590                if self._loop.get_debug():591                    logger.debug("%r received EOF", self)592                keep_open = self._protocol.eof_received()593                if keep_open:594                    # We're keeping the connection open so the595                    # protocol can write more, but we still can't596                    # receive more, so remove the reader callback.597                    self._loop.remove_reader(self._sock_fd)598                else:599                    self.close()600    def write(self, data):601        if not isinstance(data, (bytes, bytearray, memoryview)):602            raise TypeError('data argument must be byte-ish (%r)',603                            type(data))604        if self._eof:605            raise RuntimeError('Cannot call write() after write_eof()')606        if not data:607            return608        if self._conn_lost:609            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:610                logger.warning('socket.send() raised exception.')611            self._conn_lost += 1612            return613        if not self._buffer:614            # Optimization: try to send now.615            try:616                n = self._sock.send(data)617            except (BlockingIOError, InterruptedError):618                pass619            except Exception as exc:620                self._fatal_error(exc, 'Fatal write error on socket transport')621                return622            else:623                data = data[n:]624                if not data:625                    return626            # Not all was written; register write handler.627            self._loop.add_writer(self._sock_fd, self._write_ready)628        # Add it to the buffer.629        self._buffer.extend(data)630        self._maybe_pause_protocol()631    def _write_ready(self):632        assert self._buffer, 'Data should not be empty'633        try:634            n = self._sock.send(self._buffer)635        except (BlockingIOError, InterruptedError):636            pass637        except Exception as exc:638            self._loop.remove_writer(self._sock_fd)639            self._buffer.clear()640            self._fatal_error(exc, 'Fatal write error on socket transport')641        else:642            if n:643                del self._buffer[:n]644            self._maybe_resume_protocol()  # May append to buffer.645            if not self._buffer:646                self._loop.remove_writer(self._sock_fd)647                if self._closing:648                    self._call_connection_lost(None)649                elif self._eof:650                    self._sock.shutdown(socket.SHUT_WR)651    def write_eof(self):652        if self._eof:653            return654        self._eof = True655        if not self._buffer:656            self._sock.shutdown(socket.SHUT_WR)657    def can_write_eof(self):658        return True659class _SelectorSslTransport(_SelectorTransport):660    _buffer_factory = bytearray661    def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,662                 server_side=False, server_hostname=None,663                 extra=None, server=None):664        if ssl is None:665            raise RuntimeError('stdlib ssl module not available')666        if not sslcontext:667            sslcontext = sslproto._create_transport_context(server_side, server_hostname)668        wrap_kwargs = {669            'server_side': server_side,670            'do_handshake_on_connect': False,671        }672        if server_hostname and not server_side:673            wrap_kwargs['server_hostname'] = server_hostname674        sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)675        super().__init__(loop, sslsock, protocol, extra, server)676        # the protocol connection is only made after the SSL handshake677        self._protocol_connected = False678        self._server_hostname = server_hostname679        self._waiter = waiter680        self._sslcontext = sslcontext681        self._paused = False682        # SSL-specific extra info.  (peercert is set later)683        self._extra.update(sslcontext=sslcontext)684        if self._loop.get_debug():685            logger.debug("%r starts SSL handshake", self)686            start_time = self._loop.time()687        else:688            start_time = None689        self._on_handshake(start_time)690    def _wakeup_waiter(self, exc=None):691        if self._waiter is None:692            return693        if not self._waiter.cancelled():694            if exc is not None:695                self._waiter.set_exception(exc)696            else:697                self._waiter.set_result(None)698        self._waiter = None699    def _on_handshake(self, start_time):700        try:701            self._sock.do_handshake()702        except ssl.SSLWantReadError:703            self._loop.add_reader(self._sock_fd,704                                  self._on_handshake, start_time)705            return706        except ssl.SSLWantWriteError:707            self._loop.add_writer(self._sock_fd,708                                  self._on_handshake, start_time)709            return710        except BaseException as exc:711            if self._loop.get_debug():712                logger.warning("%r: SSL handshake failed",713                               self, exc_info=True)714            self._loop.remove_reader(self._sock_fd)715            self._loop.remove_writer(self._sock_fd)716            self._sock.close()717            self._wakeup_waiter(exc)718            if isinstance(exc, Exception):719                return720            else:721                raise722        self._loop.remove_reader(self._sock_fd)723        self._loop.remove_writer(self._sock_fd)724        peercert = self._sock.getpeercert()725        if not hasattr(self._sslcontext, 'check_hostname'):726            # Verify hostname if requested, Python 3.4+ uses check_hostname727            # and checks the hostname in do_handshake()728            if (self._server_hostname and729                self._sslcontext.verify_mode != ssl.CERT_NONE):730                try:731                    ssl.match_hostname(peercert, self._server_hostname)732                except Exception as exc:733                    if self._loop.get_debug():734                        logger.warning("%r: SSL handshake failed "735                                       "on matching the hostname",736                                       self, exc_info=True)737                    self._sock.close()738                    self._wakeup_waiter(exc)739                    return740        # Add extra info that becomes available after handshake.741        self._extra.update(peercert=peercert,742                           cipher=self._sock.cipher(),743                           compression=self._sock.compression(),744                           )745        self._read_wants_write = False746        self._write_wants_read = False747        self._loop.add_reader(self._sock_fd, self._read_ready)748        self._protocol_connected = True749        self._loop.call_soon(self._protocol.connection_made, self)750        # only wake up the waiter when connection_made() has been called751        self._loop.call_soon(self._wakeup_waiter)752        if self._loop.get_debug():753            dt = self._loop.time() - start_time754            logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)755    def pause_reading(self):756        # XXX This is a bit icky, given the comment at the top of757        # _read_ready().  Is it possible to evoke a deadlock?  I don't758        # know, although it doesn't look like it; write() will still759        # accept more data for the buffer and eventually the app will760        # call resume_reading() again, and things will flow again.761        if self._closing:762            raise RuntimeError('Cannot pause_reading() when closing')763        if self._paused:764            raise RuntimeError('Already paused')765        self._paused = True766        self._loop.remove_reader(self._sock_fd)767        if self._loop.get_debug():768            logger.debug("%r pauses reading", self)769    def resume_reading(self):770        if not self._paused:771            raise RuntimeError('Not paused')772        self._paused = False773        if self._closing:774            return775        self._loop.add_reader(self._sock_fd, self._read_ready)776        if self._loop.get_debug():777            logger.debug("%r resumes reading", self)778    def _read_ready(self):779        if self._write_wants_read:780            self._write_wants_read = False781            self._write_ready()782            if self._buffer:783                self._loop.add_writer(self._sock_fd, self._write_ready)784        try:785            data = self._sock.recv(self.max_size)786        except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):787            pass788        except ssl.SSLWantWriteError:789            self._read_wants_write = True790            self._loop.remove_reader(self._sock_fd)791            self._loop.add_writer(self._sock_fd, self._write_ready)792        except Exception as exc:793            self._fatal_error(exc, 'Fatal read error on SSL transport')794        else:795            if data:796                self._protocol.data_received(data)797            else:798                try:799                    if self._loop.get_debug():800                        logger.debug("%r received EOF", self)801                    keep_open = self._protocol.eof_received()802                    if keep_open:803                        logger.warning('returning true from eof_received() '804                                       'has no effect when using ssl')805                finally:806                    self.close()807    def _write_ready(self):808        if self._read_wants_write:809            self._read_wants_write = False810            self._read_ready()811            if not (self._paused or self._closing):812                self._loop.add_reader(self._sock_fd, self._read_ready)813        if self._buffer:814            try:815                n = self._sock.send(self._buffer)816            except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):817                n = 0818            except ssl.SSLWantReadError:819                n = 0820                self._loop.remove_writer(self._sock_fd)821                self._write_wants_read = True822            except Exception as exc:823                self._loop.remove_writer(self._sock_fd)824                self._buffer.clear()825                self._fatal_error(exc, 'Fatal write error on SSL transport')826                return827            if n:828                del self._buffer[:n]829        self._maybe_resume_protocol()  # May append to buffer.830        if not self._buffer:831            self._loop.remove_writer(self._sock_fd)832            if self._closing:833                self._call_connection_lost(None)834    def write(self, data):835        if not isinstance(data, (bytes, bytearray, memoryview)):836            raise TypeError('data argument must be byte-ish (%r)',837                            type(data))838        if not data:839            return840        if self._conn_lost:841            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:842                logger.warning('socket.send() raised exception.')843            self._conn_lost += 1844            return845        if not self._buffer:846            self._loop.add_writer(self._sock_fd, self._write_ready)847        # Add it to the buffer.848        self._buffer.extend(data)849        self._maybe_pause_protocol()850    def can_write_eof(self):851        return False852class _SelectorDatagramTransport(_SelectorTransport):853    _buffer_factory = collections.deque854    def __init__(self, loop, sock, protocol, address=None,855                 waiter=None, extra=None):856        super().__init__(loop, sock, protocol, extra)857        self._address = address858        self._loop.call_soon(self._protocol.connection_made, self)859        # only start reading when connection_made() has been called860        self._loop.call_soon(self._loop.add_reader,861                             self._sock_fd, self._read_ready)862        if waiter is not None:863            # only wake up the waiter when connection_made() has been called864            self._loop.call_soon(waiter._set_result_unless_cancelled, None)865    def get_write_buffer_size(self):866        return sum(len(data) for data, _ in self._buffer)867    def _read_ready(self):868        try:869            data, addr = self._sock.recvfrom(self.max_size)870        except (BlockingIOError, InterruptedError):871            pass872        except OSError as exc:873            self._protocol.error_received(exc)874        except Exception as exc:875            self._fatal_error(exc, 'Fatal read error on datagram transport')876        else:877            self._protocol.datagram_received(data, addr)878    def sendto(self, data, addr=None):879        if not isinstance(data, (bytes, bytearray, memoryview)):880            raise TypeError('data argument must be byte-ish (%r)',881                            type(data))882        if not data:883            return884        if self._address and addr not in (None, self._address):885            raise ValueError('Invalid address: must be None or %s' %886                             (self._address,))887        if self._conn_lost and self._address:888            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:889                logger.warning('socket.send() raised exception.')890            self._conn_lost += 1891            return892        if not self._buffer:893            # Attempt to send it right away first.894            try:895                if self._address:896                    self._sock.send(data)897                else:898                    self._sock.sendto(data, addr)899                return900            except (BlockingIOError, InterruptedError):901                self._loop.add_writer(self._sock_fd, self._sendto_ready)902            except OSError as exc:903                self._protocol.error_received(exc)904                return905            except Exception as exc:906                self._fatal_error(exc,907                                  'Fatal write error on datagram transport')908                return909        # Ensure that what we buffer is immutable.910        self._buffer.append((bytes(data), addr))911        self._maybe_pause_protocol()912    def _sendto_ready(self):913        while self._buffer:914            data, addr = self._buffer.popleft()915            try:916                if self._address:917                    self._sock.send(data)918                else:919                    self._sock.sendto(data, addr)920            except (BlockingIOError, InterruptedError):921                self._buffer.appendleft((data, addr))  # Try again later.922                break923            except OSError as exc:924                self._protocol.error_received(exc)925                return926            except Exception as exc:927                self._fatal_error(exc,928                                  'Fatal write error on datagram transport')929                return930        self._maybe_resume_protocol()  # May append to buffer.931        if not self._buffer:932            self._loop.remove_writer(self._sock_fd)933            if self._closing:...test_rerun.py
Source:test_rerun.py  
1# -*- coding: utf-8 -*-2from bamboo_engine.builder import *  # noqa3from bamboo_engine.engine import Engine4from pipeline.eri.runtime import BambooDjangoRuntime5from ..utils import *  # noqa6def test_single_node_rerun():7    start = EmptyStartEvent()8    act_1 = ServiceActivity(component_code="debug_node")9    act_2 = ServiceActivity(component_code="loop_count_node")10    eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})11    end = EmptyEndEvent()12    act_2.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")13    start.extend(act_1).extend(act_2).extend(eg).connect(act_1, end)14    pipeline_data = Data()15    pipeline_data.inputs["${a_i}"] = NodeOutput(type=Var.SPLICE, source_act=act_2.id, source_key="_loop", value="")16    pipeline_data.inputs["${input_a}"] = Var(type=Var.SPLICE, value='${l.split(",")[a_i]}')17    pipeline_data.inputs["${l}"] = Var(type=Var.PLAIN, value="a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t")18    pipeline_data.inputs["${c}"] = Var(type=Var.PLAIN, value="4")19    pipeline = build_tree(start, data=pipeline_data)20    runtime = BambooDjangoRuntime()21    engine = Engine(runtime)22    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)23    assert_all_finish([pipeline["id"]])24    state = runtime.get_state(act_1.id)25    assert state.name == states.FINISHED26    assert state.loop == 427    state = runtime.get_state(eg.id)28    assert state.name == states.FINISHED29    assert state.loop == 430    state = runtime.get_state(act_2.id)31    assert state.name == states.FINISHED32    assert state.loop == 433    assert_exec_data_equal(34        {35            act_1.id: {36                "inputs": {"_loop": 4, "_inner_loop": 4},37                "outputs": {"_loop": 4, "_inner_loop": 4, "_result": True},38            },39            act_2.id: {40                "inputs": {"_loop": 4, "_inner_loop": 4, "input_a": "e"},41                "outputs": {"_loop": 4, "_inner_loop": 4, "loop": 4, "input_a": "e", "_result": True},42            },43        }44    )45    histories = runtime.get_histories(act_1.id)46    assert len(histories) == 347    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1}48    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": True}49    assert histories[0].loop == 150    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2}51    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "_result": True}52    assert histories[1].loop == 253    assert histories[2].inputs == {"_loop": 3, "_inner_loop": 3}54    assert histories[2].outputs == {"_loop": 3, "_inner_loop": 3, "_result": True}55    assert histories[2].loop == 356    histories = runtime.get_histories(act_2.id)57    assert len(histories) == 358    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}59    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}60    assert histories[0].loop == 161    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}62    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}63    assert histories[1].loop == 264    assert histories[2].inputs == {"_loop": 3, "_inner_loop": 3, "input_a": "d"}65    assert histories[2].outputs == {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True}66    assert histories[2].loop == 367def test_subprocess_rerun():68    start_sub = EmptyStartEvent()69    act_1_sub = ServiceActivity(component_code="debug_node")70    end_sub = EmptyEndEvent()71    act_1_sub.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")72    start_sub.extend(act_1_sub).extend(end_sub)73    start = EmptyStartEvent()74    act_1 = ServiceActivity(component_code="debug_node")75    act_2 = SubProcess(76        start=start_sub,77        data={78            "inputs": {79                "${input_a}": {"type": "splice", "value": '${l.split(",")[a_i]}'},80                "${a_i}": {"type": "plain", "value": "", "is_param": True},81                "${l}": {"type": "plain", "value": "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t"},82                "${output_a}": {"type": "splice", "source_act": act_1_sub.id, "source_key": "input_a"},83            },84            "outputs": ["${output_a}"],85        },86        params={"${a_i}": {"type": "splice", "value": "${s_i}"}},87    )88    eg = ExclusiveGateway(conditions={0: "${s_i} < 4", 1: "${s_i} >= 4"})89    end = EmptyEndEvent()90    start.extend(act_1).extend(act_2).extend(eg).connect(act_2, end)91    pipeline_data = Data()92    pipeline_data.inputs["${s_i}"] = NodeOutput(type=Var.SPLICE, source_act=act_2.id, source_key="_loop", value="")93    pipeline = build_tree(start, data=pipeline_data)94    runtime = BambooDjangoRuntime()95    engine = Engine(runtime)96    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)97    assert_all_finish([pipeline["id"]])98    state = runtime.get_state(start_sub.id)99    assert state.name == states.FINISHED100    assert state.loop == 4101    state = runtime.get_state(act_1_sub.id)102    assert state.name == states.FINISHED103    assert state.loop == 4104    state = runtime.get_state(end_sub.id)105    assert state.name == states.FINISHED106    assert state.loop == 4107    state = runtime.get_state(end_sub.id)108    assert state.name == states.FINISHED109    assert state.loop == 4110    state = runtime.get_state(act_2.id)111    assert state.name == states.FINISHED112    assert state.loop == 4113    state = runtime.get_state(eg.id)114    assert state.name == states.FINISHED115    assert state.loop == 4116    assert_exec_data_equal(117        {118            act_1_sub.id: {119                "inputs": {"_loop": 4, "_inner_loop": 1, "input_a": "e"},120                "outputs": {"_loop": 4, "_inner_loop": 1, "input_a": "e", "_result": True},121            },122            act_1.id: {123                "inputs": {"_loop": 1, "_inner_loop": 1},124                "outputs": {"_loop": 1, "_inner_loop": 1, "_result": True},125            },126            act_2.id: {"inputs": {"${a_i}": 4}, "outputs": {"${output_a}": "e", "_loop": 4, "_inner_loop": 4}},127        }128    )129    histories = runtime.get_histories(act_1_sub.id)130    assert len(histories) == 3131    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}132    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b", "_result": True}133    assert histories[0].loop == 1134    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 1, "input_a": "c"}135    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 1, "input_a": "c", "_result": True}136    assert histories[1].loop == 2137    assert histories[2].inputs == {"_loop": 3, "_inner_loop": 1, "input_a": "d"}138    assert histories[2].outputs == {"_loop": 3, "_inner_loop": 1, "input_a": "d", "_result": True}139    assert histories[2].loop == 3140    histories = runtime.get_histories(act_2.id)141    assert len(histories) == 3142    assert histories[0].inputs == {"${a_i}": 1}143    assert histories[0].outputs == {"${output_a}": "b", "_loop": 1, "_inner_loop": 1}144    assert histories[0].loop == 1145    assert histories[1].inputs == {"${a_i}": 2}146    assert histories[1].outputs == {"${output_a}": "c", "_loop": 2, "_inner_loop": 2}147    assert histories[1].loop == 2148    assert histories[2].inputs == {"${a_i}": 3}149    assert histories[2].outputs == {"${output_a}": "d", "_loop": 3, "_inner_loop": 3}150    assert histories[2].loop == 3151def test_parallel_gateway_rerun():152    start = EmptyStartEvent()153    act_1 = ServiceActivity(component_code="debug_node")154    pg = ParallelGateway()155    act_2 = ServiceActivity(component_code="loop_count_node")156    act_3 = ServiceActivity(component_code="loop_count_node")157    act_4 = ServiceActivity(component_code="loop_count_s_node")158    cg = ConvergeGateway()159    eg = ExclusiveGateway(160        conditions={161            0: "${a_i} < ${c} and ${b_i} < ${c} and ${c_i} < ${c} and ${d} < ${c}",162            1: "${a_i} >= ${c} and ${b_i} >= ${c} and ${c_i} >= ${c} and ${d} >= ${c}",163        }164    )165    end = EmptyEndEvent()166    act_2.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")167    act_3.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_b}")168    act_4.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_c}")169    start.extend(act_1).extend(pg).connect(act_2, act_3, act_4).to(pg).converge(cg).extend(eg).connect(act_1, end)170    pipeline = build_tree(171        start,172        data={173            "inputs": {174                "${a_i}": {"source_act": act_2.id, "source_key": "_loop", "type": "splice", "value": ""},175                "${b_i}": {"source_act": act_3.id, "source_key": "_loop", "type": "splice", "value": ""},176                "${c_i}": {"source_act": act_4.id, "source_key": "_loop", "type": "splice", "value": ""},177                "${input_a}": {"type": "splice", "value": '${l.split(",")[a_i]}'},178                "${input_b}": {"type": "splice", "value": '${l.split(",")[b_i]}'},179                "${input_c}": {"type": "splice", "value": '${l.split(",")[c_i]}'},180                "${d}": {"type": "splice", "value": "${c_i}"},181                "${l}": {"type": "plain", "value": "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t"},182                "${c}": {"type": "plain", "value": "3"},183            },184            "outputs": [],185        },186    )187    runtime = BambooDjangoRuntime()188    engine = Engine(runtime)189    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)190    assert_all_finish([pipeline["id"]])191    state = runtime.get_state(act_1.id)192    assert state.name == states.FINISHED193    assert state.loop == 3194    state = runtime.get_state(pg.id)195    assert state.name == states.FINISHED196    assert state.loop == 3197    state = runtime.get_state(act_2.id)198    assert state.name == states.FINISHED199    assert state.loop == 3200    state = runtime.get_state(act_3.id)201    assert state.name == states.FINISHED202    assert state.loop == 3203    state = runtime.get_state(act_4.id)204    assert state.name == states.FINISHED205    assert state.loop == 3206    state = runtime.get_state(eg.id)207    assert state.name == states.FINISHED208    assert state.loop == 3209    assert_exec_data_equal(210        {211            act_1.id: {212                "inputs": {"_loop": 3, "_inner_loop": 3},213                "outputs": {"_loop": 3, "_inner_loop": 3, "_result": True},214            },215            act_2.id: {216                "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},217                "outputs": {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},218            },219            act_3.id: {220                "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},221                "outputs": {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},222            },223            act_4.id: {224                "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},225                "outputs": {"count": 2, "_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},226            },227        }228    )229    histories = runtime.get_histories(act_1.id)230    assert len(histories) == 2231    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1}232    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": True}233    assert histories[0].loop == 1234    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2}235    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "_result": True}236    assert histories[1].loop == 2237    histories = runtime.get_histories(act_2.id)238    assert len(histories) == 2239    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}240    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}241    assert histories[0].loop == 1242    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}243    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}244    assert histories[1].loop == 2245    histories = runtime.get_histories(act_3.id)246    assert len(histories) == 2247    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}248    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}249    assert histories[0].loop == 1250    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}251    assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}252    assert histories[1].loop == 2253    histories = runtime.get_histories(act_4.id)254    assert len(histories) == 2255    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}256    assert histories[0].outputs == {257        "count": 2,258        "_loop": 1,259        "_inner_loop": 1,260        "loop": 1,261        "input_a": "b",262        "_result": True,263    }264    assert histories[0].loop == 1265    assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}266    assert histories[1].outputs == {267        "count": 2,268        "_loop": 2,269        "_inner_loop": 2,270        "loop": 2,271        "input_a": "c",272        "_result": True,273    }274    assert histories[1].loop == 2275def test_rerun_in_branch():276    start = EmptyStartEvent()277    act_1 = ServiceActivity(component_code="debug_node")278    pg = ParallelGateway()279    # branch 1280    act_2 = ServiceActivity(component_code="loop_count_node")281    eg_1 = ExclusiveGateway(conditions={0: "${l_2} < 2", 1: "${l_2} >= 2"})282    # branch 2283    act_3 = ServiceActivity(component_code="loop_count_node")284    act_4 = ServiceActivity(component_code="loop_count_node")285    eg_2 = ExclusiveGateway(conditions={0: "${l_3} < 2", 1: "${l_3} >= 2"})286    # branch 3287    act_5 = ServiceActivity(component_code="loop_count_node")288    cg = ConvergeGateway()289    end = EmptyEndEvent()290    start.extend(act_1).extend(pg).connect(act_2, act_3, act_5)291    act_2.extend(eg_1).connect(act_2, cg)292    act_3.extend(act_4).extend(eg_2).connect(act_3, cg)293    act_5.extend(cg).extend(end)294    pipeline = build_tree(295        start,296        data={297            "inputs": {298                "${l_2}": {"source_act": act_2.id, "source_key": "_loop", "type": "splice", "value": ""},299                "${l_3}": {"source_act": act_3.id, "source_key": "_loop", "type": "splice", "value": ""},300            },301            "outputs": [],302        },303    )304    runtime = BambooDjangoRuntime()305    engine = Engine(runtime)306    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)307    assert_all_finish([pipeline["id"]])308    state = runtime.get_state(act_2.id)309    assert state.name == states.FINISHED310    assert state.loop == 2311    state = runtime.get_state(act_3.id)312    assert state.name == states.FINISHED313    assert state.loop == 2314    state = runtime.get_state(act_4.id)315    assert state.name == states.FINISHED316    assert state.loop == 2317    state = runtime.get_state(eg_1.id)318    assert state.name == states.FINISHED319    assert state.loop == 2320    state = runtime.get_state(eg_2.id)321    assert state.name == states.FINISHED322    assert state.loop == 2323def test_retry_rerun():324    start = EmptyStartEvent()325    act_1 = ServiceActivity(component_code="fail_at_second_node")326    eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})327    end = EmptyEndEvent()328    act_1.component.inputs.key_1 = Var(type=Var.PLAIN, value="val_1")329    act_1.component.inputs.key_2 = Var(type=Var.PLAIN, value="val_2")330    start.extend(act_1).extend(eg).connect(act_1, end)331    pipeline = build_tree(332        start,333        data={334            "inputs": {335                "${a_i}": {"source_act": act_1.id, "source_key": "_loop", "type": "splice", "value": ""},336                "${c}": {"type": "plain", "value": "4"},337            },338            "outputs": [],339        },340    )341    runtime = BambooDjangoRuntime()342    engine = Engine(runtime)343    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)344    assert_all_failed([act_1.id])345    engine.retry_node(act_1.id, {})346    assert_all_failed([act_1.id])347    engine.retry_node(act_1.id, {"can_go": True})348    assert_all_finish([pipeline["id"]])349    state = runtime.get_state(act_1.id)350    assert state.name == states.FINISHED351    assert state.loop == 4352    state = runtime.get_state(eg.id)353    assert state.name == states.FINISHED354    assert state.loop == 4355    assert_exec_data_equal(356        {357            act_1.id: {358                "inputs": {"_loop": 4, "_inner_loop": 4, "can_go": True},359                "outputs": {"loop": 4, "_loop": 4, "_inner_loop": 4, "can_go": True, "_result": True},360            }361        }362    )363    histories = runtime.get_histories(act_1.id)364    assert len(histories) == 5365    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}366    assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": False}367    assert histories[0].retry == 0368    assert histories[0].loop == 1369    assert histories[1].inputs == {"_loop": 1, "_inner_loop": 1}370    assert histories[1].outputs == {"_loop": 1, "_inner_loop": 1, "_result": False}371    assert histories[1].retry == 1372    assert histories[1].loop == 1373    assert histories[2].inputs == {"_loop": 1, "_inner_loop": 1, "can_go": True}374    assert histories[2].outputs == {"loop": 1, "_loop": 1, "_inner_loop": 1, "can_go": True, "_result": True}375    assert histories[2].retry == 2376    assert histories[2].loop == 1377    assert histories[3].inputs == {"_loop": 2, "_inner_loop": 2, "can_go": True}378    assert histories[3].outputs == {"loop": 2, "_loop": 2, "_inner_loop": 2, "can_go": True, "_result": True}379    assert histories[3].loop == 2380    assert histories[4].inputs == {"_loop": 3, "_inner_loop": 3, "can_go": True}381    assert histories[4].outputs == {"loop": 3, "_loop": 3, "_inner_loop": 3, "can_go": True, "_result": True}382    assert histories[4].loop == 3383def test_skip_rerun():384    start = EmptyStartEvent()385    act_1 = ServiceActivity(component_code="fail_at_second_node")386    eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})387    end = EmptyEndEvent()388    act_1.component.inputs.key_1 = Var(type=Var.PLAIN, value="val_1")389    act_1.component.inputs.key_2 = Var(type=Var.PLAIN, value="val_2")390    start.extend(act_1).extend(eg).connect(act_1, end)391    pipeline = build_tree(392        start,393        data={394            "inputs": {395                "${a_i}": {"source_act": act_1.id, "source_key": "_loop", "type": "splice", "value": ""},396                "${c}": {"type": "plain", "value": "4"},397            },398            "outputs": [],399        },400    )401    runtime = BambooDjangoRuntime()402    engine = Engine(runtime)403    engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)404    assert_all_failed([act_1.id])405    engine.skip_node(act_1.id)406    assert_all_finish([pipeline["id"]])407    state = runtime.get_state(act_1.id)408    assert state.name == states.FINISHED409    assert state.loop == 4410    state = runtime.get_state(eg.id)411    assert state.name == states.FINISHED412    assert state.loop == 4413    assert_exec_data_equal(414        {415            act_1.id: {416                "inputs": {"_loop": 4, "_inner_loop": 4, "key_1": "val_1", "key_2": "val_2"},417                "outputs": {418                    "loop": 4,419                    "_loop": 4,420                    "_inner_loop": 4,421                    "key_1": "val_1",422                    "key_2": "val_2",423                    "_result": True,424                },425            }426        }427    )428    histories = runtime.get_histories(act_1.id)429    assert len(histories) == 4430    assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}431    assert histories[0].skip is False432    assert histories[0].outputs == {"_result": False, "_inner_loop": 1, "_loop": 1}433    assert histories[0].loop == 1434    assert histories[1].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}435    assert histories[1].skip is True436    assert histories[1].outputs == {"_result": False, "_inner_loop": 1, "_loop": 1}437    assert histories[1].loop == 1438    assert histories[2].inputs == {"_loop": 2, "_inner_loop": 2, "key_1": "val_1", "key_2": "val_2"}439    assert histories[2].outputs == {440        "loop": 2,441        "_loop": 2,442        "_inner_loop": 2,443        "key_1": "val_1",444        "key_2": "val_2",445        "_result": True,446    }447    assert histories[2].skip is False448    assert histories[2].loop == 2449    assert histories[3].inputs == {"_loop": 3, "_inner_loop": 3, "key_1": "val_1", "key_2": "val_2"}450    assert histories[3].outputs == {451        "loop": 3,452        "_loop": 3,453        "_inner_loop": 3,454        "key_1": "val_1",455        "key_2": "val_2",456        "_result": True,457    }458    assert histories[3].skip is False...test.py
Source:test.py  
...7HOST = "192.168.80.43"8PORT = 809class TestLogin(aiounittest.AsyncTestCase):10    def setUp(self):11        self._loop = asyncio.new_event_loop()12        self.addCleanup(self._loop.close)13        self._user = USER14        self._password = PASSWORD15        self._host = HOST16        self._port = PORT17    def tearDown(self):18        self._loop.close()19    def test_succes(self):20        host = Host(21            host        = self._host,22            port        = self._port,23            username    = self._user,24            password    = self._password,25        )26        assert self._loop.run_until_complete(host.login())27        assert host.session_active28        self._loop.run_until_complete(host.logout())29    def test_wrong_password(self):30        host = Host(31            host        = self._host,32            port        = self._port,33            username    = self._user,34            password    = "wrongpass"35        )36        assert not self._loop.run_until_complete(host.login())37        assert not host.session_active38        assert not self._loop.run_until_complete(host.get_host_data())39        assert not self._loop.run_until_complete(host.get_states())40        assert not self._loop.run_until_complete(host.get_motion_state(0))41        assert not self._loop.run_until_complete(host.get_stream_source(0))42        assert not self._loop.run_until_complete(host.set_ftp(0, False))43    def test_wrong_user(self):44        host = Host(45            host=self._host,46            port=self._port,47            username="wronguser",48            password=self._password,49        )50        assert not self._loop.run_until_complete(host.login())51        assert not host.session_active52    def test_wrong_host(self):53        host = Host(54            host="192.168.1.0",55            port=self._port,56            username=self._user,57            password=self._password,58        )59        assert not self._loop.run_until_complete(host.login())60        assert not host.session_active61#endof class TestLogin62class TestGetData(aiounittest.AsyncTestCase):63    def setUp(self):64        self._loop = asyncio.new_event_loop()65        self.addCleanup(self._loop.close)66        self._user      = USER67        self._password  = PASSWORD68        self._host      = HOST69        self._port      = PORT70        self._host_device = Host(71            host        = self._host,72            port        = self._port,73            username    = self._user,74            password    = self._password,75        )76        assert self._loop.run_until_complete(self._host_device.login())77        assert self._host_device.session_active78    def test1_settings(self):79        assert self._loop.run_until_complete(self._host_device.get_host_data())80        self._host_device.is_admin81        assert self._host_device.host is not None82        assert self._host_device.port is not None83        assert self._host_device.channels is not None84        assert self._host_device.onvif_port is not None85        assert self._host_device.mac_address is not None86        assert self._host_device.serial is not None87        assert self._host_device.nvr_name is not None88        assert self._host_device.sw_version is not None89        assert self._host_device.model is not None90        assert self._host_device.manufacturer is not None91        assert self._host_device.rtmp_port is not None92        assert self._host_device.rtsp_port is not None93        assert self._host_device.stream is not None94        assert self._host_device.protocol is not None95        assert self._host_device.hdd_info is not None96        assert self._host_device.ptz_supported is not None97        self._host_device._users.append({"level": "guest", "userName": "guest"})98        self._host_device._username = "guest"99        assert not self._host_device.is_admin100    def test2_states(self):101        assert self._loop.run_until_complete(self._host_device.get_states())102        assert self._loop.run_until_complete(self._host_device.get_motion_state(0)) is not None103        self._host_device._ptz_support[0] = True104        self._host_device._ptz_presets[0]["test"] = 123105        assert (106            self._loop.run_until_complete(self._host_device.get_switchable_capabilities(0))107            is not None108        )109    def test3_images(self):110        assert self._loop.run_until_complete(self._host_device.get_snapshot(0)) is not None111        assert self._loop.run_until_complete(self._host_device.get_snapshot(100)) is None112        assert self._loop.run_until_complete(self._host_device.get_snapshot(0)) is not None113        assert self._loop.run_until_complete(self._host_device.get_stream_source(0)) is not None114    def test4_properties(self):115        assert self._loop.run_until_complete(self._host_device.get_states())116        assert self._host_device.motion_detection_state(0) is not None117        assert self._host_device.is_ia_enabled(0) is not None118        assert self._host_device.ftp_enabled(0) is not None119        assert self._host_device.email_enabled(0) is not None120        assert self._host_device.ir_enabled(0) is not None121        assert self._host_device.whiteled_enabled(0) is not None122        assert self._host_device.daynight_state(0) is not None123        assert self._host_device.recording_enabled(0) is not None124        assert self._host_device.audio_alarm_enabled(0) is not None125        assert self._host_device.ptz_presets(0) == {}  # Cam has no ptz126        assert self._host_device.sensititivy_presets(0) is not None127        get_ptz_response = [128            {129                "cmd": "GetPtzPreset",130                "code": 0,131                "value": {132                    "PtzPreset": [133                        {"enable": 0, "name": "Preset_1", "id": 0},134                        {"enable": 1, "name": "Preset_2", "id": 1},135                    ]136                },137            }138        ]139        self._host_device.map_channel_json_response(get_ptz_response, 0)140        assert self._host_device._ptz_presets[0] is not None141        assert self._host_device._ptz_presets_settings[0] is not None142        assert not self._loop.run_until_complete(143            self._host_device.send_setting([{"cmd": "wrong_command"}])144        )145        for _ in range(1):146            """FTP state."""147            assert self._loop.run_until_complete(self._host_device.set_ftp(0, True))148            assert self._host_device.ftp_enabled(0)149            assert self._loop.run_until_complete(self._host_device.set_ftp(0, False))150            assert not self._host_device.ftp_enabled(0)151            """Email state."""152            assert self._loop.run_until_complete(self._host_device.set_email(0, True))153            assert self._host_device.email_enabled(0)154            assert self._loop.run_until_complete(self._host_device.set_email(0, False))155            assert not self._host_device.email_enabled(0)156            """Audio state."""157            assert self._loop.run_until_complete(self._host_device.set_audio(0, True))158            assert self._host_device.audio_alarm_enabled(0)159            assert self._loop.run_until_complete(self._host_device.set_audio(0, False))160            assert not self._host_device.audio_alarm_enabled(0)161            """ir state."""162            assert self._loop.run_until_complete(self._host_device.set_ir_lights(0, True))163            assert self._host_device.ir_enabled(0)164            assert self._loop.run_until_complete(self._host_device.set_ir_lights(0, False))165            assert not self._host_device.ir_enabled(0)166            """Daynight state."""167            assert self._loop.run_until_complete(self._host_device.set_daynight(0, "Auto"))168            assert self._host_device.daynight_state(0)169            assert self._loop.run_until_complete(self._host_device.set_daynight(0, "Color"))170            assert not self._host_device.daynight_state(0)171            """Recording state."""172            assert self._loop.run_until_complete(self._host_device.set_recording(0, True))173            assert self._host_device.recording_enabled(0)174            assert self._loop.run_until_complete(self._host_device.set_recording(0, False))175            assert not self._host_device.recording_enabled(0)176            """Motion detection state."""177            assert self._loop.run_until_complete(self._host_device.set_motion_detection(0, True))178            assert self._host_device.motion_detection_state(0) is not None  # Ignore state179            assert self._loop.run_until_complete(self._host_device.set_motion_detection(0, False))180            assert self._loop.run_until_complete(self._host_device.get_states())181            assert (182                self._loop.run_until_complete(self._host_device.get_stream_source(0)) is not None183            )184            assert (185                self._loop.run_until_complete(186                    self._host_device.set_ptz_command(0, "RIGHT", speed=10)187                )188                == False189            )190            assert (191                self._loop.run_until_complete(192                    self._host_device.set_ptz_command(0, "GOTO", preset=1)193                )194                == False195            )196            assert (197                self._loop.run_until_complete(self._host_device.set_ptz_command(0, "STOP"))198                == False199            )200            assert self._loop.run_until_complete(self._host_device.set_sensitivity(0, value=10))201            assert self._loop.run_until_complete(202                self._host_device.set_sensitivity(0, value=45, preset=0)203            )204            """ White Led State (Spotlight )  """205            """ required tests """206            """    turn off , night mode off """207            """    turn on, night mode off """208            """    turn off, , night mode on """209            """    turn on, night mode on , auto mode """210            """    turn off, night mode on, scheduled """211            """    turn on,  night mode on, scheduled mode """212            """    Turn on, NM on, auto Bright = 0 """213            """    Turn on, NM on, auto Bright = 100 """214            """    incorrect mode not 0,1,3 """215            """    incorrect brightness < 0 """216            """    incorrect brightness > 100 """217            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,0))218            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,0))219            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,1))220            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,1))221            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,3))222            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,3))223            """  so that effect can be seen on spotlight wait 2 seconds between changes """224            time.sleep(2)225            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,0,1))226            time.sleep(2)227            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,100,1))228            assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,100,2))229            time.sleep(2)230            """ now turn off light - does not require an assert """231            self.loop.run_until_complete(self._host_device.set_whiteled(0, False,50,0))232            """ with incorrect values the routine should return a False """233            assert not self._loop.run_until_complete(self._host_device.set_whiteled(0, True,-10,1))234            assert not self._loop.run_until_complete(self._host_device.set_whiteled(0, True,1000,1))235            """  now tests for setting the schedule for spotlight when night mode non auto"""236            assert self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 5, 30, 17, 30))237            assert self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 7, 30, 19, 30))238            # invalid parameters239            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, -1, 0, 18, 0))240            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 24, 0, 18, 0))241            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, -2, 18, 0))242            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 60, 18, 0))243            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, -3, 0))244            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 24, 0))245            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 18, -4))246            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 18, 59, 19, 0))247            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 18, 29, 18, 30))248            #  query should end time equals start time be an error249            assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 6, 0))250            #251            # check simplified call252            assert self._loop.run_until_complete(self._host_device.set_spotlight(0, True))253            assert self._loop.run_until_complete(self._host_device.set_spotlight(0, False))254            # test of siren255            assert self._loop.run_until_complete(self._host_device.set_siren(0, True))256            assert self._loop.run_until_complete(self._host_device.set_siren(0, False))257    def tearDown(self):258        self._loop.run_until_complete(self._host_device.logout())259        self._loop.close()260#endof class TestGetData261class TestSubscription(aiounittest.AsyncTestCase):262    def setUp(self):263        self._loop = asyncio.new_event_loop()264        self.addCleanup(self._loop.close)265        self._user = USER266        self._password = PASSWORD267        self._host = HOST268        self._port = PORT269    def tearDown(self):270        self._loop.close()271    def test_succes(self):272        host = Host(273            host=self._host,274            port=self._port,275            username=self._user,276            password=self._password,277        )...receiver.js
Source:receiver.js  
1'use strict';2const { Writable } = require('stream');3const PerMessageDeflate = require('./permessage-deflate');4const {5  BINARY_TYPES,6  EMPTY_BUFFER,7  kStatusCode,8  kWebSocket9} = require('./constants');10const { concat, toArrayBuffer, unmask } = require('./buffer-util');11const { isValidStatusCode, isValidUTF8 } = require('./validation');12const GET_INFO = 0;13const GET_PAYLOAD_LENGTH_16 = 1;14const GET_PAYLOAD_LENGTH_64 = 2;15const GET_MASK = 3;16const GET_DATA = 4;17const INFLATING = 5;18/**19 * HyBi Receiver implementation.20 *21 * @extends stream.Writable22 */23class Receiver extends Writable {24  /**25   * Creates a Receiver instance.26   *27   * @param {String} binaryType The type for binary data28   * @param {Object} extensions An object containing the negotiated extensions29   * @param {Number} maxPayload The maximum allowed message length30   */31  constructor(binaryType, extensions, maxPayload) {32    super();33    this._binaryType = binaryType || BINARY_TYPES[0];34    this[kWebSocket] = undefined;35    this._extensions = extensions || {};36    this._maxPayload = maxPayload | 0;37    this._bufferedBytes = 0;38    this._buffers = [];39    this._compressed = false;40    this._payloadLength = 0;41    this._mask = undefined;42    this._fragmented = 0;43    this._masked = false;44    this._fin = false;45    this._opcode = 0;46    this._totalPayloadLength = 0;47    this._messageLength = 0;48    this._fragments = [];49    this._state = GET_INFO;50    this._loop = false;51  }52  /**53   * Implements `Writable.prototype._write()`.54   *55   * @param {Buffer} chunk The chunk of data to write56   * @param {String} encoding The character encoding of `chunk`57   * @param {Function} cb Callback58   */59  _write(chunk, encoding, cb) {60    if (this._opcode === 0x08 && this._state == GET_INFO) return cb();61    this._bufferedBytes += chunk.length;62    this._buffers.push(chunk);63    this.startLoop(cb);64  }65  /**66   * Consumes `n` bytes from the buffered data.67   *68   * @param {Number} n The number of bytes to consume69   * @return {Buffer} The consumed bytes70   * @private71   */72  consume(n) {73    this._bufferedBytes -= n;74    if (n === this._buffers[0].length) return this._buffers.shift();75    if (n < this._buffers[0].length) {76      const buf = this._buffers[0];77      this._buffers[0] = buf.slice(n);78      return buf.slice(0, n);79    }80    const dst = Buffer.allocUnsafe(n);81    do {82      const buf = this._buffers[0];83      if (n >= buf.length) {84        this._buffers.shift().copy(dst, dst.length - n);85      } else {86        buf.copy(dst, dst.length - n, 0, n);87        this._buffers[0] = buf.slice(n);88      }89      n -= buf.length;90    } while (n > 0);91    return dst;92  }93  /**94   * Starts the parsing loop.95   *96   * @param {Function} cb Callback97   * @private98   */99  startLoop(cb) {100    var err;101    this._loop = true;102    do {103      switch (this._state) {104        case GET_INFO:105          err = this.getInfo();106          break;107        case GET_PAYLOAD_LENGTH_16:108          err = this.getPayloadLength16();109          break;110        case GET_PAYLOAD_LENGTH_64:111          err = this.getPayloadLength64();112          break;113        case GET_MASK:114          this.getMask();115          break;116        case GET_DATA:117          err = this.getData(cb);118          break;119        default:120          // `INFLATING`121          this._loop = false;122          return;123      }124    } while (this._loop);125    cb(err);126  }127  /**128   * Reads the first two bytes of a frame.129   *130   * @return {(RangeError|undefined)} A possible error131   * @private132   */133  getInfo() {134    if (this._bufferedBytes < 2) {135      this._loop = false;136      return;137    }138    const buf = this.consume(2);139    if ((buf[0] & 0x30) !== 0x00) {140      this._loop = false;141      return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);142    }143    const compressed = (buf[0] & 0x40) === 0x40;144    if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {145      this._loop = false;146      return error(RangeError, 'RSV1 must be clear', true, 1002);147    }148    this._fin = (buf[0] & 0x80) === 0x80;149    this._opcode = buf[0] & 0x0f;150    this._payloadLength = buf[1] & 0x7f;151    if (this._opcode === 0x00) {152      if (compressed) {153        this._loop = false;154        return error(RangeError, 'RSV1 must be clear', true, 1002);155      }156      if (!this._fragmented) {157        this._loop = false;158        return error(RangeError, 'invalid opcode 0', true, 1002);159      }160      this._opcode = this._fragmented;161    } else if (this._opcode === 0x01 || this._opcode === 0x02) {162      if (this._fragmented) {163        this._loop = false;164        return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);165      }166      this._compressed = compressed;167    } else if (this._opcode > 0x07 && this._opcode < 0x0b) {168      if (!this._fin) {169        this._loop = false;170        return error(RangeError, 'FIN must be set', true, 1002);171      }172      if (compressed) {173        this._loop = false;174        return error(RangeError, 'RSV1 must be clear', true, 1002);175      }176      if (this._payloadLength > 0x7d) {177        this._loop = false;178        return error(179          RangeError,180          `invalid payload length ${this._payloadLength}`,181          true,182          1002183        );184      }185    } else {186      this._loop = false;187      return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);188    }189    if (!this._fin && !this._fragmented) this._fragmented = this._opcode;190    this._masked = (buf[1] & 0x80) === 0x80;191    if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;192    else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;193    else return this.haveLength();194  }195  /**196   * Gets extended payload length (7+16).197   *198   * @return {(RangeError|undefined)} A possible error199   * @private200   */201  getPayloadLength16() {202    if (this._bufferedBytes < 2) {203      this._loop = false;204      return;205    }206    this._payloadLength = this.consume(2).readUInt16BE(0);207    return this.haveLength();208  }209  /**210   * Gets extended payload length (7+64).211   *212   * @return {(RangeError|undefined)} A possible error213   * @private214   */215  getPayloadLength64() {216    if (this._bufferedBytes < 8) {217      this._loop = false;218      return;219    }220    const buf = this.consume(8);221    const num = buf.readUInt32BE(0);222    //223    // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned224    // if payload length is greater than this number.225    //226    if (num > Math.pow(2, 53 - 32) - 1) {227      this._loop = false;228      return error(229        RangeError,230        'Unsupported WebSocket frame: payload length > 2^53 - 1',231        false,232        1009233      );234    }235    this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);236    return this.haveLength();237  }238  /**239   * Payload length has been read.240   *241   * @return {(RangeError|undefined)} A possible error242   * @private243   */244  haveLength() {245    if (this._payloadLength && this._opcode < 0x08) {246      this._totalPayloadLength += this._payloadLength;247      if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {248        this._loop = false;249        return error(RangeError, 'Max payload size exceeded', false, 1009);250      }251    }252    if (this._masked) this._state = GET_MASK;253    else this._state = GET_DATA;254  }255  /**256   * Reads mask bytes.257   *258   * @private259   */260  getMask() {261    if (this._bufferedBytes < 4) {262      this._loop = false;263      return;264    }265    this._mask = this.consume(4);266    this._state = GET_DATA;267  }268  /**269   * Reads data bytes.270   *271   * @param {Function} cb Callback272   * @return {(Error|RangeError|undefined)} A possible error273   * @private274   */275  getData(cb) {276    var data = EMPTY_BUFFER;277    if (this._payloadLength) {278      if (this._bufferedBytes < this._payloadLength) {279        this._loop = false;280        return;281      }282      data = this.consume(this._payloadLength);283      if (this._masked) unmask(data, this._mask);284    }285    if (this._opcode > 0x07) return this.controlMessage(data);286    if (this._compressed) {287      this._state = INFLATING;288      this.decompress(data, cb);289      return;290    }291    if (data.length) {292      //293      // This message is not compressed so its lenght is the sum of the payload294      // length of all fragments.295      //296      this._messageLength = this._totalPayloadLength;297      this._fragments.push(data);298    }299    return this.dataMessage();300  }301  /**302   * Decompresses data.303   *304   * @param {Buffer} data Compressed data305   * @param {Function} cb Callback306   * @private307   */308  decompress(data, cb) {309    const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];310    perMessageDeflate.decompress(data, this._fin, (err, buf) => {311      if (err) return cb(err);312      if (buf.length) {313        this._messageLength += buf.length;314        if (this._messageLength > this._maxPayload && this._maxPayload > 0) {315          return cb(316            error(RangeError, 'Max payload size exceeded', false, 1009)317          );318        }319        this._fragments.push(buf);320      }321      const er = this.dataMessage();322      if (er) return cb(er);323      this.startLoop(cb);324    });325  }326  /**327   * Handles a data message.328   *329   * @return {(Error|undefined)} A possible error330   * @private331   */332  dataMessage() {333    if (this._fin) {334      const messageLength = this._messageLength;335      const fragments = this._fragments;336      this._totalPayloadLength = 0;337      this._messageLength = 0;338      this._fragmented = 0;339      this._fragments = [];340      if (this._opcode === 2) {341        var data;342        if (this._binaryType === 'nodebuffer') {343          data = concat(fragments, messageLength);344        } else if (this._binaryType === 'arraybuffer') {345          data = toArrayBuffer(concat(fragments, messageLength));346        } else {347          data = fragments;348        }349        this.emit('message', data);350      } else {351        const buf = concat(fragments, messageLength);352        if (!isValidUTF8(buf)) {353          this._loop = false;354          return error(Error, 'invalid UTF-8 sequence', true, 1007);355        }356        this.emit('message', buf.toString());357      }358    }359    this._state = GET_INFO;360  }361  /**362   * Handles a control message.363   *364   * @param {Buffer} data Data to handle365   * @return {(Error|RangeError|undefined)} A possible error366   * @private367   */368  controlMessage(data) {369    if (this._opcode === 0x08) {370      this._loop = false;371      if (data.length === 0) {372        this.emit('conclude', 1005, '');373        this.end();374      } else if (data.length === 1) {375        return error(RangeError, 'invalid payload length 1', true, 1002);376      } else {377        const code = data.readUInt16BE(0);378        if (!isValidStatusCode(code)) {379          return error(RangeError, `invalid status code ${code}`, true, 1002);380        }381        const buf = data.slice(2);382        if (!isValidUTF8(buf)) {383          return error(Error, 'invalid UTF-8 sequence', true, 1007);384        }385        this.emit('conclude', code, buf.toString());386        this.end();387      }388    } else if (this._opcode === 0x09) {389      this.emit('ping', data);390    } else {391      this.emit('pong', data);392    }393    this._state = GET_INFO;394  }395}396module.exports = Receiver;397/**398 * Builds an error object.399 *400 * @param {(Error|RangeError)} ErrorCtor The error constructor401 * @param {String} message The error message402 * @param {Boolean} prefix Specifies whether or not to add a default prefix to403 *     `message`404 * @param {Number} statusCode The status code405 * @return {(Error|RangeError)} The error406 * @private407 */408function error(ErrorCtor, message, prefix, statusCode) {409  const err = new ErrorCtor(410    prefix ? `Invalid WebSocket frame: ${message}` : message411  );412  Error.captureStackTrace(err, error);413  err[kStatusCode] = statusCode;414  return err;...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
