How to use _actual_headers method in Playwright Python

Best Python code snippet using playwright-python

test_conn.py

Source:test_conn.py Github

copy

Full Screen

1"""Tests for TCP connection handling, including proper and timely close."""2from __future__ import absolute_import, division, print_function3__metaclass__ = type4import errno5import socket6import time7import logging8import traceback as traceback_9from collections import namedtuple10from six.moves import range, http_client, urllib11import six12import pytest13from jaraco.text import trim, unwrap14from cheroot.test import helper, webtest15from cheroot._compat import IS_CI, IS_MACOS, IS_PYPY, IS_WINDOWS16import cheroot.server17IS_SLOW_ENV = IS_MACOS or IS_WINDOWS18timeout = 119pov = 'pPeErRsSiIsStTeEnNcCeE oOfF vViIsSiIoOnN'20class Controller(helper.Controller):21 """Controller for serving WSGI apps."""22 def hello(req, resp):23 """Render Hello world."""24 return 'Hello, world!'25 def pov(req, resp):26 """Render ``pov`` value."""27 return pov28 def stream(req, resp):29 """Render streaming response."""30 if 'set_cl' in req.environ['QUERY_STRING']:31 resp.headers['Content-Length'] = str(10)32 def content():33 for x in range(10):34 yield str(x)35 return content()36 def upload(req, resp):37 """Process file upload and render thank."""38 if not req.environ['REQUEST_METHOD'] == 'POST':39 raise AssertionError(40 "'POST' != request.method %r" %41 req.environ['REQUEST_METHOD'],42 )43 return "thanks for '%s'" % req.environ['wsgi.input'].read()44 def custom_204(req, resp):45 """Render response with status 204."""46 resp.status = '204'47 return 'Code = 204'48 def custom_304(req, resp):49 """Render response with status 304."""50 resp.status = '304'51 return 'Code = 304'52 def err_before_read(req, resp):53 """Render response with status 500."""54 resp.status = '500 Internal Server Error'55 return 'ok'56 def one_megabyte_of_a(req, resp):57 """Render 1MB response."""58 return ['a' * 1024] * 102459 def wrong_cl_buffered(req, resp):60 """Render buffered response with invalid length value."""61 resp.headers['Content-Length'] = '5'62 return 'I have too many bytes'63 def wrong_cl_unbuffered(req, resp):64 """Render unbuffered response with invalid length value."""65 resp.headers['Content-Length'] = '5'66 return ['I too', ' have too many bytes']67 def _munge(string):68 """Encode PATH_INFO correctly depending on Python version.69 WSGI 1.0 is a mess around unicode. Create endpoints70 that match the PATH_INFO that it produces.71 """72 if six.PY2:73 return string74 return string.encode('utf-8').decode('latin-1')75 handlers = {76 '/hello': hello,77 '/pov': pov,78 '/page1': pov,79 '/page2': pov,80 '/page3': pov,81 '/stream': stream,82 '/upload': upload,83 '/custom/204': custom_204,84 '/custom/304': custom_304,85 '/err_before_read': err_before_read,86 '/one_megabyte_of_a': one_megabyte_of_a,87 '/wrong_cl_buffered': wrong_cl_buffered,88 '/wrong_cl_unbuffered': wrong_cl_unbuffered,89 }90class ErrorLogMonitor:91 """Mock class to access the server error_log calls made by the server."""92 ErrorLogCall = namedtuple('ErrorLogCall', ['msg', 'level', 'traceback'])93 def __init__(self):94 """Initialize the server error log monitor/interceptor.95 If you need to ignore a particular error message use the property96 ``ignored_msgs`` by appending to the list the expected error messages.97 """98 self.calls = []99 # to be used the the teardown validation100 self.ignored_msgs = []101 def __call__(self, msg='', level=logging.INFO, traceback=False):102 """Intercept the call to the server error_log method."""103 if traceback:104 tblines = traceback_.format_exc()105 else:106 tblines = ''107 self.calls.append(ErrorLogMonitor.ErrorLogCall(msg, level, tblines))108@pytest.fixture109def raw_testing_server(wsgi_server_client):110 """Attach a WSGI app to the given server and preconfigure it."""111 app = Controller()112 def _timeout(req, resp):113 return str(wsgi_server.timeout)114 app.handlers['/timeout'] = _timeout115 wsgi_server = wsgi_server_client.server_instance116 wsgi_server.wsgi_app = app117 wsgi_server.max_request_body_size = 1001118 wsgi_server.timeout = timeout119 wsgi_server.server_client = wsgi_server_client120 wsgi_server.keep_alive_conn_limit = 2121 return wsgi_server122@pytest.fixture123def testing_server(raw_testing_server, monkeypatch):124 """Modify the "raw" base server to monitor the error_log messages.125 If you need to ignore a particular error message use the property126 ``testing_server.error_log.ignored_msgs`` by appending to the list127 the expected error messages.128 """129 # patch the error_log calls of the server instance130 monkeypatch.setattr(raw_testing_server, 'error_log', ErrorLogMonitor())131 yield raw_testing_server132 # Teardown verification, in case that the server logged an133 # error that wasn't notified to the client or we just made a mistake.134 # pylint: disable=possibly-unused-variable135 for c_msg, c_level, c_traceback in raw_testing_server.error_log.calls:136 if c_level <= logging.WARNING:137 continue138 assert c_msg in raw_testing_server.error_log.ignored_msgs, (139 'Found error in the error log: '140 "message = '{c_msg}', level = '{c_level}'\n"141 '{c_traceback}'.format(**locals()),142 )143@pytest.fixture144def test_client(testing_server):145 """Get and return a test client out of the given server."""146 return testing_server.server_client147def header_exists(header_name, headers):148 """Check that a header is present."""149 return header_name.lower() in (k.lower() for (k, _) in headers)150def header_has_value(header_name, header_value, headers):151 """Check that a header with a given value is present."""152 return header_name.lower() in (153 k.lower() for (k, v) in headers154 if v == header_value155 )156def test_HTTP11_persistent_connections(test_client):157 """Test persistent HTTP/1.1 connections."""158 # Initialize a persistent HTTP connection159 http_connection = test_client.get_connection()160 http_connection.auto_open = False161 http_connection.connect()162 # Make the first request and assert there's no "Connection: close".163 status_line, actual_headers, actual_resp_body = test_client.get(164 '/pov', http_conn=http_connection,165 )166 actual_status = int(status_line[:3])167 assert actual_status == 200168 assert status_line[4:] == 'OK'169 assert actual_resp_body == pov.encode()170 assert not header_exists('Connection', actual_headers)171 # Make another request on the same connection.172 status_line, actual_headers, actual_resp_body = test_client.get(173 '/page1', http_conn=http_connection,174 )175 actual_status = int(status_line[:3])176 assert actual_status == 200177 assert status_line[4:] == 'OK'178 assert actual_resp_body == pov.encode()179 assert not header_exists('Connection', actual_headers)180 # Test client-side close.181 status_line, actual_headers, actual_resp_body = test_client.get(182 '/page2', http_conn=http_connection,183 headers=[('Connection', 'close')],184 )185 actual_status = int(status_line[:3])186 assert actual_status == 200187 assert status_line[4:] == 'OK'188 assert actual_resp_body == pov.encode()189 assert header_has_value('Connection', 'close', actual_headers)190 # Make another request on the same connection, which should error.191 with pytest.raises(http_client.NotConnected):192 test_client.get('/pov', http_conn=http_connection)193@pytest.mark.parametrize(194 'set_cl',195 (196 False, # Without Content-Length197 True, # With Content-Length198 ),199)200def test_streaming_11(test_client, set_cl):201 """Test serving of streaming responses with HTTP/1.1 protocol."""202 # Initialize a persistent HTTP connection203 http_connection = test_client.get_connection()204 http_connection.auto_open = False205 http_connection.connect()206 # Make the first request and assert there's no "Connection: close".207 status_line, actual_headers, actual_resp_body = test_client.get(208 '/pov', http_conn=http_connection,209 )210 actual_status = int(status_line[:3])211 assert actual_status == 200212 assert status_line[4:] == 'OK'213 assert actual_resp_body == pov.encode()214 assert not header_exists('Connection', actual_headers)215 # Make another, streamed request on the same connection.216 if set_cl:217 # When a Content-Length is provided, the content should stream218 # without closing the connection.219 status_line, actual_headers, actual_resp_body = test_client.get(220 '/stream?set_cl=Yes', http_conn=http_connection,221 )222 assert header_exists('Content-Length', actual_headers)223 assert not header_has_value('Connection', 'close', actual_headers)224 assert not header_exists('Transfer-Encoding', actual_headers)225 assert actual_status == 200226 assert status_line[4:] == 'OK'227 assert actual_resp_body == b'0123456789'228 else:229 # When no Content-Length response header is provided,230 # streamed output will either close the connection, or use231 # chunked encoding, to determine transfer-length.232 status_line, actual_headers, actual_resp_body = test_client.get(233 '/stream', http_conn=http_connection,234 )235 assert not header_exists('Content-Length', actual_headers)236 assert actual_status == 200237 assert status_line[4:] == 'OK'238 assert actual_resp_body == b'0123456789'239 chunked_response = False240 for k, v in actual_headers:241 if k.lower() == 'transfer-encoding':242 if str(v) == 'chunked':243 chunked_response = True244 if chunked_response:245 assert not header_has_value('Connection', 'close', actual_headers)246 else:247 assert header_has_value('Connection', 'close', actual_headers)248 # Make another request on the same connection, which should249 # error.250 with pytest.raises(http_client.NotConnected):251 test_client.get('/pov', http_conn=http_connection)252 # Try HEAD.253 # See https://www.bitbucket.org/cherrypy/cherrypy/issue/864.254 # TODO: figure out how can this be possible on an closed connection255 # (chunked_response case)256 status_line, actual_headers, actual_resp_body = test_client.head(257 '/stream', http_conn=http_connection,258 )259 assert actual_status == 200260 assert status_line[4:] == 'OK'261 assert actual_resp_body == b''262 assert not header_exists('Transfer-Encoding', actual_headers)263@pytest.mark.parametrize(264 'set_cl',265 (266 False, # Without Content-Length267 True, # With Content-Length268 ),269)270def test_streaming_10(test_client, set_cl):271 """Test serving of streaming responses with HTTP/1.0 protocol."""272 original_server_protocol = test_client.server_instance.protocol273 test_client.server_instance.protocol = 'HTTP/1.0'274 # Initialize a persistent HTTP connection275 http_connection = test_client.get_connection()276 http_connection.auto_open = False277 http_connection.connect()278 # Make the first request and assert Keep-Alive.279 status_line, actual_headers, actual_resp_body = test_client.get(280 '/pov', http_conn=http_connection,281 headers=[('Connection', 'Keep-Alive')],282 protocol='HTTP/1.0',283 )284 actual_status = int(status_line[:3])285 assert actual_status == 200286 assert status_line[4:] == 'OK'287 assert actual_resp_body == pov.encode()288 assert header_has_value('Connection', 'Keep-Alive', actual_headers)289 # Make another, streamed request on the same connection.290 if set_cl:291 # When a Content-Length is provided, the content should292 # stream without closing the connection.293 status_line, actual_headers, actual_resp_body = test_client.get(294 '/stream?set_cl=Yes', http_conn=http_connection,295 headers=[('Connection', 'Keep-Alive')],296 protocol='HTTP/1.0',297 )298 actual_status = int(status_line[:3])299 assert actual_status == 200300 assert status_line[4:] == 'OK'301 assert actual_resp_body == b'0123456789'302 assert header_exists('Content-Length', actual_headers)303 assert header_has_value('Connection', 'Keep-Alive', actual_headers)304 assert not header_exists('Transfer-Encoding', actual_headers)305 else:306 # When a Content-Length is not provided,307 # the server should close the connection.308 status_line, actual_headers, actual_resp_body = test_client.get(309 '/stream', http_conn=http_connection,310 headers=[('Connection', 'Keep-Alive')],311 protocol='HTTP/1.0',312 )313 actual_status = int(status_line[:3])314 assert actual_status == 200315 assert status_line[4:] == 'OK'316 assert actual_resp_body == b'0123456789'317 assert not header_exists('Content-Length', actual_headers)318 assert not header_has_value('Connection', 'Keep-Alive', actual_headers)319 assert not header_exists('Transfer-Encoding', actual_headers)320 # Make another request on the same connection, which should error.321 with pytest.raises(http_client.NotConnected):322 test_client.get(323 '/pov', http_conn=http_connection,324 protocol='HTTP/1.0',325 )326 test_client.server_instance.protocol = original_server_protocol327@pytest.mark.parametrize(328 'http_server_protocol',329 (330 'HTTP/1.0',331 pytest.param(332 'HTTP/1.1',333 marks=pytest.mark.xfail(334 IS_PYPY and IS_CI,335 reason='Fails under PyPy in CI for unknown reason',336 strict=False,337 ),338 ),339 ),340)341def test_keepalive(test_client, http_server_protocol):342 """Test Keep-Alive enabled connections."""343 original_server_protocol = test_client.server_instance.protocol344 test_client.server_instance.protocol = http_server_protocol345 http_client_protocol = 'HTTP/1.0'346 # Initialize a persistent HTTP connection347 http_connection = test_client.get_connection()348 http_connection.auto_open = False349 http_connection.connect()350 # Test a normal HTTP/1.0 request.351 status_line, actual_headers, actual_resp_body = test_client.get(352 '/page2',353 protocol=http_client_protocol,354 )355 actual_status = int(status_line[:3])356 assert actual_status == 200357 assert status_line[4:] == 'OK'358 assert actual_resp_body == pov.encode()359 assert not header_exists('Connection', actual_headers)360 # Test a keep-alive HTTP/1.0 request.361 status_line, actual_headers, actual_resp_body = test_client.get(362 '/page3', headers=[('Connection', 'Keep-Alive')],363 http_conn=http_connection, protocol=http_client_protocol,364 )365 actual_status = int(status_line[:3])366 assert actual_status == 200367 assert status_line[4:] == 'OK'368 assert actual_resp_body == pov.encode()369 assert header_has_value('Connection', 'Keep-Alive', actual_headers)370 assert header_has_value(371 'Keep-Alive',372 'timeout={test_client.server_instance.timeout}'.format(**locals()),373 actual_headers,374 )375 # Remove the keep-alive header again.376 status_line, actual_headers, actual_resp_body = test_client.get(377 '/page3', http_conn=http_connection,378 protocol=http_client_protocol,379 )380 actual_status = int(status_line[:3])381 assert actual_status == 200382 assert status_line[4:] == 'OK'383 assert actual_resp_body == pov.encode()384 assert not header_exists('Connection', actual_headers)385 assert not header_exists('Keep-Alive', actual_headers)386 test_client.server_instance.protocol = original_server_protocol387def test_keepalive_conn_management(test_client):388 """Test management of Keep-Alive connections."""389 test_client.server_instance.timeout = 2390 def connection():391 # Initialize a persistent HTTP connection392 http_connection = test_client.get_connection()393 http_connection.auto_open = False394 http_connection.connect()395 return http_connection396 def request(conn, keepalive=True):397 status_line, actual_headers, actual_resp_body = test_client.get(398 '/page3', headers=[('Connection', 'Keep-Alive')],399 http_conn=conn, protocol='HTTP/1.0',400 )401 actual_status = int(status_line[:3])402 assert actual_status == 200403 assert status_line[4:] == 'OK'404 assert actual_resp_body == pov.encode()405 if keepalive:406 assert header_has_value('Connection', 'Keep-Alive', actual_headers)407 assert header_has_value(408 'Keep-Alive',409 'timeout={test_client.server_instance.timeout}'.410 format(**locals()),411 actual_headers,412 )413 else:414 assert not header_exists('Connection', actual_headers)415 assert not header_exists('Keep-Alive', actual_headers)416 def check_server_idle_conn_count(count, timeout=1.0):417 deadline = time.time() + timeout418 while True:419 n = test_client.server_instance._connections._num_connections420 if n == count:421 return422 assert time.time() <= deadline, (423 'idle conn count mismatch, wanted {count}, got {n}'.424 format(**locals()),425 )426 disconnect_errors = (427 http_client.BadStatusLine,428 http_client.CannotSendRequest,429 http_client.NotConnected,430 )431 # Make a new connection.432 c1 = connection()433 request(c1)434 check_server_idle_conn_count(1)435 # Make a second one.436 c2 = connection()437 request(c2)438 check_server_idle_conn_count(2)439 # Reusing the first connection should still work.440 request(c1)441 check_server_idle_conn_count(2)442 # Creating a new connection should still work, but we should443 # have run out of available connections to keep alive, so the444 # server should tell us to close.445 c3 = connection()446 request(c3, keepalive=False)447 check_server_idle_conn_count(2)448 # Show that the third connection was closed.449 with pytest.raises(disconnect_errors):450 request(c3)451 check_server_idle_conn_count(2)452 # Wait for some of our timeout.453 time.sleep(1.2)454 # Refresh the second connection.455 request(c2)456 check_server_idle_conn_count(2)457 # Wait for the remainder of our timeout, plus one tick.458 time.sleep(1.2)459 check_server_idle_conn_count(1)460 # First connection should now be expired.461 with pytest.raises(disconnect_errors):462 request(c1)463 check_server_idle_conn_count(1)464 # But the second one should still be valid.465 request(c2)466 check_server_idle_conn_count(1)467 # Restore original timeout.468 test_client.server_instance.timeout = timeout469@pytest.mark.parametrize(470 ('simulated_exception', 'error_number', 'exception_leaks'),471 (472 pytest.param(473 socket.error, errno.ECONNRESET, False,474 id='socket.error(ECONNRESET)',475 ),476 pytest.param(477 socket.error, errno.EPIPE, False,478 id='socket.error(EPIPE)',479 ),480 pytest.param(481 socket.error, errno.ENOTCONN, False,482 id='simulated socket.error(ENOTCONN)',483 ),484 pytest.param(485 None, # <-- don't raise an artificial exception486 errno.ENOTCONN, False,487 id='real socket.error(ENOTCONN)',488 marks=pytest.mark.xfail(489 IS_WINDOWS,490 reason='Now reproducible this way on Windows',491 ),492 ),493 pytest.param(494 socket.error, errno.ESHUTDOWN, False,495 id='socket.error(ESHUTDOWN)',496 ),497 pytest.param(RuntimeError, 666, True, id='RuntimeError(666)'),498 pytest.param(socket.error, -1, True, id='socket.error(-1)'),499 ) + (500 () if six.PY2 else (501 pytest.param(502 ConnectionResetError, errno.ECONNRESET, False,503 id='ConnectionResetError(ECONNRESET)',504 ),505 pytest.param(506 BrokenPipeError, errno.EPIPE, False,507 id='BrokenPipeError(EPIPE)',508 ),509 pytest.param(510 BrokenPipeError, errno.ESHUTDOWN, False,511 id='BrokenPipeError(ESHUTDOWN)',512 ),513 )514 ),515)516def test_broken_connection_during_tcp_fin(517 error_number, exception_leaks,518 mocker, monkeypatch,519 simulated_exception, test_client,520):521 """Test there's no traceback on broken connection during close.522 It artificially causes :py:data:`~errno.ECONNRESET` /523 :py:data:`~errno.EPIPE` / :py:data:`~errno.ESHUTDOWN` /524 :py:data:`~errno.ENOTCONN` as well as unrelated :py:exc:`RuntimeError`525 and :py:exc:`socket.error(-1) <socket.error>` on the server socket when526 :py:meth:`socket.shutdown() <socket.socket.shutdown>` is called. It's527 triggered by closing the client socket before the server had a chance528 to respond.529 The expectation is that only :py:exc:`RuntimeError` and a530 :py:exc:`socket.error` with an unusual error code would leak.531 With the :py:data:`None`-parameter, a real non-simulated532 :py:exc:`OSError(107, 'Transport endpoint is not connected')533 <OSError>` happens.534 """535 exc_instance = (536 None if simulated_exception is None537 else simulated_exception(error_number, 'Simulated socket error')538 )539 old_close_kernel_socket = (540 test_client.server_instance.541 ConnectionClass._close_kernel_socket542 )543 def _close_kernel_socket(self):544 monkeypatch.setattr( # `socket.shutdown` is read-only otherwise545 self, 'socket',546 mocker.mock_module.Mock(wraps=self.socket),547 )548 if exc_instance is not None:549 monkeypatch.setattr(550 self.socket, 'shutdown',551 mocker.mock_module.Mock(side_effect=exc_instance),552 )553 _close_kernel_socket.fin_spy = mocker.spy(self.socket, 'shutdown')554 try:555 old_close_kernel_socket(self)556 except simulated_exception:557 _close_kernel_socket.exception_leaked = True558 else:559 _close_kernel_socket.exception_leaked = False560 monkeypatch.setattr(561 test_client.server_instance.ConnectionClass,562 '_close_kernel_socket',563 _close_kernel_socket,564 )565 conn = test_client.get_connection()566 conn.auto_open = False567 conn.connect()568 conn.send(b'GET /hello HTTP/1.1')569 conn.send(('Host: %s' % conn.host).encode('ascii'))570 conn.close()571 # Let the server attempt TCP shutdown:572 for _ in range(10 * (2 if IS_SLOW_ENV else 1)):573 time.sleep(0.1)574 if hasattr(_close_kernel_socket, 'exception_leaked'):575 break576 if exc_instance is not None: # simulated by us577 assert _close_kernel_socket.fin_spy.spy_exception is exc_instance578 else: # real579 assert isinstance(580 _close_kernel_socket.fin_spy.spy_exception, socket.error,581 )582 assert _close_kernel_socket.fin_spy.spy_exception.errno == error_number583 assert _close_kernel_socket.exception_leaked is exception_leaks584@pytest.mark.parametrize(585 'timeout_before_headers',586 (587 True,588 False,589 ),590)591def test_HTTP11_Timeout(test_client, timeout_before_headers):592 """Check timeout without sending any data.593 The server will close the connection with a 408.594 """595 conn = test_client.get_connection()596 conn.auto_open = False597 conn.connect()598 if not timeout_before_headers:599 # Connect but send half the headers only.600 conn.send(b'GET /hello HTTP/1.1')601 conn.send(('Host: %s' % conn.host).encode('ascii'))602 # else: Connect but send nothing.603 # Wait for our socket timeout604 time.sleep(timeout * 2)605 # The request should have returned 408 already.606 response = conn.response_class(conn.sock, method='GET')607 response.begin()608 assert response.status == 408609 conn.close()610def test_HTTP11_Timeout_after_request(test_client):611 """Check timeout after at least one request has succeeded.612 The server should close the connection without 408.613 """614 fail_msg = "Writing to timed out socket didn't fail as it should have: %s"615 # Make an initial request616 conn = test_client.get_connection()617 conn.putrequest('GET', '/timeout?t=%s' % timeout, skip_host=True)618 conn.putheader('Host', conn.host)619 conn.endheaders()620 response = conn.response_class(conn.sock, method='GET')621 response.begin()622 assert response.status == 200623 actual_body = response.read()624 expected_body = str(timeout).encode()625 assert actual_body == expected_body626 # Make a second request on the same socket627 conn._output(b'GET /hello HTTP/1.1')628 conn._output(('Host: %s' % conn.host).encode('ascii'))629 conn._send_output()630 response = conn.response_class(conn.sock, method='GET')631 response.begin()632 assert response.status == 200633 actual_body = response.read()634 expected_body = b'Hello, world!'635 assert actual_body == expected_body636 # Wait for our socket timeout637 time.sleep(timeout * 2)638 # Make another request on the same socket, which should error639 conn._output(b'GET /hello HTTP/1.1')640 conn._output(('Host: %s' % conn.host).encode('ascii'))641 conn._send_output()642 response = conn.response_class(conn.sock, method='GET')643 try:644 response.begin()645 except (socket.error, http_client.BadStatusLine):646 pass647 except Exception as ex:648 pytest.fail(fail_msg % ex)649 else:650 if response.status != 408:651 pytest.fail(fail_msg % response.read())652 conn.close()653 # Make another request on a new socket, which should work654 conn = test_client.get_connection()655 conn.putrequest('GET', '/pov', skip_host=True)656 conn.putheader('Host', conn.host)657 conn.endheaders()658 response = conn.response_class(conn.sock, method='GET')659 response.begin()660 assert response.status == 200661 actual_body = response.read()662 expected_body = pov.encode()663 assert actual_body == expected_body664 # Make another request on the same socket,665 # but timeout on the headers666 conn.send(b'GET /hello HTTP/1.1')667 # Wait for our socket timeout668 time.sleep(timeout * 2)669 response = conn.response_class(conn.sock, method='GET')670 try:671 response.begin()672 except (socket.error, http_client.BadStatusLine):673 pass674 except Exception as ex:675 pytest.fail(fail_msg % ex)676 else:677 if response.status != 408:678 pytest.fail(fail_msg % response.read())679 conn.close()680 # Retry the request on a new connection, which should work681 conn = test_client.get_connection()682 conn.putrequest('GET', '/pov', skip_host=True)683 conn.putheader('Host', conn.host)684 conn.endheaders()685 response = conn.response_class(conn.sock, method='GET')686 response.begin()687 assert response.status == 200688 actual_body = response.read()689 expected_body = pov.encode()690 assert actual_body == expected_body691 conn.close()692def test_HTTP11_pipelining(test_client):693 """Test HTTP/1.1 pipelining.694 :py:mod:`http.client` doesn't support this directly.695 """696 conn = test_client.get_connection()697 # Put request 1698 conn.putrequest('GET', '/hello', skip_host=True)699 conn.putheader('Host', conn.host)700 conn.endheaders()701 for trial in range(5):702 # Put next request703 conn._output(704 ('GET /hello?%s HTTP/1.1' % trial).encode('iso-8859-1'),705 )706 conn._output(('Host: %s' % conn.host).encode('ascii'))707 conn._send_output()708 # Retrieve previous response709 response = conn.response_class(conn.sock, method='GET')710 # there is a bug in python3 regarding the buffering of711 # ``conn.sock``. Until that bug get's fixed we will712 # monkey patch the ``response`` instance.713 # https://bugs.python.org/issue23377714 if not six.PY2:715 response.fp = conn.sock.makefile('rb', 0)716 response.begin()717 body = response.read(13)718 assert response.status == 200719 assert body == b'Hello, world!'720 # Retrieve final response721 response = conn.response_class(conn.sock, method='GET')722 response.begin()723 body = response.read()724 assert response.status == 200725 assert body == b'Hello, world!'726 conn.close()727def test_100_Continue(test_client):728 """Test 100-continue header processing."""729 conn = test_client.get_connection()730 # Try a page without an Expect request header first.731 # Note that http.client's response.begin automatically ignores732 # 100 Continue responses, so we must manually check for it.733 conn.putrequest('POST', '/upload', skip_host=True)734 conn.putheader('Host', conn.host)735 conn.putheader('Content-Type', 'text/plain')736 conn.putheader('Content-Length', '4')737 conn.endheaders()738 conn.send(b"d'oh")739 response = conn.response_class(conn.sock, method='POST')740 _version, status, _reason = response._read_status()741 assert status != 100742 conn.close()743 # Now try a page with an Expect header...744 conn.connect()745 conn.putrequest('POST', '/upload', skip_host=True)746 conn.putheader('Host', conn.host)747 conn.putheader('Content-Type', 'text/plain')748 conn.putheader('Content-Length', '17')749 conn.putheader('Expect', '100-continue')750 conn.endheaders()751 response = conn.response_class(conn.sock, method='POST')752 # ...assert and then skip the 100 response753 version, status, reason = response._read_status()754 assert status == 100755 while True:756 line = response.fp.readline().strip()757 if line:758 pytest.fail(759 '100 Continue should not output any headers. Got %r' %760 line,761 )762 else:763 break764 # ...send the body765 body = b'I am a small file'766 conn.send(body)767 # ...get the final response768 response.begin()769 status_line, _actual_headers, actual_resp_body = webtest.shb(response)770 actual_status = int(status_line[:3])771 assert actual_status == 200772 expected_resp_body = ("thanks for '%s'" % body).encode()773 assert actual_resp_body == expected_resp_body774 conn.close()775@pytest.mark.parametrize(776 'max_request_body_size',777 (778 0,779 1001,780 ),781)782def test_readall_or_close(test_client, max_request_body_size):783 """Test a max_request_body_size of 0 (the default) and 1001."""784 old_max = test_client.server_instance.max_request_body_size785 test_client.server_instance.max_request_body_size = max_request_body_size786 conn = test_client.get_connection()787 # Get a POST page with an error788 conn.putrequest('POST', '/err_before_read', skip_host=True)789 conn.putheader('Host', conn.host)790 conn.putheader('Content-Type', 'text/plain')791 conn.putheader('Content-Length', '1000')792 conn.putheader('Expect', '100-continue')793 conn.endheaders()794 response = conn.response_class(conn.sock, method='POST')795 # ...assert and then skip the 100 response796 _version, status, _reason = response._read_status()797 assert status == 100798 skip = True799 while skip:800 skip = response.fp.readline().strip()801 # ...send the body802 conn.send(b'x' * 1000)803 # ...get the final response804 response.begin()805 status_line, _actual_headers, actual_resp_body = webtest.shb(response)806 actual_status = int(status_line[:3])807 assert actual_status == 500808 # Now try a working page with an Expect header...809 conn._output(b'POST /upload HTTP/1.1')810 conn._output(('Host: %s' % conn.host).encode('ascii'))811 conn._output(b'Content-Type: text/plain')812 conn._output(b'Content-Length: 17')813 conn._output(b'Expect: 100-continue')814 conn._send_output()815 response = conn.response_class(conn.sock, method='POST')816 # ...assert and then skip the 100 response817 version, status, reason = response._read_status()818 assert status == 100819 skip = True820 while skip:821 skip = response.fp.readline().strip()822 # ...send the body823 body = b'I am a small file'824 conn.send(body)825 # ...get the final response826 response.begin()827 status_line, actual_headers, actual_resp_body = webtest.shb(response)828 actual_status = int(status_line[:3])829 assert actual_status == 200830 expected_resp_body = ("thanks for '%s'" % body).encode()831 assert actual_resp_body == expected_resp_body832 conn.close()833 test_client.server_instance.max_request_body_size = old_max834def test_No_Message_Body(test_client):835 """Test HTTP queries with an empty response body."""836 # Initialize a persistent HTTP connection837 http_connection = test_client.get_connection()838 http_connection.auto_open = False839 http_connection.connect()840 # Make the first request and assert there's no "Connection: close".841 status_line, actual_headers, actual_resp_body = test_client.get(842 '/pov', http_conn=http_connection,843 )844 actual_status = int(status_line[:3])845 assert actual_status == 200846 assert status_line[4:] == 'OK'847 assert actual_resp_body == pov.encode()848 assert not header_exists('Connection', actual_headers)849 # Make a 204 request on the same connection.850 status_line, actual_headers, actual_resp_body = test_client.get(851 '/custom/204', http_conn=http_connection,852 )853 actual_status = int(status_line[:3])854 assert actual_status == 204855 assert not header_exists('Content-Length', actual_headers)856 assert actual_resp_body == b''857 assert not header_exists('Connection', actual_headers)858 # Make a 304 request on the same connection.859 status_line, actual_headers, actual_resp_body = test_client.get(860 '/custom/304', http_conn=http_connection,861 )862 actual_status = int(status_line[:3])863 assert actual_status == 304864 assert not header_exists('Content-Length', actual_headers)865 assert actual_resp_body == b''866 assert not header_exists('Connection', actual_headers)867@pytest.mark.xfail(868 reason=unwrap(869 trim("""870 Headers from earlier request leak into the request871 line for a subsequent request, resulting in 400872 instead of 413. See cherrypy/cheroot#69 for details.873 """),874 ),875)876def test_Chunked_Encoding(test_client):877 """Test HTTP uploads with chunked transfer-encoding."""878 # Initialize a persistent HTTP connection879 conn = test_client.get_connection()880 # Try a normal chunked request (with extensions)881 body = (882 b'8;key=value\r\nxx\r\nxxxx\r\n5\r\nyyyyy\r\n0\r\n'883 b'Content-Type: application/json\r\n'884 b'\r\n'885 )886 conn.putrequest('POST', '/upload', skip_host=True)887 conn.putheader('Host', conn.host)888 conn.putheader('Transfer-Encoding', 'chunked')889 conn.putheader('Trailer', 'Content-Type')890 # Note that this is somewhat malformed:891 # we shouldn't be sending Content-Length.892 # RFC 2616 says the server should ignore it.893 conn.putheader('Content-Length', '3')894 conn.endheaders()895 conn.send(body)896 response = conn.getresponse()897 status_line, _actual_headers, actual_resp_body = webtest.shb(response)898 actual_status = int(status_line[:3])899 assert actual_status == 200900 assert status_line[4:] == 'OK'901 expected_resp_body = ("thanks for '%s'" % b'xx\r\nxxxxyyyyy').encode()902 assert actual_resp_body == expected_resp_body903 # Try a chunked request that exceeds server.max_request_body_size.904 # Note that the delimiters and trailer are included.905 body = b'\r\n'.join((b'3e3', b'x' * 995, b'0', b'', b''))906 conn.putrequest('POST', '/upload', skip_host=True)907 conn.putheader('Host', conn.host)908 conn.putheader('Transfer-Encoding', 'chunked')909 conn.putheader('Content-Type', 'text/plain')910 # Chunked requests don't need a content-length911 # conn.putheader("Content-Length", len(body))912 conn.endheaders()913 conn.send(body)914 response = conn.getresponse()915 status_line, actual_headers, actual_resp_body = webtest.shb(response)916 actual_status = int(status_line[:3])917 assert actual_status == 413918 conn.close()919def test_Content_Length_in(test_client):920 """Try a non-chunked request where Content-Length exceeds limit.921 (server.max_request_body_size).922 Assert error before body send.923 """924 # Initialize a persistent HTTP connection925 conn = test_client.get_connection()926 conn.putrequest('POST', '/upload', skip_host=True)927 conn.putheader('Host', conn.host)928 conn.putheader('Content-Type', 'text/plain')929 conn.putheader('Content-Length', '9999')930 conn.endheaders()931 response = conn.getresponse()932 status_line, _actual_headers, actual_resp_body = webtest.shb(response)933 actual_status = int(status_line[:3])934 assert actual_status == 413935 expected_resp_body = (936 b'The entity sent with the request exceeds '937 b'the maximum allowed bytes.'938 )939 assert actual_resp_body == expected_resp_body940 conn.close()941def test_Content_Length_not_int(test_client):942 """Test that malicious Content-Length header returns 400."""943 status_line, _actual_headers, actual_resp_body = test_client.post(944 '/upload',945 headers=[946 ('Content-Type', 'text/plain'),947 ('Content-Length', 'not-an-integer'),948 ],949 )950 actual_status = int(status_line[:3])951 assert actual_status == 400952 assert actual_resp_body == b'Malformed Content-Length Header.'953@pytest.mark.parametrize(954 ('uri', 'expected_resp_status', 'expected_resp_body'),955 (956 (957 '/wrong_cl_buffered', 500,958 (959 b'The requested resource returned more bytes than the '960 b'declared Content-Length.'961 ),962 ),963 ('/wrong_cl_unbuffered', 200, b'I too'),964 ),965)966def test_Content_Length_out(967 test_client,968 uri, expected_resp_status, expected_resp_body,969):970 """Test response with Content-Length less than the response body.971 (non-chunked response)972 """973 conn = test_client.get_connection()974 conn.putrequest('GET', uri, skip_host=True)975 conn.putheader('Host', conn.host)976 conn.endheaders()977 response = conn.getresponse()978 status_line, _actual_headers, actual_resp_body = webtest.shb(response)979 actual_status = int(status_line[:3])980 assert actual_status == expected_resp_status981 assert actual_resp_body == expected_resp_body982 conn.close()983 # the server logs the exception that we had verified from the984 # client perspective. Tell the error_log verification that985 # it can ignore that message.986 test_client.server_instance.error_log.ignored_msgs.extend((987 # Python 3.7+:988 "ValueError('Response body exceeds the declared Content-Length.')",989 # Python 2.7-3.6 (macOS?):990 "ValueError('Response body exceeds the declared Content-Length.',)",991 ))992@pytest.mark.xfail(993 reason='Sometimes this test fails due to low timeout. '994 'Ref: https://github.com/cherrypy/cherrypy/issues/598',995)996def test_598(test_client):997 """Test serving large file with a read timeout in place."""998 # Initialize a persistent HTTP connection999 conn = test_client.get_connection()1000 remote_data_conn = urllib.request.urlopen(1001 '%s://%s:%s/one_megabyte_of_a'1002 % ('http', conn.host, conn.port),1003 )1004 buf = remote_data_conn.read(512)1005 time.sleep(timeout * 0.6)1006 remaining = (1024 * 1024) - 5121007 while remaining:1008 data = remote_data_conn.read(remaining)1009 if not data:1010 break1011 buf += data1012 remaining -= len(data)1013 assert len(buf) == 1024 * 10241014 assert buf == b'a' * 1024 * 10241015 assert remaining == 01016 remote_data_conn.close()1017@pytest.mark.parametrize(1018 'invalid_terminator',1019 (1020 b'\n\n',1021 b'\r\n\n',1022 ),1023)1024def test_No_CRLF(test_client, invalid_terminator):1025 """Test HTTP queries with no valid CRLF terminators."""1026 # Initialize a persistent HTTP connection1027 conn = test_client.get_connection()1028 # (b'%s' % b'') is not supported in Python 3.4, so just use bytes.join()1029 conn.send(b''.join((b'GET /hello HTTP/1.1', invalid_terminator)))1030 response = conn.response_class(conn.sock, method='GET')1031 response.begin()1032 actual_resp_body = response.read()1033 expected_resp_body = b'HTTP requires CRLF terminators'1034 assert actual_resp_body == expected_resp_body1035 conn.close()1036class FaultySelect:1037 """Mock class to insert errors in the selector.select method."""1038 def __init__(self, original_select):1039 """Initilize helper class to wrap the selector.select method."""1040 self.original_select = original_select1041 self.request_served = False1042 self.os_error_triggered = False1043 def __call__(self, timeout):1044 """Intercept the calls to selector.select."""1045 if self.request_served:1046 self.os_error_triggered = True1047 raise OSError('Error while selecting the client socket.')1048 return self.original_select(timeout)1049class FaultyGetMap:1050 """Mock class to insert errors in the selector.get_map method."""1051 def __init__(self, original_get_map):1052 """Initilize helper class to wrap the selector.get_map method."""1053 self.original_get_map = original_get_map1054 self.sabotage_conn = False1055 self.conn_closed = False1056 def __call__(self):1057 """Intercept the calls to selector.get_map."""1058 sabotage_targets = (1059 conn for _, (_, _, _, conn) in self.original_get_map().items()1060 if isinstance(conn, cheroot.server.HTTPConnection)1061 ) if self.sabotage_conn and not self.conn_closed else ()1062 for conn in sabotage_targets:1063 # close the socket to cause OSError1064 conn.close()1065 self.conn_closed = True1066 return self.original_get_map()1067def test_invalid_selected_connection(test_client, monkeypatch):1068 """Test the error handling segment of HTTP connection selection.1069 See :py:meth:`cheroot.connections.ConnectionManager.get_conn`.1070 """1071 # patch the select method1072 faux_select = FaultySelect(1073 test_client.server_instance._connections._selector.select,1074 )1075 monkeypatch.setattr(1076 test_client.server_instance._connections._selector,1077 'select',1078 faux_select,1079 )1080 # patch the get_map method1081 faux_get_map = FaultyGetMap(1082 test_client.server_instance._connections._selector._selector.get_map,1083 )1084 monkeypatch.setattr(1085 test_client.server_instance._connections._selector._selector,1086 'get_map',1087 faux_get_map,1088 )1089 # request a page with connection keep-alive to make sure1090 # we'll have a connection to be modified.1091 resp_status, _resp_headers, _resp_body = test_client.request(1092 '/page1', headers=[('Connection', 'Keep-Alive')],1093 )1094 assert resp_status == '200 OK'1095 # trigger the internal errors1096 faux_get_map.sabotage_conn = faux_select.request_served = True1097 # give time to make sure the error gets handled1098 time.sleep(test_client.server_instance.expiration_interval * 2)1099 assert faux_select.os_error_triggered...

Full Screen

Full Screen

_network.py

Source:_network.py Github

copy

Full Screen

...137 @property138 def headers(self) -> Headers:139 return self._provisional_headers.headers()140 async def all_headers(self) -> Headers:141 return (await self._actual_headers()).headers()142 async def headers_array(self) -> HeadersArray:143 return (await self._actual_headers()).headers_array()144 async def header_value(self, name: str) -> Optional[str]:145 return (await self._actual_headers()).get(name)146 async def _actual_headers(self) -> "RawHeaders":147 if not self._all_headers_future:148 self._all_headers_future = asyncio.Future()149 headers = await self._channel.send("rawRequestHeaders")150 self._all_headers_future.set_result(RawHeaders(headers))151 return await self._all_headers_future152class Route(ChannelOwner):153 def __init__(154 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict155 ) -> None:156 super().__init__(parent, type, guid, initializer)157 def __repr__(self) -> str:158 return f"<Route request={self.request}>"159 @property160 def request(self) -> Request:161 return from_channel(self._initializer["request"])162 async def abort(self, errorCode: str = None) -> None:163 await self._race_with_page_close(164 self._channel.send("abort", locals_to_params(locals()))165 )166 async def fulfill(167 self,168 status: int = None,169 headers: Dict[str, str] = None,170 body: Union[str, bytes] = None,171 path: Union[str, Path] = None,172 contentType: str = None,173 response: "APIResponse" = None,174 ) -> None:175 params = locals_to_params(locals())176 if response:177 del params["response"]178 params["status"] = (179 params["status"] if params.get("status") else response.status180 )181 params["headers"] = (182 params["headers"] if params.get("headers") else response.headers183 )184 from playwright._impl._fetch import APIResponse185 if body is None and path is None and isinstance(response, APIResponse):186 if response._request._connection is self._connection:187 params["fetchResponseUid"] = response._fetch_uid188 else:189 body = await response.body()190 length = 0191 if isinstance(body, str):192 params["body"] = body193 params["isBase64"] = False194 length = len(body.encode())195 elif isinstance(body, bytes):196 params["body"] = base64.b64encode(body).decode()197 params["isBase64"] = True198 length = len(body)199 elif path:200 del params["path"]201 file_content = Path(path).read_bytes()202 params["body"] = base64.b64encode(file_content).decode()203 params["isBase64"] = True204 length = len(file_content)205 headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()}206 if params.get("contentType"):207 headers["content-type"] = params["contentType"]208 elif path:209 headers["content-type"] = (210 mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream"211 )212 if length and "content-length" not in headers:213 headers["content-length"] = str(length)214 params["headers"] = serialize_headers(headers)215 await self._race_with_page_close(self._channel.send("fulfill", params))216 async def continue_(217 self,218 url: str = None,219 method: str = None,220 headers: Dict[str, str] = None,221 postData: Union[str, bytes] = None,222 ) -> None:223 overrides: ContinueParameters = {}224 if url:225 overrides["url"] = url226 if method:227 overrides["method"] = method228 if headers:229 overrides["headers"] = serialize_headers(headers)230 if isinstance(postData, str):231 overrides["postData"] = base64.b64encode(postData.encode()).decode()232 elif isinstance(postData, bytes):233 overrides["postData"] = base64.b64encode(postData).decode()234 await self._race_with_page_close(235 self._channel.send("continue", cast(Any, overrides))236 )237 def _internal_continue(self) -> None:238 async def continue_route() -> None:239 try:240 await self.continue_()241 except Exception:242 pass243 asyncio.create_task(continue_route())244 async def _race_with_page_close(self, future: Coroutine) -> None:245 if hasattr(self.request.frame, "_page"):246 page = self.request.frame._page247 # When page closes or crashes, we catch any potential rejects from this Route.248 # Note that page could be missing when routing popup's initial request that249 # does not have a Page initialized just yet.250 fut = asyncio.create_task(future)251 await asyncio.wait(252 [fut, page._closed_or_crashed_future],253 return_when=asyncio.FIRST_COMPLETED,254 )255 if page._closed_or_crashed_future.done():256 await asyncio.gather(fut, return_exceptions=True)257 else:258 await future259class Response(ChannelOwner):260 def __init__(261 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict262 ) -> None:263 super().__init__(parent, type, guid, initializer)264 self._request: Request = from_channel(self._initializer["request"])265 timing = self._initializer["timing"]266 self._request._timing["startTime"] = timing["startTime"]267 self._request._timing["domainLookupStart"] = timing["domainLookupStart"]268 self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"]269 self._request._timing["connectStart"] = timing["connectStart"]270 self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"]271 self._request._timing["connectEnd"] = timing["connectEnd"]272 self._request._timing["requestStart"] = timing["requestStart"]273 self._request._timing["responseStart"] = timing["responseStart"]274 self._provisional_headers = RawHeaders(275 cast(HeadersArray, self._initializer["headers"])276 )277 self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None278 self._finished_future: asyncio.Future[bool] = asyncio.Future()279 def __repr__(self) -> str:280 return f"<Response url={self.url!r} request={self.request}>"281 @property282 def url(self) -> str:283 return self._initializer["url"]284 @property285 def ok(self) -> bool:286 # Status 0 is for file:// URLs287 return self._initializer["status"] == 0 or (288 self._initializer["status"] >= 200 and self._initializer["status"] <= 299289 )290 @property291 def status(self) -> int:292 return self._initializer["status"]293 @property294 def status_text(self) -> str:295 return self._initializer["statusText"]296 @property297 def headers(self) -> Headers:298 return self._provisional_headers.headers()299 async def all_headers(self) -> Headers:300 return (await self._actual_headers()).headers()301 async def headers_array(self) -> HeadersArray:302 return (await self._actual_headers()).headers_array()303 async def header_value(self, name: str) -> Optional[str]:304 return (await self._actual_headers()).get(name)305 async def header_values(self, name: str) -> List[str]:306 return (await self._actual_headers()).get_all(name)307 async def _actual_headers(self) -> "RawHeaders":308 if not self._raw_headers_future:309 self._raw_headers_future = asyncio.Future()310 headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders"))311 self._raw_headers_future.set_result(RawHeaders(headers))312 return await self._raw_headers_future313 async def server_addr(self) -> Optional[RemoteAddr]:314 return await self._channel.send("serverAddr")315 async def security_details(self) -> Optional[SecurityDetails]:316 return await self._channel.send("securityDetails")317 async def finished(self) -> None:318 await self._finished_future319 async def body(self) -> bytes:320 binary = await self._channel.send("body")321 return base64.b64decode(binary)...

