Best Python code snippet using playwright-python
_browser_context.py
Source:_browser_context.py  
...98            lambda params: self._on_background_page(from_channel(params["page"])),99        )100        self._channel.on(101            "serviceWorker",102            lambda params: self._on_service_worker(from_channel(params["worker"])),103        )104        self._channel.on(105            "request",106            lambda params: self._on_request(107                from_channel(params["request"]),108                from_nullable_channel(params.get("page")),109            ),110        )111        self._channel.on(112            "response",113            lambda params: self._on_response(114                from_channel(params["response"]),115                from_nullable_channel(params.get("page")),116            ),117        )118        self._channel.on(119            "requestFailed",120            lambda params: self._on_request_failed(121                from_channel(params["request"]),122                params["responseEndTiming"],123                params.get("failureText"),124                from_nullable_channel(params.get("page")),125            ),126        )127        self._channel.on(128            "requestFinished",129            lambda params: self._on_request_finished(130                from_channel(params["request"]),131                from_nullable_channel(params.get("response")),132                params["responseEndTiming"],133                from_nullable_channel(params.get("page")),134            ),135        )136        self._closed_future: asyncio.Future = asyncio.Future()137        self.once(138            self.Events.Close, lambda context: self._closed_future.set_result(True)139        )140    def __repr__(self) -> str:141        return f"<BrowserContext browser={self.browser}>"142    def _on_page(self, page: Page) -> None:143        self._pages.append(page)144        self.emit(BrowserContext.Events.Page, page)145        if page._opener and not page._opener.is_closed():146            page._opener.emit(Page.Events.Popup, page)147    def _on_route(self, route: Route, request: Request) -> None:148        for handler_entry in self._routes:149            if handler_entry.matches(request.url):150                try:151                    handler_entry.handle(route, request)152                finally:153                    if not handler_entry.is_active:154                        self._routes.remove(handler_entry)155                        if not len(self._routes) == 0:156                            asyncio.create_task(self._disable_interception())157                break158        route._internal_continue()159    def _on_binding(self, binding_call: BindingCall) -> None:160        func = self._bindings.get(binding_call._initializer["name"])161        if func is None:162            return163        asyncio.create_task(binding_call.call(func))164    def set_default_navigation_timeout(self, timeout: float) -> None:165        self._timeout_settings.set_navigation_timeout(timeout)166        self._channel.send_no_reply(167            "setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)168        )169    def set_default_timeout(self, timeout: float) -> None:170        self._timeout_settings.set_timeout(timeout)171        self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))172    @property173    def pages(self) -> List[Page]:174        return self._pages.copy()175    @property176    def browser(self) -> Optional["Browser"]:177        return self._browser178    async def new_page(self) -> Page:179        if self._owner_page:180            raise Error("Please use browser.new_context()")181        return from_channel(await self._channel.send("newPage"))182    async def cookies(self, urls: Union[str, List[str]] = None) -> List[Cookie]:183        if urls is None:184            urls = []185        if not isinstance(urls, list):186            urls = [urls]187        return await self._channel.send("cookies", dict(urls=urls))188    async def add_cookies(self, cookies: List[SetCookieParam]) -> None:189        await self._channel.send("addCookies", dict(cookies=cookies))190    async def clear_cookies(self) -> None:191        await self._channel.send("clearCookies")192    async def grant_permissions(193        self, permissions: List[str], origin: str = None194    ) -> None:195        await self._channel.send("grantPermissions", locals_to_params(locals()))196    async def clear_permissions(self) -> None:197        await self._channel.send("clearPermissions")198    async def set_geolocation(self, geolocation: Geolocation = None) -> None:199        await self._channel.send("setGeolocation", locals_to_params(locals()))200    async def set_extra_http_headers(self, headers: Dict[str, str]) -> None:201        await self._channel.send(202            "setExtraHTTPHeaders", dict(headers=serialize_headers(headers))203        )204    async def set_offline(self, offline: bool) -> None:205        await self._channel.send("setOffline", dict(offline=offline))206    async def add_init_script(207        self, script: str = None, path: Union[str, Path] = None208    ) -> None:209        if path:210            script = (await async_readfile(path)).decode()211        if not isinstance(script, str):212            raise Error("Either path or script parameter must be specified")213        await self._channel.send("addInitScript", dict(source=script))214    async def expose_binding(215        self, name: str, callback: Callable, handle: bool = None216    ) -> None:217        for page in self._pages:218            if name in page._bindings:219                raise Error(220                    f'Function "{name}" has been already registered in one of the pages'221                )222        if name in self._bindings:223            raise Error(f'Function "{name}" has been already registered')224        self._bindings[name] = callback225        await self._channel.send(226            "exposeBinding", dict(name=name, needsHandle=handle or False)227        )228    async def expose_function(self, name: str, callback: Callable) -> None:229        await self.expose_binding(name, lambda source, *args: callback(*args))230    async def route(231        self, url: URLMatch, handler: RouteHandlerCallback, times: int = None232    ) -> None:233        self._routes.insert(234            0,235            RouteHandler(URLMatcher(self._options.get("baseURL"), url), handler, times),236        )237        if len(self._routes) == 1:238            await self._channel.send(239                "setNetworkInterceptionEnabled", dict(enabled=True)240            )241    async def unroute(242        self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None243    ) -> None:244        self._routes = list(245            filter(246                lambda r: r.matcher.match != url or (handler and r.handler != handler),247                self._routes,248            )249        )250        if len(self._routes) == 0:251            await self._disable_interception()252    async def _disable_interception(self) -> None:253        await self._channel.send("setNetworkInterceptionEnabled", dict(enabled=False))254    def expect_event(255        self,256        event: str,257        predicate: Callable = None,258        timeout: float = None,259    ) -> EventContextManagerImpl:260        if timeout is None:261            timeout = self._timeout_settings.timeout()262        wait_helper = WaitHelper(self, f"browser_context.expect_event({event})")263        wait_helper.reject_on_timeout(264            timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'265        )266        if event != BrowserContext.Events.Close:267            wait_helper.reject_on_event(268                self, BrowserContext.Events.Close, Error("Context closed")269            )270        wait_helper.wait_for_event(self, event, predicate)271        return EventContextManagerImpl(wait_helper.result())272    def _on_close(self) -> None:273        if self._browser:274            self._browser._contexts.remove(self)275        self.emit(BrowserContext.Events.Close, self)276    async def close(self) -> None:277        try:278            if self._options.get("recordHar"):279                har = cast(280                    Artifact, from_channel(await self._channel.send("harExport"))281                )282                await har.save_as(283                    cast(Dict[str, str], self._options["recordHar"])["path"]284                )285                await har.delete()286            await self._channel.send("close")287            await self._closed_future288        except Exception as e:289            if not is_safe_close_error(e):290                raise e291    async def _pause(self) -> None:292        await self._channel.send("pause")293    async def storage_state(self, path: Union[str, Path] = None) -> StorageState:294        result = await self._channel.send_return_as_dict("storageState")295        if path:296            await async_writefile(path, json.dumps(result))297        return result298    async def wait_for_event(299        self, event: str, predicate: Callable = None, timeout: float = None300    ) -> Any:301        async with self.expect_event(event, predicate, timeout) as event_info:302            pass303        return await event_info304    def expect_page(305        self,306        predicate: Callable[[Page], bool] = None,307        timeout: float = None,308    ) -> EventContextManagerImpl[Page]:309        return self.expect_event(BrowserContext.Events.Page, predicate, timeout)310    def _on_background_page(self, page: Page) -> None:311        self._background_pages.add(page)312        self.emit(BrowserContext.Events.BackgroundPage, page)313    def _on_service_worker(self, worker: Worker) -> None:314        worker._context = self315        self._service_workers.add(worker)316        self.emit(BrowserContext.Events.ServiceWorker, worker)317    def _on_request_failed(318        self,319        request: Request,320        response_end_timing: float,321        failure_text: Optional[str],322        page: Optional[Page],323    ) -> None:324        request._failure_text = failure_text325        if request._timing:326            request._timing["responseEnd"] = response_end_timing327        self.emit(BrowserContext.Events.RequestFailed, request)..._chromium_browser_context.py
Source:_chromium_browser_context.py  
...33            lambda params: self._on_background_page(from_channel(params["page"])),34        )35        self._channel.on(36            "crServiceWorker",37            lambda params: self._on_service_worker(from_channel(params["worker"])),38        )39    def _on_background_page(self, page: Page) -> None:40        self._background_pages.add(page)41        self.emit(ChromiumBrowserContext.Events.BackgroundPage, page)42    def _on_service_worker(self, worker: Worker) -> None:43        worker._context = self44        self._service_workers.add(worker)45        self.emit(ChromiumBrowserContext.Events.ServiceWorker, worker)46    @property47    def background_pages(self) -> List[Page]:48        return list(self._background_pages)49    @property50    def service_workers(self) -> List[Worker]:51        return list(self._service_workers)52    async def new_cdp_session(self, page: Page) -> CDPSession:53        return from_channel(54            await self._channel.send("crNewCDPSession", {"page": page._channel})...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
