Best Python code snippet using playwright-python
test_network.py
Source:test_network.py  
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import json16from asyncio.futures import Future17from typing import Dict, List, cast18import pytest19from playwright.async_api import Error, Page, Request, Response20async def test_request_fulfill(page, server):21    async def handle_request(route, request):22        assert route.request == request23        assert "empty.html" in request.url24        assert request.headers["user-agent"]25        assert request.method == "GET"26        assert request.post_data is None27        assert request.is_navigation_request()28        assert request.resource_type == "document"29        assert request.frame == page.main_frame30        assert request.frame.url == "about:blank"31        await route.fulfill(body="Text")32    await page.route(33        "**/empty.html",34        lambda route, request: asyncio.create_task(handle_request(route, request)),35    )36    response = await page.goto(server.EMPTY_PAGE)37    assert response.ok38    assert await response.text() == "Text"39async def test_request_continue(page, server):40    async def handle_request(route, request, intercepted):41        intercepted.append(True)42        await route.continue_()43    intercepted = list()44    await page.route(45        "**/*",46        lambda route, request: asyncio.create_task(47            handle_request(route, request, intercepted)48        ),49    )50    response = await page.goto(server.EMPTY_PAGE)51    assert response.ok52    assert intercepted == [True]53    assert await page.title() == ""54async def test_page_events_request_should_fire_for_navigation_requests(55    page: Page, server56):57    requests = []58    page.on("request", lambda r: requests.append(r))59    await page.goto(server.EMPTY_PAGE)60    assert len(requests) == 161async def test_page_events_request_should_fire_for_iframes(page, server, utils):62    requests = []63    page.on("request", lambda r: requests.append(r))64    await page.goto(server.EMPTY_PAGE)65    await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)66    assert len(requests) == 267async def test_page_events_request_should_fire_for_fetches(page, server):68    requests = []69    page.on("request", lambda r: requests.append(r))70    await page.goto(server.EMPTY_PAGE)71    await page.evaluate('() => fetch("/empty.html")')72    assert len(requests) == 273async def test_page_events_request_should_report_requests_and_responses_handled_by_service_worker(74    page: Page, server75):76    await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")77    await page.evaluate("() => window.activationPromise")78    sw_response = None79    async with page.expect_request("**/*") as request_info:80        sw_response = await page.evaluate('() => fetchDummy("foo")')81    request = await request_info.value82    assert sw_response == "responseFromServiceWorker:foo"83    assert request.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"84    response = await request.response()85    assert response.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"86    assert await response.text() == "responseFromServiceWorker:foo"87async def test_request_frame_should_work_for_main_frame_navigation_request(88    page, server89):90    requests = []91    page.on("request", lambda r: requests.append(r))92    await page.goto(server.EMPTY_PAGE)93    assert len(requests) == 194    assert requests[0].frame == page.main_frame95async def test_request_frame_should_work_for_subframe_navigation_request(96    page, server, utils97):98    await page.goto(server.EMPTY_PAGE)99    requests = []100    page.on("request", lambda r: requests.append(r))101    await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)102    assert len(requests) == 1103    assert requests[0].frame == page.frames[1]104async def test_request_frame_should_work_for_fetch_requests(page, server):105    await page.goto(server.EMPTY_PAGE)106    requests: List[Request] = []107    page.on("request", lambda r: requests.append(r))108    await page.evaluate('() => fetch("/digits/1.png")')109    requests = [r for r in requests if "favicon" not in r.url]110    assert len(requests) == 1111    assert requests[0].frame == page.main_frame112async def test_request_headers_should_work(113    page, server, is_chromium, is_firefox, is_webkit114):115    response = await page.goto(server.EMPTY_PAGE)116    if is_chromium:117        assert "Chrome" in response.request.headers["user-agent"]118    elif is_firefox:119        assert "Firefox" in response.request.headers["user-agent"]120    elif is_webkit:121        assert "WebKit" in response.request.headers["user-agent"]122async def test_request_headers_should_get_the_same_headers_as_the_server(123    page: Page, server, is_webkit, is_win124):125    server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()126    def handle(request):127        normalized_headers = {128            key.decode().lower(): value[0].decode()129            for key, value in request.requestHeaders.getAllRawHeaders()130        }131        server_request_headers_future.set_result(normalized_headers)132        request.write(b"done")133        request.finish()134    server.set_route("/empty.html", handle)135    response = await page.goto(server.EMPTY_PAGE)136    server_headers = await server_request_headers_future137    if is_webkit and is_win:138        # Curl does not show accept-encoding and accept-language139        server_headers.pop("accept-encoding")140        server_headers.pop("accept-language")141    assert cast(Response, response).request.headers == server_headers142async def test_request_headers_should_get_the_same_headers_as_the_server_cors(143    page: Page, server, is_webkit, is_win144):145    await page.goto(server.PREFIX + "/empty.html")146    server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()147    def handle_something(request):148        normalized_headers = {149            key.decode().lower(): value[0].decode()150            for key, value in request.requestHeaders.getAllRawHeaders()151        }152        server_request_headers_future.set_result(normalized_headers)153        request.setHeader("Access-Control-Allow-Origin", "*")154        request.write(b"done")155        request.finish()156    server.set_route("/something", handle_something)157    text = None158    async with page.expect_request("**/*") as request_info:159        text = await page.evaluate(160            """async url => {161                const data = await fetch(url);162                return data.text();163            }""",164            server.CROSS_PROCESS_PREFIX + "/something",165        )166    request = await request_info.value167    assert text == "done"168    server_headers = await server_request_headers_future169    if is_webkit and is_win:170        # Curl does not show accept-encoding and accept-language171        server_headers.pop("accept-encoding")172        server_headers.pop("accept-language")173    assert request.headers == server_headers174async def test_response_headers_should_work(page, server):175    server.set_route("/empty.html", lambda r: (r.setHeader("foo", "bar"), r.finish()))176    response = await page.goto(server.EMPTY_PAGE)177    assert response.headers["foo"] == "bar"178async def test_request_post_data_should_work(page, server):179    await page.goto(server.EMPTY_PAGE)180    server.set_route("/post", lambda r: r.finish())181    requests = []182    page.on("request", lambda r: requests.append(r))183    await page.evaluate(184        '() => fetch("./post", { method: "POST", body: JSON.stringify({foo: "bar"})})'185    )186    assert len(requests) == 1187    assert requests[0].post_data == '{"foo":"bar"}'188async def test_request_post_data__should_be_undefined_when_there_is_no_post_data(189    page, server190):191    response = await page.goto(server.EMPTY_PAGE)192    assert response.request.post_data is None193async def test_should_parse_the_json_post_data(page, server):194    await page.goto(server.EMPTY_PAGE)195    server.set_route("/post", lambda req: req.finish())196    requests = []197    page.on("request", lambda r: requests.append(r))198    await page.evaluate(199        """() => fetch('./post', { method: 'POST', body: JSON.stringify({ foo: 'bar' }) })"""200    )201    assert len(requests) == 1202    assert requests[0].post_data_json == {"foo": "bar"}203async def test_should_parse_the_data_if_content_type_is_form_urlencoded(page, server):204    await page.goto(server.EMPTY_PAGE)205    server.set_route("/post", lambda req: req.finish())206    requests = []207    page.on("request", lambda r: requests.append(r))208    await page.set_content(209        """<form method='POST' action='/post'><input type='text' name='foo' value='bar'><input type='number' name='baz' value='123'><input type='submit'></form>"""210    )211    await page.click("input[type=submit]")212    assert len(requests) == 1213    assert requests[0].post_data_json == {"foo": "bar", "baz": "123"}214async def test_should_be_undefined_when_there_is_no_post_data(page, server):215    response = await page.goto(server.EMPTY_PAGE)216    assert response.request.post_data_json is None217async def test_should_work_with_binary_post_data(page, server):218    await page.goto(server.EMPTY_PAGE)219    server.set_route("/post", lambda req: req.finish())220    requests = []221    page.on("request", lambda r: requests.append(r))222    await page.evaluate(223        """async () => {224        await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })225    }"""226    )227    assert len(requests) == 1228    buffer = requests[0].post_data_buffer229    assert len(buffer) == 256230    for i in range(256):231        assert buffer[i] == i232async def test_should_work_with_binary_post_data_and_interception(page, server):233    await page.goto(server.EMPTY_PAGE)234    server.set_route("/post", lambda req: req.finish())235    requests = []236    await page.route("/post", lambda route: asyncio.ensure_future(route.continue_()))237    page.on("request", lambda r: requests.append(r))238    await page.evaluate(239        """async () => {240        await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })241    }"""242    )243    assert len(requests) == 1244    buffer = requests[0].post_data_buffer245    assert len(buffer) == 256246    for i in range(256):247        assert buffer[i] == i248async def test_response_text_should_work(page, server):249    response = await page.goto(server.PREFIX + "/simple.json")250    assert await response.text() == '{"foo": "bar"}\n'251async def test_response_text_should_return_uncompressed_text(page, server):252    server.enable_gzip("/simple.json")253    response = await page.goto(server.PREFIX + "/simple.json")254    assert response.headers["content-encoding"] == "gzip"255    assert await response.text() == '{"foo": "bar"}\n'256async def test_response_text_should_throw_when_requesting_body_of_redirected_response(257    page, server258):259    server.set_redirect("/foo.html", "/empty.html")260    response = await page.goto(server.PREFIX + "/foo.html")261    redirected_from = response.request.redirected_from262    assert redirected_from263    redirected = await redirected_from.response()264    assert redirected.status == 302265    error = None266    try:267        await redirected.text()268    except Error as exc:269        error = exc270    assert "Response body is unavailable for redirect responses" in error.message271async def test_response_json_should_work(page, server):272    response = await page.goto(server.PREFIX + "/simple.json")273    assert await response.json() == {"foo": "bar"}274async def test_response_body_should_work(page, server, assetdir):275    response = await page.goto(server.PREFIX + "/pptr.png")276    with open(277        assetdir / "pptr.png",278        "rb",279    ) as fd:280        assert fd.read() == await response.body()281async def test_response_body_should_work_with_compression(page, server, assetdir):282    server.enable_gzip("/pptr.png")283    response = await page.goto(server.PREFIX + "/pptr.png")284    with open(285        assetdir / "pptr.png",286        "rb",287    ) as fd:288        assert fd.read() == await response.body()289async def test_response_status_text_should_work(page, server):290    server.set_route("/cool", lambda r: (r.setResponseCode(200, b"cool!"), r.finish()))291    response = await page.goto(server.PREFIX + "/cool")292    assert response.status_text == "cool!"293async def test_request_resource_type_should_return_event_source(page, server):294    SSE_MESSAGE = {"foo": "bar"}295    # 1. Setup server-sent events on server that immediately sends a message to the client.296    server.set_route(297        "/sse",298        lambda r: (299            r.setHeader("Content-Type", "text/event-stream"),300            r.setHeader("Connection", "keep-alive"),301            r.setHeader("Cache-Control", "no-cache"),302            r.setResponseCode(200),303            r.write(f"data: {json.dumps(SSE_MESSAGE)}\n\n".encode()),304            r.finish(),305        ),306    )307    # 2. Subscribe to page request events.308    await page.goto(server.EMPTY_PAGE)309    requests = []310    page.on("request", lambda r: requests.append(r))311    # 3. Connect to EventSource in browser and return first message.312    assert (313        await page.evaluate(314            """() => {315      const eventSource = new EventSource('/sse');316      return new Promise(resolve => {317        eventSource.onmessage = e => resolve(JSON.parse(e.data));318      });319    }"""320        )321        == SSE_MESSAGE322    )323    assert requests[0].resource_type == "eventsource"324async def test_network_events_request(page, server):325    requests = []326    page.on("request", lambda r: requests.append(r))327    await page.goto(server.EMPTY_PAGE)328    assert len(requests) == 1329    assert requests[0].url == server.EMPTY_PAGE330    assert requests[0].resource_type == "document"331    assert requests[0].method == "GET"332    assert await requests[0].response()333    assert requests[0].frame == page.main_frame334    assert requests[0].frame.url == server.EMPTY_PAGE335async def test_network_events_response(page, server):336    responses = []337    page.on("response", lambda r: responses.append(r))338    await page.goto(server.EMPTY_PAGE)339    assert len(responses) == 1340    assert responses[0].url == server.EMPTY_PAGE341    assert responses[0].status == 200342    assert responses[0].ok343    assert responses[0].request344async def test_network_events_request_failed(345    page, server, is_chromium, is_webkit, is_mac, is_win346):347    def handle_request(request):348        request.setHeader("Content-Type", "text/css")349        request.transport.loseConnection()350    server.set_route("/one-style.css", handle_request)351    failed_requests = []352    page.on("requestfailed", lambda request: failed_requests.append(request))353    await page.goto(server.PREFIX + "/one-style.html")354    assert len(failed_requests) == 1355    assert "one-style.css" in failed_requests[0].url356    assert await failed_requests[0].response() is None357    assert failed_requests[0].resource_type == "stylesheet"358    if is_chromium:359        assert failed_requests[0].failure == "net::ERR_EMPTY_RESPONSE"360    elif is_webkit:361        if is_mac:362            assert failed_requests[0].failure == "The network connection was lost."363        elif is_win:364            assert (365                failed_requests[0].failure366                == "Server returned nothing (no headers, no data)"367            )368        else:369            assert failed_requests[0].failure == "Message Corrupt"370    else:371        assert failed_requests[0].failure == "NS_ERROR_NET_RESET"372    assert failed_requests[0].frame373async def test_network_events_request_finished(page, server):374    async with page.expect_event("requestfinished") as event_info:375        await page.goto(server.EMPTY_PAGE)376    request = await event_info.value377    assert request.url == server.EMPTY_PAGE378    assert await request.response()379    assert request.frame == page.main_frame380    assert request.frame.url == server.EMPTY_PAGE381async def test_network_events_should_fire_events_in_proper_order(page, server):382    events = []383    page.on("request", lambda request: events.append("request"))384    page.on("response", lambda response: events.append("response"))385    response = await page.goto(server.EMPTY_PAGE)386    await response.finished()387    events.append("requestfinished")388    assert events == ["request", "response", "requestfinished"]389async def test_network_events_should_support_redirects(page, server):390    events = []391    page.on("request", lambda request: events.append(f"{request.method} {request.url}"))392    page.on(393        "response", lambda response: events.append(f"{response.status} {response.url}")394    )395    page.on("requestfinished", lambda request: events.append(f"DONE {request.url}"))396    page.on("requestfailed", lambda request: events.append(f"FAIL {request.url}"))397    server.set_redirect("/foo.html", "/empty.html")398    FOO_URL = server.PREFIX + "/foo.html"399    response = await page.goto(FOO_URL)400    await response.finished()401    assert events == [402        f"GET {FOO_URL}",403        f"302 {FOO_URL}",404        f"DONE {FOO_URL}",405        f"GET {server.EMPTY_PAGE}",406        f"200 {server.EMPTY_PAGE}",407        f"DONE {server.EMPTY_PAGE}",408    ]409    redirected_from = response.request.redirected_from410    assert "/foo.html" in redirected_from.url411    assert redirected_from.redirected_from is None412    assert redirected_from.redirected_to == response.request413async def test_request_is_navigation_request_should_work(page, server):414    pytest.skip(msg="test")415    requests = {}416    def handle_request(request):417        requests[request.url().split("/").pop()] = request418    page.on("request", handle_request)419    server.set_redirect("/rrredirect", "/frames/one-frame.html")420    await page.goto(server.PREFIX + "/rrredirect")421    print("kek")422    assert requests.get("rrredirect").is_navigation_request()423    assert requests.get("one-frame.html").is_navigation_request()424    assert requests.get("frame.html").is_navigation_request()425    assert requests.get("script.js").is_navigation_request() is False426    assert requests.get("style.css").is_navigation_request() is False427async def test_request_is_navigation_request_should_work_when_navigating_to_image(428    page, server429):430    requests = []431    page.on("request", lambda r: requests.append(r))432    await page.goto(server.PREFIX + "/pptr.png")433    assert requests[0].is_navigation_request()434async def test_set_extra_http_headers_should_work(page, server):435    await page.set_extra_http_headers({"foo": "bar"})436    request = (437        await asyncio.gather(438            server.wait_for_request("/empty.html"),439            page.goto(server.EMPTY_PAGE),440        )441    )[0]442    assert request.getHeader("foo") == "bar"443async def test_set_extra_http_headers_should_work_with_redirects(page, server):444    server.set_redirect("/foo.html", "/empty.html")445    await page.set_extra_http_headers({"foo": "bar"})446    request = (447        await asyncio.gather(448            server.wait_for_request("/empty.html"),449            page.goto(server.PREFIX + "/foo.html"),450        )451    )[0]452    assert request.getHeader("foo") == "bar"453async def test_set_extra_http_headers_should_work_with_extra_headers_from_browser_context(454    browser, server455):456    context = await browser.new_context()457    await context.set_extra_http_headers({"foo": "bar"})458    page = await context.new_page()459    request = (460        await asyncio.gather(461            server.wait_for_request("/empty.html"),462            page.goto(server.EMPTY_PAGE),463        )464    )[0]465    await context.close()466    assert request.getHeader("foo") == "bar"467async def test_set_extra_http_headers_should_override_extra_headers_from_browser_context(468    browser, server469):470    context = await browser.new_context(extra_http_headers={"fOo": "bAr", "baR": "foO"})471    page = await context.new_page()472    await page.set_extra_http_headers({"Foo": "Bar"})473    request = (474        await asyncio.gather(475            server.wait_for_request("/empty.html"),476            page.goto(server.EMPTY_PAGE),477        )478    )[0]479    await context.close()480    assert request.getHeader("foo") == "Bar"481    assert request.getHeader("bar") == "foO"482async def test_set_extra_http_headers_should_throw_for_non_string_header_values(483    page, server484):485    error = None486    try:487        await page.set_extra_http_headers({"foo": 1})488    except Error as exc:489        error = exc..._fetch.py
Source:_fetch.py  
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import base6415import json16import pathlib17import typing18from pathlib import Path19from typing import Any, Dict, List, Optional, Union, cast20import playwright._impl._network as network21from playwright._impl._api_structures import (22    FilePayload,23    FormField,24    Headers,25    HttpCredentials,26    ProxySettings,27    ServerFilePayload,28    StorageState,29)30from playwright._impl._connection import ChannelOwner, from_channel31from playwright._impl._helper import (32    Error,33    NameValue,34    async_readfile,35    async_writefile,36    is_file_payload,37    is_safe_close_error,38    locals_to_params,39    object_to_array,40)41from playwright._impl._network import serialize_headers42from playwright._impl._tracing import Tracing43if typing.TYPE_CHECKING:44    from playwright._impl._playwright import Playwright45class APIRequest:46    def __init__(self, playwright: "Playwright") -> None:47        self.playwright = playwright48        self._loop = playwright._loop49        self._dispatcher_fiber = playwright._connection._dispatcher_fiber50    async def new_context(51        self,52        baseURL: str = None,53        extraHTTPHeaders: Dict[str, str] = None,54        httpCredentials: HttpCredentials = None,55        ignoreHTTPSErrors: bool = None,56        proxy: ProxySettings = None,57        userAgent: str = None,58        timeout: float = None,59        storageState: Union[StorageState, str, Path] = None,60    ) -> "APIRequestContext":61        params = locals_to_params(locals())62        if "storageState" in params:63            storage_state = params["storageState"]64            if not isinstance(storage_state, dict) and storage_state:65                params["storageState"] = json.loads(66                    (await async_readfile(storage_state)).decode()67                )68        if "extraHTTPHeaders" in params:69            params["extraHTTPHeaders"] = serialize_headers(params["extraHTTPHeaders"])70        context = cast(71            APIRequestContext,72            from_channel(await self.playwright._channel.send("newRequest", params)),73        )74        context._tracing._local_utils = self.playwright._utils75        return context76class APIRequestContext(ChannelOwner):77    def __init__(78        self, parent: ChannelOwner, type: str, guid: str, initializer: Dict79    ) -> None:80        super().__init__(parent, type, guid, initializer)81        self._tracing: Tracing = from_channel(initializer["tracing"])82    async def dispose(self) -> None:83        await self._channel.send("dispose")84    async def delete(85        self,86        url: str,87        params: Dict[str, Union[bool, float, str]] = None,88        headers: Headers = None,89        data: Union[Any, bytes, str] = None,90        form: Dict[str, Union[bool, float, str]] = None,91        multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,92        timeout: float = None,93        failOnStatusCode: bool = None,94        ignoreHTTPSErrors: bool = None,95    ) -> "APIResponse":96        return await self.fetch(97            url,98            method="DELETE",99            params=params,100            headers=headers,101            data=data,102            form=form,103            multipart=multipart,104            timeout=timeout,105            failOnStatusCode=failOnStatusCode,106            ignoreHTTPSErrors=ignoreHTTPSErrors,107        )108    async def head(109        self,110        url: str,111        params: Dict[str, Union[bool, float, str]] = None,112        headers: Headers = None,113        timeout: float = None,114        failOnStatusCode: bool = None,115        ignoreHTTPSErrors: bool = None,116    ) -> "APIResponse":117        return await self.fetch(118            url,119            method="HEAD",120            params=params,121            headers=headers,122            timeout=timeout,123            failOnStatusCode=failOnStatusCode,124            ignoreHTTPSErrors=ignoreHTTPSErrors,125        )126    async def get(127        self,128        url: str,129        params: Dict[str, Union[bool, float, str]] = None,130        headers: Headers = None,131        timeout: float = None,132        failOnStatusCode: bool = None,133        ignoreHTTPSErrors: bool = None,134    ) -> "APIResponse":135        return await self.fetch(136            url,137            method="GET",138            params=params,139            headers=headers,140            timeout=timeout,141            failOnStatusCode=failOnStatusCode,142            ignoreHTTPSErrors=ignoreHTTPSErrors,143        )144    async def patch(145        self,146        url: str,147        params: Dict[str, Union[bool, float, str]] = None,148        headers: Headers = None,149        data: Union[Any, bytes, str] = None,150        form: Dict[str, Union[bool, float, str]] = None,151        multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,152        timeout: float = None,153        failOnStatusCode: bool = None,154        ignoreHTTPSErrors: bool = None,155    ) -> "APIResponse":156        return await self.fetch(157            url,158            method="PATCH",159            params=params,160            headers=headers,161            data=data,162            form=form,163            multipart=multipart,164            timeout=timeout,165            failOnStatusCode=failOnStatusCode,166            ignoreHTTPSErrors=ignoreHTTPSErrors,167        )168    async def put(169        self,170        url: str,171        params: Dict[str, Union[bool, float, str]] = None,172        headers: Headers = None,173        data: Union[Any, bytes, str] = None,174        form: Dict[str, Union[bool, float, str]] = None,175        multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,176        timeout: float = None,177        failOnStatusCode: bool = None,178        ignoreHTTPSErrors: bool = None,179    ) -> "APIResponse":180        return await self.fetch(181            url,182            method="PUT",183            params=params,184            headers=headers,185            data=data,186            form=form,187            multipart=multipart,188            timeout=timeout,189            failOnStatusCode=failOnStatusCode,190            ignoreHTTPSErrors=ignoreHTTPSErrors,191        )192    async def post(193        self,194        url: str,195        params: Dict[str, Union[bool, float, str]] = None,196        headers: Headers = None,197        data: Union[Any, bytes, str] = None,198        form: Dict[str, Union[bool, float, str]] = None,199        multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,200        timeout: float = None,201        failOnStatusCode: bool = None,202        ignoreHTTPSErrors: bool = None,203    ) -> "APIResponse":204        return await self.fetch(205            url,206            method="POST",207            params=params,208            headers=headers,209            data=data,210            form=form,211            multipart=multipart,212            timeout=timeout,213            failOnStatusCode=failOnStatusCode,214            ignoreHTTPSErrors=ignoreHTTPSErrors,215        )216    async def fetch(217        self,218        urlOrRequest: Union[str, network.Request],219        params: Dict[str, Union[bool, float, str]] = None,220        method: str = None,221        headers: Headers = None,222        data: Union[Any, bytes, str] = None,223        form: Dict[str, Union[bool, float, str]] = None,224        multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,225        timeout: float = None,226        failOnStatusCode: bool = None,227        ignoreHTTPSErrors: bool = None,228    ) -> "APIResponse":229        request = urlOrRequest if isinstance(urlOrRequest, network.Request) else None230        assert request or isinstance(231            urlOrRequest, str232        ), "First argument must be either URL string or Request"233        assert (234            (1 if data else 0) + (1 if form else 0) + (1 if multipart else 0)235        ) <= 1, "Only one of 'data', 'form' or 'multipart' can be specified"236        url = request.url if request else urlOrRequest237        method = method or (request.method if request else "GET")238        # Cannot call allHeaders() here as the request may be paused inside route handler.239        headers_obj = headers or (request.headers if request else None)240        serialized_headers = serialize_headers(headers_obj) if headers_obj else None241        json_data: Any = None242        form_data: Optional[List[NameValue]] = None243        multipart_data: Optional[List[FormField]] = None244        post_data_buffer: Optional[bytes] = None245        if data:246            if isinstance(data, str):247                if is_json_content_type(serialized_headers):248                    json_data = data249                else:250                    post_data_buffer = data.encode()251            elif isinstance(data, bytes):252                post_data_buffer = data253            elif isinstance(data, (dict, list, int, bool)):254                json_data = data255            else:256                raise Error(f"Unsupported 'data' type: {type(data)}")257        elif form:258            form_data = object_to_array(form)259        elif multipart:260            multipart_data = []261            # Convert file-like values to ServerFilePayload structs.262            for name, value in multipart.items():263                if is_file_payload(value):264                    payload = cast(FilePayload, value)265                    assert isinstance(266                        payload["buffer"], bytes267                    ), f"Unexpected buffer type of 'data.{name}'"268                    multipart_data.append(269                        FormField(name=name, file=file_payload_to_json(payload))270                    )271                elif isinstance(value, str):272                    multipart_data.append(FormField(name=name, value=value))273        if (274            post_data_buffer is None275            and json_data is None276            and form_data is None277            and multipart_data is None278        ):279            post_data_buffer = request.post_data_buffer if request else None280        post_data = (281            base64.b64encode(post_data_buffer).decode() if post_data_buffer else None282        )283        def filter_none(input: Dict) -> Dict:284            return {k: v for k, v in input.items() if v is not None}285        response = await self._channel.send(286            "fetch",287            filter_none(288                {289                    "url": url,290                    "params": object_to_array(params),291                    "method": method,292                    "headers": serialized_headers,293                    "postData": post_data,294                    "jsonData": json_data,295                    "formData": form_data,296                    "multipartData": multipart_data,297                    "timeout": timeout,298                    "failOnStatusCode": failOnStatusCode,299                    "ignoreHTTPSErrors": ignoreHTTPSErrors,300                }301            ),302        )303        return APIResponse(self, response)304    async def storage_state(305        self, path: Union[pathlib.Path, str] = None306    ) -> StorageState:307        result = await self._channel.send_return_as_dict("storageState")308        if path:309            await async_writefile(path, json.dumps(result))310        return result311def file_payload_to_json(payload: FilePayload) -> ServerFilePayload:312    return ServerFilePayload(313        name=payload["name"],314        mimeType=payload["mimeType"],315        buffer=base64.b64encode(payload["buffer"]).decode(),316    )317class APIResponse:318    def __init__(self, context: APIRequestContext, initializer: Dict) -> None:319        self._loop = context._loop320        self._dispatcher_fiber = context._connection._dispatcher_fiber321        self._request = context322        self._initializer = initializer323        self._headers = network.RawHeaders(initializer["headers"])324    def __repr__(self) -> str:325        return f"<APIResponse url={self.url!r} status={self.status!r} status_text={self.status_text!r}>"326    @property327    def ok(self) -> bool:328        return self.status >= 200 and self.status <= 299329    @property330    def url(self) -> str:331        return self._initializer["url"]332    @property333    def status(self) -> int:334        return self._initializer["status"]335    @property336    def status_text(self) -> str:337        return self._initializer["statusText"]338    @property339    def headers(self) -> Headers:340        return self._headers.headers()341    @property342    def headers_array(self) -> network.HeadersArray:343        return self._headers.headers_array()344    async def body(self) -> bytes:345        try:346            result = await self._request._channel.send_return_as_dict(347                "fetchResponseBody",348                {349                    "fetchUid": self._fetch_uid,350                },351            )352            if result is None:353                raise Error("Response has been disposed")354            return base64.b64decode(result["binary"])355        except Error as exc:356            if is_safe_close_error(exc):357                raise Error("Response has been disposed")358            raise exc359    async def text(self) -> str:360        content = await self.body()361        return content.decode()362    async def json(self) -> Any:363        content = await self.text()364        return json.loads(content)365    async def dispose(self) -> None:366        await self._request._channel.send(367            "disposeAPIResponse",368            {369                "fetchUid": self._fetch_uid,370            },371        )372    @property373    def _fetch_uid(self) -> str:374        return self._initializer["fetchUid"]375    async def _fetch_log(self) -> List[str]:376        return await self._request._channel.send(377            "fetchLog",378            {379                "fetchUid": self._fetch_uid,380            },381        )382def is_json_content_type(headers: network.HeadersArray = None) -> bool:383    if not headers:384        return False385    for header in headers:386        if header["name"] == "Content-Type":387            return header["value"].startswith("application/json")..._network.py
Source:_network.py  
...80        if content_type == "application/x-www-form-urlencoded":81            return dict(parse.parse_qsl(post_data))82        return json.loads(post_data)83    @property84    def post_data_buffer(self) -> Optional[bytes]:85        b64_content = self._initializer.get("postData")86        if not b64_content:87            return None88        return base64.b64decode(b64_content)89    @property90    def headers(self) -> Dict[str, str]:91        return self._headers92    async def response(self) -> Optional["Response"]:93        return from_nullable_channel(await self._channel.send("response"))94    @property95    def frame(self) -> "Frame":96        return from_channel(self._initializer["frame"])97    def is_navigation_request(self) -> bool:98        return self._initializer["isNavigationRequest"]...main.py
Source:main.py  
1from os import device_encoding2import sys3import asyncio4import requests5import time6import aiohttp7import json8import random9import argparse10from threading import Thread11from playwright.async_api import async_playwright12parser = argparse.ArgumentParser(description='Script to generate an unlabeled dataset for Poseidon')13parser.add_argument('--threads', dest='threads', action='store', type=int, default=1, help='How many browsers to run in parallel for collection')14parser.add_argument('--proxy-url', dest='proxy_url', action='store', type=str, required=False, help='The url of the proxy')15parser.add_argument('--proxy-port-min', dest='proxy_port_min', action='store', type=int, required=False, help='The minimum port (if one port then set --proxy-port-max to --proxy-port-min)')16parser.add_argument('--proxy-port-max', dest='proxy_port_max', action='store', type=int, required=False, help='The maximum port')17parser.add_argument('--proxy-user', dest='proxy_user', action='store', type=str, required=False, help='Username of proxy auth (if auth exists)')18parser.add_argument('--proxy-pass', dest='proxy_pass', action='store', type=str, required=False, help='Password of proxy auth (if auth exists)')19args = parser.parse_args()20if args.proxy_url and (args.proxy_port_min is None or args.proxy_port_max is None):21    parser.error("--proxy-url requires --proxy-port-min and --proxy-port-max.")22if args.proxy_url and len([x for x in (args.proxy_user,args.proxy_pass) if x is not None]) == 1:23    parser.error("--proxy-user and --proxy-pass must be given together.")24LANDING="https://recaptcha-demo.appspot.com/recaptcha-v2-invisible.php"25async def download_image(response):26    with open('parts/' + str(time.time_ns()) + str(random.randint(0, 10000)) + '.jpeg', 'wb') as f:27        f.write(await response.body())28async def capture_route(session, route):29    request = route.request30    headers = request.headers31    method = request.method32    post_data_buffer = request.post_data_buffer33    url = request.url34    if LANDING in url:35        with open('document.html') as doc:36            return await route.fulfill(body=doc.read(), content_type='text/html', status=200, headers={ 'Access-Control-Allow-Origin': '*' })37    proxy = None38    if args.proxy_url is not None:39        if args.proxy_user is not None:40            proxy = 'http://{}:{}@{}:{}'.format(args.proxy_user, args.proxy_pass, args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))41        else:42            proxy = 'http://{}:{}'.format(args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))43        44    async with session.request(method=method, url=url, data=post_data_buffer, headers=headers, proxy=proxy) as response:45        content_type = response.headers.get('content-type')46        47        print(response.status, response.url)48        return await route.fulfill(body=await response.read(), content_type=content_type, status=response.status, headers=response.headers)49async def main_async():50    async with aiohttp.ClientSession() as session:51        async with async_playwright() as p:52            iphone_11 = p.devices["iPhone 11 Pro"]53            browser = await p.chromium.launch(headless=True)54            context = await browser.new_context(55                # **iphone_11,56                locale="en-US",57                geolocation={"longitude": 12.492507, "latitude": 41.889938 },58                permissions=["geolocation"],59            )60            page = await context.new_page()61            async def load_page():62                if len(list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))) > 0:63                    bframe = list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))[0]64                    print(bframe)65                    el = await bframe.wait_for_selector('.rc-button-reload')66                    await el.click()67                else:68                    try:69                        await page.goto(LANDING)70                        el = await page.wait_for_selector('#submit')71                        await el.click()72                    except Exception:73                        return await load_page()74            async def ensure_image(response):75                if '/api2/payload' in response.url:76                    asyncio.ensure_future(download_image(response))77                elif '/api2/userverify' not in response.url: return78                await load_page()79            80            async def check_labels(frame):81                if 'bframe' in frame.url:82                    el = await frame.wait_for_selector('strong')83                    label = await el.text_content()84                    with open('labels.json', "r+") as f:85                        try:86                            data = json.loads(f.read())87                        except Exception:88                            data = []89                                                90                        if label not in data:91                            data.append(label)92                        93                        f.seek(0)94                        f.write(json.dumps(data))95                        f.truncate()96                    97                    return await load_page()98                    # return await check_labels(frame)99            await page.route('**', lambda route: asyncio.ensure_future(capture_route(session, route)))100            page.on('response', lambda response: asyncio.ensure_future(ensure_image(response)))101            page.on('framenavigated', lambda frame: asyncio.ensure_future(check_labels(frame)))102            await load_page()103            print('Press CTRL-D to stop')104            reader = asyncio.StreamReader()105            pipe = sys.stdin106            loop = asyncio.get_event_loop()107            await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), pipe)108            async for line in reader:109                print(f'Got: {line.decode()!r}')110def main():111    asyncio.run(main_async())112if __name__ == '__main__':113    for i in range(args.threads):114        Thread(target=main).start()...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
