How to use listener method in ATX

Best Python code snippet using ATX

network.py

Source:network.py Github

copy

Full Screen

...444 """445 event_name = "Network.dataReceived"446 if listener is None:447 future = self.client.loop.create_future()448 def _listener(event: Optional[Dict] = None) -> None:449 future.set_result(event)450 self.client.once(event_name, _listener)451 return future452 self.client.on(event_name, listener)453 return lambda: self.client.remove_listener(event_name, listener)454 def eventSourceMessageReceived(455 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None456 ) -> Any:457 """458 Fired when EventSource message is received.459 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-eventSourceMessageReceived`460 :param listener: Optional listener function461 :return: If a listener was supplied the return value is a callable that462 will remove the supplied listener otherwise a future that resolves463 with the value of the event464 """465 event_name = "Network.eventSourceMessageReceived"466 if listener is None:467 future = self.client.loop.create_future()468 def _listener(event: Optional[Dict] = None) -> None:469 future.set_result(event)470 self.client.once(event_name, _listener)471 return future472 self.client.on(event_name, listener)473 return lambda: self.client.remove_listener(event_name, listener)474 def loadingFailed(475 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None476 ) -> Any:477 """478 Fired when HTTP request has failed to load.479 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-loadingFailed`480 :param listener: Optional listener function481 :return: If a listener was supplied the return value is a callable that482 will remove the supplied listener otherwise a future that resolves483 with the value of the event484 """485 event_name = "Network.loadingFailed"486 if listener is None:487 future = self.client.loop.create_future()488 def _listener(event: Optional[Dict] = None) -> None:489 future.set_result(event)490 self.client.once(event_name, _listener)491 return future492 self.client.on(event_name, listener)493 return lambda: self.client.remove_listener(event_name, listener)494 def loadingFinished(495 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None496 ) -> Any:497 """498 Fired when HTTP request has finished loading.499 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-loadingFinished`500 :param listener: Optional listener function501 :return: If a listener was supplied the return value is a callable that502 will remove the supplied listener otherwise a future that resolves503 with the value of the event504 """505 event_name = "Network.loadingFinished"506 if listener is None:507 future = self.client.loop.create_future()508 def _listener(event: Optional[Dict] = None) -> None:509 future.set_result(event)510 self.client.once(event_name, _listener)511 return future512 self.client.on(event_name, listener)513 return lambda: self.client.remove_listener(event_name, listener)514 def requestIntercepted(515 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None516 ) -> Any:517 """518 Details of an intercepted HTTP request, which must be either allowed, blocked, modified or519 mocked.520 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestIntercepted`521 :param listener: Optional listener function522 :return: If a listener was supplied the return value is a callable that523 will remove the supplied listener otherwise a future that resolves524 with the value of the event525 """526 event_name = "Network.requestIntercepted"527 if listener is None:528 future = self.client.loop.create_future()529 def _listener(event: Optional[Dict] = None) -> None:530 future.set_result(event)531 self.client.once(event_name, _listener)532 return future533 self.client.on(event_name, listener)534 return lambda: self.client.remove_listener(event_name, listener)535 def requestServedFromCache(536 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None537 ) -> Any:538 """539 Fired if request ended up loading from cache.540 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestServedFromCache`541 :param listener: Optional listener function542 :return: If a listener was supplied the return value is a callable that543 will remove the supplied listener otherwise a future that resolves544 with the value of the event545 """546 event_name = "Network.requestServedFromCache"547 if listener is None:548 future = self.client.loop.create_future()549 def _listener(event: Optional[Dict] = None) -> None:550 future.set_result(event)551 self.client.once(event_name, _listener)552 return future553 self.client.on(event_name, listener)554 return lambda: self.client.remove_listener(event_name, listener)555 def requestWillBeSent(556 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None557 ) -> Any:558 """559 Fired when page is about to send HTTP request.560 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-requestWillBeSent`561 :param listener: Optional listener function562 :return: If a listener was supplied the return value is a callable that563 will remove the supplied listener otherwise a future that resolves564 with the value of the event565 """566 event_name = "Network.requestWillBeSent"567 if listener is None:568 future = self.client.loop.create_future()569 def _listener(event: Optional[Dict] = None) -> None:570 future.set_result(event)571 self.client.once(event_name, _listener)572 return future573 self.client.on(event_name, listener)574 return lambda: self.client.remove_listener(event_name, listener)575 def resourceChangedPriority(576 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None577 ) -> Any:578 """579 Fired when resource loading priority is changed580 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-resourceChangedPriority`581 :param listener: Optional listener function582 :return: If a listener was supplied the return value is a callable that583 will remove the supplied listener otherwise a future that resolves584 with the value of the event585 """586 event_name = "Network.resourceChangedPriority"587 if listener is None:588 future = self.client.loop.create_future()589 def _listener(event: Optional[Dict] = None) -> None:590 future.set_result(event)591 self.client.once(event_name, _listener)592 return future593 self.client.on(event_name, listener)594 return lambda: self.client.remove_listener(event_name, listener)595 def signedExchangeReceived(596 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None597 ) -> Any:598 """599 Fired when a signed exchange was received over the network600 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-signedExchangeReceived`601 :param listener: Optional listener function602 :return: If a listener was supplied the return value is a callable that603 will remove the supplied listener otherwise a future that resolves604 with the value of the event605 """606 event_name = "Network.signedExchangeReceived"607 if listener is None:608 future = self.client.loop.create_future()609 def _listener(event: Optional[Dict] = None) -> None:610 future.set_result(event)611 self.client.once(event_name, _listener)612 return future613 self.client.on(event_name, listener)614 return lambda: self.client.remove_listener(event_name, listener)615 def responseReceived(616 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None617 ) -> Any:618 """619 Fired when HTTP response is available.620 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-responseReceived`621 :param listener: Optional listener function622 :return: If a listener was supplied the return value is a callable that623 will remove the supplied listener otherwise a future that resolves624 with the value of the event625 """626 event_name = "Network.responseReceived"627 if listener is None:628 future = self.client.loop.create_future()629 def _listener(event: Optional[Dict] = None) -> None:630 future.set_result(event)631 self.client.once(event_name, _listener)632 return future633 self.client.on(event_name, listener)634 return lambda: self.client.remove_listener(event_name, listener)635 def webSocketClosed(636 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None637 ) -> Any:638 """639 Fired when WebSocket is closed.640 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketClosed`641 :param listener: Optional listener function642 :return: If a listener was supplied the return value is a callable that643 will remove the supplied listener otherwise a future that resolves644 with the value of the event645 """646 event_name = "Network.webSocketClosed"647 if listener is None:648 future = self.client.loop.create_future()649 def _listener(event: Optional[Dict] = None) -> None:650 future.set_result(event)651 self.client.once(event_name, _listener)652 return future653 self.client.on(event_name, listener)654 return lambda: self.client.remove_listener(event_name, listener)655 def webSocketCreated(656 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None657 ) -> Any:658 """659 Fired upon WebSocket creation.660 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketCreated`661 :param listener: Optional listener function662 :return: If a listener was supplied the return value is a callable that663 will remove the supplied listener otherwise a future that resolves664 with the value of the event665 """666 event_name = "Network.webSocketCreated"667 if listener is None:668 future = self.client.loop.create_future()669 def _listener(event: Optional[Dict] = None) -> None:670 future.set_result(event)671 self.client.once(event_name, _listener)672 return future673 self.client.on(event_name, listener)674 return lambda: self.client.remove_listener(event_name, listener)675 def webSocketFrameError(676 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None677 ) -> Any:678 """679 Fired when WebSocket message error occurs.680 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameError`681 :param listener: Optional listener function682 :return: If a listener was supplied the return value is a callable that683 will remove the supplied listener otherwise a future that resolves684 with the value of the event685 """686 event_name = "Network.webSocketFrameError"687 if listener is None:688 future = self.client.loop.create_future()689 def _listener(event: Optional[Dict] = None) -> None:690 future.set_result(event)691 self.client.once(event_name, _listener)692 return future693 self.client.on(event_name, listener)694 return lambda: self.client.remove_listener(event_name, listener)695 def webSocketFrameReceived(696 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None697 ) -> Any:698 """699 Fired when WebSocket message is received.700 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameReceived`701 :param listener: Optional listener function702 :return: If a listener was supplied the return value is a callable that703 will remove the supplied listener otherwise a future that resolves704 with the value of the event705 """706 event_name = "Network.webSocketFrameReceived"707 if listener is None:708 future = self.client.loop.create_future()709 def _listener(event: Optional[Dict] = None) -> None:710 future.set_result(event)711 self.client.once(event_name, _listener)712 return future713 self.client.on(event_name, listener)714 return lambda: self.client.remove_listener(event_name, listener)715 def webSocketFrameSent(716 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None717 ) -> Any:718 """719 Fired when WebSocket message is sent.720 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketFrameSent`721 :param listener: Optional listener function722 :return: If a listener was supplied the return value is a callable that723 will remove the supplied listener otherwise a future that resolves724 with the value of the event725 """726 event_name = "Network.webSocketFrameSent"727 if listener is None:728 future = self.client.loop.create_future()729 def _listener(event: Optional[Dict] = None) -> None:730 future.set_result(event)731 self.client.once(event_name, _listener)732 return future733 self.client.on(event_name, listener)734 return lambda: self.client.remove_listener(event_name, listener)735 def webSocketHandshakeResponseReceived(736 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None737 ) -> Any:738 """739 Fired when WebSocket handshake response becomes available.740 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketHandshakeResponseReceived`741 :param listener: Optional listener function742 :return: If a listener was supplied the return value is a callable that743 will remove the supplied listener otherwise a future that resolves744 with the value of the event745 """746 event_name = "Network.webSocketHandshakeResponseReceived"747 if listener is None:748 future = self.client.loop.create_future()749 def _listener(event: Optional[Dict] = None) -> None:750 future.set_result(event)751 self.client.once(event_name, _listener)752 return future753 self.client.on(event_name, listener)754 return lambda: self.client.remove_listener(event_name, listener)755 def webSocketWillSendHandshakeRequest(756 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None757 ) -> Any:758 """759 Fired when WebSocket is about to initiate handshake.760 See `https://chromedevtools.github.io/devtools-protocol/tot/Network#event-webSocketWillSendHandshakeRequest`761 :param listener: Optional listener function762 :return: If a listener was supplied the return value is a callable that763 will remove the supplied listener otherwise a future that resolves764 with the value of the event765 """766 event_name = "Network.webSocketWillSendHandshakeRequest"767 if listener is None:768 future = self.client.loop.create_future()769 def _listener(event: Optional[Dict] = None) -> None:770 future.set_result(event)771 self.client.once(event_name, _listener)772 return future773 self.client.on(event_name, listener)...

