How to use is_navigation_request method in Playwright Python

Best Python code snippet using playwright-python

test_interception.py

Source:test_interception.py Github

copy

Full Screen

...22 assert "empty.html" in request.url23 assert request.headers["user-agent"]24 assert request.method == "GET"25 assert request.post_data is None26 assert request.is_navigation_request()27 assert request.resource_type == "document"28 assert request.frame == page.main_frame29 assert request.frame.url == "about:blank"30 await route.continue_()31 intercepted.append(True)32 await page.route("**/empty.html", handle_request)33 response = await page.goto(server.EMPTY_PAGE)34 assert response.ok35 assert len(intercepted) == 136async def test_page_route_should_unroute(page: Page, server):37 intercepted = []38 def handler1(route):39 intercepted.append(1)40 asyncio.create_task(route.continue_())41 await page.route("**/empty.html", handler1)42 await page.route(43 "**/empty.html",44 lambda route: (45 intercepted.append(2), # type: ignore46 asyncio.create_task(route.continue_()),47 ),48 )49 await page.route(50 "**/empty.html",51 lambda route: (52 intercepted.append(3), # type: ignore53 asyncio.create_task(route.continue_()),54 ),55 )56 await page.route(57 "**/*",58 lambda route: (59 intercepted.append(4), # type: ignore60 asyncio.create_task(route.continue_()),61 ),62 )63 await page.goto(server.EMPTY_PAGE)64 assert intercepted == [1]65 intercepted = []66 await page.unroute("**/empty.html", handler1)67 await page.goto(server.EMPTY_PAGE)68 assert intercepted == [2]69 intercepted = []70 await page.unroute("**/empty.html")71 await page.goto(server.EMPTY_PAGE)72 assert intercepted == [4]73async def test_page_route_should_work_when_POST_is_redirected_with_302(page, server):74 server.set_redirect("/rredirect", "/empty.html")75 await page.goto(server.EMPTY_PAGE)76 await page.route("**/*", lambda route: route.continue_())77 await page.set_content(78 """79 <form action='/rredirect' method='post'>80 <input type="hidden" id="foo" name="foo" value="FOOBAR">81 </form>82 """83 )84 async with page.expect_navigation():85 await page.eval_on_selector("form", "form => form.submit()"),86# @see https://github.com/GoogleChrome/puppeteer/issues/397387async def test_page_route_should_work_when_header_manipulation_headers_with_redirect(88 page, server89):90 server.set_redirect("/rrredirect", "/empty.html")91 await page.route(92 "**/*",93 lambda route: route.continue_(headers={**route.request.headers, "foo": "bar"}),94 )95 await page.goto(server.PREFIX + "/rrredirect")96# @see https://github.com/GoogleChrome/puppeteer/issues/474397async def test_page_route_should_be_able_to_remove_headers(page, server):98 async def handle_request(route):99 headers = route.request.headers100 if "origin" in headers:101 del headers["origin"]102 await route.continue_(headers=headers)103 await page.route(104 "**/*", # remove "origin" header105 handle_request,106 )107 [serverRequest, _] = await asyncio.gather(108 server.wait_for_request("/empty.html"), page.goto(server.PREFIX + "/empty.html")109 )110 assert serverRequest.getHeader("origin") is None111async def test_page_route_should_contain_referer_header(page, server):112 requests = []113 await page.route(114 "**/*",115 lambda route: (116 requests.append(route.request),117 asyncio.create_task(route.continue_()),118 ),119 )120 await page.goto(server.PREFIX + "/one-style.html")121 assert "/one-style.css" in requests[1].url122 assert "/one-style.html" in requests[1].headers["referer"]123async def test_page_route_should_properly_return_navigation_response_when_URL_has_cookies(124 context, page, server125):126 # Setup cookie.127 await page.goto(server.EMPTY_PAGE)128 await context.add_cookies(129 [{"url": server.EMPTY_PAGE, "name": "foo", "value": "bar"}]130 )131 # Setup request interception.132 await page.route("**/*", lambda route: route.continue_())133 response = await page.reload()134 assert response.status == 200135async def test_page_route_should_show_custom_HTTP_headers(page, server):136 await page.set_extra_http_headers({"foo": "bar"})137 def assert_headers(request):138 assert request.headers["foo"] == "bar"139 await page.route(140 "**/*",141 lambda route: (142 assert_headers(route.request),143 asyncio.create_task(route.continue_()),144 ),145 )146 response = await page.goto(server.EMPTY_PAGE)147 assert response.ok148# @see https://github.com/GoogleChrome/puppeteer/issues/4337149async def test_page_route_should_work_with_redirect_inside_sync_XHR(page, server):150 await page.goto(server.EMPTY_PAGE)151 server.set_redirect("/logo.png", "/pptr.png")152 await page.route("**/*", lambda route: route.continue_())153 status = await page.evaluate(154 """async() => {155 const request = new XMLHttpRequest();156 request.open('GET', '/logo.png', false); // `false` makes the request synchronous157 request.send(null);158 return request.status;159 }"""160 )161 assert status == 200162async def test_page_route_should_work_with_custom_referer_headers(page, server):163 await page.set_extra_http_headers({"referer": server.EMPTY_PAGE})164 def assert_headers(route):165 assert route.request.headers["referer"] == server.EMPTY_PAGE166 await page.route(167 "**/*",168 lambda route: (169 assert_headers(route),170 asyncio.create_task(route.continue_()),171 ),172 )173 response = await page.goto(server.EMPTY_PAGE)174 assert response.ok175async def test_page_route_should_be_abortable(page, server):176 await page.route(r"/\.css$/", lambda route: asyncio.create_task(route.abort()))177 failed = []178 def handle_request(request):179 if request.url.includes(".css"):180 failed.append(True)181 page.on("requestfailed", handle_request)182 response = await page.goto(server.PREFIX + "/one-style.html")183 assert response.ok184 assert response.request.failure is None185 assert len(failed) == 0186async def test_page_route_should_be_abortable_with_custom_error_codes(187 page: Page, server, is_webkit, is_firefox188):189 await page.route(190 "**/*",191 lambda route: route.abort("internetdisconnected"),192 )193 failed_requests = []194 page.on("requestfailed", lambda request: failed_requests.append(request))195 with pytest.raises(Error):196 await page.goto(server.EMPTY_PAGE)197 assert len(failed_requests) == 1198 failed_request = failed_requests[0]199 if is_webkit:200 assert failed_request.failure == "Request intercepted"201 elif is_firefox:202 assert failed_request.failure == "NS_ERROR_OFFLINE"203 else:204 assert failed_request.failure == "net::ERR_INTERNET_DISCONNECTED"205async def test_page_route_should_send_referer(page, server):206 await page.set_extra_http_headers({"referer": "http://google.com/"})207 await page.route("**/*", lambda route: route.continue_())208 [request, _] = await asyncio.gather(209 server.wait_for_request("/grid.html"),210 page.goto(server.PREFIX + "/grid.html"),211 )212 assert request.getHeader("referer") == "http://google.com/"213async def test_page_route_should_fail_navigation_when_aborting_main_resource(214 page, server, is_webkit, is_firefox215):216 await page.route("**/*", lambda route: route.abort())217 with pytest.raises(Error) as exc:218 await page.goto(server.EMPTY_PAGE)219 assert exc220 if is_webkit:221 assert "Request intercepted" in exc.value.message222 elif is_firefox:223 assert "NS_ERROR_FAILURE" in exc.value.message224 else:225 assert "net::ERR_FAILED" in exc.value.message226async def test_page_route_should_not_work_with_redirects(page, server):227 intercepted = []228 await page.route(229 "**/*",230 lambda route: (231 asyncio.create_task(route.continue_()),232 intercepted.append(route.request),233 ),234 )235 server.set_redirect("/non-existing-page.html", "/non-existing-page-2.html")236 server.set_redirect("/non-existing-page-2.html", "/non-existing-page-3.html")237 server.set_redirect("/non-existing-page-3.html", "/non-existing-page-4.html")238 server.set_redirect("/non-existing-page-4.html", "/empty.html")239 response = await page.goto(server.PREFIX + "/non-existing-page.html")240 assert response.status == 200241 assert "empty.html" in response.url242 assert len(intercepted) == 1243 assert intercepted[0].resource_type == "document"244 assert intercepted[0].is_navigation_request()245 assert "/non-existing-page.html" in intercepted[0].url246 chain = []247 r = response.request248 while r:249 chain.append(r)250 assert r.is_navigation_request()251 r = r.redirected_from252 assert len(chain) == 5253 assert "/empty.html" in chain[0].url254 assert "/non-existing-page-4.html" in chain[1].url255 assert "/non-existing-page-3.html" in chain[2].url256 assert "/non-existing-page-2.html" in chain[3].url257 assert "/non-existing-page.html" in chain[4].url258 for idx, _ in enumerate(chain):259 assert chain[idx].redirected_to == (chain[idx - 1] if idx > 0 else None)260async def test_page_route_should_work_with_redirects_for_subresources(page, server):261 intercepted = []262 await page.route(263 "**/*",264 lambda route: (265 asyncio.create_task(route.continue_()),266 intercepted.append(route.request),267 ),268 )269 server.set_redirect("/one-style.css", "/two-style.css")270 server.set_redirect("/two-style.css", "/three-style.css")271 server.set_redirect("/three-style.css", "/four-style.css")272 server.set_route(273 "/four-style.css",274 lambda req: (req.write(b"body {box-sizing: border-box; }"), req.finish()),275 )276 response = await page.goto(server.PREFIX + "/one-style.html")277 assert response.status == 200278 assert "one-style.html" in response.url279 assert len(intercepted) == 2280 assert intercepted[0].resource_type == "document"281 assert "one-style.html" in intercepted[0].url282 r = intercepted[1]283 for url in [284 "/one-style.css",285 "/two-style.css",286 "/three-style.css",287 "/four-style.css",288 ]:289 assert r.resource_type == "stylesheet"290 assert url in r.url291 r = r.redirected_to292 assert r is None293async def test_page_route_should_work_with_equal_requests(page, server):294 await page.goto(server.EMPTY_PAGE)295 hits = [True]296 def handle_request(request, hits):297 request.write(str(len(hits) * 11).encode())298 request.finish()299 hits.append(True)300 server.set_route("/zzz", lambda r: handle_request(r, hits))301 spinner = []302 async def handle_route(route):303 if len(spinner) == 1:304 await route.abort()305 spinner.pop(0)306 else:307 await route.continue_()308 spinner.append(True)309 # Cancel 2nd request.310 await page.route("**/*", handle_route)311 results = []312 for idx in range(3):313 results.append(314 await page.evaluate(315 """() => fetch('/zzz').then(response => response.text()).catch(e => 'FAILED')"""316 )317 )318 assert results == ["11", "FAILED", "22"]319async def test_page_route_should_navigate_to_dataURL_and_not_fire_dataURL_requests(320 page, server321):322 requests = []323 await page.route(324 "**/*",325 lambda route: (326 requests.append(route.request),327 asyncio.create_task(route.continue_()),328 ),329 )330 data_URL = "data:text/html,<div>yo</div>"331 response = await page.goto(data_URL)332 assert response is None333 assert len(requests) == 0334async def test_page_route_should_be_able_to_fetch_dataURL_and_not_fire_dataURL_requests(335 page, server336):337 await page.goto(server.EMPTY_PAGE)338 requests = []339 await page.route(340 "**/*",341 lambda route: (342 requests.append(route.request),343 asyncio.create_task(route.continue_()),344 ),345 )346 data_URL = "data:text/html,<div>yo</div>"347 text = await page.evaluate("url => fetch(url).then(r => r.text())", data_URL)348 assert text == "<div>yo</div>"349 assert len(requests) == 0350async def test_page_route_should_navigate_to_URL_with_hash_and_and_fire_requests_without_hash(351 page, server352):353 requests = []354 await page.route(355 "**/*",356 lambda route: (357 requests.append(route.request),358 asyncio.create_task(route.continue_()),359 ),360 )361 response = await page.goto(server.EMPTY_PAGE + "#hash")362 assert response.status == 200363 assert response.url == server.EMPTY_PAGE364 assert len(requests) == 1365 assert requests[0].url == server.EMPTY_PAGE366async def test_page_route_should_work_with_encoded_server(page, server):367 # The requestWillBeSent will report encoded URL, whereas interception will368 # report URL as-is. @see crbug.com/759388369 await page.route("**/*", lambda route: route.continue_())370 response = await page.goto(server.PREFIX + "/some nonexisting page")371 assert response.status == 404372async def test_page_route_should_work_with_encoded_server___2(page, server):373 # The requestWillBeSent will report URL as-is, whereas interception will374 # report encoded URL for stylesheet. @see crbug.com/759388375 requests = []376 await page.route(377 "**/*",378 lambda route: (379 asyncio.create_task(route.continue_()),380 requests.append(route.request),381 ),382 )383 response = await page.goto(384 f"""data:text/html,<link rel="stylesheet" href="{server.PREFIX}/fonts?helvetica|arial"/>"""385 )386 assert response is None387 assert len(requests) == 1388 assert (await requests[0].response()).status == 404389async def test_page_route_should_not_throw_Invalid_Interception_Id_if_the_request_was_cancelled(390 page, server391):392 await page.set_content("<iframe></iframe>")393 route_future = asyncio.Future()394 await page.route("**/*", lambda r, _: route_future.set_result(r))395 async with page.expect_request("**/*"):396 await page.eval_on_selector(397 "iframe", """(frame, url) => frame.src = url""", server.EMPTY_PAGE398 )399 # Delete frame to cause request to be canceled.400 await page.eval_on_selector("iframe", "frame => frame.remove()")401 route = await route_future402 await route.continue_()403async def test_page_route_should_intercept_main_resource_during_cross_process_navigation(404 page, server405):406 await page.goto(server.EMPTY_PAGE)407 intercepted = []408 await page.route(409 server.CROSS_PROCESS_PREFIX + "/empty.html",410 lambda route: (411 intercepted.append(True),412 asyncio.create_task(route.continue_()),413 ),414 )415 response = await page.goto(server.CROSS_PROCESS_PREFIX + "/empty.html")416 assert response.ok417 assert len(intercepted) == 1418@pytest.mark.skip_browser("webkit")419async def test_page_route_should_create_a_redirect(page, server):420 await page.goto(server.PREFIX + "/empty.html")421 async def handle_route(route, request):422 if request.url != (server.PREFIX + "/redirect_this"):423 return await route.continue_()424 await route.fulfill(status=301, headers={"location": "/empty.html"})425 await page.route(426 "**/*",427 handle_route,428 )429 text = await page.evaluate(430 """async url => {431 const data = await fetch(url);432 return data.text();433 }""",434 server.PREFIX + "/redirect_this",435 )436 assert text == ""437async def test_page_route_should_support_cors_with_GET(page, server):438 await page.goto(server.EMPTY_PAGE)439 async def handle_route(route, request):440 headers = (441 {"access-control-allow-origin": "*"}442 if request.url.endswith("allow")443 else {}444 )445 await route.fulfill(446 content_type="application/json",447 headers=headers,448 status=200,449 body=json.dumps(["electric", "gas"]),450 )451 await page.route(452 "**/cars*",453 handle_route,454 )455 # Should succeed456 resp = await page.evaluate(457 """async () => {458 const response = await fetch('https://example.com/cars?allow', { mode: 'cors' });459 return response.json();460 }"""461 )462 assert resp == ["electric", "gas"]463 # Should be rejected464 with pytest.raises(Error) as exc:465 await page.evaluate(466 """async () => {467 const response = await fetch('https://example.com/cars?reject', { mode: 'cors' });468 return response.json();469 }"""470 )471 assert "failed" in exc.value.message472async def test_page_route_should_support_cors_with_POST(page, server):473 await page.goto(server.EMPTY_PAGE)474 await page.route(475 "**/cars",476 lambda route: route.fulfill(477 content_type="application/json",478 headers={"Access-Control-Allow-Origin": "*"},479 status=200,480 body=json.dumps(["electric", "gas"]),481 ),482 )483 resp = await page.evaluate(484 """async () => {485 const response = await fetch('https://example.com/cars', {486 method: 'POST',487 headers: { 'Content-Type': 'application/json' },488 mode: 'cors',489 body: JSON.stringify({ 'number': 1 })490 });491 return response.json();492 }"""493 )494 assert resp == ["electric", "gas"]495async def test_page_route_should_support_cors_for_different_methods(page, server):496 await page.goto(server.EMPTY_PAGE)497 await page.route(498 "**/cars",499 lambda route, request: route.fulfill(500 content_type="application/json",501 headers={"Access-Control-Allow-Origin": "*"},502 status=200,503 body=json.dumps([request.method, "electric", "gas"]),504 ),505 )506 # First POST507 resp = await page.evaluate(508 """async () => {509 const response = await fetch('https://example.com/cars', {510 method: 'POST',511 headers: { 'Content-Type': 'application/json' },512 mode: 'cors',513 body: JSON.stringify({ 'number': 1 })514 });515 return response.json();516 }"""517 )518 assert resp == ["POST", "electric", "gas"]519 # Then DELETE520 resp = await page.evaluate(521 """async () => {522 const response = await fetch('https://example.com/cars', {523 method: 'DELETE',524 headers: {},525 mode: 'cors',526 body: ''527 });528 return response.json();529 }"""530 )531 assert resp == ["DELETE", "electric", "gas"]532async def test_request_fulfill_should_work_a(page, server):533 await page.route(534 "**/*",535 lambda route: route.fulfill(536 status=201,537 headers={"foo": "bar"},538 content_type="text/html",539 body="Yo, page!",540 ),541 )542 response = await page.goto(server.EMPTY_PAGE)543 assert response.status == 201544 assert response.headers["foo"] == "bar"545 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"546async def test_request_fulfill_should_work_with_status_code_422(page, server):547 await page.route(548 "**/*",549 lambda route: route.fulfill(status=422, body="Yo, page!"),550 )551 response = await page.goto(server.EMPTY_PAGE)552 assert response.status == 422553 assert response.status_text == "Unprocessable Entity"554 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"555async def test_request_fulfill_should_allow_mocking_binary_responses(556 page: Page, server, assert_to_be_golden, assetdir557):558 await page.route(559 "**/*",560 lambda route: route.fulfill(561 content_type="image/png",562 body=(assetdir / "pptr.png").read_bytes(),563 ),564 )565 await page.evaluate(566 """PREFIX => {567 const img = document.createElement('img');568 img.src = PREFIX + '/does-not-exist.png';569 document.body.appendChild(img);570 return new Promise(fulfill => img.onload = fulfill);571 }""",572 server.PREFIX,573 )574 img = await page.query_selector("img")575 assert img576 assert_to_be_golden(await img.screenshot(), "mock-binary-response.png")577async def test_request_fulfill_should_allow_mocking_svg_with_charset(578 page, server, assert_to_be_golden579):580 await page.route(581 "**/*",582 lambda route: route.fulfill(583 content_type="image/svg+xml ; charset=utf-8",584 body='<svg width="50" height="50" version="1.1" xmlns="http://www.w3.org/2000/svg"><rect x="10" y="10" width="30" height="30" stroke="black" fill="transparent" stroke-width="5"/></svg>',585 ),586 )587 await page.evaluate(588 """PREFIX => {589 const img = document.createElement('img');590 img.src = PREFIX + '/does-not-exist.svg';591 document.body.appendChild(img);592 return new Promise((f, r) => { img.onload = f; img.onerror = r; });593 }""",594 server.PREFIX,595 )596 img = await page.query_selector("img")597 assert_to_be_golden(await img.screenshot(), "mock-svg.png")598async def test_request_fulfill_should_work_with_file_path(599 page: Page, server, assert_to_be_golden, assetdir600):601 await page.route(602 "**/*",603 lambda route: route.fulfill(604 content_type="shouldBeIgnored", path=assetdir / "pptr.png"605 ),606 )607 await page.evaluate(608 """PREFIX => {609 const img = document.createElement('img');610 img.src = PREFIX + '/does-not-exist.png';611 document.body.appendChild(img);612 return new Promise(fulfill => img.onload = fulfill);613 }""",614 server.PREFIX,615 )616 img = await page.query_selector("img")617 assert img618 assert_to_be_golden(await img.screenshot(), "mock-binary-response.png")619async def test_request_fulfill_should_stringify_intercepted_request_response_headers(620 page, server621):622 await page.route(623 "**/*",624 lambda route: route.fulfill(625 status=200, headers={"foo": True}, body="Yo, page!"626 ),627 )628 response = await page.goto(server.EMPTY_PAGE)629 assert response.status == 200630 headers = response.headers631 assert headers["foo"] == "True"632 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"633async def test_request_fulfill_should_not_modify_the_headers_sent_to_the_server(634 page, server635):636 await page.goto(server.PREFIX + "/empty.html")637 interceptedRequests = []638 # this is just to enable request interception, which disables caching in chromium639 await page.route(server.PREFIX + "/unused", lambda route, req: None)640 server.set_route(641 "/something",642 lambda response: (643 interceptedRequests.append(response),644 response.setHeader("Access-Control-Allow-Origin", "*"),645 response.write(b"done"),646 response.finish(),647 ),648 )649 text = await page.evaluate(650 """async url => {651 const data = await fetch(url);652 return data.text();653 }""",654 server.CROSS_PROCESS_PREFIX + "/something",655 )656 assert text == "done"657 playwrightRequest = asyncio.Future()658 await page.route(659 server.CROSS_PROCESS_PREFIX + "/something",660 lambda route, request: (661 playwrightRequest.set_result(request),662 asyncio.create_task(route.continue_(headers={**request.headers})),663 ),664 )665 textAfterRoute = await page.evaluate(666 """async url => {667 const data = await fetch(url);668 return data.text();669 }""",670 server.CROSS_PROCESS_PREFIX + "/something",671 )672 assert textAfterRoute == "done"673 assert len(interceptedRequests) == 2674 assert (675 interceptedRequests[0].requestHeaders == interceptedRequests[1].requestHeaders676 )677async def test_request_fulfill_should_include_the_origin_header(page, server):678 await page.goto(server.PREFIX + "/empty.html")679 interceptedRequest = []680 await page.route(681 server.CROSS_PROCESS_PREFIX + "/something",682 lambda route, request: (683 interceptedRequest.append(request),684 asyncio.create_task(685 route.fulfill(686 headers={"Access-Control-Allow-Origin": "*"},687 content_type="text/plain",688 body="done",689 )690 ),691 ),692 )693 text = await page.evaluate(694 """async url => {695 const data = await fetch(url);696 return data.text();697 }""",698 server.CROSS_PROCESS_PREFIX + "/something",699 )700 assert text == "done"701 assert len(interceptedRequest) == 1702 assert interceptedRequest[0].headers["origin"] == server.PREFIX703async def test_request_fulfill_should_work_with_request_interception(page, server):704 requests = {}705 async def _handle_route(route: Route):706 requests[route.request.url.split("/").pop()] = route.request707 await route.continue_()708 await page.route("**/*", _handle_route)709 server.set_redirect("/rrredirect", "/frames/one-frame.html")710 await page.goto(server.PREFIX + "/rrredirect")711 assert requests["rrredirect"].is_navigation_request()712 assert requests["frame.html"].is_navigation_request()713 assert requests["script.js"].is_navigation_request() is False714 assert requests["style.css"].is_navigation_request() is False715async def test_Interception_should_work_with_request_interception(716 browser: Browser, https_server717):718 context = await browser.new_context(ignore_https_errors=True)719 page = await context.new_page()720 await page.route("**/*", lambda route: asyncio.ensure_future(route.continue_()))721 response = await page.goto(https_server.EMPTY_PAGE)722 assert response723 assert response.status == 200724 await context.close()725async def test_ignore_http_errors_service_worker_should_intercept_after_a_service_worker(726 page, server727):728 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")...

Full Screen

Full Screen

test_network.py

Source:test_network.py Github

copy

Full Screen

...23 assert "empty.html" in request.url24 assert request.headers["user-agent"]25 assert request.method == "GET"26 assert request.post_data is None27 assert request.is_navigation_request()28 assert request.resource_type == "document"29 assert request.frame == page.main_frame30 assert request.frame.url == "about:blank"31 await route.fulfill(body="Text")32 await page.route(33 "**/empty.html",34 lambda route, request: asyncio.create_task(handle_request(route, request)),35 )36 response = await page.goto(server.EMPTY_PAGE)37 assert response.ok38 assert await response.text() == "Text"39async def test_request_continue(page, server):40 async def handle_request(route, request, intercepted):41 intercepted.append(True)42 await route.continue_()43 intercepted = list()44 await page.route(45 "**/*",46 lambda route, request: asyncio.create_task(47 handle_request(route, request, intercepted)48 ),49 )50 response = await page.goto(server.EMPTY_PAGE)51 assert response.ok52 assert intercepted == [True]53 assert await page.title() == ""54async def test_page_events_request_should_fire_for_navigation_requests(55 page: Page, server56):57 requests = []58 page.on("request", lambda r: requests.append(r))59 await page.goto(server.EMPTY_PAGE)60 assert len(requests) == 161async def test_page_events_request_should_fire_for_iframes(page, server, utils):62 requests = []63 page.on("request", lambda r: requests.append(r))64 await page.goto(server.EMPTY_PAGE)65 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)66 assert len(requests) == 267async def test_page_events_request_should_fire_for_fetches(page, server):68 requests = []69 page.on("request", lambda r: requests.append(r))70 await page.goto(server.EMPTY_PAGE)71 await page.evaluate('() => fetch("/empty.html")')72 assert len(requests) == 273async def test_page_events_request_should_report_requests_and_responses_handled_by_service_worker(74 page: Page, server75):76 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")77 await page.evaluate("() => window.activationPromise")78 sw_response = None79 async with page.expect_request("**/*") as request_info:80 sw_response = await page.evaluate('() => fetchDummy("foo")')81 request = await request_info.value82 assert sw_response == "responseFromServiceWorker:foo"83 assert request.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"84 response = await request.response()85 assert response.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"86 assert await response.text() == "responseFromServiceWorker:foo"87async def test_request_frame_should_work_for_main_frame_navigation_request(88 page, server89):90 requests = []91 page.on("request", lambda r: requests.append(r))92 await page.goto(server.EMPTY_PAGE)93 assert len(requests) == 194 assert requests[0].frame == page.main_frame95async def test_request_frame_should_work_for_subframe_navigation_request(96 page, server, utils97):98 await page.goto(server.EMPTY_PAGE)99 requests = []100 page.on("request", lambda r: requests.append(r))101 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)102 assert len(requests) == 1103 assert requests[0].frame == page.frames[1]104async def test_request_frame_should_work_for_fetch_requests(page, server):105 await page.goto(server.EMPTY_PAGE)106 requests: List[Request] = []107 page.on("request", lambda r: requests.append(r))108 await page.evaluate('() => fetch("/digits/1.png")')109 requests = [r for r in requests if "favicon" not in r.url]110 assert len(requests) == 1111 assert requests[0].frame == page.main_frame112async def test_request_headers_should_work(113 page, server, is_chromium, is_firefox, is_webkit114):115 response = await page.goto(server.EMPTY_PAGE)116 if is_chromium:117 assert "Chrome" in response.request.headers["user-agent"]118 elif is_firefox:119 assert "Firefox" in response.request.headers["user-agent"]120 elif is_webkit:121 assert "WebKit" in response.request.headers["user-agent"]122async def test_request_headers_should_get_the_same_headers_as_the_server(123 page: Page, server, is_webkit, is_win124):125 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()126 def handle(request):127 normalized_headers = {128 key.decode().lower(): value[0].decode()129 for key, value in request.requestHeaders.getAllRawHeaders()130 }131 server_request_headers_future.set_result(normalized_headers)132 request.write(b"done")133 request.finish()134 server.set_route("/empty.html", handle)135 response = await page.goto(server.EMPTY_PAGE)136 server_headers = await server_request_headers_future137 if is_webkit and is_win:138 # Curl does not show accept-encoding and accept-language139 server_headers.pop("accept-encoding")140 server_headers.pop("accept-language")141 assert cast(Response, response).request.headers == server_headers142async def test_request_headers_should_get_the_same_headers_as_the_server_cors(143 page: Page, server, is_webkit, is_win144):145 await page.goto(server.PREFIX + "/empty.html")146 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()147 def handle_something(request):148 normalized_headers = {149 key.decode().lower(): value[0].decode()150 for key, value in request.requestHeaders.getAllRawHeaders()151 }152 server_request_headers_future.set_result(normalized_headers)153 request.setHeader("Access-Control-Allow-Origin", "*")154 request.write(b"done")155 request.finish()156 server.set_route("/something", handle_something)157 text = None158 async with page.expect_request("**/*") as request_info:159 text = await page.evaluate(160 """async url => {161 const data = await fetch(url);162 return data.text();163 }""",164 server.CROSS_PROCESS_PREFIX + "/something",165 )166 request = await request_info.value167 assert text == "done"168 server_headers = await server_request_headers_future169 if is_webkit and is_win:170 # Curl does not show accept-encoding and accept-language171 server_headers.pop("accept-encoding")172 server_headers.pop("accept-language")173 assert request.headers == server_headers174async def test_response_headers_should_work(page, server):175 server.set_route("/empty.html", lambda r: (r.setHeader("foo", "bar"), r.finish()))176 response = await page.goto(server.EMPTY_PAGE)177 assert response.headers["foo"] == "bar"178async def test_request_post_data_should_work(page, server):179 await page.goto(server.EMPTY_PAGE)180 server.set_route("/post", lambda r: r.finish())181 requests = []182 page.on("request", lambda r: requests.append(r))183 await page.evaluate(184 '() => fetch("./post", { method: "POST", body: JSON.stringify({foo: "bar"})})'185 )186 assert len(requests) == 1187 assert requests[0].post_data == '{"foo":"bar"}'188async def test_request_post_data__should_be_undefined_when_there_is_no_post_data(189 page, server190):191 response = await page.goto(server.EMPTY_PAGE)192 assert response.request.post_data is None193async def test_should_parse_the_json_post_data(page, server):194 await page.goto(server.EMPTY_PAGE)195 server.set_route("/post", lambda req: req.finish())196 requests = []197 page.on("request", lambda r: requests.append(r))198 await page.evaluate(199 """() => fetch('./post', { method: 'POST', body: JSON.stringify({ foo: 'bar' }) })"""200 )201 assert len(requests) == 1202 assert requests[0].post_data_json == {"foo": "bar"}203async def test_should_parse_the_data_if_content_type_is_form_urlencoded(page, server):204 await page.goto(server.EMPTY_PAGE)205 server.set_route("/post", lambda req: req.finish())206 requests = []207 page.on("request", lambda r: requests.append(r))208 await page.set_content(209 """<form method='POST' action='/post'><input type='text' name='foo' value='bar'><input type='number' name='baz' value='123'><input type='submit'></form>"""210 )211 await page.click("input[type=submit]")212 assert len(requests) == 1213 assert requests[0].post_data_json == {"foo": "bar", "baz": "123"}214async def test_should_be_undefined_when_there_is_no_post_data(page, server):215 response = await page.goto(server.EMPTY_PAGE)216 assert response.request.post_data_json is None217async def test_should_work_with_binary_post_data(page, server):218 await page.goto(server.EMPTY_PAGE)219 server.set_route("/post", lambda req: req.finish())220 requests = []221 page.on("request", lambda r: requests.append(r))222 await page.evaluate(223 """async () => {224 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })225 }"""226 )227 assert len(requests) == 1228 buffer = requests[0].post_data_buffer229 assert len(buffer) == 256230 for i in range(256):231 assert buffer[i] == i232async def test_should_work_with_binary_post_data_and_interception(page, server):233 await page.goto(server.EMPTY_PAGE)234 server.set_route("/post", lambda req: req.finish())235 requests = []236 await page.route("/post", lambda route: asyncio.ensure_future(route.continue_()))237 page.on("request", lambda r: requests.append(r))238 await page.evaluate(239 """async () => {240 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })241 }"""242 )243 assert len(requests) == 1244 buffer = requests[0].post_data_buffer245 assert len(buffer) == 256246 for i in range(256):247 assert buffer[i] == i248async def test_response_text_should_work(page, server):249 response = await page.goto(server.PREFIX + "/simple.json")250 assert await response.text() == '{"foo": "bar"}\n'251async def test_response_text_should_return_uncompressed_text(page, server):252 server.enable_gzip("/simple.json")253 response = await page.goto(server.PREFIX + "/simple.json")254 assert response.headers["content-encoding"] == "gzip"255 assert await response.text() == '{"foo": "bar"}\n'256async def test_response_text_should_throw_when_requesting_body_of_redirected_response(257 page, server258):259 server.set_redirect("/foo.html", "/empty.html")260 response = await page.goto(server.PREFIX + "/foo.html")261 redirected_from = response.request.redirected_from262 assert redirected_from263 redirected = await redirected_from.response()264 assert redirected.status == 302265 error = None266 try:267 await redirected.text()268 except Error as exc:269 error = exc270 assert "Response body is unavailable for redirect responses" in error.message271async def test_response_json_should_work(page, server):272 response = await page.goto(server.PREFIX + "/simple.json")273 assert await response.json() == {"foo": "bar"}274async def test_response_body_should_work(page, server, assetdir):275 response = await page.goto(server.PREFIX + "/pptr.png")276 with open(277 assetdir / "pptr.png",278 "rb",279 ) as fd:280 assert fd.read() == await response.body()281async def test_response_body_should_work_with_compression(page, server, assetdir):282 server.enable_gzip("/pptr.png")283 response = await page.goto(server.PREFIX + "/pptr.png")284 with open(285 assetdir / "pptr.png",286 "rb",287 ) as fd:288 assert fd.read() == await response.body()289async def test_response_status_text_should_work(page, server):290 server.set_route("/cool", lambda r: (r.setResponseCode(200, b"cool!"), r.finish()))291 response = await page.goto(server.PREFIX + "/cool")292 assert response.status_text == "cool!"293async def test_request_resource_type_should_return_event_source(page, server):294 SSE_MESSAGE = {"foo": "bar"}295 # 1. Setup server-sent events on server that immediately sends a message to the client.296 server.set_route(297 "/sse",298 lambda r: (299 r.setHeader("Content-Type", "text/event-stream"),300 r.setHeader("Connection", "keep-alive"),301 r.setHeader("Cache-Control", "no-cache"),302 r.setResponseCode(200),303 r.write(f"data: {json.dumps(SSE_MESSAGE)}\n\n".encode()),304 r.finish(),305 ),306 )307 # 2. Subscribe to page request events.308 await page.goto(server.EMPTY_PAGE)309 requests = []310 page.on("request", lambda r: requests.append(r))311 # 3. Connect to EventSource in browser and return first message.312 assert (313 await page.evaluate(314 """() => {315 const eventSource = new EventSource('/sse');316 return new Promise(resolve => {317 eventSource.onmessage = e => resolve(JSON.parse(e.data));318 });319 }"""320 )321 == SSE_MESSAGE322 )323 assert requests[0].resource_type == "eventsource"324async def test_network_events_request(page, server):325 requests = []326 page.on("request", lambda r: requests.append(r))327 await page.goto(server.EMPTY_PAGE)328 assert len(requests) == 1329 assert requests[0].url == server.EMPTY_PAGE330 assert requests[0].resource_type == "document"331 assert requests[0].method == "GET"332 assert await requests[0].response()333 assert requests[0].frame == page.main_frame334 assert requests[0].frame.url == server.EMPTY_PAGE335async def test_network_events_response(page, server):336 responses = []337 page.on("response", lambda r: responses.append(r))338 await page.goto(server.EMPTY_PAGE)339 assert len(responses) == 1340 assert responses[0].url == server.EMPTY_PAGE341 assert responses[0].status == 200342 assert responses[0].ok343 assert responses[0].request344async def test_network_events_request_failed(345 page, server, is_chromium, is_webkit, is_mac, is_win346):347 def handle_request(request):348 request.setHeader("Content-Type", "text/css")349 request.transport.loseConnection()350 server.set_route("/one-style.css", handle_request)351 failed_requests = []352 page.on("requestfailed", lambda request: failed_requests.append(request))353 await page.goto(server.PREFIX + "/one-style.html")354 assert len(failed_requests) == 1355 assert "one-style.css" in failed_requests[0].url356 assert await failed_requests[0].response() is None357 assert failed_requests[0].resource_type == "stylesheet"358 if is_chromium:359 assert failed_requests[0].failure == "net::ERR_EMPTY_RESPONSE"360 elif is_webkit:361 if is_mac:362 assert failed_requests[0].failure == "The network connection was lost."363 elif is_win:364 assert (365 failed_requests[0].failure366 == "Server returned nothing (no headers, no data)"367 )368 else:369 assert failed_requests[0].failure == "Message Corrupt"370 else:371 assert failed_requests[0].failure == "NS_ERROR_NET_RESET"372 assert failed_requests[0].frame373async def test_network_events_request_finished(page, server):374 async with page.expect_event("requestfinished") as event_info:375 await page.goto(server.EMPTY_PAGE)376 request = await event_info.value377 assert request.url == server.EMPTY_PAGE378 assert await request.response()379 assert request.frame == page.main_frame380 assert request.frame.url == server.EMPTY_PAGE381async def test_network_events_should_fire_events_in_proper_order(page, server):382 events = []383 page.on("request", lambda request: events.append("request"))384 page.on("response", lambda response: events.append("response"))385 response = await page.goto(server.EMPTY_PAGE)386 await response.finished()387 events.append("requestfinished")388 assert events == ["request", "response", "requestfinished"]389async def test_network_events_should_support_redirects(page, server):390 events = []391 page.on("request", lambda request: events.append(f"{request.method} {request.url}"))392 page.on(393 "response", lambda response: events.append(f"{response.status} {response.url}")394 )395 page.on("requestfinished", lambda request: events.append(f"DONE {request.url}"))396 page.on("requestfailed", lambda request: events.append(f"FAIL {request.url}"))397 server.set_redirect("/foo.html", "/empty.html")398 FOO_URL = server.PREFIX + "/foo.html"399 response = await page.goto(FOO_URL)400 await response.finished()401 assert events == [402 f"GET {FOO_URL}",403 f"302 {FOO_URL}",404 f"DONE {FOO_URL}",405 f"GET {server.EMPTY_PAGE}",406 f"200 {server.EMPTY_PAGE}",407 f"DONE {server.EMPTY_PAGE}",408 ]409 redirected_from = response.request.redirected_from410 assert "/foo.html" in redirected_from.url411 assert redirected_from.redirected_from is None412 assert redirected_from.redirected_to == response.request413async def test_request_is_navigation_request_should_work(page, server):414 pytest.skip(msg="test")415 requests = {}416 def handle_request(request):417 requests[request.url().split("/").pop()] = request418 page.on("request", handle_request)419 server.set_redirect("/rrredirect", "/frames/one-frame.html")420 await page.goto(server.PREFIX + "/rrredirect")421 print("kek")422 assert requests.get("rrredirect").is_navigation_request()423 assert requests.get("one-frame.html").is_navigation_request()424 assert requests.get("frame.html").is_navigation_request()425 assert requests.get("script.js").is_navigation_request() is False426 assert requests.get("style.css").is_navigation_request() is False427async def test_request_is_navigation_request_should_work_when_navigating_to_image(428 page, server429):430 requests = []431 page.on("request", lambda r: requests.append(r))432 await page.goto(server.PREFIX + "/pptr.png")433 assert requests[0].is_navigation_request()434async def test_set_extra_http_headers_should_work(page, server):435 await page.set_extra_http_headers({"foo": "bar"})436 request = (437 await asyncio.gather(438 server.wait_for_request("/empty.html"),439 page.goto(server.EMPTY_PAGE),440 )441 )[0]442 assert request.getHeader("foo") == "bar"443async def test_set_extra_http_headers_should_work_with_redirects(page, server):444 server.set_redirect("/foo.html", "/empty.html")445 await page.set_extra_http_headers({"foo": "bar"})446 request = (447 await asyncio.gather(...

Full Screen

Full Screen

handler.py

Source:handler.py Github

copy

Full Screen

...252 stats_prefix = "playwright/request_count"253 self.stats.inc_value(stats_prefix)254 self.stats.inc_value(f"{stats_prefix}/resource_type/{request.resource_type}")255 self.stats.inc_value(f"{stats_prefix}/method/{request.method}")256 if request.is_navigation_request():257 self.stats.inc_value(f"{stats_prefix}/navigation")258 def _increment_response_stats(self, response: PlaywrightResponse) -> None:259 stats_prefix = "playwright/response_count"260 self.stats.inc_value(stats_prefix)261 self.stats.inc_value(f"{stats_prefix}/resource_type/{response.request.resource_type}")262 self.stats.inc_value(f"{stats_prefix}/method/{response.request.method}")263 def _make_close_page_callback(self, context_name: str) -> Callable:264 def close_page_callback() -> None:265 if context_name in self.context_semaphores:266 self.context_semaphores[context_name].release()267 return close_page_callback268 def _make_close_browser_context_callback(self, name: str) -> Callable:269 def close_browser_context_callback() -> None:270 logger.debug("Browser context closed: '%s'", name)271 if name in self.contexts:272 self.contexts.pop(name)273 if name in self.context_semaphores:274 self.context_semaphores.pop(name)275 return close_browser_context_callback276 def _make_request_handler(277 self, method: str, scrapy_headers: Headers, body: Optional[bytes], encoding: str = "utf8"278 ) -> Callable:279 async def _request_handler(route: Route, playwright_request: PlaywrightRequest) -> None:280 """Override request headers, method and body."""281 if self.abort_request and self.abort_request(playwright_request):282 await route.abort()283 self.stats.inc_value("playwright/request_count/aborted")284 return None285 processed_headers = await self.process_request_headers(286 self.browser_type, playwright_request, scrapy_headers287 )288 # the request that reaches the callback should contain the headers that were sent289 scrapy_headers.clear()290 scrapy_headers.update(processed_headers)291 overrides: dict = {"headers": processed_headers}292 if playwright_request.is_navigation_request():293 overrides["method"] = method294 if body is not None:295 overrides["post_data"] = body.decode(encoding)296 try:297 await route.continue_(**overrides)298 except Exception as ex: # pylint: disable=broad-except299 if _is_safe_close_error(ex):300 logger.warning(301 f"{playwright_request}: failed processing Playwright request ({ex})"302 )303 else:304 raise305 return _request_handler306def _possible_encodings(headers: Headers, text: str) -> Generator[str, None, None]:...

Full Screen

Full Screen

_network.py

Source:_network.py Github

copy

Full Screen

...93 return from_nullable_channel(await self._channel.send("response"))94 @property95 def frame(self) -> "Frame":96 return from_channel(self._initializer["frame"])97 def is_navigation_request(self) -> bool:98 return self._initializer["isNavigationRequest"]99 @property100 def redirected_from(self) -> Optional["Request"]:101 return self._redirected_from102 @property103 def redirected_to(self) -> Optional["Request"]:104 return self._redirected_to105 @property106 def failure(self) -> Optional[str]:107 return self._failure_text108 @property109 def timing(self) -> ResourceTiming:110 return self._timing111class Route(ChannelOwner):...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful