Best Python code snippet using playwright-python
network.py
Source:network.py  
...444        """445        event_name = "Network.dataReceived"446        if listener is None:447            future = self.client.loop.create_future()448            def _listener(event: Optional[Dict] = None) -> None:449                future.set_result(event)450            self.client.once(event_name, _listener)451            return future452        self.client.on(event_name, listener)453        return lambda: self.client.remove_listener(event_name, listener)454    def eventSourceMessageReceived(455        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None456    ) -> Any:457        """458        Fired when EventSource message is received.459        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-eventSourceMessageReceived`460        :param listener: Optional listener function461        :return: If a listener was supplied the return value is a callable that462        will remove the supplied listener otherwise a future that resolves463        with the value of the event464        """465        event_name = "Network.eventSourceMessageReceived"466        if listener is None:467            future = self.client.loop.create_future()468            def _listener(event: Optional[Dict] = None) -> None:469                future.set_result(event)470            self.client.once(event_name, _listener)471            return future472        self.client.on(event_name, listener)473        return lambda: self.client.remove_listener(event_name, listener)474    def loadingFailed(475        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None476    ) -> Any:477        """478        Fired when HTTP request has failed to load.479        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-loadingFailed`480        :param listener: Optional listener function481        :return: If a listener was supplied the return value is a callable that482        will remove the supplied listener otherwise a future that resolves483        with the value of the event484        """485        event_name = "Network.loadingFailed"486        if listener is None:487            future = self.client.loop.create_future()488            def _listener(event: Optional[Dict] = None) -> None:489                future.set_result(event)490            self.client.once(event_name, _listener)491            return future492        self.client.on(event_name, listener)493        return lambda: self.client.remove_listener(event_name, listener)494    def loadingFinished(495        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None496    ) -> Any:497        """498        Fired when HTTP request has finished loading.499        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-loadingFinished`500        :param listener: Optional listener function501        :return: If a listener was supplied the return value is a callable that502        will remove the supplied listener otherwise a future that resolves503        with the value of the event504        """505        event_name = "Network.loadingFinished"506        if listener is None:507            future = self.client.loop.create_future()508            def _listener(event: Optional[Dict] = None) -> None:509                future.set_result(event)510            self.client.once(event_name, _listener)511            return future512        self.client.on(event_name, listener)513        return lambda: self.client.remove_listener(event_name, listener)514    def requestIntercepted(515        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None516    ) -> Any:517        """518        Details of an intercepted HTTP request, which must be either allowed, blocked, modified or519        mocked.520        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestIntercepted`521        :param listener: Optional listener function522        :return: If a listener was supplied the return value is a callable that523        will remove the supplied listener otherwise a future that resolves524        with the value of the event525        """526        event_name = "Network.requestIntercepted"527        if listener is None:528            future = self.client.loop.create_future()529            def _listener(event: Optional[Dict] = None) -> None:530                future.set_result(event)531            self.client.once(event_name, _listener)532            return future533        self.client.on(event_name, listener)534        return lambda: self.client.remove_listener(event_name, listener)535    def requestServedFromCache(536        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None537    ) -> Any:538        """539        Fired if request ended up loading from cache.540        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestServedFromCache`541        :param listener: Optional listener function542        :return: If a listener was supplied the return value is a callable that543        will remove the supplied listener otherwise a future that resolves544        with the value of the event545        """546        event_name = "Network.requestServedFromCache"547        if listener is None:548            future = self.client.loop.create_future()549            def _listener(event: Optional[Dict] = None) -> None:550                future.set_result(event)551            self.client.once(event_name, _listener)552            return future553        self.client.on(event_name, listener)554        return lambda: self.client.remove_listener(event_name, listener)555    def requestWillBeSent(556        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None557    ) -> Any:558        """559        Fired when page is about to send HTTP request.560        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestWillBeSent`561        :param listener: Optional listener function562        :return: If a listener was supplied the return value is a callable that563        will remove the supplied listener otherwise a future that resolves564        with the value of the event565        """566        event_name = "Network.requestWillBeSent"567        if listener is None:568            future = self.client.loop.create_future()569            def _listener(event: Optional[Dict] = None) -> None:570                future.set_result(event)571            self.client.once(event_name, _listener)572            return future573        self.client.on(event_name, listener)574        return lambda: self.client.remove_listener(event_name, listener)575    def resourceChangedPriority(576        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None577    ) -> Any:578        """579        Fired when resource loading priority is changed580        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-resourceChangedPriority`581        :param listener: Optional listener function582        :return: If a listener was supplied the return value is a callable that583        will remove the supplied listener otherwise a future that resolves584        with the value of the event585        """586        event_name = "Network.resourceChangedPriority"587        if listener is None:588            future = self.client.loop.create_future()589            def _listener(event: Optional[Dict] = None) -> None:590                future.set_result(event)591            self.client.once(event_name, _listener)592            return future593        self.client.on(event_name, listener)594        return lambda: self.client.remove_listener(event_name, listener)595    def signedExchangeReceived(596        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None597    ) -> Any:598        """599        Fired when a signed exchange was received over the network600        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-signedExchangeReceived`601        :param listener: Optional listener function602        :return: If a listener was supplied the return value is a callable that603        will remove the supplied listener otherwise a future that resolves604        with the value of the event605        """606        event_name = "Network.signedExchangeReceived"607        if listener is None:608            future = self.client.loop.create_future()609            def _listener(event: Optional[Dict] = None) -> None:610                future.set_result(event)611            self.client.once(event_name, _listener)612            return future613        self.client.on(event_name, listener)614        return lambda: self.client.remove_listener(event_name, listener)615    def responseReceived(616        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None617    ) -> Any:618        """619        Fired when HTTP response is available.620        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-responseReceived`621        :param listener: Optional listener function622        :return: If a listener was supplied the return value is a callable that623        will remove the supplied listener otherwise a future that resolves624        with the value of the event625        """626        event_name = "Network.responseReceived"627        if listener is None:628            future = self.client.loop.create_future()629            def _listener(event: Optional[Dict] = None) -> None:630                future.set_result(event)631            self.client.once(event_name, _listener)632            return future633        self.client.on(event_name, listener)634        return lambda: self.client.remove_listener(event_name, listener)635    def webSocketClosed(636        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None637    ) -> Any:638        """639        Fired when WebSocket is closed.640        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketClosed`641        :param listener: Optional listener function642        :return: If a listener was supplied the return value is a callable that643        will remove the supplied listener otherwise a future that resolves644        with the value of the event645        """646        event_name = "Network.webSocketClosed"647        if listener is None:648            future = self.client.loop.create_future()649            def _listener(event: Optional[Dict] = None) -> None:650                future.set_result(event)651            self.client.once(event_name, _listener)652            return future653        self.client.on(event_name, listener)654        return lambda: self.client.remove_listener(event_name, listener)655    def webSocketCreated(656        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None657    ) -> Any:658        """659        Fired upon WebSocket creation.660        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketCreated`661        :param listener: Optional listener function662        :return: If a listener was supplied the return value is a callable that663        will remove the supplied listener otherwise a future that resolves664        with the value of the event665        """666        event_name = "Network.webSocketCreated"667        if listener is None:668            future = self.client.loop.create_future()669            def _listener(event: Optional[Dict] = None) -> None:670                future.set_result(event)671            self.client.once(event_name, _listener)672            return future673        self.client.on(event_name, listener)674        return lambda: self.client.remove_listener(event_name, listener)675    def webSocketFrameError(676        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None677    ) -> Any:678        """679        Fired when WebSocket message error occurs.680        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameError`681        :param listener: Optional listener function682        :return: If a listener was supplied the return value is a callable that683        will remove the supplied listener otherwise a future that resolves684        with the value of the event685        """686        event_name = "Network.webSocketFrameError"687        if listener is None:688            future = self.client.loop.create_future()689            def _listener(event: Optional[Dict] = None) -> None:690                future.set_result(event)691            self.client.once(event_name, _listener)692            return future693        self.client.on(event_name, listener)694        return lambda: self.client.remove_listener(event_name, listener)695    def webSocketFrameReceived(696        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None697    ) -> Any:698        """699        Fired when WebSocket message is received.700        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameReceived`701        :param listener: Optional listener function702        :return: If a listener was supplied the return value is a callable that703        will remove the supplied listener otherwise a future that resolves704        with the value of the event705        """706        event_name = "Network.webSocketFrameReceived"707        if listener is None:708            future = self.client.loop.create_future()709            def _listener(event: Optional[Dict] = None) -> None:710                future.set_result(event)711            self.client.once(event_name, _listener)712            return future713        self.client.on(event_name, listener)714        return lambda: self.client.remove_listener(event_name, listener)715    def webSocketFrameSent(716        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None717    ) -> Any:718        """719        Fired when WebSocket message is sent.720        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameSent`721        :param listener: Optional listener function722        :return: If a listener was supplied the return value is a callable that723        will remove the supplied listener otherwise a future that resolves724        with the value of the event725        """726        event_name = "Network.webSocketFrameSent"727        if listener is None:728            future = self.client.loop.create_future()729            def _listener(event: Optional[Dict] = None) -> None:730                future.set_result(event)731            self.client.once(event_name, _listener)732            return future733        self.client.on(event_name, listener)734        return lambda: self.client.remove_listener(event_name, listener)735    def webSocketHandshakeResponseReceived(736        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None737    ) -> Any:738        """739        Fired when WebSocket handshake response becomes available.740        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketHandshakeResponseReceived`741        :param listener: Optional listener function742        :return: If a listener was supplied the return value is a callable that743        will remove the supplied listener otherwise a future that resolves744        with the value of the event745        """746        event_name = "Network.webSocketHandshakeResponseReceived"747        if listener is None:748            future = self.client.loop.create_future()749            def _listener(event: Optional[Dict] = None) -> None:750                future.set_result(event)751            self.client.once(event_name, _listener)752            return future753        self.client.on(event_name, listener)754        return lambda: self.client.remove_listener(event_name, listener)755    def webSocketWillSendHandshakeRequest(756        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None757    ) -> Any:758        """759        Fired when WebSocket is about to initiate handshake.760        See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketWillSendHandshakeRequest`761        :param listener: Optional listener function762        :return: If a listener was supplied the return value is a callable that763        will remove the supplied listener otherwise a future that resolves764        with the value of the event765        """766        event_name = "Network.webSocketWillSendHandshakeRequest"767        if listener is None:768            future = self.client.loop.create_future()769            def _listener(event: Optional[Dict] = None) -> None:770                future.set_result(event)771            self.client.once(event_name, _listener)772            return future773        self.client.on(event_name, listener)...interfaces.py
Source:interfaces.py  
...22            # etc.23        # create a new pool with a listener24        p = QueuePool(..., listeners=[MyListener()])25        # add a listener after the fact26        p.add_listener(MyListener())27        # usage with create_engine()28        e = create_engine("url://", listeners=[MyListener()])29    All of the standard connection :class:`~sqlalchemy.pool.Pool` types can30    accept event listeners for key connection lifecycle events:31    creation, pool check-out and check-in.  There are no events fired32    when a connection closes.33    For any given DB-API connection, there will be one ``connect``34    event, `n` number of ``checkout`` events, and either `n` or `n - 1`35    ``checkin`` events.  (If a ``Connection`` is detached from its36    pool via the ``detach()`` method, it won't be checked back in.)37    These are low-level events for low-level objects: raw Python38    DB-API connections, without the conveniences of the SQLAlchemy39    ``Connection`` wrapper, ``Dialect`` services or ``ClauseElement``40    execution.  If you execute SQL through the connection, explicitly41    closing all cursors and other resources is recommended.42    Events also receive a ``_ConnectionRecord``, a long-lived internal43    ``Pool`` object that basically represents a "slot" in the44    connection pool.  ``_ConnectionRecord`` objects have one public45    attribute of note: ``info``, a dictionary whose contents are46    scoped to the lifetime of the DB-API connection managed by the47    record.  You can use this shared storage area however you like.48    There is no need to subclass ``PoolListener`` to handle events.49    Any class that implements one or more of these methods can be used50    as a pool listener.  The ``Pool`` will inspect the methods51    provided by a listener object and add the listener to one or more52    internal event queues based on its capabilities.  In terms of53    efficiency and function call overhead, you're much better off only54    providing implementations for the hooks you'll be using.55    """56    @classmethod57    def _adapt_listener(cls, self, listener):58        """Adapt a :class:`.PoolListener` to individual59        :class:`event.Dispatch` events.60        """61        listener = util.as_interface(listener,62                                     methods=('connect', 'first_connect',63                                              'checkout', 'checkin'))64        if hasattr(listener, 'connect'):65            event.listen(self, 'connect', listener.connect)66        if hasattr(listener, 'first_connect'):67            event.listen(self, 'first_connect', listener.first_connect)68        if hasattr(listener, 'checkout'):69            event.listen(self, 'checkout', listener.checkout)70        if hasattr(listener, 'checkin'):71            event.listen(self, 'checkin', listener.checkin)72    def connect(self, dbapi_con, con_record):73        """Called once for each new DB-API connection or Pool's ``creator()``.74        dbapi_con75          A newly connected raw DB-API connection (not a SQLAlchemy76          ``Connection`` wrapper).77        con_record78          The ``_ConnectionRecord`` that persistently manages the connection79        """80    def first_connect(self, dbapi_con, con_record):81        """Called exactly once for the first DB-API connection.82        dbapi_con83          A newly connected raw DB-API connection (not a SQLAlchemy84          ``Connection`` wrapper).85        con_record86          The ``_ConnectionRecord`` that persistently manages the connection87        """88    def checkout(self, dbapi_con, con_record, con_proxy):89        """Called when a connection is retrieved from the Pool.90        dbapi_con91          A raw DB-API connection92        con_record93          The ``_ConnectionRecord`` that persistently manages the connection94        con_proxy95          The ``_ConnectionFairy`` which manages the connection for the span of96          the current checkout.97        If you raise an ``exc.DisconnectionError``, the current98        connection will be disposed and a fresh connection retrieved.99        Processing of all checkout listeners will abort and restart100        using the new connection.101        """102    def checkin(self, dbapi_con, con_record):103        """Called when a connection returns to the pool.104        Note that the connection may be closed, and may be None if the105        connection has been invalidated.  ``checkin`` will not be called106        for detached connections.  (They do not return to the pool.)107        dbapi_con108          A raw DB-API connection109        con_record110          The ``_ConnectionRecord`` that persistently manages the connection111        """112class ConnectionProxy(object):113    """Allows interception of statement execution by Connections.114    .. note::115       :class:`.ConnectionProxy` is deprecated.   Please116       refer to :class:`.ConnectionEvents`.117    Either or both of the ``execute()`` and ``cursor_execute()``118    may be implemented to intercept compiled statement and119    cursor level executions, e.g.::120        class MyProxy(ConnectionProxy):121            def execute(self, conn, execute, clauseelement,122                        *multiparams, **params):123                print "compiled statement:", clauseelement124                return execute(clauseelement, *multiparams, **params)125            def cursor_execute(self, execute, cursor, statement,126                               parameters, context, executemany):127                print "raw statement:", statement128                return execute(cursor, statement, parameters, context)129    The ``execute`` argument is a function that will fulfill the default130    execution behavior for the operation.  The signature illustrated131    in the example should be used.132    The proxy is installed into an :class:`~sqlalchemy.engine.Engine` via133    the ``proxy`` argument::134        e = create_engine('someurl://', proxy=MyProxy())135    """136    @classmethod137    def _adapt_listener(cls, self, listener):138        def adapt_execute(conn, clauseelement, multiparams, params):139            def execute_wrapper(clauseelement, *multiparams, **params):140                return clauseelement, multiparams, params141            return listener.execute(conn, execute_wrapper,142                                    clauseelement, *multiparams,143                                    **params)144        event.listen(self, 'before_execute', adapt_execute)145        def adapt_cursor_execute(conn, cursor, statement,146                                 parameters, context, executemany):147            def execute_wrapper(148                cursor,149                statement,150                parameters,151                context,152            ):153                return statement, parameters154            return listener.cursor_execute(155                execute_wrapper,156                cursor,157                statement,158                parameters,159                context,160                executemany,161                )162        event.listen(self, 'before_cursor_execute', adapt_cursor_execute)163        def do_nothing_callback(*arg, **kw):164            pass165        def adapt_listener(fn):166            def go(conn, *arg, **kw):167                fn(conn, do_nothing_callback, *arg, **kw)168            return util.update_wrapper(go, fn)169        event.listen(self, 'begin', adapt_listener(listener.begin))170        event.listen(self, 'rollback',171                     adapt_listener(listener.rollback))172        event.listen(self, 'commit', adapt_listener(listener.commit))173        event.listen(self, 'savepoint',174                     adapt_listener(listener.savepoint))175        event.listen(self, 'rollback_savepoint',176                     adapt_listener(listener.rollback_savepoint))177        event.listen(self, 'release_savepoint',178                     adapt_listener(listener.release_savepoint))179        event.listen(self, 'begin_twophase',180                     adapt_listener(listener.begin_twophase))181        event.listen(self, 'prepare_twophase',182                     adapt_listener(listener.prepare_twophase))183        event.listen(self, 'rollback_twophase',184                     adapt_listener(listener.rollback_twophase))185        event.listen(self, 'commit_twophase',186                     adapt_listener(listener.commit_twophase))187    def execute(self, conn, execute, clauseelement, *multiparams, **params):188        """Intercept high level execute() events."""189        return execute(clauseelement, *multiparams, **params)190    def cursor_execute(self, execute, cursor, statement, parameters,191                       context, executemany):192        """Intercept low-level cursor execute() events."""193        return execute(cursor, statement, parameters, context)194    def begin(self, conn, begin):195        """Intercept begin() events."""196        return begin()197    def rollback(self, conn, rollback):198        """Intercept rollback() events."""199        return rollback()200    def commit(self, conn, commit):...events.py
Source:events.py  
...43        self.kwargs = kwargs44        self.removeMethod = removeMethod45        self.removeMethodArgs = (self,)46    def handleEvent(self, event):47        self.listener(event, *self.args, **self.kwargs)48    def unlisten(self):49        self.removeMethod(*self.removeMethodArgs)50def addEventListener(target, eventInterface, event, listener, *args, **kwargs):51    """52    Adds an event listener to `target`.53    :param target: an object that supports listening to the events of the given54                   type (the add*Listener methods must be inherited from a Java55                   class so that autodetection will work)56    :param eventInterface: the interface that the listener wrapper has to57                           implement (e.g. :class:`java.awt.MouseListener`)58    :param event: name(s) of the event(s) to listen for (e.g. "mouseClicked")59    :param listener: callable that is called with ``(event, *args, **kwargs)``60                     when the event is fired61    :type eventInterface: Java interface...domstorage.py
Source:domstorage.py  
...84        """85        event_name = "DOMStorage.domStorageItemAdded"86        if listener is None:87            future = self.client.loop.create_future()88            def _listener(event: Optional[Dict] = None) -> None:89                future.set_result(event)90            self.client.once(event_name, _listener)91            return future92        self.client.on(event_name, listener)93        return lambda: self.client.remove_listener(event_name, listener)94    def domStorageItemRemoved(95        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None96    ) -> Any:97        """98        See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemRemoved`99        :param listener: Optional listener function100        :return: If a listener was supplied the return value is a callable that101        will remove the supplied listener otherwise a future that resolves102        with the value of the event103        """104        event_name = "DOMStorage.domStorageItemRemoved"105        if listener is None:106            future = self.client.loop.create_future()107            def _listener(event: Optional[Dict] = None) -> None:108                future.set_result(event)109            self.client.once(event_name, _listener)110            return future111        self.client.on(event_name, listener)112        return lambda: self.client.remove_listener(event_name, listener)113    def domStorageItemUpdated(114        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None115    ) -> Any:116        """117        See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemUpdated`118        :param listener: Optional listener function119        :return: If a listener was supplied the return value is a callable that120        will remove the supplied listener otherwise a future that resolves121        with the value of the event122        """123        event_name = "DOMStorage.domStorageItemUpdated"124        if listener is None:125            future = self.client.loop.create_future()126            def _listener(event: Optional[Dict] = None) -> None:127                future.set_result(event)128            self.client.once(event_name, _listener)129            return future130        self.client.on(event_name, listener)131        return lambda: self.client.remove_listener(event_name, listener)132    def domStorageItemsCleared(133        self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None134    ) -> Any:135        """136        See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemsCleared`137        :param listener: Optional listener function138        :return: If a listener was supplied the return value is a callable that139        will remove the supplied listener otherwise a future that resolves140        with the value of the event141        """142        event_name = "DOMStorage.domStorageItemsCleared"143        if listener is None:144            future = self.client.loop.create_future()145            def _listener(event: Optional[Dict] = None) -> None:146                future.set_result(event)147            self.client.once(event_name, _listener)148            return future149        self.client.on(event_name, listener)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
