Best Python code snippet using playwright-python
browser_tab.py
Source:browser_tab.py  
...983            reply = self.network_manager.get(request)984        reply.finished.connect(callback)985        self._replies.add(reply)986        return reply987    def _on_request_finished(self, callback, method, body, headers,988                             follow_redirects, redirects_remaining):989        """ Handle redirects and call the callback. """990        reply = self.sender()991        try:992            if not follow_redirects:993                callback()994                return995            if not redirects_remaining:996                callback()  # XXX: should it be an error?997                return998            redirect_url = reply.attribute(QNetworkRequest.RedirectionTargetAttribute)999            if redirect_url is None:  # no redirect1000                callback()1001                return..._page.py
Source:_page.py  
...181            ),182        )183        self._channel.on(184            "requestFinished",185            lambda params: self._on_request_finished(186                from_channel(params["request"]), params["responseEndTiming"]187            ),188        )189        self._channel.on(190            "response",191            lambda params: self.emit(192                Page.Events.Response, from_channel(params["response"])193            ),194        )195        self._channel.on(196            "route",197            lambda params: self._on_route(198                from_channel(params["route"]), from_channel(params["request"])199            ),200        )201        self._channel.on(202            "video",203            lambda params: cast(Video, self.video)._set_relative_path(204                params["relativePath"]205            ),206        )207        self._channel.on(208            "webSocket",209            lambda params: self.emit(210                Page.Events.WebSocket, from_channel(params["webSocket"])211            ),212        )213        self._channel.on(214            "worker", lambda params: self._on_worker(from_channel(params["worker"]))215        )216    def _set_browser_context(self, context: "BrowserContext") -> None:217        self._browser_context = context218        self._timeout_settings = TimeoutSettings(context._timeout_settings)219    def _on_request_failed(220        self,221        request: Request,222        response_end_timing: float,223        failure_text: str = None,224    ) -> None:225        request._failure_text = failure_text226        if request._timing:227            request._timing["responseEnd"] = response_end_timing228        self.emit(Page.Events.RequestFailed, request)229    def _on_request_finished(230        self, request: Request, response_end_timing: float231    ) -> None:232        if request._timing:233            request._timing["responseEnd"] = response_end_timing234        self.emit(Page.Events.RequestFinished, request)235    def _on_frame_attached(self, frame: Frame) -> None:236        frame._page = self237        self._frames.append(frame)238        self.emit(Page.Events.FrameAttached, frame)239    def _on_frame_detached(self, frame: Frame) -> None:240        self._frames.remove(frame)241        frame._detached = True242        self.emit(Page.Events.FrameDetached, frame)243    def _on_route(self, route: Route, request: Request) -> None:...switch.py
Source:switch.py  
...198            self.logger.debug(f"Firing request event.")199            self._current_profile = None200            self._reset_turn_off_timer()201            self.fire_event(EVENT_TYPE_AUTOMATIC_LIGHTING, entity_id=self.entity_id, type=EVENT_DATA_TYPE_REQUEST)202        def _on_request_finished(*args: Any) -> None:203            """ Triggered when the request event has finished. """204            self._reset_request_timer()205            if self.is_blocked:206                return207            if self._current_profile:208                self.logger.debug(f"Turning on profile {self._current_profile.id} with the following values: { {CONF_ENTITY_ID: self._current_profile.lights, **self._current_profile.attributes} }")209                self._current_status = self._current_profile.status210                self._turn_off_unused_entities(self._tracked_lights, self._current_profile.lights)211                self.call_service(LIGHT_DOMAIN, SERVICE_TURN_ON, entity_id=self._current_profile.lights, **self._current_profile.attributes)212            else:213                self.logger.debug(f"No profile was provided.")214                self._current_status = STATUS_IDLE215                self._turn_off_unused_entities(self._tracked_lights, [])216            self.async_schedule_update_ha_state(True)..._browser_context.py
Source:_browser_context.py  
...125            ),126        )127        self._channel.on(128            "requestFinished",129            lambda params: self._on_request_finished(130                from_channel(params["request"]),131                from_nullable_channel(params.get("response")),132                params["responseEndTiming"],133                from_nullable_channel(params.get("page")),134            ),135        )136        self._closed_future: asyncio.Future = asyncio.Future()137        self.once(138            self.Events.Close, lambda context: self._closed_future.set_result(True)139        )140    def __repr__(self) -> str:141        return f"<BrowserContext browser={self.browser}>"142    def _on_page(self, page: Page) -> None:143        self._pages.append(page)144        self.emit(BrowserContext.Events.Page, page)145        if page._opener and not page._opener.is_closed():146            page._opener.emit(Page.Events.Popup, page)147    def _on_route(self, route: Route, request: Request) -> None:148        for handler_entry in self._routes:149            if handler_entry.matches(request.url):150                try:151                    handler_entry.handle(route, request)152                finally:153                    if not handler_entry.is_active:154                        self._routes.remove(handler_entry)155                        if not len(self._routes) == 0:156                            asyncio.create_task(self._disable_interception())157                break158        route._internal_continue()159    def _on_binding(self, binding_call: BindingCall) -> None:160        func = self._bindings.get(binding_call._initializer["name"])161        if func is None:162            return163        asyncio.create_task(binding_call.call(func))164    def set_default_navigation_timeout(self, timeout: float) -> None:165        self._timeout_settings.set_navigation_timeout(timeout)166        self._channel.send_no_reply(167            "setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)168        )169    def set_default_timeout(self, timeout: float) -> None:170        self._timeout_settings.set_timeout(timeout)171        self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))172    @property173    def pages(self) -> List[Page]:174        return self._pages.copy()175    @property176    def browser(self) -> Optional["Browser"]:177        return self._browser178    async def new_page(self) -> Page:179        if self._owner_page:180            raise Error("Please use browser.new_context()")181        return from_channel(await self._channel.send("newPage"))182    async def cookies(self, urls: Union[str, List[str]] = None) -> List[Cookie]:183        if urls is None:184            urls = []185        if not isinstance(urls, list):186            urls = [urls]187        return await self._channel.send("cookies", dict(urls=urls))188    async def add_cookies(self, cookies: List[SetCookieParam]) -> None:189        await self._channel.send("addCookies", dict(cookies=cookies))190    async def clear_cookies(self) -> None:191        await self._channel.send("clearCookies")192    async def grant_permissions(193        self, permissions: List[str], origin: str = None194    ) -> None:195        await self._channel.send("grantPermissions", locals_to_params(locals()))196    async def clear_permissions(self) -> None:197        await self._channel.send("clearPermissions")198    async def set_geolocation(self, geolocation: Geolocation = None) -> None:199        await self._channel.send("setGeolocation", locals_to_params(locals()))200    async def set_extra_http_headers(self, headers: Dict[str, str]) -> None:201        await self._channel.send(202            "setExtraHTTPHeaders", dict(headers=serialize_headers(headers))203        )204    async def set_offline(self, offline: bool) -> None:205        await self._channel.send("setOffline", dict(offline=offline))206    async def add_init_script(207        self, script: str = None, path: Union[str, Path] = None208    ) -> None:209        if path:210            script = (await async_readfile(path)).decode()211        if not isinstance(script, str):212            raise Error("Either path or script parameter must be specified")213        await self._channel.send("addInitScript", dict(source=script))214    async def expose_binding(215        self, name: str, callback: Callable, handle: bool = None216    ) -> None:217        for page in self._pages:218            if name in page._bindings:219                raise Error(220                    f'Function "{name}" has been already registered in one of the pages'221                )222        if name in self._bindings:223            raise Error(f'Function "{name}" has been already registered')224        self._bindings[name] = callback225        await self._channel.send(226            "exposeBinding", dict(name=name, needsHandle=handle or False)227        )228    async def expose_function(self, name: str, callback: Callable) -> None:229        await self.expose_binding(name, lambda source, *args: callback(*args))230    async def route(231        self, url: URLMatch, handler: RouteHandlerCallback, times: int = None232    ) -> None:233        self._routes.insert(234            0,235            RouteHandler(URLMatcher(self._options.get("baseURL"), url), handler, times),236        )237        if len(self._routes) == 1:238            await self._channel.send(239                "setNetworkInterceptionEnabled", dict(enabled=True)240            )241    async def unroute(242        self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None243    ) -> None:244        self._routes = list(245            filter(246                lambda r: r.matcher.match != url or (handler and r.handler != handler),247                self._routes,248            )249        )250        if len(self._routes) == 0:251            await self._disable_interception()252    async def _disable_interception(self) -> None:253        await self._channel.send("setNetworkInterceptionEnabled", dict(enabled=False))254    def expect_event(255        self,256        event: str,257        predicate: Callable = None,258        timeout: float = None,259    ) -> EventContextManagerImpl:260        if timeout is None:261            timeout = self._timeout_settings.timeout()262        wait_helper = WaitHelper(self, f"browser_context.expect_event({event})")263        wait_helper.reject_on_timeout(264            timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'265        )266        if event != BrowserContext.Events.Close:267            wait_helper.reject_on_event(268                self, BrowserContext.Events.Close, Error("Context closed")269            )270        wait_helper.wait_for_event(self, event, predicate)271        return EventContextManagerImpl(wait_helper.result())272    def _on_close(self) -> None:273        if self._browser:274            self._browser._contexts.remove(self)275        self.emit(BrowserContext.Events.Close, self)276    async def close(self) -> None:277        try:278            if self._options.get("recordHar"):279                har = cast(280                    Artifact, from_channel(await self._channel.send("harExport"))281                )282                await har.save_as(283                    cast(Dict[str, str], self._options["recordHar"])["path"]284                )285                await har.delete()286            await self._channel.send("close")287            await self._closed_future288        except Exception as e:289            if not is_safe_close_error(e):290                raise e291    async def _pause(self) -> None:292        await self._channel.send("pause")293    async def storage_state(self, path: Union[str, Path] = None) -> StorageState:294        result = await self._channel.send_return_as_dict("storageState")295        if path:296            await async_writefile(path, json.dumps(result))297        return result298    async def wait_for_event(299        self, event: str, predicate: Callable = None, timeout: float = None300    ) -> Any:301        async with self.expect_event(event, predicate, timeout) as event_info:302            pass303        return await event_info304    def expect_page(305        self,306        predicate: Callable[[Page], bool] = None,307        timeout: float = None,308    ) -> EventContextManagerImpl[Page]:309        return self.expect_event(BrowserContext.Events.Page, predicate, timeout)310    def _on_background_page(self, page: Page) -> None:311        self._background_pages.add(page)312        self.emit(BrowserContext.Events.BackgroundPage, page)313    def _on_service_worker(self, worker: Worker) -> None:314        worker._context = self315        self._service_workers.add(worker)316        self.emit(BrowserContext.Events.ServiceWorker, worker)317    def _on_request_failed(318        self,319        request: Request,320        response_end_timing: float,321        failure_text: Optional[str],322        page: Optional[Page],323    ) -> None:324        request._failure_text = failure_text325        if request._timing:326            request._timing["responseEnd"] = response_end_timing327        self.emit(BrowserContext.Events.RequestFailed, request)328        if page:329            page.emit(Page.Events.RequestFailed, request)330    def _on_request_finished(331        self,332        request: Request,333        response: Optional[Response],334        response_end_timing: float,335        page: Optional[Page],336    ) -> None:337        if request._timing:338            request._timing["responseEnd"] = response_end_timing339        self.emit(BrowserContext.Events.RequestFinished, request)340        if page:341            page.emit(Page.Events.RequestFinished, request)342        if response:343            response._finished_future.set_result(True)344    def _on_request(self, request: Request, page: Optional[Page]) -> None:...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
