Best Python code snippet using playwright-python
test_http_headers.py
Source:test_http_headers.py  
...199    """200    Tests for the backwards compatible C{dict} interface for L{Headers}201    provided by L{_DictHeaders}.202    """203    def headers(self, **kw):204        """205        Create a L{Headers} instance populated with the header name/values206        specified by C{kw} and a L{_DictHeaders} wrapped around it and return207        them both.208        """209        h = Headers()210        for k, v in kw.items():211            h.setRawHeaders(k.encode('ascii'), v)212        return h, _DictHeaders(h)213    def test_getItem(self):214        """215        L{_DictHeaders.__getitem__} returns a single header for the given name.216        """217        headers, wrapper = self.headers(test=[b"lemur"])218        self.assertEqual(wrapper[b"test"], b"lemur")219    def test_getItemMultiple(self):220        """221        L{_DictHeaders.__getitem__} returns only the last header value for a222        given name.223        """224        headers, wrapper = self.headers(test=[b"lemur", b"panda"])225        self.assertEqual(wrapper[b"test"], b"panda")226    def test_getItemMissing(self):227        """228        L{_DictHeaders.__getitem__} raises L{KeyError} if called with a header229        which is not present.230        """231        headers, wrapper = self.headers()232        exc = self.assertRaises(KeyError, wrapper.__getitem__, b"test")233        self.assertEqual(exc.args, (b"test",))234    def test_iteration(self):235        """236        L{_DictHeaders.__iter__} returns an iterator the elements of which237        are the lowercase name of each header present.238        """239        headers, wrapper = self.headers(foo=[b"lemur", b"panda"], bar=[b"baz"])240        self.assertEqual(set(list(wrapper)), set([b"foo", b"bar"]))241    def test_length(self):242        """243        L{_DictHeaders.__len__} returns the number of headers present.244        """245        headers, wrapper = self.headers()246        self.assertEqual(len(wrapper), 0)247        headers.setRawHeaders(b"foo", [b"bar"])248        self.assertEqual(len(wrapper), 1)249        headers.setRawHeaders(b"test", [b"lemur", b"panda"])250        self.assertEqual(len(wrapper), 2)251    def test_setItem(self):252        """253        L{_DictHeaders.__setitem__} sets a single header value for the given254        name.255        """256        headers, wrapper = self.headers()257        wrapper[b"test"] = b"lemur"258        self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])259    def test_setItemOverwrites(self):260        """261        L{_DictHeaders.__setitem__} will replace any previous header values for262        the given name.263        """264        headers, wrapper = self.headers(test=[b"lemur", b"panda"])265        wrapper[b"test"] = b"lemur"266        self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])267    def test_delItem(self):268        """269        L{_DictHeaders.__delitem__} will remove the header values for the given270        name.271        """272        headers, wrapper = self.headers(test=[b"lemur"])273        del wrapper[b"test"]274        self.assertFalse(headers.hasHeader(b"test"))275    def test_delItemMissing(self):276        """277        L{_DictHeaders.__delitem__} will raise L{KeyError} if the given name is278        not present.279        """280        headers, wrapper = self.headers()281        exc = self.assertRaises(KeyError, wrapper.__delitem__, b"test")282        self.assertEqual(exc.args, (b"test",))283    def test_keys(self, _method='keys', _requireList=not _PY3):284        """285        L{_DictHeaders.keys} will return a list of all present header names.286        """287        headers, wrapper = self.headers(test=[b"lemur"], foo=[b"bar"])288        keys = getattr(wrapper, _method)()289        if _requireList:290            self.assertIsInstance(keys, list)291        self.assertEqual(set(keys), set([b"foo", b"test"]))292    def test_iterkeys(self):293        """294        L{_DictHeaders.iterkeys} will return all present header names.295        """296        self.test_keys('iterkeys', False)297    def test_values(self, _method='values', _requireList=not _PY3):298        """299        L{_DictHeaders.values} will return a list of all present header values,300        returning only the last value for headers with more than one.301        """302        headers, wrapper = self.headers(303            foo=[b"lemur"], bar=[b"marmot", b"panda"])304        values = getattr(wrapper, _method)()305        if _requireList:306            self.assertIsInstance(values, list)307        self.assertEqual(set(values), set([b"lemur", b"panda"]))308    def test_itervalues(self):309        """310        L{_DictHeaders.itervalues} will return all present header values,311        returning only the last value for headers with more than one.312        """313        self.test_values('itervalues', False)314    def test_items(self, _method='items', _requireList=not _PY3):315        """316        L{_DictHeaders.items} will return a list of all present header names317        and values as tuples, returning only the last value for headers with318        more than one.319        """320        headers, wrapper = self.headers(321            foo=[b"lemur"], bar=[b"marmot", b"panda"])322        items = getattr(wrapper, _method)()323        if _requireList:324            self.assertIsInstance(items, list)325        self.assertEqual(326            set(items), set([(b"foo", b"lemur"), (b"bar", b"panda")]))327    def test_iteritems(self):328        """329        L{_DictHeaders.iteritems} will return all present header names and330        values as tuples, returning only the last value for headers with more331        than one.332        """333        self.test_items('iteritems', False)334    def test_clear(self):335        """336        L{_DictHeaders.clear} will remove all headers.337        """338        headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda"])339        wrapper.clear()340        self.assertEqual(list(headers.getAllRawHeaders()), [])341    def test_copy(self):342        """343        L{_DictHeaders.copy} will return a C{dict} with all the same headers344        and the last value for each.345        """346        headers, wrapper = self.headers(347            foo=[b"lemur", b"panda"], bar=[b"marmot"])348        duplicate = wrapper.copy()349        self.assertEqual(duplicate, {b"foo": b"panda", b"bar": b"marmot"})350    def test_get(self):351        """352        L{_DictHeaders.get} returns the last value for the given header name.353        """354        headers, wrapper = self.headers(foo=[b"lemur", b"panda"])355        self.assertEqual(wrapper.get(b"foo"), b"panda")356    def test_getMissing(self):357        """358        L{_DictHeaders.get} returns C{None} for a header which is not present.359        """360        headers, wrapper = self.headers()361        self.assertIdentical(wrapper.get(b"foo"), None)362    def test_getDefault(self):363        """364        L{_DictHeaders.get} returns the last value for the given header name365        even when it is invoked with a default value.366        """367        headers, wrapper = self.headers(foo=[b"lemur"])368        self.assertEqual(wrapper.get(b"foo", b"bar"), b"lemur")369    def test_getDefaultMissing(self):370        """371        L{_DictHeaders.get} returns the default value specified if asked for a372        header which is not present.373        """374        headers, wrapper = self.headers()375        self.assertEqual(wrapper.get(b"foo", b"bar"), b"bar")376    def test_has_key(self):377        """378        L{_DictHeaders.has_key} returns C{True} if the given header is present,379        C{False} otherwise.380        """381        headers, wrapper = self.headers(foo=[b"lemur"])382        self.assertTrue(wrapper.has_key(b"foo"))383        self.assertFalse(wrapper.has_key(b"bar"))384    def test_contains(self):385        """386        L{_DictHeaders.__contains__} returns C{True} if the given header is387        present, C{False} otherwise.388        """389        headers, wrapper = self.headers(foo=[b"lemur"])390        self.assertIn(b"foo", wrapper)391        self.assertNotIn(b"bar", wrapper)392    def test_pop(self):393        """394        L{_DictHeaders.pop} returns the last header value associated with the395        given header name and removes the header.396        """397        headers, wrapper = self.headers(foo=[b"lemur", b"panda"])398        self.assertEqual(wrapper.pop(b"foo"), b"panda")399        self.assertIdentical(headers.getRawHeaders(b"foo"), None)400    def test_popMissing(self):401        """402        L{_DictHeaders.pop} raises L{KeyError} if passed a header name which is403        not present.404        """405        headers, wrapper = self.headers()406        self.assertRaises(KeyError, wrapper.pop, b"foo")407    def test_popDefault(self):408        """409        L{_DictHeaders.pop} returns the last header value associated with the410        given header name and removes the header, even if it is supplied with a411        default value.412        """413        headers, wrapper = self.headers(foo=[b"lemur"])414        self.assertEqual(wrapper.pop(b"foo", b"bar"), b"lemur")415        self.assertIdentical(headers.getRawHeaders(b"foo"), None)416    def test_popDefaultMissing(self):417        """418        L{_DictHeaders.pop} returns the default value is asked for a header419        name which is not present.420        """421        headers, wrapper = self.headers(foo=[b"lemur"])422        self.assertEqual(wrapper.pop(b"bar", b"baz"), b"baz")423        self.assertEqual(headers.getRawHeaders(b"foo"), [b"lemur"])424    def test_popitem(self):425        """426        L{_DictHeaders.popitem} returns some header name/value pair.427        """428        headers, wrapper = self.headers(foo=[b"lemur", b"panda"])429        self.assertEqual(wrapper.popitem(), (b"foo", b"panda"))430        self.assertIdentical(headers.getRawHeaders(b"foo"), None)431    def test_popitemEmpty(self):432        """433        L{_DictHeaders.popitem} raises L{KeyError} if there are no headers434        present.435        """436        headers, wrapper = self.headers()437        self.assertRaises(KeyError, wrapper.popitem)438    def test_update(self):439        """440        L{_DictHeaders.update} adds the header/value pairs in the C{dict} it is441        passed, overriding any existing values for those headers.442        """443        headers, wrapper = self.headers(foo=[b"lemur"])444        wrapper.update({b"foo": b"panda", b"bar": b"marmot"})445        self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])446        self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])447    def test_updateWithKeywords(self):448        """449        L{_DictHeaders.update} adds header names given as keyword arguments450        with the keyword values as the header value.451        """452        headers, wrapper = self.headers(foo=[b"lemur"])453        wrapper.update(foo=b"panda", bar=b"marmot")454        self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])455        self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])456    if _PY3:457        test_updateWithKeywords.skip = "Not yet supported on Python 3; see #6082."458    def test_setdefaultMissing(self):459        """460        If passed the name of a header which is not present,461        L{_DictHeaders.setdefault} sets the value of the given header to the462        specified default value and returns it.463        """464        headers, wrapper = self.headers(foo=[b"bar"])465        self.assertEqual(wrapper.setdefault(b"baz", b"quux"), b"quux")466        self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar"])467        self.assertEqual(headers.getRawHeaders(b"baz"), [b"quux"])468    def test_setdefaultPresent(self):469        """470        If passed the name of a header which is present,471        L{_DictHeaders.setdefault} makes no changes to the headers and472        returns the last value already associated with that header.473        """474        headers, wrapper = self.headers(foo=[b"bar", b"baz"])475        self.assertEqual(wrapper.setdefault(b"foo", b"quux"), b"baz")476        self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar", b"baz"])477    def test_setdefaultDefault(self):478        """479        If a value is not passed to L{_DictHeaders.setdefault}, C{None} is480        used.481        """482        # This results in an invalid state for the headers, but maybe some483        # application is doing this an intermediate step towards some other484        # state.  Anyway, it was broken with the old implementation so it's485        # broken with the new implementation.  Compatibility, for the win.486        # -exarkun487        headers, wrapper = self.headers()488        self.assertIdentical(wrapper.setdefault(b"foo"), None)489        self.assertEqual(headers.getRawHeaders(b"foo"), [None])490    def test_dictComparison(self):491        """492        An instance of L{_DictHeaders} compares equal to a C{dict} which493        contains the same header/value pairs.  For header names with multiple494        values, the last value only is considered.495        """496        headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda", b"marmot"])497        self.assertNotEqual(wrapper, {b"foo": b"lemur", b"bar": b"panda"})498        self.assertEqual(wrapper, {b"foo": b"lemur", b"bar": b"marmot"})499    def test_otherComparison(self):500        """501        An instance of L{_DictHeaders} does not compare equal to other502        unrelated objects.503        """504        headers, wrapper = self.headers()505        self.assertNotEqual(wrapper, ())506        self.assertNotEqual(wrapper, object())507        self.assertNotEqual(wrapper, b"foo")508    if _PY3:509        # Python 3 lacks these APIs...test_base.py
Source:test_base.py  
1# encoding: utf-82from nose import tools as nose_tools3import ckan.tests.helpers as helpers4import ckan.plugins as p5import ckan.tests.factories as factories6class TestRenderSnippet(helpers.FunctionalTestBase):7    """8    Test ``ckan.lib.base.render_snippet``.9    """10    @helpers.change_config('debug', True)11    def test_comment_present_if_debug_true(self):12        response = self._get_test_app().get('/')13        assert '<!-- Snippet ' in response14    @helpers.change_config('debug', False)15    def test_comment_absent_if_debug_false(self):16        response = self._get_test_app().get('/')17        assert '<!-- Snippet ' not in response18class TestGetUserForApikey(helpers.FunctionalTestBase):19    def test_apikey_missing(self):20        app = self._get_test_app()21        request_headers = {}22        app.get('/dataset/new', headers=request_headers, status=403)23    def test_apikey_in_authorization_header(self):24        user = factories.Sysadmin()25        app = self._get_test_app()26        request_headers = {'Authorization': str(user['apikey'])}27        app.get('/dataset/new', headers=request_headers)28    def test_apikey_in_x_ckan_header(self):29        user = factories.Sysadmin()30        app = self._get_test_app()31        # non-standard header name is defined in test-core.ini32        request_headers = {'X-Non-Standard-CKAN-API-Key': str(user['apikey'])}33        app.get('/dataset/new', headers=request_headers)34    def test_apikey_contains_unicode(self):35        # there is no valid apikey containing unicode, but we should fail36        # nicely if unicode is supplied37        app = self._get_test_app()38        request_headers = {'Authorization': '\xc2\xb7'}39        app.get('/dataset/new', headers=request_headers, status=403)40class TestCORS(helpers.FunctionalTestBase):41    def test_options(self):42        app = self._get_test_app()43        response = app.options(url='/', status=200)44        assert len(str(response.body)) == 0, 'OPTIONS must return no content'45    def test_cors_config_no_cors(self):46        '''47        No ckan.cors settings in config, so no Access-Control-Allow headers in48        response.49        '''50        app = self._get_test_app()51        response = app.get('/')52        response_headers = dict(response.headers)53        assert 'Access-Control-Allow-Origin' not in response_headers54        assert 'Access-Control-Allow-Methods' not in response_headers55        assert 'Access-Control-Allow-Headers' not in response_headers56    def test_cors_config_no_cors_with_origin(self):57        '''58        No ckan.cors settings in config, so no Access-Control-Allow headers in59        response, even with origin header in request.60        '''61        app = self._get_test_app()62        request_headers = {'Origin': 'http://thirdpartyrequests.org'}63        response = app.get('/', headers=request_headers)64        response_headers = dict(response.headers)65        assert 'Access-Control-Allow-Origin' not in response_headers66        assert 'Access-Control-Allow-Methods' not in response_headers67        assert 'Access-Control-Allow-Headers' not in response_headers68    @helpers.change_config('ckan.cors.origin_allow_all', 'true')69    def test_cors_config_origin_allow_all_true_no_origin(self):70        '''71        With origin_allow_all set to true, but no origin in the request72        header, no Access-Control-Allow headers should be in the response.73        '''74        app = self._get_test_app()75        response = app.get('/')76        response_headers = dict(response.headers)77        assert 'Access-Control-Allow-Origin' not in response_headers78        assert 'Access-Control-Allow-Methods' not in response_headers79        assert 'Access-Control-Allow-Headers' not in response_headers80    @helpers.change_config('ckan.cors.origin_allow_all', 'true')81    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')82    def test_cors_config_origin_allow_all_true_with_origin(self):83        '''84        With origin_allow_all set to true, and an origin in the request85        header, the appropriate Access-Control-Allow headers should be in the86        response.87        '''88        app = self._get_test_app()89        request_headers = {'Origin': 'http://thirdpartyrequests.org'}90        response = app.get('/', headers=request_headers)91        response_headers = dict(response.headers)92        assert 'Access-Control-Allow-Origin' in response_headers93        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')94        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")95        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")96    @helpers.change_config('ckan.cors.origin_allow_all', 'false')97    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')98    def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):99        '''100        With origin_allow_all set to false, with an origin in the request101        header, but no whitelist defined, there should be no Access-Control-102        Allow headers in the response.103        '''104        app = self._get_test_app()105        request_headers = {'Origin': 'http://thirdpartyrequests.org'}106        response = app.get('/', headers=request_headers)107        response_headers = dict(response.headers)108        assert 'Access-Control-Allow-Origin' not in response_headers109        assert 'Access-Control-Allow-Methods' not in response_headers110        assert 'Access-Control-Allow-Headers' not in response_headers111    @helpers.change_config('ckan.cors.origin_allow_all', 'false')112    @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')113    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')114    def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):115        '''116        With origin_allow_all set to false, with an origin in the request117        header, and a whitelist defined (containing the origin), the118        appropriate Access-Control-Allow headers should be in the response.119        '''120        app = self._get_test_app()121        request_headers = {'Origin': 'http://thirdpartyrequests.org'}122        response = app.get('/', headers=request_headers)123        response_headers = dict(response.headers)124        assert 'Access-Control-Allow-Origin' in response_headers125        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')126        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")127        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")128    @helpers.change_config('ckan.cors.origin_allow_all', 'false')129    @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')130    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')131    def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):132        '''133        With origin_allow_all set to false, with an origin in the request134        header, and a whitelist defining multiple allowed origins (containing135        the origin), the appropriate Access-Control-Allow headers should be in136        the response.137        '''138        app = self._get_test_app()139        request_headers = {'Origin': 'http://thirdpartyrequests.org'}140        response = app.get('/', headers=request_headers)141        response_headers = dict(response.headers)142        assert 'Access-Control-Allow-Origin' in response_headers143        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')144        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")145        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")146    @helpers.change_config('ckan.cors.origin_allow_all', 'false')147    @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')148    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')149    def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):150        '''151        With origin_allow_all set to false, with an origin in the request152        header, and a whitelist defining multiple allowed origins (but not153        containing the requesting origin), there should be no Access-Control-154        Allow headers in the response.155        '''156        app = self._get_test_app()157        request_headers = {'Origin': 'http://thirdpartyrequests.org'}158        response = app.get('/', headers=request_headers)159        response_headers = dict(response.headers)160        assert 'Access-Control-Allow-Origin' not in response_headers161        assert 'Access-Control-Allow-Methods' not in response_headers162        assert 'Access-Control-Allow-Headers' not in response_headers163class TestCORSFlask(helpers.FunctionalTestBase):164    @classmethod165    def setup_class(cls):166        super(TestCORSFlask, cls).setup_class()167        cls.app = cls._get_test_app()168        flask_app = cls.app.flask_app169        if not p.plugin_loaded('test_routing_plugin'):170            p.load('test_routing_plugin')171            plugin = p.get_plugin('test_routing_plugin')172            flask_app.register_blueprint(plugin.get_blueprint(),173                                         prioritise_rules=True)174    @classmethod175    def teardown_class(cls):176        super(TestCORSFlask, cls).teardown_class()177        p.unload('test_routing_plugin')178    def test_options(self):179        response = self.app.options(url='/simple_flask', status=200)180        assert len(str(response.body)) == 0, 'OPTIONS must return no content'181    def test_cors_config_no_cors(self):182        '''183        No ckan.cors settings in config, so no Access-Control-Allow headers in184        response.185        '''186        response = self.app.get('/simple_flask')187        response_headers = dict(response.headers)188        assert 'Access-Control-Allow-Origin' not in response_headers189        assert 'Access-Control-Allow-Methods' not in response_headers190        assert 'Access-Control-Allow-Headers' not in response_headers191    def test_cors_config_no_cors_with_origin(self):192        '''193        No ckan.cors settings in config, so no Access-Control-Allow headers in194        response, even with origin header in request.195        '''196        request_headers = {'Origin': 'http://thirdpartyrequests.org'}197        response = self.app.get('/simple_flask', headers=request_headers)198        response_headers = dict(response.headers)199        assert 'Access-Control-Allow-Origin' not in response_headers200        assert 'Access-Control-Allow-Methods' not in response_headers201        assert 'Access-Control-Allow-Headers' not in response_headers202    @helpers.change_config('ckan.cors.origin_allow_all', 'true')203    def test_cors_config_origin_allow_all_true_no_origin(self):204        '''205        With origin_allow_all set to true, but no origin in the request206        header, no Access-Control-Allow headers should be in the response.207        '''208        response = self.app.get('/simple_flask')209        response_headers = dict(response.headers)210        assert 'Access-Control-Allow-Origin' not in response_headers211        assert 'Access-Control-Allow-Methods' not in response_headers212        assert 'Access-Control-Allow-Headers' not in response_headers213    @helpers.change_config('ckan.cors.origin_allow_all', 'true')214    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')215    def test_cors_config_origin_allow_all_true_with_origin(self):216        '''217        With origin_allow_all set to true, and an origin in the request218        header, the appropriate Access-Control-Allow headers should be in the219        response.220        '''221        request_headers = {'Origin': 'http://thirdpartyrequests.org'}222        response = self.app.get('/simple_flask', headers=request_headers)223        response_headers = dict(response.headers)224        assert 'Access-Control-Allow-Origin' in response_headers225        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')226        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")227        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")228    @helpers.change_config('ckan.cors.origin_allow_all', 'false')229    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')230    def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):231        '''232        With origin_allow_all set to false, with an origin in the request233        header, but no whitelist defined, there should be no Access-Control-234        Allow headers in the response.235        '''236        request_headers = {'Origin': 'http://thirdpartyrequests.org'}237        response = self.app.get('/simple_flask', headers=request_headers)238        response_headers = dict(response.headers)239        assert 'Access-Control-Allow-Origin' not in response_headers240        assert 'Access-Control-Allow-Methods' not in response_headers241        assert 'Access-Control-Allow-Headers' not in response_headers242    @helpers.change_config('ckan.cors.origin_allow_all', 'false')243    @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')244    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')245    def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):246        '''247        With origin_allow_all set to false, with an origin in the request248        header, and a whitelist defined (containing the origin), the249        appropriate Access-Control-Allow headers should be in the response.250        '''251        request_headers = {'Origin': 'http://thirdpartyrequests.org'}252        response = self.app.get('/simple_flask', headers=request_headers)253        response_headers = dict(response.headers)254        assert 'Access-Control-Allow-Origin' in response_headers255        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')256        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")257        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")258    @helpers.change_config('ckan.cors.origin_allow_all', 'false')259    @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')260    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')261    def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):262        '''263        With origin_allow_all set to false, with an origin in the request264        header, and a whitelist defining multiple allowed origins (containing265        the origin), the appropriate Access-Control-Allow headers should be in266        the response.267        '''268        request_headers = {'Origin': 'http://thirdpartyrequests.org'}269        response = self.app.get('/simple_flask', headers=request_headers)270        response_headers = dict(response.headers)271        assert 'Access-Control-Allow-Origin' in response_headers272        nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')273        nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")274        nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")275    @helpers.change_config('ckan.cors.origin_allow_all', 'false')276    @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')277    @helpers.change_config('ckan.site_url', 'http://test.ckan.org')278    def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):279        '''280        With origin_allow_all set to false, with an origin in the request281        header, and a whitelist defining multiple allowed origins (but not282        containing the requesting origin), there should be no Access-Control-283        Allow headers in the response.284        '''285        request_headers = {'Origin': 'http://thirdpartyrequests.org'}286        response = self.app.get('/simple_flask', headers=request_headers)287        response_headers = dict(response.headers)288        assert 'Access-Control-Allow-Origin' not in response_headers289        assert 'Access-Control-Allow-Methods' not in response_headers...test_warc.py
Source:test_warc.py  
1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21import json, urllib22from operator import itemgetter23from warcio.archiveiterator import ArchiveIterator24from yarl import URL25from multidict import CIMultiDict26from hypothesis import given, reproduce_failure27import hypothesis.strategies as st28import pytest29from .warc import WarcHandler30from .logger import Logger, WarcHandlerConsumer31from .controller import ControllerStart32from .behavior import Script, ScreenshotEvent, DomSnapshotEvent33from .browser import RequestResponsePair, Base64Body, UnicodeBody34from .test_browser import requestResponsePair, urls35def test_log ():36    logger = Logger ()37    with NamedTemporaryFile() as fd:38        with WarcHandler (fd, logger) as handler:39            warclogger = WarcHandlerConsumer (handler)40            logger.connect (warclogger)41            golden = []42            assert handler.log.tell () == 043            golden.append (logger.info (foo=1, bar='baz', encoding='äöüâÎΨ'))44            assert handler.log.tell () != 045            handler.maxLogSize = 046            golden.append (logger.info (bar=1, baz='baz'))47            # should flush the log48            assert handler.log.tell () == 049        fd.seek (0)50        for it in ArchiveIterator (fd):51            headers = it.rec_headers52            assert headers['warc-type'] == 'metadata'53            assert 'warc-target-uri' not in headers54            assert headers['x-crocoite-type'] == 'log'55            assert headers['content-type'] == f'application/json; charset={handler.logEncoding}'56            while True:57                l = it.raw_stream.readline ()58                if not l:59                    break60                data = json.loads (l.strip ())61                assert data == golden.pop (0)62def jsonObject ():63    """ JSON-encodable objects """64    return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ()))65def viewport ():66    return st.builds (lambda x, y: f'{x}x{y}', st.integers (), st.integers ())67def event ():68    return st.one_of (69            st.builds (ControllerStart, jsonObject ()),70            st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),71            st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),72            st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),73            requestResponsePair (),74            )75@pytest.mark.asyncio76@given (st.lists (event ()))77async def test_push (golden):78    def checkWarcinfoId (headers):79        if lastWarcinfoRecordid is not None:80            assert headers['WARC-Warcinfo-ID'] == lastWarcinfoRecordid81    lastWarcinfoRecordid = None82    # null logger83    logger = Logger ()84    with open('/tmp/test.warc.gz', 'w+b') as fd:85        with WarcHandler (fd, logger) as handler:86            for g in golden:87                await handler.push (g)88        fd.seek (0)89        it = iter (ArchiveIterator (fd))90        for g in golden:91            if isinstance (g, ControllerStart):92                rec = next (it)93                headers = rec.rec_headers94                assert headers['warc-type'] == 'warcinfo'95                assert 'warc-target-uri' not in headers96                assert 'x-crocoite-type' not in headers97                data = json.load (rec.raw_stream)98                assert data == g.payload99                lastWarcinfoRecordid = headers['warc-record-id']100                assert lastWarcinfoRecordid101            elif isinstance (g, Script):102                rec = next (it)103                headers = rec.rec_headers104                assert headers['warc-type'] == 'resource'105                assert headers['content-type'] == 'application/javascript; charset=utf-8'106                assert headers['x-crocoite-type'] == 'script'107                checkWarcinfoId (headers)108                if g.path:109                    assert URL (headers['warc-target-uri']) == URL ('file://' + g.abspath)110                else:111                    assert 'warc-target-uri' not in headers112                data = rec.raw_stream.read ().decode ('utf-8')113                assert data == g.data114            elif isinstance (g, ScreenshotEvent):115                # XXX: check refers-to header116                rec = next (it)117                headers = rec.rec_headers118                assert headers['warc-type'] == 'conversion'119                assert headers['x-crocoite-type'] == 'screenshot'120                checkWarcinfoId (headers)121                assert URL (headers['warc-target-uri']) == g.url, (headers['warc-target-uri'], g.url)122                assert headers['warc-refers-to'] is None123                assert int (headers['X-Crocoite-Screenshot-Y-Offset']) == g.yoff124                assert rec.raw_stream.read () == g.data125            elif isinstance (g, DomSnapshotEvent):126                rec = next (it)127                headers = rec.rec_headers128                assert headers['warc-type'] == 'conversion'129                assert headers['x-crocoite-type'] == 'dom-snapshot'130                checkWarcinfoId (headers)131                assert URL (headers['warc-target-uri']) == g.url132                assert headers['warc-refers-to'] is None133                assert rec.raw_stream.read () == g.document134            elif isinstance (g, RequestResponsePair):135                rec = next (it)136                # request137                headers = rec.rec_headers138                assert headers['warc-type'] == 'request'139                assert 'x-crocoite-type' not in headers140                checkWarcinfoId (headers)141                assert URL (headers['warc-target-uri']) == g.url142                assert headers['x-chrome-request-id'] == g.id143                144                assert CIMultiDict (rec.http_headers.headers) == g.request.headers145                if g.request.hasPostData:146                    if g.request.body is not None:147                        assert rec.raw_stream.read () == g.request.body148                    else:149                        # body fetch failed150                        assert headers['warc-truncated'] == 'unspecified'151                        assert not rec.raw_stream.read ()152                else:153                    assert not rec.raw_stream.read ()154                # response155                if g.response:156                    rec = next (it)157                    headers = rec.rec_headers158                    httpheaders = rec.http_headers159                    assert headers['warc-type'] == 'response'160                    checkWarcinfoId (headers)161                    assert URL (headers['warc-target-uri']) == g.url162                    assert headers['x-chrome-request-id'] == g.id163                    assert 'x-crocoite-type' not in headers164                    # these are checked separately165                    filteredHeaders = CIMultiDict (httpheaders.headers)166                    for b in {'content-type', 'content-length'}:167                        if b in g.response.headers:168                            g.response.headers.popall (b)169                        if b in filteredHeaders:170                            filteredHeaders.popall (b)171                    assert filteredHeaders == g.response.headers172                    expectedContentType = g.response.mimeType173                    if expectedContentType is not None:174                        assert httpheaders['content-type'].startswith (expectedContentType)175                    if g.response.body is not None:176                        assert rec.raw_stream.read () == g.response.body177                        assert httpheaders['content-length'] == str (len (g.response.body))178                        # body is never truncated if it exists179                        assert headers['warc-truncated'] is None180                        # unencoded strings are converted to utf8181                        if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None:182                            assert httpheaders['content-type'].endswith ('; charset=utf-8')183                    else:184                        # body fetch failed185                        assert headers['warc-truncated'] == 'unspecified'186                        assert not rec.raw_stream.read ()187                        # content-length header should be kept intact188            else:189                assert False, f"invalid golden type {type(g)}" # pragma: no cover190        # no further records191        with pytest.raises (StopIteration):...test_tools.py
Source:test_tools.py  
1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21from operator import itemgetter22from io import BytesIO23import pytest24from warcio.archiveiterator import ArchiveIterator25from warcio.warcwriter import WARCWriter26from warcio.statusandheaders import StatusAndHeaders27from pkg_resources import parse_version28from .tools import mergeWarc, Errata, FixableErrata29@pytest.fixture30def writer():31    return WARCWriter (NamedTemporaryFile(), gzip=True)32def recordsEqual(golden, underTest):33    for a, b in zip (golden, underTest):34        # record ids are not predictable, so we cannot compare them. Dito for35        # dates. Content-* seems to be added when writing to file.36        for x in {'WARC-Record-Id', 'WARC-Block-Digest', 'WARC-Date',37                'Content-Length', 'Content-Type'}:38            a.rec_headers.remove_header(x)39            b.rec_headers.remove_header(x)40        aheader = sorted(a.rec_headers.headers, key=itemgetter(0))41        bheader = sorted(b.rec_headers.headers, key=itemgetter(0))42        assert aheader == bheader43        assert a.http_headers == b.http_headers44def makeGolden(writer, records):45    # additional warcinfo is written. Content does not matter.46    record = writer.create_warc_record (47            '',48            'warcinfo',49            payload=b'',50            warc_headers_dict={'Content-Type': 'application/json; charset=utf-8'})51    records.insert (0, record)52    return records53def test_unmodified(writer):54    """55    Single request/response pair, no revisits56    """57    records = []58    httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)59    warcHeaders = {}60    record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),61            warc_headers_dict=warcHeaders, http_headers=httpHeaders)62    records.append (record)63    httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')64    record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),65            warc_headers_dict=warcHeaders, http_headers=httpHeaders)66    records.append (record)67    for r in records:68        writer.write_record (r)69    output = NamedTemporaryFile()70    mergeWarc ([writer.out.name], output)71    output.seek(0)72    recordsEqual (makeGolden (writer, records), ArchiveIterator (output))73def test_different_payload(writer):74    """75    Duplicate URL, but different payload76    """77    records = []78    for i in range (2):79        httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)80        warcHeaders = {}81        record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),82                warc_headers_dict=warcHeaders, http_headers=httpHeaders)83        records.append (record)84        httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')85        record = writer.create_warc_record ('http://example.com/', 'response',86                payload=BytesIO(f'data{i}'.encode ('utf8')),87                warc_headers_dict=warcHeaders, http_headers=httpHeaders)88        records.append (record)89    for r in records:90        writer.write_record (r)91    output = NamedTemporaryFile()92    mergeWarc ([writer.out.name], output)93    output.seek(0)94    recordsEqual (makeGolden (writer, records), ArchiveIterator (output))95def makeRevisit(writer, ref, dup):96    """ Make revisit record for reference """97    dupHeaders = dup.rec_headers98    refHeaders = ref.rec_headers99    record = writer.create_revisit_record (dupHeaders.get_header('WARC-Target-URI'),100            digest=refHeaders.get_header('WARC-Payload-Digest'),101            refers_to_uri=refHeaders.get_header('WARC-Target-URI'),102            refers_to_date=refHeaders.get_header('WARC-Date'),103            http_headers=dup.http_headers)104    record.rec_headers.add_header ('WARC-Refers-To', refHeaders.get_header('WARC-Record-ID'))105    record.rec_headers.add_header ('WARC-Truncated', 'length')106    return record107def test_resp_revisit_same_url(writer):108    """109    Duplicate record for the same URL, creates a revisit110    """111    records = []112    for i in range (2):113        httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)114        warcHeaders = {}115        record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),116                warc_headers_dict=warcHeaders, http_headers=httpHeaders)117        records.append (record)118        httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')119        record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),120                warc_headers_dict=warcHeaders, http_headers=httpHeaders)121        records.append (record)122    for r in records:123        writer.write_record (r)124    dup = records.pop ()125    ref = records[1]126    records.append (makeRevisit (writer, ref, dup))127    output = NamedTemporaryFile()128    mergeWarc ([writer.out.name], output)129    output.seek(0)130    recordsEqual (makeGolden (writer, records), ArchiveIterator (output))131def test_resp_revisit_other_url(writer):132    """133    Duplicate record for different URL, creates a revisit134    """135    records = []136    httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)137    warcHeaders = {}138    record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),139            warc_headers_dict=warcHeaders, http_headers=httpHeaders)140    records.append (record)141    httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')142    record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),143            warc_headers_dict=warcHeaders, http_headers=httpHeaders)144    records.append (record)145    httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)146    warcHeaders = {}147    record = writer.create_warc_record ('http://example.com/one', 'request', payload=BytesIO(b'foobar'),148            warc_headers_dict=warcHeaders, http_headers=httpHeaders)149    records.append (record)150    httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')151    record = writer.create_warc_record ('http://example.com/one', 'response', payload=BytesIO(b'data'),152            warc_headers_dict=warcHeaders, http_headers=httpHeaders)153    records.append (record)154    for r in records:155        writer.write_record (r)156    dup = records.pop ()157    ref = records[1]158    records.append (makeRevisit (writer, ref, dup))159    output = NamedTemporaryFile()160    mergeWarc ([writer.out.name], output)161    output.seek(0)162    recordsEqual (makeGolden (writer, records), ArchiveIterator (output))163def test_errata_contains():164    """ Test version matching """165    e = Errata('some-uuid', 'description', ['a<1.0'])166    assert {'a': parse_version('0.1')} in e167    assert {'a': parse_version('1.0')} not in e168    assert {'b': parse_version('1.0')} not in e169    e = Errata('some-uuid', 'description', ['a<1.0,>0.1'])170    assert {'a': parse_version('0.1')} not in e171    assert {'a': parse_version('0.2')} in e172    assert {'a': parse_version('1.0')} not in e173    # a AND b174    e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])175    assert {'a': parse_version('0.1')} not in e176    assert {'b': parse_version('1.1')} not in e177    assert {'a': parse_version('0.1'), 'b': parse_version('1.1')} in e178def test_errata_fixable ():179    e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])180    assert not e.fixable181    e = FixableErrata('some-uuid', 'description', ['a<1.0', 'b>1.0'])...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