Full Screen

Full Screen

interfaces.py

Source:interfaces.py Github

copy

Full Screen

...22 # etc.23 # create a new pool with a listener24 p = QueuePool(..., listeners=[MyListener()])25 # add a listener after the fact26 p.add_listener(MyListener())27 # usage with create_engine()28 e = create_engine("url://", listeners=[MyListener()])29 All of the standard connection :class:`~sqlalchemy.pool.Pool` types can30 accept event listeners for key connection lifecycle events:31 creation, pool check-out and check-in. There are no events fired32 when a connection closes.33 For any given DB-API connection, there will be one ``connect``34 event, `n` number of ``checkout`` events, and either `n` or `n - 1`35 ``checkin`` events. (If a ``Connection`` is detached from its36 pool via the ``detach()`` method, it won't be checked back in.)37 These are low-level events for low-level objects: raw Python38 DB-API connections, without the conveniences of the SQLAlchemy39 ``Connection`` wrapper, ``Dialect`` services or ``ClauseElement``40 execution. If you execute SQL through the connection, explicitly41 closing all cursors and other resources is recommended.42 Events also receive a ``_ConnectionRecord``, a long-lived internal43 ``Pool`` object that basically represents a "slot" in the44 connection pool. ``_ConnectionRecord`` objects have one public45 attribute of note: ``info``, a dictionary whose contents are46 scoped to the lifetime of the DB-API connection managed by the47 record. You can use this shared storage area however you like.48 There is no need to subclass ``PoolListener`` to handle events.49 Any class that implements one or more of these methods can be used50 as a pool listener. The ``Pool`` will inspect the methods51 provided by a listener object and add the listener to one or more52 internal event queues based on its capabilities. In terms of53 efficiency and function call overhead, you're much better off only54 providing implementations for the hooks you'll be using.55 """56 @classmethod57 def _adapt_listener(cls, self, listener):58 """Adapt a :class:`.PoolListener` to individual59 :class:`event.Dispatch` events.60 """61 listener = util.as_interface(listener,62 methods=('connect', 'first_connect',63 'checkout', 'checkin'))64 if hasattr(listener, 'connect'):65 event.listen(self, 'connect', listener.connect)66 if hasattr(listener, 'first_connect'):67 event.listen(self, 'first_connect', listener.first_connect)68 if hasattr(listener, 'checkout'):69 event.listen(self, 'checkout', listener.checkout)70 if hasattr(listener, 'checkin'):71 event.listen(self, 'checkin', listener.checkin)72 def connect(self, dbapi_con, con_record):73 """Called once for each new DB-API connection or Pool's ``creator()``.74 dbapi_con75 A newly connected raw DB-API connection (not a SQLAlchemy76 ``Connection`` wrapper).77 con_record78 The ``_ConnectionRecord`` that persistently manages the connection79 """80 def first_connect(self, dbapi_con, con_record):81 """Called exactly once for the first DB-API connection.82 dbapi_con83 A newly connected raw DB-API connection (not a SQLAlchemy84 ``Connection`` wrapper).85 con_record86 The ``_ConnectionRecord`` that persistently manages the connection87 """88 def checkout(self, dbapi_con, con_record, con_proxy):89 """Called when a connection is retrieved from the Pool.90 dbapi_con91 A raw DB-API connection92 con_record93 The ``_ConnectionRecord`` that persistently manages the connection94 con_proxy95 The ``_ConnectionFairy`` which manages the connection for the span of96 the current checkout.97 If you raise an ``exc.DisconnectionError``, the current98 connection will be disposed and a fresh connection retrieved.99 Processing of all checkout listeners will abort and restart100 using the new connection.101 """102 def checkin(self, dbapi_con, con_record):103 """Called when a connection returns to the pool.104 Note that the connection may be closed, and may be None if the105 connection has been invalidated. ``checkin`` will not be called106 for detached connections. (They do not return to the pool.)107 dbapi_con108 A raw DB-API connection109 con_record110 The ``_ConnectionRecord`` that persistently manages the connection111 """112class ConnectionProxy(object):113 """Allows interception of statement execution by Connections.114 .. note::115 :class:`.ConnectionProxy` is deprecated. Please116 refer to :class:`.ConnectionEvents`.117 Either or both of the ``execute()`` and ``cursor_execute()``118 may be implemented to intercept compiled statement and119 cursor level executions, e.g.::120 class MyProxy(ConnectionProxy):121 def execute(self, conn, execute, clauseelement,122 *multiparams, **params):123 print "compiled statement:", clauseelement124 return execute(clauseelement, *multiparams, **params)125 def cursor_execute(self, execute, cursor, statement,126 parameters, context, executemany):127 print "raw statement:", statement128 return execute(cursor, statement, parameters, context)129 The ``execute`` argument is a function that will fulfill the default130 execution behavior for the operation. The signature illustrated131 in the example should be used.132 The proxy is installed into an :class:`~sqlalchemy.engine.Engine` via133 the ``proxy`` argument::134 e = create_engine('someurl://', proxy=MyProxy())135 """136 @classmethod137 def _adapt_listener(cls, self, listener):138 def adapt_execute(conn, clauseelement, multiparams, params):139 def execute_wrapper(clauseelement, *multiparams, **params):140 return clauseelement, multiparams, params141 return listener.execute(conn, execute_wrapper,142 clauseelement, *multiparams,143 **params)144 event.listen(self, 'before_execute', adapt_execute)145 def adapt_cursor_execute(conn, cursor, statement,146 parameters, context, executemany):147 def execute_wrapper(148 cursor,149 statement,150 parameters,151 context,152 ):153 return statement, parameters154 return listener.cursor_execute(155 execute_wrapper,156 cursor,157 statement,158 parameters,159 context,160 executemany,161 )162 event.listen(self, 'before_cursor_execute', adapt_cursor_execute)163 def do_nothing_callback(*arg, **kw):164 pass165 def adapt_listener(fn):166 def go(conn, *arg, **kw):167 fn(conn, do_nothing_callback, *arg, **kw)168 return util.update_wrapper(go, fn)169 event.listen(self, 'begin', adapt_listener(listener.begin))170 event.listen(self, 'rollback',171 adapt_listener(listener.rollback))172 event.listen(self, 'commit', adapt_listener(listener.commit))173 event.listen(self, 'savepoint',174 adapt_listener(listener.savepoint))175 event.listen(self, 'rollback_savepoint',176 adapt_listener(listener.rollback_savepoint))177 event.listen(self, 'release_savepoint',178 adapt_listener(listener.release_savepoint))179 event.listen(self, 'begin_twophase',180 adapt_listener(listener.begin_twophase))181 event.listen(self, 'prepare_twophase',182 adapt_listener(listener.prepare_twophase))183 event.listen(self, 'rollback_twophase',184 adapt_listener(listener.rollback_twophase))185 event.listen(self, 'commit_twophase',186 adapt_listener(listener.commit_twophase))187 def execute(self, conn, execute, clauseelement, *multiparams, **params):188 """Intercept high level execute() events."""189 return execute(clauseelement, *multiparams, **params)190 def cursor_execute(self, execute, cursor, statement, parameters,191 context, executemany):192 """Intercept low-level cursor execute() events."""193 return execute(cursor, statement, parameters, context)194 def begin(self, conn, begin):195 """Intercept begin() events."""196 return begin()197 def rollback(self, conn, rollback):198 """Intercept rollback() events."""199 return rollback()200 def commit(self, conn, commit):...

Full Screen

Full Screen

events.py

Source:events.py Github

copy

Full Screen

...43 self.kwargs = kwargs44 self.removeMethod = removeMethod45 self.removeMethodArgs = (self,)46 def handleEvent(self, event):47 self.listener(event, *self.args, **self.kwargs)48 def unlisten(self):49 self.removeMethod(*self.removeMethodArgs)50def addEventListener(target, eventInterface, event, listener, *args, **kwargs):51 """52 Adds an event listener to `target`.53 :param target: an object that supports listening to the events of the given54 type (the add*Listener methods must be inherited from a Java55 class so that autodetection will work)56 :param eventInterface: the interface that the listener wrapper has to57 implement (e.g. :class:`java.awt.MouseListener`)58 :param event: name(s) of the event(s) to listen for (e.g. "mouseClicked")59 :param listener: callable that is called with ``(event, *args, **kwargs)``60 when the event is fired61 :type eventInterface: Java interface...

Full Screen

Full Screen

domstorage.py

Source:domstorage.py Github

copy

Full Screen

...84 """85 event_name = "DOMStorage.domStorageItemAdded"86 if listener is None:87 future = self.client.loop.create_future()88 def _listener(event: Optional[Dict] = None) -> None:89 future.set_result(event)90 self.client.once(event_name, _listener)91 return future92 self.client.on(event_name, listener)93 return lambda: self.client.remove_listener(event_name, listener)94 def domStorageItemRemoved(95 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None96 ) -> Any:97 """98 See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemRemoved`99 :param listener: Optional listener function100 :return: If a listener was supplied the return value is a callable that101 will remove the supplied listener otherwise a future that resolves102 with the value of the event103 """104 event_name = "DOMStorage.domStorageItemRemoved"105 if listener is None:106 future = self.client.loop.create_future()107 def _listener(event: Optional[Dict] = None) -> None:108 future.set_result(event)109 self.client.once(event_name, _listener)110 return future111 self.client.on(event_name, listener)112 return lambda: self.client.remove_listener(event_name, listener)113 def domStorageItemUpdated(114 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None115 ) -> Any:116 """117 See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemUpdated`118 :param listener: Optional listener function119 :return: If a listener was supplied the return value is a callable that120 will remove the supplied listener otherwise a future that resolves121 with the value of the event122 """123 event_name = "DOMStorage.domStorageItemUpdated"124 if listener is None:125 future = self.client.loop.create_future()126 def _listener(event: Optional[Dict] = None) -> None:127 future.set_result(event)128 self.client.once(event_name, _listener)129 return future130 self.client.on(event_name, listener)131 return lambda: self.client.remove_listener(event_name, listener)132 def domStorageItemsCleared(133 self, listener: Optional[Callable[[Dict[str, Any]], Any]] = None134 ) -> Any:135 """136 See `https://chromedevtools.github.io/devtools-protocol/tot/DOMStorage#event-domStorageItemsCleared`137 :param listener: Optional listener function138 :return: If a listener was supplied the return value is a callable that139 will remove the supplied listener otherwise a future that resolves140 with the value of the event141 """142 event_name = "DOMStorage.domStorageItemsCleared"143 if listener is None:144 future = self.client.loop.create_future()145 def _listener(event: Optional[Dict] = None) -> None:146 future.set_result(event)147 self.client.once(event_name, _listener)148 return future149 self.client.on(event_name, listener)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful