Best Python code snippet using playwright-python
_network.py
Source:_network.py  
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import base6416import json17import mimetypes18from collections import defaultdict19from pathlib import Path20from types import SimpleNamespace21from typing import (22    TYPE_CHECKING,23    Any,24    Callable,25    Coroutine,26    Dict,27    List,28    Optional,29    Union,30    cast,31)32from urllib import parse33from playwright._impl._api_structures import (34    Headers,35    HeadersArray,36    RemoteAddr,37    RequestSizes,38    ResourceTiming,39    SecurityDetails,40)41from playwright._impl._api_types import Error42from playwright._impl._connection import (43    ChannelOwner,44    from_channel,45    from_nullable_channel,46)47from playwright._impl._event_context_manager import EventContextManagerImpl48from playwright._impl._helper import ContinueParameters, locals_to_params49from playwright._impl._wait_helper import WaitHelper50if TYPE_CHECKING:  # pragma: no cover51    from playwright._impl._fetch import APIResponse52    from playwright._impl._frame import Frame53class Request(ChannelOwner):54    def __init__(55        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict56    ) -> None:57        super().__init__(parent, type, guid, initializer)58        self._redirected_from: Optional["Request"] = from_nullable_channel(59            initializer.get("redirectedFrom")60        )61        self._redirected_to: Optional["Request"] = None62        if self._redirected_from:63            self._redirected_from._redirected_to = self64        self._failure_text: Optional[str] = None65        self._timing: ResourceTiming = {66            "startTime": 0,67            "domainLookupStart": -1,68            "domainLookupEnd": -1,69            "connectStart": -1,70            "secureConnectionStart": -1,71            "connectEnd": -1,72            "requestStart": -1,73            "responseStart": -1,74            "responseEnd": -1,75        }76        self._provisional_headers = RawHeaders(self._initializer["headers"])77        self._all_headers_future: Optional[asyncio.Future[RawHeaders]] = None78    def __repr__(self) -> str:79        return f"<Request url={self.url!r} method={self.method!r}>"80    @property81    def url(self) -> str:82        return self._initializer["url"]83    @property84    def resource_type(self) -> str:85        return self._initializer["resourceType"]86    @property87    def method(self) -> str:88        return self._initializer["method"]89    async def sizes(self) -> RequestSizes:90        response = await self.response()91        if not response:92            raise Error("Unable to fetch sizes for failed request")93        return await response._channel.send("sizes")94    @property95    def post_data(self) -> Optional[str]:96        data = self.post_data_buffer97        if not data:98            return None99        return data.decode()100    @property101    def post_data_json(self) -> Optional[Any]:102        post_data = self.post_data103        if not post_data:104            return None105        content_type = self.headers["content-type"]106        if content_type == "application/x-www-form-urlencoded":107            return dict(parse.parse_qsl(post_data))108        try:109            return json.loads(post_data)110        except Exception:111            raise Error(f"POST data is not a valid JSON object: {post_data}")112    @property113    def post_data_buffer(self) -> Optional[bytes]:114        b64_content = self._initializer.get("postData")115        if b64_content is None:116            return None117        return base64.b64decode(b64_content)118    async def response(self) -> Optional["Response"]:119        return from_nullable_channel(await self._channel.send("response"))120    @property121    def frame(self) -> "Frame":122        return from_channel(self._initializer["frame"])123    def is_navigation_request(self) -> bool:124        return self._initializer["isNavigationRequest"]125    @property126    def redirected_from(self) -> Optional["Request"]:127        return self._redirected_from128    @property129    def redirected_to(self) -> Optional["Request"]:130        return self._redirected_to131    @property132    def failure(self) -> Optional[str]:133        return self._failure_text134    @property135    def timing(self) -> ResourceTiming:136        return self._timing137    @property138    def headers(self) -> Headers:139        return self._provisional_headers.headers()140    async def all_headers(self) -> Headers:141        return (await self._actual_headers()).headers()142    async def headers_array(self) -> HeadersArray:143        return (await self._actual_headers()).headers_array()144    async def header_value(self, name: str) -> Optional[str]:145        return (await self._actual_headers()).get(name)146    async def _actual_headers(self) -> "RawHeaders":147        if not self._all_headers_future:148            self._all_headers_future = asyncio.Future()149            headers = await self._channel.send("rawRequestHeaders")150            self._all_headers_future.set_result(RawHeaders(headers))151        return await self._all_headers_future152class Route(ChannelOwner):153    def __init__(154        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict155    ) -> None:156        super().__init__(parent, type, guid, initializer)157    def __repr__(self) -> str:158        return f"<Route request={self.request}>"159    @property160    def request(self) -> Request:161        return from_channel(self._initializer["request"])162    async def abort(self, errorCode: str = None) -> None:163        await self._race_with_page_close(164            self._channel.send("abort", locals_to_params(locals()))165        )166    async def fulfill(167        self,168        status: int = None,169        headers: Dict[str, str] = None,170        body: Union[str, bytes] = None,171        path: Union[str, Path] = None,172        contentType: str = None,173        response: "APIResponse" = None,174    ) -> None:175        params = locals_to_params(locals())176        if response:177            del params["response"]178            params["status"] = (179                params["status"] if params.get("status") else response.status180            )181            params["headers"] = (182                params["headers"] if params.get("headers") else response.headers183            )184            from playwright._impl._fetch import APIResponse185            if body is None and path is None and isinstance(response, APIResponse):186                if response._request._connection is self._connection:187                    params["fetchResponseUid"] = response._fetch_uid188                else:189                    body = await response.body()190        length = 0191        if isinstance(body, str):192            params["body"] = body193            params["isBase64"] = False194            length = len(body.encode())195        elif isinstance(body, bytes):196            params["body"] = base64.b64encode(body).decode()197            params["isBase64"] = True198            length = len(body)199        elif path:200            del params["path"]201            file_content = Path(path).read_bytes()202            params["body"] = base64.b64encode(file_content).decode()203            params["isBase64"] = True204            length = len(file_content)205        headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()}206        if params.get("contentType"):207            headers["content-type"] = params["contentType"]208        elif path:209            headers["content-type"] = (210                mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream"211            )212        if length and "content-length" not in headers:213            headers["content-length"] = str(length)214        params["headers"] = serialize_headers(headers)215        await self._race_with_page_close(self._channel.send("fulfill", params))216    async def continue_(217        self,218        url: str = None,219        method: str = None,220        headers: Dict[str, str] = None,221        postData: Union[str, bytes] = None,222    ) -> None:223        overrides: ContinueParameters = {}224        if url:225            overrides["url"] = url226        if method:227            overrides["method"] = method228        if headers:229            overrides["headers"] = serialize_headers(headers)230        if isinstance(postData, str):231            overrides["postData"] = base64.b64encode(postData.encode()).decode()232        elif isinstance(postData, bytes):233            overrides["postData"] = base64.b64encode(postData).decode()234        await self._race_with_page_close(235            self._channel.send("continue", cast(Any, overrides))236        )237    def _internal_continue(self) -> None:238        async def continue_route() -> None:239            try:240                await self.continue_()241            except Exception:242                pass243        asyncio.create_task(continue_route())244    async def _race_with_page_close(self, future: Coroutine) -> None:245        if hasattr(self.request.frame, "_page"):246            page = self.request.frame._page247            # When page closes or crashes, we catch any potential rejects from this Route.248            # Note that page could be missing when routing popup's initial request that249            # does not have a Page initialized just yet.250            fut = asyncio.create_task(future)251            await asyncio.wait(252                [fut, page._closed_or_crashed_future],253                return_when=asyncio.FIRST_COMPLETED,254            )255            if page._closed_or_crashed_future.done():256                await asyncio.gather(fut, return_exceptions=True)257        else:258            await future259class Response(ChannelOwner):260    def __init__(261        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict262    ) -> None:263        super().__init__(parent, type, guid, initializer)264        self._request: Request = from_channel(self._initializer["request"])265        timing = self._initializer["timing"]266        self._request._timing["startTime"] = timing["startTime"]267        self._request._timing["domainLookupStart"] = timing["domainLookupStart"]268        self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"]269        self._request._timing["connectStart"] = timing["connectStart"]270        self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"]271        self._request._timing["connectEnd"] = timing["connectEnd"]272        self._request._timing["requestStart"] = timing["requestStart"]273        self._request._timing["responseStart"] = timing["responseStart"]274        self._provisional_headers = RawHeaders(275            cast(HeadersArray, self._initializer["headers"])276        )277        self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None278        self._finished_future: asyncio.Future[bool] = asyncio.Future()279    def __repr__(self) -> str:280        return f"<Response url={self.url!r} request={self.request}>"281    @property282    def url(self) -> str:283        return self._initializer["url"]284    @property285    def ok(self) -> bool:286        # Status 0 is for file:// URLs287        return self._initializer["status"] == 0 or (288            self._initializer["status"] >= 200 and self._initializer["status"] <= 299289        )290    @property291    def status(self) -> int:292        return self._initializer["status"]293    @property294    def status_text(self) -> str:295        return self._initializer["statusText"]296    @property297    def headers(self) -> Headers:298        return self._provisional_headers.headers()299    async def all_headers(self) -> Headers:300        return (await self._actual_headers()).headers()301    async def headers_array(self) -> HeadersArray:302        return (await self._actual_headers()).headers_array()303    async def header_value(self, name: str) -> Optional[str]:304        return (await self._actual_headers()).get(name)305    async def header_values(self, name: str) -> List[str]:306        return (await self._actual_headers()).get_all(name)307    async def _actual_headers(self) -> "RawHeaders":308        if not self._raw_headers_future:309            self._raw_headers_future = asyncio.Future()310            headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders"))311            self._raw_headers_future.set_result(RawHeaders(headers))312        return await self._raw_headers_future313    async def server_addr(self) -> Optional[RemoteAddr]:314        return await self._channel.send("serverAddr")315    async def security_details(self) -> Optional[SecurityDetails]:316        return await self._channel.send("securityDetails")317    async def finished(self) -> None:318        await self._finished_future319    async def body(self) -> bytes:320        binary = await self._channel.send("body")321        return base64.b64decode(binary)322    async def text(self) -> str:323        content = await self.body()324        return content.decode()325    async def json(self) -> Any:326        return json.loads(await self.text())327    @property328    def request(self) -> Request:329        return self._request330    @property331    def frame(self) -> "Frame":332        return self._request.frame333class WebSocket(ChannelOwner):334    Events = SimpleNamespace(335        Close="close",336        FrameReceived="framereceived",337        FrameSent="framesent",338        Error="socketerror",339    )340    def __init__(341        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict342    ) -> None:343        super().__init__(parent, type, guid, initializer)344        self._is_closed = False345        self._channel.on(346            "frameSent",347            lambda params: self._on_frame_sent(params["opcode"], params["data"]),348        )349        self._channel.on(350            "frameReceived",351            lambda params: self._on_frame_received(params["opcode"], params["data"]),352        )353        self._channel.on(354            "socketError",355            lambda params: self.emit(WebSocket.Events.Error, params["error"]),356        )357        self._channel.on("close", lambda params: self._on_close())358    def __repr__(self) -> str:359        return f"<WebSocket url={self.url!r}>"360    @property361    def url(self) -> str:362        return self._initializer["url"]363    def expect_event(364        self,365        event: str,366        predicate: Callable = None,367        timeout: float = None,368    ) -> EventContextManagerImpl:369        if timeout is None:370            timeout = cast(Any, self._parent)._timeout_settings.timeout()371        wait_helper = WaitHelper(self, f"web_socket.expect_event({event})")372        wait_helper.reject_on_timeout(373            cast(float, timeout),374            f'Timeout {timeout}ms exceeded while waiting for event "{event}"',375        )376        if event != WebSocket.Events.Close:377            wait_helper.reject_on_event(378                self, WebSocket.Events.Close, Error("Socket closed")379            )380        if event != WebSocket.Events.Error:381            wait_helper.reject_on_event(382                self, WebSocket.Events.Error, Error("Socket error")383            )384        wait_helper.reject_on_event(self._parent, "close", Error("Page closed"))385        wait_helper.wait_for_event(self, event, predicate)386        return EventContextManagerImpl(wait_helper.result())387    async def wait_for_event(388        self, event: str, predicate: Callable = None, timeout: float = None389    ) -> Any:390        async with self.expect_event(event, predicate, timeout) as event_info:391            pass392        return await event_info393    def _on_frame_sent(self, opcode: int, data: str) -> None:394        if opcode == 2:395            self.emit(WebSocket.Events.FrameSent, base64.b64decode(data))396        elif opcode == 1:397            self.emit(WebSocket.Events.FrameSent, data)398    def _on_frame_received(self, opcode: int, data: str) -> None:399        if opcode == 2:400            self.emit(WebSocket.Events.FrameReceived, base64.b64decode(data))401        elif opcode == 1:402            self.emit(WebSocket.Events.FrameReceived, data)403    def is_closed(self) -> bool:404        return self._is_closed405    def _on_close(self) -> None:406        self._is_closed = True407        self.emit(WebSocket.Events.Close, self)408def serialize_headers(headers: Dict[str, str]) -> HeadersArray:409    return [{"name": name, "value": value} for name, value in headers.items()]410class RawHeaders:411    def __init__(self, headers: HeadersArray) -> None:412        self._headers_array = headers413        self._headers_map: Dict[str, Dict[str, bool]] = defaultdict(dict)414        for header in headers:415            self._headers_map[header["name"].lower()][header["value"]] = True416    def get(self, name: str) -> Optional[str]:417        values = self.get_all(name)418        if not values:419            return None420        separator = "\n" if name.lower() == "set-cookie" else ", "421        return separator.join(values)422    def get_all(self, name: str) -> List[str]:423        return list(self._headers_map[name.lower()].keys())424    def headers(self) -> Dict[str, str]:425        result = {}426        for name in self._headers_map.keys():427            result[name] = cast(str, self.get(name))428        return result429    def headers_array(self) -> HeadersArray:..._fetch.py
Source:_fetch.py  
...369                "fetchUid": self._fetch_uid,370            },371        )372    @property373    def _fetch_uid(self) -> str:374        return self._initializer["fetchUid"]375    async def _fetch_log(self) -> List[str]:376        return await self._request._channel.send(377            "fetchLog",378            {379                "fetchUid": self._fetch_uid,380            },381        )382def is_json_content_type(headers: network.HeadersArray = None) -> bool:383    if not headers:384        return False385    for header in headers:386        if header["name"] == "Content-Type":387            return header["value"].startswith("application/json")...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
