Best Python code snippet using playwright-python
test_wsgi_apps.py
Source:test_wsgi_apps.py  
...32        with self.assertRaisesRegex(AssertionError, r'expect true-value'):33            self.response.status = 40434        self.assertIs(self.response.status, consts.Statuses.BAD_REQUEST)35    def test_headers(self):36        def assert_headers(headers):37            self.assertEqual(self.response.headers, headers)38            # Test methods of _Response.Headers class.39            self.assertEqual(sorted(self.response.headers), sorted(headers))40            self.assertEqual(bool(self.response.headers), bool(headers))41            self.assertEqual(len(self.response.headers), len(headers))42            for key in headers:43                with self.subTest(key):44                    self.assertIn(key, self.response.headers)45                    self.assertEqual(self.response.headers[key], headers[key])46            self.assertNotIn('no-such-key', self.response.headers)47        assert_headers({})48        self.response.headers['p'] = 'q'49        self.response.headers['r'] = 's'50        assert_headers({'p': 'q', 'r': 's'})51        del self.response.headers['r']52        assert_headers({'p': 'q'})53        self.response.commit()54        assert_headers({'p': 'q'})55        with self.assertRaisesRegex(AssertionError, r'expect true-value'):56            self.response.headers['x'] = 'y'57        with self.assertRaisesRegex(AssertionError, r'expect true-value'):58            del self.response.headers['x']59        assert_headers({'p': 'q'})60        self.response.close()61        assert_headers({'p': 'q'})62        with self.assertRaisesRegex(AssertionError, r'expect true-value'):63            self.response.headers['x'] = 'y'64        with self.assertRaisesRegex(AssertionError, r'expect true-value'):65            del self.response.headers['x']66        assert_headers({'p': 'q'})67    @kernels.with_kernel68    def test_reset(self):69        self.response.status = 40070        self.response.headers['p'] = 'q'71        kernels.run(self.response.write(b'hello world'))72        self.response.reset()73        self.assertIs(self.response.status, consts.Statuses.OK)74        self.assertEqual(self.response.headers, {})75        self.assertIsNone(self.response._precommit.read_nonblocking())76        self.response.commit()77        with self.assertRaisesRegex(AssertionError, r'expect true-value'):78            self.response.reset()79        self.response.close()80        with self.assertRaisesRegex(AssertionError, r'expect true-value'):...utils.py
Source:utils.py  
1import datetime2import hashlib3import hmac4from copy import deepcopy5from functools import wraps6from urllib.parse import urlparse, parse_qsl, quote7SCOPE = 'dl1_request'8class DLSigner(object):9    __slots__ = ['service', 'access_key', 'secret_key', 'algorithm', 'solution', 'hash_method']10    def __init__(self, service, access_key, secret_key, algorithm='DL-HMAC-SHA256', solution='RING'):11        """12        Signer initialization, accepts arguments that are constant in13        signing process and not related to specific request14        :param service:15        :param access_key: Key that allows you to access to API16        :param secret_key: Key17        :param algorithm: One of following hashing algorithms:18            * DL-HMAC-SHASHA224,19            * DL-HMAC-SHASHA256, - if algorithm param is missing, used as default value20            * DL-HMAC-SHASHA384,21            * DL-HMAC-SHASHA51222        :param solution: Solution which aggregates a several services.23        """24        assert service, 'Missing service parameter.'25        self.service = service26        assert access_key, 'Missing access_key parameter.'27        self.access_key = access_key28        assert secret_key, 'Missing secret_key parameter'29        self.secret_key = secret_key30        self.solution = solution31        assert algorithm.startswith('DL-HMAC-SHA'), 'Invalid hashing method.'32        self.algorithm = algorithm33        self.hash_method = algorithm.split('-')[-1].lower()34        assert self.hash_method in (35            'sha224', 'sha256', 'sha384', 'sha512'), 'Invalid hashing algorithm.'36        self.hash_method = getattr(hashlib, self.hash_method)37    @staticmethod38    def _check_sign_params(request):39        """Checks params of request dictionary."""40        assert 'headers' in request, 'Missing headers.'41        assert_headers = set(k.lower() for k in request['headers'])42        assert 'host' in assert_headers, 'Missing Host parameter.'43        if 'body' in request:44            assert isinstance(request['body'], bytearray), \45                f'Body must be instance of bytes. not {type(request["body"])}'46        assert 'content-type' in assert_headers47        copied_request = deepcopy(request)48        if 'x-dl-date' not in assert_headers:49            copied_request['headers']['X-DL-Date'] = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')50        del assert_headers51        return copied_request52    def _sign(self, key, msg, hex_output=False):53        """Performs hashing, returns digest or hexdigest depending on 'hex_output' argument."""54        key = key if isinstance(key, bytes) else key.encode('utf-8')55        msg = msg if isinstance(msg, bytes) else msg.encode('utf-8')56        sign = hmac.new(key, msg, self.hash_method)57        return sign.digest() if not hex_output else sign.hexdigest()58    def _get_canonical_request(self, request):59        """Return formatted string of canonical request data."""60        method = request['method']61        uri = urlparse(request.get('uri', '/'))62        payload = request.get('body', b'')63        headers = self._get_headers(request)64        params = parse_qsl(uri.query, keep_blank_values=True)65        params = '&'.join('{}={}'.format(quote(k, safe='-_.~'), quote(v, safe='-_.~')) for k, v in sorted(params))66        return "{method}\n{uri}\n{params}\n{canonical_headers}\n{signed_headers}\n{payload_hash}".format(67            method=method,68            uri=quote(uri.path, safe='/-_.~'),69            params=params,70            canonical_headers=headers['canonical_headers'],71            signed_headers=headers['signed_headers'],72            payload_hash=self.hash_method(payload).hexdigest()73        )74    @staticmethod75    def _get_headers(request):76        """Method returning dictionary with formatted strings of canonical_headers and signed_headers."""77        canonical_headers = []78        signed_headers = []79        for header_key, header_value in sorted(request['headers'].items(), key=lambda s: s[0].lower()):80            canonical_headers.append('{}:{}'.format(header_key.lower(), header_value.strip()))81            signed_headers.append(header_key.lower())82        canonical_headers = '\n'.join(canonical_headers)83        signed_headers = ';'.join(signed_headers)84        return {'canonical_headers': canonical_headers,85                'signed_headers': signed_headers}86    def _get_string_to_sign(self, canonical_request, date):87        return "{algorithm}\n{date}\n{scope}\n{canonical_request_hash}".format(88            algorithm=self.algorithm,89            date=date,90            scope=date[:8] + '/' + self.solution + '/' + self.service + '/' + SCOPE,91            canonical_request_hash=self.hash_method(canonical_request.encode('utf-8')).hexdigest()92        )93    def _get_signing_key(self, date):94        key = self._sign('DL' + self.secret_key, date[:8])95        key = self._sign(key, self.solution)96        key = self._sign(key, self.service)97        key = self._sign(key, SCOPE)98        return key99    def _get_signature(self, request):100        """Get_signature is calling other methods to process data to finally101            return a signature. """102        canonical_request = self._get_canonical_request(request)103        string_to_sign = self._get_string_to_sign(canonical_request, request['headers']['X-DL-Date'])104        signing_key = self._get_signing_key(request['headers']['X-DL-Date'])105        signature = self._sign(signing_key, string_to_sign, True)106        return signature107    def sign(self, original_request):108        """109        Signs request and returns dictionary with parameters required for authorization process.110            :param original_request: has to be an instance of dict with keys:111                * method: - with values POST/GET/PUT/DELETE112                * uri: URI of the request. If there is no URI given in request dict,113                program will insert default value of URI.114                * headers: - headers of your requests. This key has to be a dictionary.115                    Into headers you have to put 'host' key.116                * payload: - optional.117        :returns: dict:118        """119        request = self._check_sign_params(original_request)120        return {121            'Authorization':122                '{algorithm} Credential={credentials},SignedHeaders={signed_headers},Signature={signature}'.format(123                    algorithm=self.algorithm.upper(),124                    credentials=self.access_key + '/' + request['headers']['X-DL-Date'][:8] +125                                '/' + self.solution + '/'126                                + self.service + '/' + SCOPE,127                    signed_headers=self._get_headers(request)['signed_headers'],128                    signature=self._get_signature(request)129                ),130            'X-DL-Date': request['headers']['X-DL-Date']}131class ClientError(Exception):132    def __init__(self, code, message, data=''):133        self.code = code134        self.message = message135        self.data = data136    def __str__(self):137        return f'code={self.code} message={self.message}' \138            f'{f" data={self.data}" if self.data else ""}'139def attr_generator(cls):140    def getattr(self, item):141        if item.isidentifier() and not item.startswith('_'):142            return cls(item, self)143        raise AttributeError(f'{cls.__name__} must be public method')144    def add_attr(klass):145        klass.__getattr__ = getattr146        return klass147    return add_attr148def convert(func, iterable_type):149    @wraps(func)150    def converted(*args, **kwargs):151        return iterable_type(func(*args, **kwargs))...test_confluence.py
Source:test_confluence.py  
...115        self.api._session = client116        got = self.api.get_author('foo')117        self.assertEqual(got['userKey'], userKey)118class TestConfluenceHeaders(unittest.TestCase):119    def assert_headers(self, got, want):120        for name, value in want.items():121            self.assertEqual(got[name], value)122    def testHeaders(self):123        headers = ['Cookie: NAME=VALUE', 'X-CUSTOM-HEADER: VALUE']124        want = {'Cookie': 'NAME=VALUE', 'X-CUSTOM-HEADER': 'VALUE'}125        api = Confluence(api_url='https://wiki.example.com/rest/api',126                         headers=headers)127        self.assert_headers(api._session.headers, want)128    def testHeadersDuplicates(self):129        # HTTP headers are case insensitive. If multiple headers with the same130        # name are passed, the last one should win.131        headers = ['X-CUSTOM-HEADER: foo', 'X-Custom-Header: bar', 'x-custom-header: baz']132        want = {'x-custom-header': 'baz'}133        api = Confluence(api_url='https://wiki.example.com/rest/api',134                         headers=headers)135        self.assert_headers(api._session.headers, want)136    def testHeadersNoValue(self):137        # If no value is set, an empty string should be used.138        headers = ['X-CUSTOM-HEADER-1:', 'X-CUSTOM-HEADER-2']139        want = {'X-CUSTOM-HEADER-1': '', 'X-CUSTOM-HEADER-2': ''}140        api = Confluence(api_url='https://wiki.example.com/rest/api',141                         headers=headers)142        self.assert_headers(api._session.headers, want)143if __name__ == '__main__':...test_download.py
Source:test_download.py  
...18    # our google test urls dont support HEAD19    req = requests.get(url)20    # we test against binary response: Content-Length not accurate as gzip-encoded21    assert file.stat().st_size == len(req.content)22def assert_headers(returned_headers):23    assert isinstance(returned_headers, requests.structures.CaseInsensitiveDict)24    assert returned_headers["Content-Type"] == "image/x-icon"25def get_dest_file(tmp_path):26    return tmp_path.joinpath("favicon.ico")27def test_missing_dest(tmp_path):28    with pytest.raises(requests.exceptions.ConnectionError):29        stream_file(url="http://some_url", byte_stream=io.BytesIO())30def test_invalid_url(tmp_path, invalid_url):31    dest_file = tmp_path / "favicon.ico"32    with pytest.raises(requests.exceptions.ConnectionError):33        stream_file(url=invalid_url, fpath=dest_file)34def test_no_output_supplied(valid_http_url):35    with pytest.raises(36        ValueError, match="Either file path or a bytesIO object is needed"37    ):38        stream_file(url=valid_http_url)39def test_first_block_download(valid_http_url):40    byte_stream = io.BytesIO()41    size, ret = stream_file(42        url=valid_http_url, byte_stream=byte_stream, only_first_block=True43    )44    assert_headers(ret)45    # valid_http_url randomly returns gzip-encoded content.46    # otherwise, expected size is default block size47    expected = 3062 if ret.get("Content-Encoding") == "gzip" else 102448    assert len(byte_stream.read()) <= expected49@pytest.mark.slow50def test_save_http(tmp_path, valid_http_url):51    dest_file = tmp_path / "favicon.ico"52    size, ret = stream_file(url=valid_http_url, fpath=dest_file)53    assert_headers(ret)54    assert_downloaded_file(valid_http_url, dest_file)55@pytest.mark.slow56def test_save_https(tmp_path, valid_https_url):57    dest_file = tmp_path / "favicon.ico"58    size, ret = stream_file(url=valid_https_url, fpath=dest_file)59    assert_headers(ret)60    assert_downloaded_file(valid_https_url, dest_file)61@pytest.mark.slow62def test_stream_to_bytes(valid_https_url):63    byte_stream = io.BytesIO()64    size, ret = stream_file(url=valid_https_url, byte_stream=byte_stream)65    assert_headers(ret)66    assert byte_stream.read() == requests.get(valid_https_url).content67@pytest.mark.slow68def test_save_parent_folder_missing(tmp_path, valid_http_url):69    dest_file = tmp_path / "some-folder" / "favicon.ico"70    with pytest.raises(IOError):71        stream_file(url=valid_http_url, fpath=dest_file)72@pytest.mark.slow73def test_save_http_error(tmp_path, http_error_url):74    dest_file = tmp_path / "favicon.ico"75    with pytest.raises(requests.exceptions.HTTPError):76        stream_file(url=http_error_url, fpath=dest_file)77@pytest.mark.slow78def test_large_download_http(tmp_path, valid_http_url):79    dest_file = tmp_path / "favicon.ico"...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