Full Screen

Full Screen

tabular.py

Source:tabular.py Github

copy

Full Screen

1# This file is part of the GBI project.2# Copyright (C) 2012 Omniscale GmbH & Co. KG <http://omniscale.com>3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Convert GeoJSON-like datastructures to tabular datasets"""16from io import BytesIO17import csv18from geobox.ext.odf import opendocument, style, table, text19class Tabular(object):20 """21 Collection dict-items into a list of rows.22 :param headers: expected headers. columns will be sorted23 in this order.24 """25 def __init__(self, headers=None, additional_headers=False):26 self.headers = headers or []27 self.additional_headers = additional_headers28 self._actual_headers = set()29 self._row_dicts = []30 def add(self, item):31 """32 Add dictionary as a new row.33 """34 self._actual_headers.update(set(item.keys()))35 self._row_dicts.append(item)36 def as_rows(self, with_headers=False):37 """38 Return all rows. The first row contains the39 header, if with_headers is true.40 columns are sorted by Taular.headers first, then any additional41 keys that are found in the added row dicts.42 """43 headers = list(self.headers)44 if self.additional_headers:45 for h in self._actual_headers:46 if h not in headers:47 headers.append(h)48 rows = []49 if with_headers:50 rows.append(headers)51 for row_dict in self._row_dicts:52 row = []53 for h in headers:54 row.append(row_dict.get(h))55 rows.append(row)56 return rows57def geojson_to_rows(doc, headers=None):58 """59 Collect properties of all GeoJSON Features as list of rows.60 :param doc: GeoJSON dictionary with FeatureCollection or Feature61 :param headers: List of expected property keys. Expected keys appear62 as the first columns in the output rows.63 :returns: list of rows, first row contains header names (property keys).64 """65 tabular = Tabular(headers)66 _add_geojson(tabular, doc)67 return tabular.as_rows(with_headers=True)68def _add_geojson(tabular, doc):69 if doc.get('type') == 'Feature':70 tabular.add(doc.get('properties', {}))71 elif doc.get('type') == 'FeatureCollection':72 for feature in doc.get('features', []):73 _add_geojson(tabular, feature)74odf_bold_style = style.Style(name="bold", family="paragraph")75odf_bold_style.addElement(style.TextProperties(fontweight="bold", fontweightasian="bold", fontweightcomplex="bold"))76def ods_export(rows, with_headers=False, name=None):77 """78 Export rows as OpenDocument spreadsheet.79 :params with_headers: If True, output first row as bold text.80 :params name: Name of the worksheet.81 :returns: ODS file content as string82 (use open(x, 'wb') when writing to a file).83 """84 # ODS export with code from tablib85 # (c) 2011 by Kenneth Reitz, MIT licensed86 wb = opendocument.OpenDocumentSpreadsheet()87 wb.automaticstyles.addElement(odf_bold_style)88 ws = table.Table(name=name or 'Export')89 wb.spreadsheet.addElement(ws)90 for i, row in enumerate(rows):91 row_number = i + 192 odf_row = table.TableRow(stylename=odf_bold_style, defaultcellstylename='bold')93 for j, col in enumerate(row):94 try:95 col = unicode(col, errors='ignore')96 except TypeError:97 ## col is already unicode98 pass99 ws.addElement(table.TableColumn())100 # bold headers101 if (row_number == 1) and with_headers:102 odf_row.setAttribute('stylename', odf_bold_style)103 ws.addElement(odf_row)104 cell = table.TableCell()105 p = text.P()106 p.addElement(text.Span(text=col, stylename=odf_bold_style))107 cell.addElement(p)108 odf_row.addElement(cell)109 # wrap the rest110 else:111 ws.addElement(odf_row)112 if isinstance(col, (int, float)):113 cell = table.TableCell(valuetype='float', value=str(col))114 else:115 cell = table.TableCell()116 cell.addElement(text.P(text=col))117 odf_row.addElement(cell)118 stream = BytesIO()119 wb.save(stream)120 return stream.getvalue()121def csv_export(rows):122 """123 Export rows as CSV file.124 :returns: CSV file content as string125 (use open(x, 'wb') when writing to a file).126 """127 stream = BytesIO()128 writer = csv.writer(stream)129 for row in rows:130 for i, col in enumerate(row):131 if isinstance(col, basestring):132 col = col.encode('latin-1', errors='replace')133 row[i] = col134 writer.writerow(row)135 return stream.getvalue()136if __name__ == '__main__':137 import json138 doc = json.load(open('geobox/test/test.geojson'))139 rows = geojson_to_rows(doc, headers=['Name'])140 with open('/tmp/foo.ods', 'wb') as f:141 f.write(ods_export(rows, with_headers=True))142 with open('/tmp/foo.csv', 'wb') as f:...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful