How to use post_data_buffer method in Playwright Python

Best Python code snippet using playwright-python

test_network.py

Source:test_network.py Github

copy

Full Screen

1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import json16from asyncio.futures import Future17from typing import Dict, List, cast18import pytest19from playwright.async_api import Error, Page, Request, Response20async def test_request_fulfill(page, server):21 async def handle_request(route, request):22 assert route.request == request23 assert "empty.html" in request.url24 assert request.headers["user-agent"]25 assert request.method == "GET"26 assert request.post_data is None27 assert request.is_navigation_request()28 assert request.resource_type == "document"29 assert request.frame == page.main_frame30 assert request.frame.url == "about:blank"31 await route.fulfill(body="Text")32 await page.route(33 "**/empty.html",34 lambda route, request: asyncio.create_task(handle_request(route, request)),35 )36 response = await page.goto(server.EMPTY_PAGE)37 assert response.ok38 assert await response.text() == "Text"39async def test_request_continue(page, server):40 async def handle_request(route, request, intercepted):41 intercepted.append(True)42 await route.continue_()43 intercepted = list()44 await page.route(45 "**/*",46 lambda route, request: asyncio.create_task(47 handle_request(route, request, intercepted)48 ),49 )50 response = await page.goto(server.EMPTY_PAGE)51 assert response.ok52 assert intercepted == [True]53 assert await page.title() == ""54async def test_page_events_request_should_fire_for_navigation_requests(55 page: Page, server56):57 requests = []58 page.on("request", lambda r: requests.append(r))59 await page.goto(server.EMPTY_PAGE)60 assert len(requests) == 161async def test_page_events_request_should_fire_for_iframes(page, server, utils):62 requests = []63 page.on("request", lambda r: requests.append(r))64 await page.goto(server.EMPTY_PAGE)65 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)66 assert len(requests) == 267async def test_page_events_request_should_fire_for_fetches(page, server):68 requests = []69 page.on("request", lambda r: requests.append(r))70 await page.goto(server.EMPTY_PAGE)71 await page.evaluate('() => fetch("/empty.html")')72 assert len(requests) == 273async def test_page_events_request_should_report_requests_and_responses_handled_by_service_worker(74 page: Page, server75):76 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")77 await page.evaluate("() => window.activationPromise")78 sw_response = None79 async with page.expect_request("**/*") as request_info:80 sw_response = await page.evaluate('() => fetchDummy("foo")')81 request = await request_info.value82 assert sw_response == "responseFromServiceWorker:foo"83 assert request.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"84 response = await request.response()85 assert response.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"86 assert await response.text() == "responseFromServiceWorker:foo"87async def test_request_frame_should_work_for_main_frame_navigation_request(88 page, server89):90 requests = []91 page.on("request", lambda r: requests.append(r))92 await page.goto(server.EMPTY_PAGE)93 assert len(requests) == 194 assert requests[0].frame == page.main_frame95async def test_request_frame_should_work_for_subframe_navigation_request(96 page, server, utils97):98 await page.goto(server.EMPTY_PAGE)99 requests = []100 page.on("request", lambda r: requests.append(r))101 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)102 assert len(requests) == 1103 assert requests[0].frame == page.frames[1]104async def test_request_frame_should_work_for_fetch_requests(page, server):105 await page.goto(server.EMPTY_PAGE)106 requests: List[Request] = []107 page.on("request", lambda r: requests.append(r))108 await page.evaluate('() => fetch("/digits/1.png")')109 requests = [r for r in requests if "favicon" not in r.url]110 assert len(requests) == 1111 assert requests[0].frame == page.main_frame112async def test_request_headers_should_work(113 page, server, is_chromium, is_firefox, is_webkit114):115 response = await page.goto(server.EMPTY_PAGE)116 if is_chromium:117 assert "Chrome" in response.request.headers["user-agent"]118 elif is_firefox:119 assert "Firefox" in response.request.headers["user-agent"]120 elif is_webkit:121 assert "WebKit" in response.request.headers["user-agent"]122async def test_request_headers_should_get_the_same_headers_as_the_server(123 page: Page, server, is_webkit, is_win124):125 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()126 def handle(request):127 normalized_headers = {128 key.decode().lower(): value[0].decode()129 for key, value in request.requestHeaders.getAllRawHeaders()130 }131 server_request_headers_future.set_result(normalized_headers)132 request.write(b"done")133 request.finish()134 server.set_route("/empty.html", handle)135 response = await page.goto(server.EMPTY_PAGE)136 server_headers = await server_request_headers_future137 if is_webkit and is_win:138 # Curl does not show accept-encoding and accept-language139 server_headers.pop("accept-encoding")140 server_headers.pop("accept-language")141 assert cast(Response, response).request.headers == server_headers142async def test_request_headers_should_get_the_same_headers_as_the_server_cors(143 page: Page, server, is_webkit, is_win144):145 await page.goto(server.PREFIX + "/empty.html")146 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()147 def handle_something(request):148 normalized_headers = {149 key.decode().lower(): value[0].decode()150 for key, value in request.requestHeaders.getAllRawHeaders()151 }152 server_request_headers_future.set_result(normalized_headers)153 request.setHeader("Access-Control-Allow-Origin", "*")154 request.write(b"done")155 request.finish()156 server.set_route("/something", handle_something)157 text = None158 async with page.expect_request("**/*") as request_info:159 text = await page.evaluate(160 """async url => {161 const data = await fetch(url);162 return data.text();163 }""",164 server.CROSS_PROCESS_PREFIX + "/something",165 )166 request = await request_info.value167 assert text == "done"168 server_headers = await server_request_headers_future169 if is_webkit and is_win:170 # Curl does not show accept-encoding and accept-language171 server_headers.pop("accept-encoding")172 server_headers.pop("accept-language")173 assert request.headers == server_headers174async def test_response_headers_should_work(page, server):175 server.set_route("/empty.html", lambda r: (r.setHeader("foo", "bar"), r.finish()))176 response = await page.goto(server.EMPTY_PAGE)177 assert response.headers["foo"] == "bar"178async def test_request_post_data_should_work(page, server):179 await page.goto(server.EMPTY_PAGE)180 server.set_route("/post", lambda r: r.finish())181 requests = []182 page.on("request", lambda r: requests.append(r))183 await page.evaluate(184 '() => fetch("./post", { method: "POST", body: JSON.stringify({foo: "bar"})})'185 )186 assert len(requests) == 1187 assert requests[0].post_data == '{"foo":"bar"}'188async def test_request_post_data__should_be_undefined_when_there_is_no_post_data(189 page, server190):191 response = await page.goto(server.EMPTY_PAGE)192 assert response.request.post_data is None193async def test_should_parse_the_json_post_data(page, server):194 await page.goto(server.EMPTY_PAGE)195 server.set_route("/post", lambda req: req.finish())196 requests = []197 page.on("request", lambda r: requests.append(r))198 await page.evaluate(199 """() => fetch('./post', { method: 'POST', body: JSON.stringify({ foo: 'bar' }) })"""200 )201 assert len(requests) == 1202 assert requests[0].post_data_json == {"foo": "bar"}203async def test_should_parse_the_data_if_content_type_is_form_urlencoded(page, server):204 await page.goto(server.EMPTY_PAGE)205 server.set_route("/post", lambda req: req.finish())206 requests = []207 page.on("request", lambda r: requests.append(r))208 await page.set_content(209 """<form method='POST' action='/post'><input type='text' name='foo' value='bar'><input type='number' name='baz' value='123'><input type='submit'></form>"""210 )211 await page.click("input[type=submit]")212 assert len(requests) == 1213 assert requests[0].post_data_json == {"foo": "bar", "baz": "123"}214async def test_should_be_undefined_when_there_is_no_post_data(page, server):215 response = await page.goto(server.EMPTY_PAGE)216 assert response.request.post_data_json is None217async def test_should_work_with_binary_post_data(page, server):218 await page.goto(server.EMPTY_PAGE)219 server.set_route("/post", lambda req: req.finish())220 requests = []221 page.on("request", lambda r: requests.append(r))222 await page.evaluate(223 """async () => {224 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })225 }"""226 )227 assert len(requests) == 1228 buffer = requests[0].post_data_buffer229 assert len(buffer) == 256230 for i in range(256):231 assert buffer[i] == i232async def test_should_work_with_binary_post_data_and_interception(page, server):233 await page.goto(server.EMPTY_PAGE)234 server.set_route("/post", lambda req: req.finish())235 requests = []236 await page.route("/post", lambda route: asyncio.ensure_future(route.continue_()))237 page.on("request", lambda r: requests.append(r))238 await page.evaluate(239 """async () => {240 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })241 }"""242 )243 assert len(requests) == 1244 buffer = requests[0].post_data_buffer245 assert len(buffer) == 256246 for i in range(256):247 assert buffer[i] == i248async def test_response_text_should_work(page, server):249 response = await page.goto(server.PREFIX + "/simple.json")250 assert await response.text() == '{"foo": "bar"}\n'251async def test_response_text_should_return_uncompressed_text(page, server):252 server.enable_gzip("/simple.json")253 response = await page.goto(server.PREFIX + "/simple.json")254 assert response.headers["content-encoding"] == "gzip"255 assert await response.text() == '{"foo": "bar"}\n'256async def test_response_text_should_throw_when_requesting_body_of_redirected_response(257 page, server258):259 server.set_redirect("/foo.html", "/empty.html")260 response = await page.goto(server.PREFIX + "/foo.html")261 redirected_from = response.request.redirected_from262 assert redirected_from263 redirected = await redirected_from.response()264 assert redirected.status == 302265 error = None266 try:267 await redirected.text()268 except Error as exc:269 error = exc270 assert "Response body is unavailable for redirect responses" in error.message271async def test_response_json_should_work(page, server):272 response = await page.goto(server.PREFIX + "/simple.json")273 assert await response.json() == {"foo": "bar"}274async def test_response_body_should_work(page, server, assetdir):275 response = await page.goto(server.PREFIX + "/pptr.png")276 with open(277 assetdir / "pptr.png",278 "rb",279 ) as fd:280 assert fd.read() == await response.body()281async def test_response_body_should_work_with_compression(page, server, assetdir):282 server.enable_gzip("/pptr.png")283 response = await page.goto(server.PREFIX + "/pptr.png")284 with open(285 assetdir / "pptr.png",286 "rb",287 ) as fd:288 assert fd.read() == await response.body()289async def test_response_status_text_should_work(page, server):290 server.set_route("/cool", lambda r: (r.setResponseCode(200, b"cool!"), r.finish()))291 response = await page.goto(server.PREFIX + "/cool")292 assert response.status_text == "cool!"293async def test_request_resource_type_should_return_event_source(page, server):294 SSE_MESSAGE = {"foo": "bar"}295 # 1. Setup server-sent events on server that immediately sends a message to the client.296 server.set_route(297 "/sse",298 lambda r: (299 r.setHeader("Content-Type", "text/event-stream"),300 r.setHeader("Connection", "keep-alive"),301 r.setHeader("Cache-Control", "no-cache"),302 r.setResponseCode(200),303 r.write(f"data: {json.dumps(SSE_MESSAGE)}\n\n".encode()),304 r.finish(),305 ),306 )307 # 2. Subscribe to page request events.308 await page.goto(server.EMPTY_PAGE)309 requests = []310 page.on("request", lambda r: requests.append(r))311 # 3. Connect to EventSource in browser and return first message.312 assert (313 await page.evaluate(314 """() => {315 const eventSource = new EventSource('/sse');316 return new Promise(resolve => {317 eventSource.onmessage = e => resolve(JSON.parse(e.data));318 });319 }"""320 )321 == SSE_MESSAGE322 )323 assert requests[0].resource_type == "eventsource"324async def test_network_events_request(page, server):325 requests = []326 page.on("request", lambda r: requests.append(r))327 await page.goto(server.EMPTY_PAGE)328 assert len(requests) == 1329 assert requests[0].url == server.EMPTY_PAGE330 assert requests[0].resource_type == "document"331 assert requests[0].method == "GET"332 assert await requests[0].response()333 assert requests[0].frame == page.main_frame334 assert requests[0].frame.url == server.EMPTY_PAGE335async def test_network_events_response(page, server):336 responses = []337 page.on("response", lambda r: responses.append(r))338 await page.goto(server.EMPTY_PAGE)339 assert len(responses) == 1340 assert responses[0].url == server.EMPTY_PAGE341 assert responses[0].status == 200342 assert responses[0].ok343 assert responses[0].request344async def test_network_events_request_failed(345 page, server, is_chromium, is_webkit, is_mac, is_win346):347 def handle_request(request):348 request.setHeader("Content-Type", "text/css")349 request.transport.loseConnection()350 server.set_route("/one-style.css", handle_request)351 failed_requests = []352 page.on("requestfailed", lambda request: failed_requests.append(request))353 await page.goto(server.PREFIX + "/one-style.html")354 assert len(failed_requests) == 1355 assert "one-style.css" in failed_requests[0].url356 assert await failed_requests[0].response() is None357 assert failed_requests[0].resource_type == "stylesheet"358 if is_chromium:359 assert failed_requests[0].failure == "net::ERR_EMPTY_RESPONSE"360 elif is_webkit:361 if is_mac:362 assert failed_requests[0].failure == "The network connection was lost."363 elif is_win:364 assert (365 failed_requests[0].failure366 == "Server returned nothing (no headers, no data)"367 )368 else:369 assert failed_requests[0].failure == "Message Corrupt"370 else:371 assert failed_requests[0].failure == "NS_ERROR_NET_RESET"372 assert failed_requests[0].frame373async def test_network_events_request_finished(page, server):374 async with page.expect_event("requestfinished") as event_info:375 await page.goto(server.EMPTY_PAGE)376 request = await event_info.value377 assert request.url == server.EMPTY_PAGE378 assert await request.response()379 assert request.frame == page.main_frame380 assert request.frame.url == server.EMPTY_PAGE381async def test_network_events_should_fire_events_in_proper_order(page, server):382 events = []383 page.on("request", lambda request: events.append("request"))384 page.on("response", lambda response: events.append("response"))385 response = await page.goto(server.EMPTY_PAGE)386 await response.finished()387 events.append("requestfinished")388 assert events == ["request", "response", "requestfinished"]389async def test_network_events_should_support_redirects(page, server):390 events = []391 page.on("request", lambda request: events.append(f"{request.method} {request.url}"))392 page.on(393 "response", lambda response: events.append(f"{response.status} {response.url}")394 )395 page.on("requestfinished", lambda request: events.append(f"DONE {request.url}"))396 page.on("requestfailed", lambda request: events.append(f"FAIL {request.url}"))397 server.set_redirect("/foo.html", "/empty.html")398 FOO_URL = server.PREFIX + "/foo.html"399 response = await page.goto(FOO_URL)400 await response.finished()401 assert events == [402 f"GET {FOO_URL}",403 f"302 {FOO_URL}",404 f"DONE {FOO_URL}",405 f"GET {server.EMPTY_PAGE}",406 f"200 {server.EMPTY_PAGE}",407 f"DONE {server.EMPTY_PAGE}",408 ]409 redirected_from = response.request.redirected_from410 assert "/foo.html" in redirected_from.url411 assert redirected_from.redirected_from is None412 assert redirected_from.redirected_to == response.request413async def test_request_is_navigation_request_should_work(page, server):414 pytest.skip(msg="test")415 requests = {}416 def handle_request(request):417 requests[request.url().split("/").pop()] = request418 page.on("request", handle_request)419 server.set_redirect("/rrredirect", "/frames/one-frame.html")420 await page.goto(server.PREFIX + "/rrredirect")421 print("kek")422 assert requests.get("rrredirect").is_navigation_request()423 assert requests.get("one-frame.html").is_navigation_request()424 assert requests.get("frame.html").is_navigation_request()425 assert requests.get("script.js").is_navigation_request() is False426 assert requests.get("style.css").is_navigation_request() is False427async def test_request_is_navigation_request_should_work_when_navigating_to_image(428 page, server429):430 requests = []431 page.on("request", lambda r: requests.append(r))432 await page.goto(server.PREFIX + "/pptr.png")433 assert requests[0].is_navigation_request()434async def test_set_extra_http_headers_should_work(page, server):435 await page.set_extra_http_headers({"foo": "bar"})436 request = (437 await asyncio.gather(438 server.wait_for_request("/empty.html"),439 page.goto(server.EMPTY_PAGE),440 )441 )[0]442 assert request.getHeader("foo") == "bar"443async def test_set_extra_http_headers_should_work_with_redirects(page, server):444 server.set_redirect("/foo.html", "/empty.html")445 await page.set_extra_http_headers({"foo": "bar"})446 request = (447 await asyncio.gather(448 server.wait_for_request("/empty.html"),449 page.goto(server.PREFIX + "/foo.html"),450 )451 )[0]452 assert request.getHeader("foo") == "bar"453async def test_set_extra_http_headers_should_work_with_extra_headers_from_browser_context(454 browser, server455):456 context = await browser.new_context()457 await context.set_extra_http_headers({"foo": "bar"})458 page = await context.new_page()459 request = (460 await asyncio.gather(461 server.wait_for_request("/empty.html"),462 page.goto(server.EMPTY_PAGE),463 )464 )[0]465 await context.close()466 assert request.getHeader("foo") == "bar"467async def test_set_extra_http_headers_should_override_extra_headers_from_browser_context(468 browser, server469):470 context = await browser.new_context(extra_http_headers={"fOo": "bAr", "baR": "foO"})471 page = await context.new_page()472 await page.set_extra_http_headers({"Foo": "Bar"})473 request = (474 await asyncio.gather(475 server.wait_for_request("/empty.html"),476 page.goto(server.EMPTY_PAGE),477 )478 )[0]479 await context.close()480 assert request.getHeader("foo") == "Bar"481 assert request.getHeader("bar") == "foO"482async def test_set_extra_http_headers_should_throw_for_non_string_header_values(483 page, server484):485 error = None486 try:487 await page.set_extra_http_headers({"foo": 1})488 except Error as exc:489 error = exc...

Full Screen

Full Screen

_fetch.py

Source:_fetch.py Github

copy

Full Screen

1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import base6415import json16import pathlib17import typing18from pathlib import Path19from typing import Any, Dict, List, Optional, Union, cast20import playwright._impl._network as network21from playwright._impl._api_structures import (22 FilePayload,23 FormField,24 Headers,25 HttpCredentials,26 ProxySettings,27 ServerFilePayload,28 StorageState,29)30from playwright._impl._connection import ChannelOwner, from_channel31from playwright._impl._helper import (32 Error,33 NameValue,34 async_readfile,35 async_writefile,36 is_file_payload,37 is_safe_close_error,38 locals_to_params,39 object_to_array,40)41from playwright._impl._network import serialize_headers42from playwright._impl._tracing import Tracing43if typing.TYPE_CHECKING:44 from playwright._impl._playwright import Playwright45class APIRequest:46 def __init__(self, playwright: "Playwright") -> None:47 self.playwright = playwright48 self._loop = playwright._loop49 self._dispatcher_fiber = playwright._connection._dispatcher_fiber50 async def new_context(51 self,52 baseURL: str = None,53 extraHTTPHeaders: Dict[str, str] = None,54 httpCredentials: HttpCredentials = None,55 ignoreHTTPSErrors: bool = None,56 proxy: ProxySettings = None,57 userAgent: str = None,58 timeout: float = None,59 storageState: Union[StorageState, str, Path] = None,60 ) -> "APIRequestContext":61 params = locals_to_params(locals())62 if "storageState" in params:63 storage_state = params["storageState"]64 if not isinstance(storage_state, dict) and storage_state:65 params["storageState"] = json.loads(66 (await async_readfile(storage_state)).decode()67 )68 if "extraHTTPHeaders" in params:69 params["extraHTTPHeaders"] = serialize_headers(params["extraHTTPHeaders"])70 context = cast(71 APIRequestContext,72 from_channel(await self.playwright._channel.send("newRequest", params)),73 )74 context._tracing._local_utils = self.playwright._utils75 return context76class APIRequestContext(ChannelOwner):77 def __init__(78 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict79 ) -> None:80 super().__init__(parent, type, guid, initializer)81 self._tracing: Tracing = from_channel(initializer["tracing"])82 async def dispose(self) -> None:83 await self._channel.send("dispose")84 async def delete(85 self,86 url: str,87 params: Dict[str, Union[bool, float, str]] = None,88 headers: Headers = None,89 data: Union[Any, bytes, str] = None,90 form: Dict[str, Union[bool, float, str]] = None,91 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,92 timeout: float = None,93 failOnStatusCode: bool = None,94 ignoreHTTPSErrors: bool = None,95 ) -> "APIResponse":96 return await self.fetch(97 url,98 method="DELETE",99 params=params,100 headers=headers,101 data=data,102 form=form,103 multipart=multipart,104 timeout=timeout,105 failOnStatusCode=failOnStatusCode,106 ignoreHTTPSErrors=ignoreHTTPSErrors,107 )108 async def head(109 self,110 url: str,111 params: Dict[str, Union[bool, float, str]] = None,112 headers: Headers = None,113 timeout: float = None,114 failOnStatusCode: bool = None,115 ignoreHTTPSErrors: bool = None,116 ) -> "APIResponse":117 return await self.fetch(118 url,119 method="HEAD",120 params=params,121 headers=headers,122 timeout=timeout,123 failOnStatusCode=failOnStatusCode,124 ignoreHTTPSErrors=ignoreHTTPSErrors,125 )126 async def get(127 self,128 url: str,129 params: Dict[str, Union[bool, float, str]] = None,130 headers: Headers = None,131 timeout: float = None,132 failOnStatusCode: bool = None,133 ignoreHTTPSErrors: bool = None,134 ) -> "APIResponse":135 return await self.fetch(136 url,137 method="GET",138 params=params,139 headers=headers,140 timeout=timeout,141 failOnStatusCode=failOnStatusCode,142 ignoreHTTPSErrors=ignoreHTTPSErrors,143 )144 async def patch(145 self,146 url: str,147 params: Dict[str, Union[bool, float, str]] = None,148 headers: Headers = None,149 data: Union[Any, bytes, str] = None,150 form: Dict[str, Union[bool, float, str]] = None,151 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,152 timeout: float = None,153 failOnStatusCode: bool = None,154 ignoreHTTPSErrors: bool = None,155 ) -> "APIResponse":156 return await self.fetch(157 url,158 method="PATCH",159 params=params,160 headers=headers,161 data=data,162 form=form,163 multipart=multipart,164 timeout=timeout,165 failOnStatusCode=failOnStatusCode,166 ignoreHTTPSErrors=ignoreHTTPSErrors,167 )168 async def put(169 self,170 url: str,171 params: Dict[str, Union[bool, float, str]] = None,172 headers: Headers = None,173 data: Union[Any, bytes, str] = None,174 form: Dict[str, Union[bool, float, str]] = None,175 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,176 timeout: float = None,177 failOnStatusCode: bool = None,178 ignoreHTTPSErrors: bool = None,179 ) -> "APIResponse":180 return await self.fetch(181 url,182 method="PUT",183 params=params,184 headers=headers,185 data=data,186 form=form,187 multipart=multipart,188 timeout=timeout,189 failOnStatusCode=failOnStatusCode,190 ignoreHTTPSErrors=ignoreHTTPSErrors,191 )192 async def post(193 self,194 url: str,195 params: Dict[str, Union[bool, float, str]] = None,196 headers: Headers = None,197 data: Union[Any, bytes, str] = None,198 form: Dict[str, Union[bool, float, str]] = None,199 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,200 timeout: float = None,201 failOnStatusCode: bool = None,202 ignoreHTTPSErrors: bool = None,203 ) -> "APIResponse":204 return await self.fetch(205 url,206 method="POST",207 params=params,208 headers=headers,209 data=data,210 form=form,211 multipart=multipart,212 timeout=timeout,213 failOnStatusCode=failOnStatusCode,214 ignoreHTTPSErrors=ignoreHTTPSErrors,215 )216 async def fetch(217 self,218 urlOrRequest: Union[str, network.Request],219 params: Dict[str, Union[bool, float, str]] = None,220 method: str = None,221 headers: Headers = None,222 data: Union[Any, bytes, str] = None,223 form: Dict[str, Union[bool, float, str]] = None,224 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,225 timeout: float = None,226 failOnStatusCode: bool = None,227 ignoreHTTPSErrors: bool = None,228 ) -> "APIResponse":229 request = urlOrRequest if isinstance(urlOrRequest, network.Request) else None230 assert request or isinstance(231 urlOrRequest, str232 ), "First argument must be either URL string or Request"233 assert (234 (1 if data else 0) + (1 if form else 0) + (1 if multipart else 0)235 ) <= 1, "Only one of 'data', 'form' or 'multipart' can be specified"236 url = request.url if request else urlOrRequest237 method = method or (request.method if request else "GET")238 # Cannot call allHeaders() here as the request may be paused inside route handler.239 headers_obj = headers or (request.headers if request else None)240 serialized_headers = serialize_headers(headers_obj) if headers_obj else None241 json_data: Any = None242 form_data: Optional[List[NameValue]] = None243 multipart_data: Optional[List[FormField]] = None244 post_data_buffer: Optional[bytes] = None245 if data:246 if isinstance(data, str):247 if is_json_content_type(serialized_headers):248 json_data = data249 else:250 post_data_buffer = data.encode()251 elif isinstance(data, bytes):252 post_data_buffer = data253 elif isinstance(data, (dict, list, int, bool)):254 json_data = data255 else:256 raise Error(f"Unsupported 'data' type: {type(data)}")257 elif form:258 form_data = object_to_array(form)259 elif multipart:260 multipart_data = []261 # Convert file-like values to ServerFilePayload structs.262 for name, value in multipart.items():263 if is_file_payload(value):264 payload = cast(FilePayload, value)265 assert isinstance(266 payload["buffer"], bytes267 ), f"Unexpected buffer type of 'data.{name}'"268 multipart_data.append(269 FormField(name=name, file=file_payload_to_json(payload))270 )271 elif isinstance(value, str):272 multipart_data.append(FormField(name=name, value=value))273 if (274 post_data_buffer is None275 and json_data is None276 and form_data is None277 and multipart_data is None278 ):279 post_data_buffer = request.post_data_buffer if request else None280 post_data = (281 base64.b64encode(post_data_buffer).decode() if post_data_buffer else None282 )283 def filter_none(input: Dict) -> Dict:284 return {k: v for k, v in input.items() if v is not None}285 response = await self._channel.send(286 "fetch",287 filter_none(288 {289 "url": url,290 "params": object_to_array(params),291 "method": method,292 "headers": serialized_headers,293 "postData": post_data,294 "jsonData": json_data,295 "formData": form_data,296 "multipartData": multipart_data,297 "timeout": timeout,298 "failOnStatusCode": failOnStatusCode,299 "ignoreHTTPSErrors": ignoreHTTPSErrors,300 }301 ),302 )303 return APIResponse(self, response)304 async def storage_state(305 self, path: Union[pathlib.Path, str] = None306 ) -> StorageState:307 result = await self._channel.send_return_as_dict("storageState")308 if path:309 await async_writefile(path, json.dumps(result))310 return result311def file_payload_to_json(payload: FilePayload) -> ServerFilePayload:312 return ServerFilePayload(313 name=payload["name"],314 mimeType=payload["mimeType"],315 buffer=base64.b64encode(payload["buffer"]).decode(),316 )317class APIResponse:318 def __init__(self, context: APIRequestContext, initializer: Dict) -> None:319 self._loop = context._loop320 self._dispatcher_fiber = context._connection._dispatcher_fiber321 self._request = context322 self._initializer = initializer323 self._headers = network.RawHeaders(initializer["headers"])324 def __repr__(self) -> str:325 return f"<APIResponse url={self.url!r} status={self.status!r} status_text={self.status_text!r}>"326 @property327 def ok(self) -> bool:328 return self.status >= 200 and self.status <= 299329 @property330 def url(self) -> str:331 return self._initializer["url"]332 @property333 def status(self) -> int:334 return self._initializer["status"]335 @property336 def status_text(self) -> str:337 return self._initializer["statusText"]338 @property339 def headers(self) -> Headers:340 return self._headers.headers()341 @property342 def headers_array(self) -> network.HeadersArray:343 return self._headers.headers_array()344 async def body(self) -> bytes:345 try:346 result = await self._request._channel.send_return_as_dict(347 "fetchResponseBody",348 {349 "fetchUid": self._fetch_uid,350 },351 )352 if result is None:353 raise Error("Response has been disposed")354 return base64.b64decode(result["binary"])355 except Error as exc:356 if is_safe_close_error(exc):357 raise Error("Response has been disposed")358 raise exc359 async def text(self) -> str:360 content = await self.body()361 return content.decode()362 async def json(self) -> Any:363 content = await self.text()364 return json.loads(content)365 async def dispose(self) -> None:366 await self._request._channel.send(367 "disposeAPIResponse",368 {369 "fetchUid": self._fetch_uid,370 },371 )372 @property373 def _fetch_uid(self) -> str:374 return self._initializer["fetchUid"]375 async def _fetch_log(self) -> List[str]:376 return await self._request._channel.send(377 "fetchLog",378 {379 "fetchUid": self._fetch_uid,380 },381 )382def is_json_content_type(headers: network.HeadersArray = None) -> bool:383 if not headers:384 return False385 for header in headers:386 if header["name"] == "Content-Type":387 return header["value"].startswith("application/json")...

Full Screen

Full Screen

_network.py

Source:_network.py Github

copy

Full Screen

...80 if content_type == "application/x-www-form-urlencoded":81 return dict(parse.parse_qsl(post_data))82 return json.loads(post_data)83 @property84 def post_data_buffer(self) -> Optional[bytes]:85 b64_content = self._initializer.get("postData")86 if not b64_content:87 return None88 return base64.b64decode(b64_content)89 @property90 def headers(self) -> Dict[str, str]:91 return self._headers92 async def response(self) -> Optional["Response"]:93 return from_nullable_channel(await self._channel.send("response"))94 @property95 def frame(self) -> "Frame":96 return from_channel(self._initializer["frame"])97 def is_navigation_request(self) -> bool:98 return self._initializer["isNavigationRequest"]...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1from os import device_encoding2import sys3import asyncio4import requests5import time6import aiohttp7import json8import random9import argparse10from threading import Thread11from playwright.async_api import async_playwright12parser = argparse.ArgumentParser(description='Script to generate an unlabeled dataset for Poseidon')13parser.add_argument('--threads', dest='threads', action='store', type=int, default=1, help='How many browsers to run in parallel for collection')14parser.add_argument('--proxy-url', dest='proxy_url', action='store', type=str, required=False, help='The url of the proxy')15parser.add_argument('--proxy-port-min', dest='proxy_port_min', action='store', type=int, required=False, help='The minimum port (if one port then set --proxy-port-max to --proxy-port-min)')16parser.add_argument('--proxy-port-max', dest='proxy_port_max', action='store', type=int, required=False, help='The maximum port')17parser.add_argument('--proxy-user', dest='proxy_user', action='store', type=str, required=False, help='Username of proxy auth (if auth exists)')18parser.add_argument('--proxy-pass', dest='proxy_pass', action='store', type=str, required=False, help='Password of proxy auth (if auth exists)')19args = parser.parse_args()20if args.proxy_url and (args.proxy_port_min is None or args.proxy_port_max is None):21 parser.error("--proxy-url requires --proxy-port-min and --proxy-port-max.")22if args.proxy_url and len([x for x in (args.proxy_user,args.proxy_pass) if x is not None]) == 1:23 parser.error("--proxy-user and --proxy-pass must be given together.")24LANDING="https://recaptcha-demo.appspot.com/recaptcha-v2-invisible.php"25async def download_image(response):26 with open('parts/' + str(time.time_ns()) + str(random.randint(0, 10000)) + '.jpeg', 'wb') as f:27 f.write(await response.body())28async def capture_route(session, route):29 request = route.request30 headers = request.headers31 method = request.method32 post_data_buffer = request.post_data_buffer33 url = request.url34 if LANDING in url:35 with open('document.html') as doc:36 return await route.fulfill(body=doc.read(), content_type='text/html', status=200, headers={ 'Access-Control-Allow-Origin': '*' })37 proxy = None38 if args.proxy_url is not None:39 if args.proxy_user is not None:40 proxy = 'http://{}:{}@{}:{}'.format(args.proxy_user, args.proxy_pass, args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))41 else:42 proxy = 'http://{}:{}'.format(args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))43 44 async with session.request(method=method, url=url, data=post_data_buffer, headers=headers, proxy=proxy) as response:45 content_type = response.headers.get('content-type')46 47 print(response.status, response.url)48 return await route.fulfill(body=await response.read(), content_type=content_type, status=response.status, headers=response.headers)49async def main_async():50 async with aiohttp.ClientSession() as session:51 async with async_playwright() as p:52 iphone_11 = p.devices["iPhone 11 Pro"]53 browser = await p.chromium.launch(headless=True)54 context = await browser.new_context(55 # **iphone_11,56 locale="en-US",57 geolocation={"longitude": 12.492507, "latitude": 41.889938 },58 permissions=["geolocation"],59 )60 page = await context.new_page()61 async def load_page():62 if len(list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))) > 0:63 bframe = list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))[0]64 print(bframe)65 el = await bframe.wait_for_selector('.rc-button-reload')66 await el.click()67 else:68 try:69 await page.goto(LANDING)70 el = await page.wait_for_selector('#submit')71 await el.click()72 except Exception:73 return await load_page()74 async def ensure_image(response):75 if '/api2/payload' in response.url:76 asyncio.ensure_future(download_image(response))77 elif '/api2/userverify' not in response.url: return78 await load_page()79 80 async def check_labels(frame):81 if 'bframe' in frame.url:82 el = await frame.wait_for_selector('strong')83 label = await el.text_content()84 with open('labels.json', "r+") as f:85 try:86 data = json.loads(f.read())87 except Exception:88 data = []89 90 if label not in data:91 data.append(label)92 93 f.seek(0)94 f.write(json.dumps(data))95 f.truncate()96 97 return await load_page()98 # return await check_labels(frame)99 await page.route('**', lambda route: asyncio.ensure_future(capture_route(session, route)))100 page.on('response', lambda response: asyncio.ensure_future(ensure_image(response)))101 page.on('framenavigated', lambda frame: asyncio.ensure_future(check_labels(frame)))102 await load_page()103 print('Press CTRL-D to stop')104 reader = asyncio.StreamReader()105 pipe = sys.stdin106 loop = asyncio.get_event_loop()107 await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), pipe)108 async for line in reader:109 print(f'Got: {line.decode()!r}')110def main():111 asyncio.run(main_async())112if __name__ == '__main__':113 for i in range(args.threads):114 Thread(target=main).start()...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful