How to use _on_close method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

websocket_transport.py

Source: websocket_transport.py Github

copy
1import websocket
2import threading
3import requests
4import traceback
5import time
6import ssl
7from .reconnection import ConnectionStateChecker
8from .connection import ConnectionState
9from ...messages.ping_message import PingMessage
10from ...hub.errors import HubError, HubConnectionError, UnAuthorizedHubError
11from ...protocol.messagepack_protocol import MessagePackHubProtocol
12from ...protocol.json_hub_protocol import JsonHubProtocol
13from ..base_transport import BaseTransport
14from ...helpers import Helpers
15
16class WebsocketTransport(BaseTransport):
17    def __init__(self,
18            url="",
19            headers=None,
20            keep_alive_interval=15,
21            reconnection_handler=None,
22            verify_ssl=False,
23            skip_negotiation=False,
24            enable_trace=False,
25            **kwargs):
26        super(WebsocketTransport, self).__init__(**kwargs)
27        self._ws = None
28        self.enable_trace = enable_trace
29        self._thread = None
30        self.skip_negotiation = skip_negotiation
31        self.url = url
32        if headers is None:
33            self.headers = dict()
34        else:
35            self.headers = headers
36        self.handshake_received = False
37        self.token = None  # auth
38        self.state = ConnectionState.disconnected
39        self.connection_alive = False
40        self._thread = None
41        self._ws = None
42        self.verify_ssl = verify_ssl
43        self.connection_checker = ConnectionStateChecker(
44            lambda: self.send(PingMessage()),
45            keep_alive_interval
46        )
47        self.reconnection_handler = reconnection_handler
48
49        if len(self.logger.handlers) > 0:
50            websocket.enableTrace(self.enable_trace, self.logger.handlers[0])
51    
52    def is_running(self):
53        return self.state != ConnectionState.disconnected
54
55    def stop(self):
56        if self.state == ConnectionState.connected:
57            self.connection_checker.stop()
58            self._ws.close()
59            self.state = ConnectionState.disconnected
60            self.handshake_received = False
61
62    def start(self):
63        if not self.skip_negotiation:
64            self.negotiate()
65
66        if self.state == ConnectionState.connected:
67            self.logger.warning("Already connected unable to start")
68            return False
69
70        self.state = ConnectionState.connecting
71        self.logger.debug("start url:" + self.url)
72        
73        self._ws = websocket.WebSocketApp(
74            self.url,
75            header=self.headers,
76            on_message=self.on_message,
77            on_error=self.on_socket_error,
78            on_close=self.on_close,
79            on_open=self.on_open,
80            )
81            
82        self._thread = threading.Thread(
83            target=lambda: self._ws.run_forever(
84                sslopt={"cert_reqs": ssl.CERT_NONE}
85                if not self.verify_ssl else {}
86            ))
87        self._thread.daemon = True
88        self._thread.start()
89        return True
90
91    def negotiate(self):
92        negotiate_url = Helpers.get_negotiate_url(self.url)
93        self.logger.debug("Negotiate url:{0}".format(negotiate_url))
94
95        response = requests.post(
96            negotiate_url, headers=self.headers, verify=self.verify_ssl)
97        self.logger.debug(
98            "Response status code{0}".format(response.status_code))
99
100        if response.status_code != 200:
101            raise HubError(response.status_code)\
102                if response.status_code != 401 else UnAuthorizedHubError()
103
104        data = response.json()
105
106        if "connectionId" in data.keys():
107            self.url = Helpers.encode_connection_id(
108                self.url, data["connectionId"])
109
110        # Azure
111        if 'url' in data.keys() and 'accessToken' in data.keys():
112            Helpers.get_logger().debug(
113                "Azure url, reformat headers, token and url {0}".format(data))
114            self.url = data["url"]\
115                if data["url"].startswith("ws") else\
116                Helpers.http_to_websocket(data["url"])
117            self.token = data["accessToken"]
118            self.headers = {"Authorization": "Bearer " + self.token}
119
120
121    def evaluate_handshake(self, message):
122        self.logger.debug("Evaluating handshake {0}".format(message))
123        msg, messages = self.protocol.decode_handshake(message)
124        if msg.error is None or msg.error == "":
125            self.handshake_received = True
126            self.state = ConnectionState.connected
127            if self.reconnection_handler is not None:
128                self.reconnection_handler.reconnecting = False
129                if not self.connection_checker.running:
130                    self.connection_checker.start()
131        else:
132            self.logger.error(msg.error)
133            self.on_socket_error(self._ws, msg.error)
134            self.stop()
135            self.state = ConnectionState.disconnected
136        return messages
137
138    def on_open(self, _):
139        self.logger.debug("-- web socket open --")
140        msg = self.protocol.handshake_message()
141        self.send(msg)
142
143    def on_close(self, callback, close_status_code, close_reason):
144        self.logger.debug("-- web socket close --")
145        self.logger.debug(close_status_code)
146        self.logger.debug(close_reason)
147        self.state = ConnectionState.disconnected
148        if self._on_close is not None and callable(self._on_close):
149            self._on_close()
150        if callback is not None and callable(callback):
151            callback()
152
153    def on_reconnect(self):
154        self.logger.debug("-- web socket reconnecting --")
155        self.state = ConnectionState.disconnected
156        if self._on_close is not None and callable(self._on_close):
157            self._on_close()
158
159    def on_socket_error(self, app, error):
160        """
161        Args:
162            _: Required to support websocket-client version equal or greater than 0.58.0
163            error ([type]): [description]
164
165        Raises:
166            HubError: [description]
167        """
168        self.logger.debug("-- web socket error --")
169        self.logger.error(traceback.format_exc(10, True))
170        self.logger.error("{0} {1}".format(self, error))
171        self.logger.error("{0} {1}".format(error, type(error)))
172        self._on_close()
173        self.state = ConnectionState.disconnected
174        #raise HubError(error)
175
176    def on_message(self, app, raw_message):
177        self.logger.debug("Message received{0}".format(raw_message))
178        if not self.handshake_received:
179            messages = self.evaluate_handshake(raw_message)
180            if self._on_open is not None and callable(self._on_open):
181                self.state = ConnectionState.connected
182                self._on_open()
183
184            if len(messages) > 0:
185                return self._on_message(messages)
186
187            return []
188        
189        return self._on_message(
190            self.protocol.parse_messages(raw_message))
191
192    def send(self, message):
193        self.logger.debug("Sending message {0}".format(message))
194        try:
195            self._ws.send(
196                self.protocol.encode(message),
197                opcode=0x2
198                if type(self.protocol) == MessagePackHubProtocol else
199                0x1)
200            self.connection_checker.last_message = time.time()
201            if self.reconnection_handler is not None:
202                self.reconnection_handler.reset()
203        except (
204                websocket._exceptions.WebSocketConnectionClosedException,
205                OSError) as ex:
206            self.handshake_received = False
207            self.logger.warning("Connection closed {0}".format(ex))
208            self.state = ConnectionState.disconnected
209            if self.reconnection_handler is None:
210                if self._on_close is not None and\
211                        callable(self._on_close):
212                    self._on_close()
213                raise ValueError(str(ex))
214            # Connection closed
215            self.handle_reconnect()
216        except Exception as ex:
217            raise ex
218
219    def handle_reconnect(self):
220        if not self.reconnection_handler.reconnecting and self._on_reconnect is not None and \
221                callable(self._on_reconnect):
222            self._on_reconnect()
223        self.reconnection_handler.reconnecting = True
224        try:
225            self.stop()
226            self.start()
227        except Exception as ex:
228            self.logger.error(ex)
229            sleep_time = self.reconnection_handler.next()
230            threading.Thread(
231                target=self.deferred_reconnect,
232                args=(sleep_time,)
233            ).start()
234
235    def deferred_reconnect(self, sleep_time):
236        time.sleep(sleep_time)
237        try:
238            if not self.connection_alive:
239                self.send(PingMessage())
240        except Exception as ex:
241            self.logger.error(ex)
242            self.reconnection_handler.reconnecting = False
243            self.connection_alive = False
Full Screen

popupdialog.py

Source: popupdialog.py Github

copy
1
2from __future__ import absolute_import
3from asciimatics.widgets import PopUpDialog as APopUpDialog
4
5
6class PopUpDialog(APopUpDialog):
7
8    def __init__(self,
9                 screen,
10                 text,
11                 buttons,
12                 on_click=None,
13                 on_close=None):
14
15        super(PopUpDialog, self).__init__(screen,
16                                          text,
17                                          buttons,
18                                          on_close=lambda x: self._on_parent_click(x),
19                                          has_shadow=True)
20        self._on_click = on_click
21        self._on_close = on_close
22
23    def _on_parent_click(self, selected):
24        """
25        On parent click
26        :param selected:
27        :return:
28        """
29
30        if self._on_click:
31            self._on_click(selected)
32
33        if self._on_close:
34            self._on_close()
35
36    def close(self):
37        """
38        Close the popup
39        :return:
40        """
41
42        if self in self._scene.effects:
43            self._scene.remove_effect(self)
44
45        if self._on_close:
46            self._on_close()
47
Full Screen

slider_template_dialog.py

Source: slider_template_dialog.py Github

copy
1"""
2This file is part of the Custom Slider Framework licensed under the Creative Commons Attribution-NoDerivatives 4.0 International public license (CC BY-ND 4.0).
3
4https://creativecommons.org/licenses/by-nd/4.0/
5https://creativecommons.org/licenses/by-nd/4.0/legalcode
6
7Copyright (c) COLONOLNUTTY
8"""
9from typing import Callable
10
11from cncustomsliderframework.commonlib.dialogs.option_dialogs.options.objects.common_dialog_input_text_option import \
12    CommonDialogInputTextOption
13from cncustomsliderframework.dtos.sliders.slider import CSFSlider
14from cncustomsliderframework.enums.string_ids import CSFStringId
15from cncustomsliderframework.modinfo import ModInfo
16from cncustomsliderframework.slider_templates.slider_template import CSFSliderTemplate
17from sims.sim_info import SimInfo
18from sims4communitylib.dialogs.common_choice_outcome import CommonChoiceOutcome
19from sims4communitylib.dialogs.common_ok_dialog import CommonOkDialog
20from sims4communitylib.dialogs.ok_cancel_dialog import CommonOkCancelDialog
21from sims4communitylib.dialogs.option_dialogs.common_choose_object_option_dialog import CommonChooseObjectOptionDialog
22from sims4communitylib.dialogs.option_dialogs.options.common_dialog_option_context import CommonDialogOptionContext
23from sims4communitylib.dialogs.option_dialogs.options.objects.common_dialog_action_option import \
24    CommonDialogActionOption
25from sims4communitylib.dialogs.option_dialogs.options.objects.common_dialog_object_option import \
26    CommonDialogObjectOption
27from sims4communitylib.logging.has_log import HasLog
28from sims4communitylib.mod_support.mod_identity import CommonModIdentity
29from sims4communitylib.utils.common_function_utils import CommonFunctionUtils
30from sims4communitylib.utils.common_icon_utils import CommonIconUtils
31from sims4communitylib.utils.sims.common_sim_name_utils import CommonSimNameUtils
32
33
34class CSFSliderTemplateDialog(HasLog):
35    """ A dialog for loading, saving, and applying slider templates. """
36    _SELECTED_TEMPLATE: CSFSliderTemplate = None
37    
38    def __init__(self, on_close: Callable[[], None]=CommonFunctionUtils.noop):
39        super().__init__()
40        self._on_close = on_close
41        from cncustomsliderframework.slider_templates.slider_template_utils import CSFSliderTemplateUtils
42        self._template_utils = CSFSliderTemplateUtils()
43
44    # noinspection PyMissingOrEmptyDocstring
45    @property
46    def mod_identity(self) -> CommonModIdentity:
47        return ModInfo.get_identity()
48
49    # noinspection PyMissingOrEmptyDocstring
50    @property
51    def log_identifier(self) -> str:
52        return 'csf_slider_template_dialog'
53
54    def open(self, sim_info: SimInfo, page: int=1) -> None:
55        """ Open the dialog. """
56        self.log.format_with_message('Opening dialog.', sim=sim_info)
57
58        def _on_close() -> None:
59            self.log.debug('Slider Template dialog closed.')
60            if self._on_close is not None:
61                self._on_close()
62
63        def _reopen() -> None:
64            self.log.debug('Reopening slider template dialog.')
65            self.open(sim_info, page=option_dialog.current_page)
66
67        option_dialog = CommonChooseObjectOptionDialog(
68            CSFStringId.SLIDER_TEMPLATES_NAME,
69            CSFStringId.SLIDER_TEMPLATES_DESCRIPTION,
70            mod_identity=self.mod_identity,
71            on_close=_on_close,
72            per_page=400
73        )
74
75        option_dialog.add_option(
76            CommonDialogActionOption(
77                CommonDialogOptionContext(
78                    CSFStringId.SELECTED_TEMPLATE,
79                    0,
80                    title_tokens=(CSFSliderTemplateDialog._SELECTED_TEMPLATE.template_name if CSFSliderTemplateDialog._SELECTED_TEMPLATE is not None else CSFStringId.NO_TEMPLATE_SELECTED,)
81                ),
82                on_chosen=lambda *_, **__: self._select_template(sim_info, on_close=_reopen)
83            )
84        )
85
86        def _on_apply_template_to_sim() -> None:
87            self.log.debug('Confirming all sliders reset.')
88            if CSFSliderTemplateDialog._SELECTED_TEMPLATE is None:
89                def _on_acknowledge(_) -> None:
90                    _reopen()
91
92                CommonOkDialog(
93                    CSFStringId.NO_TEMPLATE_SELECTED,
94                    CSFStringId.PLEASE_SELECT_A_TEMPLATE,
95                    mod_identity=self.mod_identity
96                ).show(on_acknowledged=_on_acknowledge)
97                return
98
99            def _on_confirm(_) -> None:
100                self.log.debug('Applying template to Sim.')
101                CSFSliderTemplateDialog._SELECTED_TEMPLATE.apply_to_sim(sim_info)
102                _reopen()
103
104            def _on_cancel(_) -> None:
105                self.log.debug('Cancelled template apply.')
106                _reopen()
107
108            CommonOkCancelDialog(
109                CSFStringId.CONFIRMATION,
110                CSFStringId.APPLY_TEMPLATE_TO_SIM_CONFIRMATION_DESCRIPTION,
111                description_tokens=(sim_info,),
112                mod_identity=self.mod_identity
113            ).show(on_ok_selected=_on_confirm, on_cancel_selected=_on_cancel)
114
115        self.log.debug('Opening Customize Slider dialog.')
116
117        option_dialog.add_option(
118            CommonDialogActionOption(
119                CommonDialogOptionContext(
120                    CSFStringId.APPLY_TEMPLATE_TO_SIM_NAME,
121                    CSFStringId.APPLY_TEMPLATE_TO_SIM_DESCRIPTION,
122                    description_tokens=(sim_info,),
123                    is_enabled=CSFSliderTemplateDialog._SELECTED_TEMPLATE is not None
124                ),
125                on_chosen=lambda *_, **__: _on_apply_template_to_sim()
126            )
127        )
128
129        def _on_save_as_template(_: str, template_name: str, outcome: CommonChoiceOutcome):
130            if _ is None or template_name is None or CommonChoiceOutcome.is_error_or_cancel(outcome):
131                self.log.debug('No template name entered, dialog closed.')
132                _reopen()
133                return
134            self.log.format_with_message('Template name entered.', template_name=template_name)
135            if template_name in self._template_utils.template_library:
136                def _on_yes(_) -> None:
137                    self.log.debug('Saving template.')
138                    self._template_utils.save_sliders_of(sim_info, template_name)
139                    _reopen()
140
141                def _on_no(_) -> None:
142                    self.log.debug('Cancelled saving template.')
143                    _reopen()
144
145                CommonOkCancelDialog(
146                    CSFStringId.TEMPLATE_ALREADY_EXISTS_NAME,
147                    CSFStringId.TEMPLATE_ALREADY_EXISTS_DESCRIPTION,
148                    description_tokens=(template_name,),
149                    ok_text_identifier=CSFStringId.YES,
150                    cancel_text_identifier=CSFStringId.NO,
151                    mod_identity=self.mod_identity
152                ).show(on_ok_selected=_on_yes, on_cancel_selected=_on_no)
153                return
154
155            self._template_utils.save_sliders_of(sim_info, template_name)
156            _reopen()
157
158        option_dialog.add_option(
159            CommonDialogInputTextOption(
160                self.mod_identity,
161                'Save Template From Sim',
162                CommonSimNameUtils.get_full_name(sim_info),
163                CommonDialogOptionContext(
164                    CSFStringId.CREATE_TEMPLATE_FROM_SIM_NAME,
165                    CSFStringId.CREATE_TEMPLATE_FROM_SIM_DESCRIPTION,
166                    title_tokens=(sim_info,),
167                    description_tokens=(sim_info,)
168                ),
169                on_chosen=_on_save_as_template,
170                dialog_description_identifier=CSFStringId.ENTER_A_NAME_FOR_YOUR_NEW_TEMPLATE
171            )
172        )
173
174        option_dialog.add_option(
175            CommonDialogActionOption(
176                CommonDialogOptionContext(
177                    CSFStringId.VIEW_TEMPLATE_NAME,
178                    CSFStringId.VIEW_TEMPLATE_DESCRIPTION,
179                    is_enabled=CSFSliderTemplateDialog._SELECTED_TEMPLATE is not None,
180                    title_tokens=(CSFSliderTemplateDialog._SELECTED_TEMPLATE.template_name if CSFSliderTemplateDialog._SELECTED_TEMPLATE is not None else CSFStringId.NO_TEMPLATE_SELECTED,)
181                ),
182                on_chosen=lambda *_, **__: self._view_template(sim_info, on_close=_reopen)
183            )
184        )
185
186        option_dialog.show(
187            sim_info=sim_info,
188            page=page
189        )
190
191    def _select_template(self, sim_info: SimInfo, on_close: Callable[[], None]=None):
192        self.log.format_with_message('Opening dialog.', sim=sim_info)
193
194        def _on_close() -> None:
195            self.log.debug('Slider Template dialog closed.')
196            if on_close is not None:
197                on_close()
198
199        option_dialog = CommonChooseObjectOptionDialog(
200            CSFStringId.SELECTED_TEMPLATE,
201            CSFStringId.PLEASE_SELECT_A_TEMPLATE,
202            mod_identity=self.mod_identity,
203            on_close=_on_close,
204            per_page=400
205        )
206
207        self.log.debug('Opening Customize Slider dialog.')
208
209        def _on_chosen(_: str, _chosen_template: CSFSliderTemplate):
210            if _chosen_template is None:
211                self.log.debug('No template name entered, dialog closed.')
212                _on_close()
213                return
214            self.log.format_with_message('Template name entered.', template_name=_chosen_template.template_name)
215            CSFSliderTemplateDialog._SELECTED_TEMPLATE = _chosen_template
216            _on_close()
217
218        for (template_name, template) in self._template_utils.template_library.items():
219            template: CSFSliderTemplate = template
220            option_dialog.add_option(
221                CommonDialogObjectOption(
222                    template_name,
223                    template,
224                    CommonDialogOptionContext(
225                        template.display_name,
226                        0,
227                        icon=CommonIconUtils.load_filled_circle_icon() if CSFSliderTemplateDialog._SELECTED_TEMPLATE == template else CommonIconUtils.load_unfilled_circle_icon(),
228                    ),
229                    on_chosen=_on_chosen
230                )
231            )
232
233        if not option_dialog.has_options():
234            def _on_acknowledge(_) -> None:
235                _on_close()
236
237            CommonOkDialog(
238                CSFStringId.NO_TEMPLATES_DETECTED_NAME,
239                CSFStringId.NO_TEMPLATES_DETECTED_DESCRIPTION,
240                mod_identity=self.mod_identity
241            ).show(on_acknowledged=_on_acknowledge)
242            return
243
244        option_dialog.show(
245            sim_info=sim_info
246        )
247
248    def _view_template(self, sim_info: SimInfo, on_close: Callable[[], None]=None):
249        self.log.format_with_message('Opening view template dialog.', sim=sim_info)
250
251        def _on_close() -> None:
252            self.log.debug('Slider Template dialog closed.')
253            if on_close is not None:
254                on_close()
255
256        def _reopen() -> None:
257            self._view_template(sim_info, on_close=on_close)
258
259        if CSFSliderTemplateDialog._SELECTED_TEMPLATE is None:
260            def _on_acknowledge(_) -> None:
261                _on_close()
262
263            CommonOkDialog(
264                CSFStringId.NO_TEMPLATE_SELECTED,
265                CSFStringId.PLEASE_SELECT_A_TEMPLATE,
266                mod_identity=self.mod_identity
267            ).show(on_acknowledged=_on_acknowledge)
268            return
269
270        option_dialog = CommonChooseObjectOptionDialog(
271            CSFStringId.VIEW_TEMPLATE_NAME,
272            CSFStringId.VIEW_TEMPLATE_DESCRIPTION,
273            mod_identity=self.mod_identity,
274            on_close=_on_close,
275            per_page=400
276        )
277
278        for (slider, amount) in CSFSliderTemplateDialog._SELECTED_TEMPLATE.get_sliders(sim_info):
279            slider: CSFSlider = slider
280            option_dialog.add_option(
281                CommonDialogObjectOption(
282                    slider.unique_identifier,
283                    slider,
284                    CommonDialogOptionContext(
285                        0,
286                        CSFStringId.STRING_PLUS_STRING,
287                        description_tokens=(slider.display_name, str(amount),),
288                        icon=CommonIconUtils.load_arrow_right_icon(),
289                        is_enabled=False
290                    ),
291                    on_chosen=lambda *_, **__: _reopen()
292                )
293            )
294
295        if not option_dialog.has_options():
296            def _on_acknowledge(_) -> None:
297                _on_close()
298
299            CommonOkDialog(
300                CSFStringId.NO_SLIDERS_DETECTED_NAME,
301                CSFStringId.NO_SLIDERS_DETECTED_DESCRIPTION,
302                mod_identity=self.mod_identity
303            ).show(on_acknowledged=_on_acknowledge)
304            return
305
306        option_dialog.show(
307            sim_info=sim_info
308        )
309
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)