How to use headers method in autotest

Best Python code snippet using autotest_python

test_http_headers.py

Source:test_http_headers.py Github

copy

Full Screen

...199 """200 Tests for the backwards compatible C{dict} interface for L{Headers}201 provided by L{_DictHeaders}.202 """203 def headers(self, **kw):204 """205 Create a L{Headers} instance populated with the header name/values206 specified by C{kw} and a L{_DictHeaders} wrapped around it and return207 them both.208 """209 h = Headers()210 for k, v in kw.items():211 h.setRawHeaders(k.encode('ascii'), v)212 return h, _DictHeaders(h)213 def test_getItem(self):214 """215 L{_DictHeaders.__getitem__} returns a single header for the given name.216 """217 headers, wrapper = self.headers(test=[b"lemur"])218 self.assertEqual(wrapper[b"test"], b"lemur")219 def test_getItemMultiple(self):220 """221 L{_DictHeaders.__getitem__} returns only the last header value for a222 given name.223 """224 headers, wrapper = self.headers(test=[b"lemur", b"panda"])225 self.assertEqual(wrapper[b"test"], b"panda")226 def test_getItemMissing(self):227 """228 L{_DictHeaders.__getitem__} raises L{KeyError} if called with a header229 which is not present.230 """231 headers, wrapper = self.headers()232 exc = self.assertRaises(KeyError, wrapper.__getitem__, b"test")233 self.assertEqual(exc.args, (b"test",))234 def test_iteration(self):235 """236 L{_DictHeaders.__iter__} returns an iterator the elements of which237 are the lowercase name of each header present.238 """239 headers, wrapper = self.headers(foo=[b"lemur", b"panda"], bar=[b"baz"])240 self.assertEqual(set(list(wrapper)), set([b"foo", b"bar"]))241 def test_length(self):242 """243 L{_DictHeaders.__len__} returns the number of headers present.244 """245 headers, wrapper = self.headers()246 self.assertEqual(len(wrapper), 0)247 headers.setRawHeaders(b"foo", [b"bar"])248 self.assertEqual(len(wrapper), 1)249 headers.setRawHeaders(b"test", [b"lemur", b"panda"])250 self.assertEqual(len(wrapper), 2)251 def test_setItem(self):252 """253 L{_DictHeaders.__setitem__} sets a single header value for the given254 name.255 """256 headers, wrapper = self.headers()257 wrapper[b"test"] = b"lemur"258 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])259 def test_setItemOverwrites(self):260 """261 L{_DictHeaders.__setitem__} will replace any previous header values for262 the given name.263 """264 headers, wrapper = self.headers(test=[b"lemur", b"panda"])265 wrapper[b"test"] = b"lemur"266 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])267 def test_delItem(self):268 """269 L{_DictHeaders.__delitem__} will remove the header values for the given270 name.271 """272 headers, wrapper = self.headers(test=[b"lemur"])273 del wrapper[b"test"]274 self.assertFalse(headers.hasHeader(b"test"))275 def test_delItemMissing(self):276 """277 L{_DictHeaders.__delitem__} will raise L{KeyError} if the given name is278 not present.279 """280 headers, wrapper = self.headers()281 exc = self.assertRaises(KeyError, wrapper.__delitem__, b"test")282 self.assertEqual(exc.args, (b"test",))283 def test_keys(self, _method='keys', _requireList=not _PY3):284 """285 L{_DictHeaders.keys} will return a list of all present header names.286 """287 headers, wrapper = self.headers(test=[b"lemur"], foo=[b"bar"])288 keys = getattr(wrapper, _method)()289 if _requireList:290 self.assertIsInstance(keys, list)291 self.assertEqual(set(keys), set([b"foo", b"test"]))292 def test_iterkeys(self):293 """294 L{_DictHeaders.iterkeys} will return all present header names.295 """296 self.test_keys('iterkeys', False)297 def test_values(self, _method='values', _requireList=not _PY3):298 """299 L{_DictHeaders.values} will return a list of all present header values,300 returning only the last value for headers with more than one.301 """302 headers, wrapper = self.headers(303 foo=[b"lemur"], bar=[b"marmot", b"panda"])304 values = getattr(wrapper, _method)()305 if _requireList:306 self.assertIsInstance(values, list)307 self.assertEqual(set(values), set([b"lemur", b"panda"]))308 def test_itervalues(self):309 """310 L{_DictHeaders.itervalues} will return all present header values,311 returning only the last value for headers with more than one.312 """313 self.test_values('itervalues', False)314 def test_items(self, _method='items', _requireList=not _PY3):315 """316 L{_DictHeaders.items} will return a list of all present header names317 and values as tuples, returning only the last value for headers with318 more than one.319 """320 headers, wrapper = self.headers(321 foo=[b"lemur"], bar=[b"marmot", b"panda"])322 items = getattr(wrapper, _method)()323 if _requireList:324 self.assertIsInstance(items, list)325 self.assertEqual(326 set(items), set([(b"foo", b"lemur"), (b"bar", b"panda")]))327 def test_iteritems(self):328 """329 L{_DictHeaders.iteritems} will return all present header names and330 values as tuples, returning only the last value for headers with more331 than one.332 """333 self.test_items('iteritems', False)334 def test_clear(self):335 """336 L{_DictHeaders.clear} will remove all headers.337 """338 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda"])339 wrapper.clear()340 self.assertEqual(list(headers.getAllRawHeaders()), [])341 def test_copy(self):342 """343 L{_DictHeaders.copy} will return a C{dict} with all the same headers344 and the last value for each.345 """346 headers, wrapper = self.headers(347 foo=[b"lemur", b"panda"], bar=[b"marmot"])348 duplicate = wrapper.copy()349 self.assertEqual(duplicate, {b"foo": b"panda", b"bar": b"marmot"})350 def test_get(self):351 """352 L{_DictHeaders.get} returns the last value for the given header name.353 """354 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])355 self.assertEqual(wrapper.get(b"foo"), b"panda")356 def test_getMissing(self):357 """358 L{_DictHeaders.get} returns C{None} for a header which is not present.359 """360 headers, wrapper = self.headers()361 self.assertIdentical(wrapper.get(b"foo"), None)362 def test_getDefault(self):363 """364 L{_DictHeaders.get} returns the last value for the given header name365 even when it is invoked with a default value.366 """367 headers, wrapper = self.headers(foo=[b"lemur"])368 self.assertEqual(wrapper.get(b"foo", b"bar"), b"lemur")369 def test_getDefaultMissing(self):370 """371 L{_DictHeaders.get} returns the default value specified if asked for a372 header which is not present.373 """374 headers, wrapper = self.headers()375 self.assertEqual(wrapper.get(b"foo", b"bar"), b"bar")376 def test_has_key(self):377 """378 L{_DictHeaders.has_key} returns C{True} if the given header is present,379 C{False} otherwise.380 """381 headers, wrapper = self.headers(foo=[b"lemur"])382 self.assertTrue(wrapper.has_key(b"foo"))383 self.assertFalse(wrapper.has_key(b"bar"))384 def test_contains(self):385 """386 L{_DictHeaders.__contains__} returns C{True} if the given header is387 present, C{False} otherwise.388 """389 headers, wrapper = self.headers(foo=[b"lemur"])390 self.assertIn(b"foo", wrapper)391 self.assertNotIn(b"bar", wrapper)392 def test_pop(self):393 """394 L{_DictHeaders.pop} returns the last header value associated with the395 given header name and removes the header.396 """397 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])398 self.assertEqual(wrapper.pop(b"foo"), b"panda")399 self.assertIdentical(headers.getRawHeaders(b"foo"), None)400 def test_popMissing(self):401 """402 L{_DictHeaders.pop} raises L{KeyError} if passed a header name which is403 not present.404 """405 headers, wrapper = self.headers()406 self.assertRaises(KeyError, wrapper.pop, b"foo")407 def test_popDefault(self):408 """409 L{_DictHeaders.pop} returns the last header value associated with the410 given header name and removes the header, even if it is supplied with a411 default value.412 """413 headers, wrapper = self.headers(foo=[b"lemur"])414 self.assertEqual(wrapper.pop(b"foo", b"bar"), b"lemur")415 self.assertIdentical(headers.getRawHeaders(b"foo"), None)416 def test_popDefaultMissing(self):417 """418 L{_DictHeaders.pop} returns the default value is asked for a header419 name which is not present.420 """421 headers, wrapper = self.headers(foo=[b"lemur"])422 self.assertEqual(wrapper.pop(b"bar", b"baz"), b"baz")423 self.assertEqual(headers.getRawHeaders(b"foo"), [b"lemur"])424 def test_popitem(self):425 """426 L{_DictHeaders.popitem} returns some header name/value pair.427 """428 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])429 self.assertEqual(wrapper.popitem(), (b"foo", b"panda"))430 self.assertIdentical(headers.getRawHeaders(b"foo"), None)431 def test_popitemEmpty(self):432 """433 L{_DictHeaders.popitem} raises L{KeyError} if there are no headers434 present.435 """436 headers, wrapper = self.headers()437 self.assertRaises(KeyError, wrapper.popitem)438 def test_update(self):439 """440 L{_DictHeaders.update} adds the header/value pairs in the C{dict} it is441 passed, overriding any existing values for those headers.442 """443 headers, wrapper = self.headers(foo=[b"lemur"])444 wrapper.update({b"foo": b"panda", b"bar": b"marmot"})445 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])446 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])447 def test_updateWithKeywords(self):448 """449 L{_DictHeaders.update} adds header names given as keyword arguments450 with the keyword values as the header value.451 """452 headers, wrapper = self.headers(foo=[b"lemur"])453 wrapper.update(foo=b"panda", bar=b"marmot")454 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])455 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])456 if _PY3:457 test_updateWithKeywords.skip = "Not yet supported on Python 3; see #6082."458 def test_setdefaultMissing(self):459 """460 If passed the name of a header which is not present,461 L{_DictHeaders.setdefault} sets the value of the given header to the462 specified default value and returns it.463 """464 headers, wrapper = self.headers(foo=[b"bar"])465 self.assertEqual(wrapper.setdefault(b"baz", b"quux"), b"quux")466 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar"])467 self.assertEqual(headers.getRawHeaders(b"baz"), [b"quux"])468 def test_setdefaultPresent(self):469 """470 If passed the name of a header which is present,471 L{_DictHeaders.setdefault} makes no changes to the headers and472 returns the last value already associated with that header.473 """474 headers, wrapper = self.headers(foo=[b"bar", b"baz"])475 self.assertEqual(wrapper.setdefault(b"foo", b"quux"), b"baz")476 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar", b"baz"])477 def test_setdefaultDefault(self):478 """479 If a value is not passed to L{_DictHeaders.setdefault}, C{None} is480 used.481 """482 # This results in an invalid state for the headers, but maybe some483 # application is doing this an intermediate step towards some other484 # state. Anyway, it was broken with the old implementation so it's485 # broken with the new implementation. Compatibility, for the win.486 # -exarkun487 headers, wrapper = self.headers()488 self.assertIdentical(wrapper.setdefault(b"foo"), None)489 self.assertEqual(headers.getRawHeaders(b"foo"), [None])490 def test_dictComparison(self):491 """492 An instance of L{_DictHeaders} compares equal to a C{dict} which493 contains the same header/value pairs. For header names with multiple494 values, the last value only is considered.495 """496 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda", b"marmot"])497 self.assertNotEqual(wrapper, {b"foo": b"lemur", b"bar": b"panda"})498 self.assertEqual(wrapper, {b"foo": b"lemur", b"bar": b"marmot"})499 def test_otherComparison(self):500 """501 An instance of L{_DictHeaders} does not compare equal to other502 unrelated objects.503 """504 headers, wrapper = self.headers()505 self.assertNotEqual(wrapper, ())506 self.assertNotEqual(wrapper, object())507 self.assertNotEqual(wrapper, b"foo")508 if _PY3:509 # Python 3 lacks these APIs...

Full Screen

Full Screen

test_base.py

Source:test_base.py Github

copy

Full Screen

1# encoding: utf-82from nose import tools as nose_tools3import ckan.tests.helpers as helpers4import ckan.plugins as p5import ckan.tests.factories as factories6class TestRenderSnippet(helpers.FunctionalTestBase):7 """8 Test ``ckan.lib.base.render_snippet``.9 """10 @helpers.change_config('debug', True)11 def test_comment_present_if_debug_true(self):12 response = self._get_test_app().get('/')13 assert '<!-- Snippet ' in response14 @helpers.change_config('debug', False)15 def test_comment_absent_if_debug_false(self):16 response = self._get_test_app().get('/')17 assert '<!-- Snippet ' not in response18class TestGetUserForApikey(helpers.FunctionalTestBase):19 def test_apikey_missing(self):20 app = self._get_test_app()21 request_headers = {}22 app.get('/dataset/new', headers=request_headers, status=403)23 def test_apikey_in_authorization_header(self):24 user = factories.Sysadmin()25 app = self._get_test_app()26 request_headers = {'Authorization': str(user['apikey'])}27 app.get('/dataset/new', headers=request_headers)28 def test_apikey_in_x_ckan_header(self):29 user = factories.Sysadmin()30 app = self._get_test_app()31 # non-standard header name is defined in test-core.ini32 request_headers = {'X-Non-Standard-CKAN-API-Key': str(user['apikey'])}33 app.get('/dataset/new', headers=request_headers)34 def test_apikey_contains_unicode(self):35 # there is no valid apikey containing unicode, but we should fail36 # nicely if unicode is supplied37 app = self._get_test_app()38 request_headers = {'Authorization': '\xc2\xb7'}39 app.get('/dataset/new', headers=request_headers, status=403)40class TestCORS(helpers.FunctionalTestBase):41 def test_options(self):42 app = self._get_test_app()43 response = app.options(url='/', status=200)44 assert len(str(response.body)) == 0, 'OPTIONS must return no content'45 def test_cors_config_no_cors(self):46 '''47 No ckan.cors settings in config, so no Access-Control-Allow headers in48 response.49 '''50 app = self._get_test_app()51 response = app.get('/')52 response_headers = dict(response.headers)53 assert 'Access-Control-Allow-Origin' not in response_headers54 assert 'Access-Control-Allow-Methods' not in response_headers55 assert 'Access-Control-Allow-Headers' not in response_headers56 def test_cors_config_no_cors_with_origin(self):57 '''58 No ckan.cors settings in config, so no Access-Control-Allow headers in59 response, even with origin header in request.60 '''61 app = self._get_test_app()62 request_headers = {'Origin': 'http://thirdpartyrequests.org'}63 response = app.get('/', headers=request_headers)64 response_headers = dict(response.headers)65 assert 'Access-Control-Allow-Origin' not in response_headers66 assert 'Access-Control-Allow-Methods' not in response_headers67 assert 'Access-Control-Allow-Headers' not in response_headers68 @helpers.change_config('ckan.cors.origin_allow_all', 'true')69 def test_cors_config_origin_allow_all_true_no_origin(self):70 '''71 With origin_allow_all set to true, but no origin in the request72 header, no Access-Control-Allow headers should be in the response.73 '''74 app = self._get_test_app()75 response = app.get('/')76 response_headers = dict(response.headers)77 assert 'Access-Control-Allow-Origin' not in response_headers78 assert 'Access-Control-Allow-Methods' not in response_headers79 assert 'Access-Control-Allow-Headers' not in response_headers80 @helpers.change_config('ckan.cors.origin_allow_all', 'true')81 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')82 def test_cors_config_origin_allow_all_true_with_origin(self):83 '''84 With origin_allow_all set to true, and an origin in the request85 header, the appropriate Access-Control-Allow headers should be in the86 response.87 '''88 app = self._get_test_app()89 request_headers = {'Origin': 'http://thirdpartyrequests.org'}90 response = app.get('/', headers=request_headers)91 response_headers = dict(response.headers)92 assert 'Access-Control-Allow-Origin' in response_headers93 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')94 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")95 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")96 @helpers.change_config('ckan.cors.origin_allow_all', 'false')97 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')98 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):99 '''100 With origin_allow_all set to false, with an origin in the request101 header, but no whitelist defined, there should be no Access-Control-102 Allow headers in the response.103 '''104 app = self._get_test_app()105 request_headers = {'Origin': 'http://thirdpartyrequests.org'}106 response = app.get('/', headers=request_headers)107 response_headers = dict(response.headers)108 assert 'Access-Control-Allow-Origin' not in response_headers109 assert 'Access-Control-Allow-Methods' not in response_headers110 assert 'Access-Control-Allow-Headers' not in response_headers111 @helpers.change_config('ckan.cors.origin_allow_all', 'false')112 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')113 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')114 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):115 '''116 With origin_allow_all set to false, with an origin in the request117 header, and a whitelist defined (containing the origin), the118 appropriate Access-Control-Allow headers should be in the response.119 '''120 app = self._get_test_app()121 request_headers = {'Origin': 'http://thirdpartyrequests.org'}122 response = app.get('/', headers=request_headers)123 response_headers = dict(response.headers)124 assert 'Access-Control-Allow-Origin' in response_headers125 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')126 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")127 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")128 @helpers.change_config('ckan.cors.origin_allow_all', 'false')129 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')130 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')131 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):132 '''133 With origin_allow_all set to false, with an origin in the request134 header, and a whitelist defining multiple allowed origins (containing135 the origin), the appropriate Access-Control-Allow headers should be in136 the response.137 '''138 app = self._get_test_app()139 request_headers = {'Origin': 'http://thirdpartyrequests.org'}140 response = app.get('/', headers=request_headers)141 response_headers = dict(response.headers)142 assert 'Access-Control-Allow-Origin' in response_headers143 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')144 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")145 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")146 @helpers.change_config('ckan.cors.origin_allow_all', 'false')147 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')148 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')149 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):150 '''151 With origin_allow_all set to false, with an origin in the request152 header, and a whitelist defining multiple allowed origins (but not153 containing the requesting origin), there should be no Access-Control-154 Allow headers in the response.155 '''156 app = self._get_test_app()157 request_headers = {'Origin': 'http://thirdpartyrequests.org'}158 response = app.get('/', headers=request_headers)159 response_headers = dict(response.headers)160 assert 'Access-Control-Allow-Origin' not in response_headers161 assert 'Access-Control-Allow-Methods' not in response_headers162 assert 'Access-Control-Allow-Headers' not in response_headers163class TestCORSFlask(helpers.FunctionalTestBase):164 @classmethod165 def setup_class(cls):166 super(TestCORSFlask, cls).setup_class()167 cls.app = cls._get_test_app()168 flask_app = cls.app.flask_app169 if not p.plugin_loaded('test_routing_plugin'):170 p.load('test_routing_plugin')171 plugin = p.get_plugin('test_routing_plugin')172 flask_app.register_blueprint(plugin.get_blueprint(),173 prioritise_rules=True)174 @classmethod175 def teardown_class(cls):176 super(TestCORSFlask, cls).teardown_class()177 p.unload('test_routing_plugin')178 def test_options(self):179 response = self.app.options(url='/simple_flask', status=200)180 assert len(str(response.body)) == 0, 'OPTIONS must return no content'181 def test_cors_config_no_cors(self):182 '''183 No ckan.cors settings in config, so no Access-Control-Allow headers in184 response.185 '''186 response = self.app.get('/simple_flask')187 response_headers = dict(response.headers)188 assert 'Access-Control-Allow-Origin' not in response_headers189 assert 'Access-Control-Allow-Methods' not in response_headers190 assert 'Access-Control-Allow-Headers' not in response_headers191 def test_cors_config_no_cors_with_origin(self):192 '''193 No ckan.cors settings in config, so no Access-Control-Allow headers in194 response, even with origin header in request.195 '''196 request_headers = {'Origin': 'http://thirdpartyrequests.org'}197 response = self.app.get('/simple_flask', headers=request_headers)198 response_headers = dict(response.headers)199 assert 'Access-Control-Allow-Origin' not in response_headers200 assert 'Access-Control-Allow-Methods' not in response_headers201 assert 'Access-Control-Allow-Headers' not in response_headers202 @helpers.change_config('ckan.cors.origin_allow_all', 'true')203 def test_cors_config_origin_allow_all_true_no_origin(self):204 '''205 With origin_allow_all set to true, but no origin in the request206 header, no Access-Control-Allow headers should be in the response.207 '''208 response = self.app.get('/simple_flask')209 response_headers = dict(response.headers)210 assert 'Access-Control-Allow-Origin' not in response_headers211 assert 'Access-Control-Allow-Methods' not in response_headers212 assert 'Access-Control-Allow-Headers' not in response_headers213 @helpers.change_config('ckan.cors.origin_allow_all', 'true')214 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')215 def test_cors_config_origin_allow_all_true_with_origin(self):216 '''217 With origin_allow_all set to true, and an origin in the request218 header, the appropriate Access-Control-Allow headers should be in the219 response.220 '''221 request_headers = {'Origin': 'http://thirdpartyrequests.org'}222 response = self.app.get('/simple_flask', headers=request_headers)223 response_headers = dict(response.headers)224 assert 'Access-Control-Allow-Origin' in response_headers225 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')226 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")227 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")228 @helpers.change_config('ckan.cors.origin_allow_all', 'false')229 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')230 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):231 '''232 With origin_allow_all set to false, with an origin in the request233 header, but no whitelist defined, there should be no Access-Control-234 Allow headers in the response.235 '''236 request_headers = {'Origin': 'http://thirdpartyrequests.org'}237 response = self.app.get('/simple_flask', headers=request_headers)238 response_headers = dict(response.headers)239 assert 'Access-Control-Allow-Origin' not in response_headers240 assert 'Access-Control-Allow-Methods' not in response_headers241 assert 'Access-Control-Allow-Headers' not in response_headers242 @helpers.change_config('ckan.cors.origin_allow_all', 'false')243 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')244 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')245 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):246 '''247 With origin_allow_all set to false, with an origin in the request248 header, and a whitelist defined (containing the origin), the249 appropriate Access-Control-Allow headers should be in the response.250 '''251 request_headers = {'Origin': 'http://thirdpartyrequests.org'}252 response = self.app.get('/simple_flask', headers=request_headers)253 response_headers = dict(response.headers)254 assert 'Access-Control-Allow-Origin' in response_headers255 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')256 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")257 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")258 @helpers.change_config('ckan.cors.origin_allow_all', 'false')259 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')260 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')261 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):262 '''263 With origin_allow_all set to false, with an origin in the request264 header, and a whitelist defining multiple allowed origins (containing265 the origin), the appropriate Access-Control-Allow headers should be in266 the response.267 '''268 request_headers = {'Origin': 'http://thirdpartyrequests.org'}269 response = self.app.get('/simple_flask', headers=request_headers)270 response_headers = dict(response.headers)271 assert 'Access-Control-Allow-Origin' in response_headers272 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')273 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")274 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")275 @helpers.change_config('ckan.cors.origin_allow_all', 'false')276 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')277 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')278 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):279 '''280 With origin_allow_all set to false, with an origin in the request281 header, and a whitelist defining multiple allowed origins (but not282 containing the requesting origin), there should be no Access-Control-283 Allow headers in the response.284 '''285 request_headers = {'Origin': 'http://thirdpartyrequests.org'}286 response = self.app.get('/simple_flask', headers=request_headers)287 response_headers = dict(response.headers)288 assert 'Access-Control-Allow-Origin' not in response_headers289 assert 'Access-Control-Allow-Methods' not in response_headers...

Full Screen

Full Screen

test_warc.py

Source:test_warc.py Github

copy

Full Screen

1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21import json, urllib22from operator import itemgetter23from warcio.archiveiterator import ArchiveIterator24from yarl import URL25from multidict import CIMultiDict26from hypothesis import given, reproduce_failure27import hypothesis.strategies as st28import pytest29from .warc import WarcHandler30from .logger import Logger, WarcHandlerConsumer31from .controller import ControllerStart32from .behavior import Script, ScreenshotEvent, DomSnapshotEvent33from .browser import RequestResponsePair, Base64Body, UnicodeBody34from .test_browser import requestResponsePair, urls35def test_log ():36 logger = Logger ()37 with NamedTemporaryFile() as fd:38 with WarcHandler (fd, logger) as handler:39 warclogger = WarcHandlerConsumer (handler)40 logger.connect (warclogger)41 golden = []42 assert handler.log.tell () == 043 golden.append (logger.info (foo=1, bar='baz', encoding='äöü⇔ΓΨ'))44 assert handler.log.tell () != 045 handler.maxLogSize = 046 golden.append (logger.info (bar=1, baz='baz'))47 # should flush the log48 assert handler.log.tell () == 049 fd.seek (0)50 for it in ArchiveIterator (fd):51 headers = it.rec_headers52 assert headers['warc-type'] == 'metadata'53 assert 'warc-target-uri' not in headers54 assert headers['x-crocoite-type'] == 'log'55 assert headers['content-type'] == f'application/json; charset={handler.logEncoding}'56 while True:57 l = it.raw_stream.readline ()58 if not l:59 break60 data = json.loads (l.strip ())61 assert data == golden.pop (0)62def jsonObject ():63 """ JSON-encodable objects """64 return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ()))65def viewport ():66 return st.builds (lambda x, y: f'{x}x{y}', st.integers (), st.integers ())67def event ():68 return st.one_of (69 st.builds (ControllerStart, jsonObject ()),70 st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),71 st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),72 st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),73 requestResponsePair (),74 )75@pytest.mark.asyncio76@given (st.lists (event ()))77async def test_push (golden):78 def checkWarcinfoId (headers):79 if lastWarcinfoRecordid is not None:80 assert headers['WARC-Warcinfo-ID'] == lastWarcinfoRecordid81 lastWarcinfoRecordid = None82 # null logger83 logger = Logger ()84 with open('/tmp/test.warc.gz', 'w+b') as fd:85 with WarcHandler (fd, logger) as handler:86 for g in golden:87 await handler.push (g)88 fd.seek (0)89 it = iter (ArchiveIterator (fd))90 for g in golden:91 if isinstance (g, ControllerStart):92 rec = next (it)93 headers = rec.rec_headers94 assert headers['warc-type'] == 'warcinfo'95 assert 'warc-target-uri' not in headers96 assert 'x-crocoite-type' not in headers97 data = json.load (rec.raw_stream)98 assert data == g.payload99 lastWarcinfoRecordid = headers['warc-record-id']100 assert lastWarcinfoRecordid101 elif isinstance (g, Script):102 rec = next (it)103 headers = rec.rec_headers104 assert headers['warc-type'] == 'resource'105 assert headers['content-type'] == 'application/javascript; charset=utf-8'106 assert headers['x-crocoite-type'] == 'script'107 checkWarcinfoId (headers)108 if g.path:109 assert URL (headers['warc-target-uri']) == URL ('file://' + g.abspath)110 else:111 assert 'warc-target-uri' not in headers112 data = rec.raw_stream.read ().decode ('utf-8')113 assert data == g.data114 elif isinstance (g, ScreenshotEvent):115 # XXX: check refers-to header116 rec = next (it)117 headers = rec.rec_headers118 assert headers['warc-type'] == 'conversion'119 assert headers['x-crocoite-type'] == 'screenshot'120 checkWarcinfoId (headers)121 assert URL (headers['warc-target-uri']) == g.url, (headers['warc-target-uri'], g.url)122 assert headers['warc-refers-to'] is None123 assert int (headers['X-Crocoite-Screenshot-Y-Offset']) == g.yoff124 assert rec.raw_stream.read () == g.data125 elif isinstance (g, DomSnapshotEvent):126 rec = next (it)127 headers = rec.rec_headers128 assert headers['warc-type'] == 'conversion'129 assert headers['x-crocoite-type'] == 'dom-snapshot'130 checkWarcinfoId (headers)131 assert URL (headers['warc-target-uri']) == g.url132 assert headers['warc-refers-to'] is None133 assert rec.raw_stream.read () == g.document134 elif isinstance (g, RequestResponsePair):135 rec = next (it)136 # request137 headers = rec.rec_headers138 assert headers['warc-type'] == 'request'139 assert 'x-crocoite-type' not in headers140 checkWarcinfoId (headers)141 assert URL (headers['warc-target-uri']) == g.url142 assert headers['x-chrome-request-id'] == g.id143 144 assert CIMultiDict (rec.http_headers.headers) == g.request.headers145 if g.request.hasPostData:146 if g.request.body is not None:147 assert rec.raw_stream.read () == g.request.body148 else:149 # body fetch failed150 assert headers['warc-truncated'] == 'unspecified'151 assert not rec.raw_stream.read ()152 else:153 assert not rec.raw_stream.read ()154 # response155 if g.response:156 rec = next (it)157 headers = rec.rec_headers158 httpheaders = rec.http_headers159 assert headers['warc-type'] == 'response'160 checkWarcinfoId (headers)161 assert URL (headers['warc-target-uri']) == g.url162 assert headers['x-chrome-request-id'] == g.id163 assert 'x-crocoite-type' not in headers164 # these are checked separately165 filteredHeaders = CIMultiDict (httpheaders.headers)166 for b in {'content-type', 'content-length'}:167 if b in g.response.headers:168 g.response.headers.popall (b)169 if b in filteredHeaders:170 filteredHeaders.popall (b)171 assert filteredHeaders == g.response.headers172 expectedContentType = g.response.mimeType173 if expectedContentType is not None:174 assert httpheaders['content-type'].startswith (expectedContentType)175 if g.response.body is not None:176 assert rec.raw_stream.read () == g.response.body177 assert httpheaders['content-length'] == str (len (g.response.body))178 # body is never truncated if it exists179 assert headers['warc-truncated'] is None180 # unencoded strings are converted to utf8181 if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None:182 assert httpheaders['content-type'].endswith ('; charset=utf-8')183 else:184 # body fetch failed185 assert headers['warc-truncated'] == 'unspecified'186 assert not rec.raw_stream.read ()187 # content-length header should be kept intact188 else:189 assert False, f"invalid golden type {type(g)}" # pragma: no cover190 # no further records191 with pytest.raises (StopIteration):...

Full Screen

Full Screen

test_tools.py

Source:test_tools.py Github

copy

Full Screen

1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21from operator import itemgetter22from io import BytesIO23import pytest24from warcio.archiveiterator import ArchiveIterator25from warcio.warcwriter import WARCWriter26from warcio.statusandheaders import StatusAndHeaders27from pkg_resources import parse_version28from .tools import mergeWarc, Errata, FixableErrata29@pytest.fixture30def writer():31 return WARCWriter (NamedTemporaryFile(), gzip=True)32def recordsEqual(golden, underTest):33 for a, b in zip (golden, underTest):34 # record ids are not predictable, so we cannot compare them. Dito for35 # dates. Content-* seems to be added when writing to file.36 for x in {'WARC-Record-Id', 'WARC-Block-Digest', 'WARC-Date',37 'Content-Length', 'Content-Type'}:38 a.rec_headers.remove_header(x)39 b.rec_headers.remove_header(x)40 aheader = sorted(a.rec_headers.headers, key=itemgetter(0))41 bheader = sorted(b.rec_headers.headers, key=itemgetter(0))42 assert aheader == bheader43 assert a.http_headers == b.http_headers44def makeGolden(writer, records):45 # additional warcinfo is written. Content does not matter.46 record = writer.create_warc_record (47 '',48 'warcinfo',49 payload=b'',50 warc_headers_dict={'Content-Type': 'application/json; charset=utf-8'})51 records.insert (0, record)52 return records53def test_unmodified(writer):54 """55 Single request/response pair, no revisits56 """57 records = []58 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)59 warcHeaders = {}60 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),61 warc_headers_dict=warcHeaders, http_headers=httpHeaders)62 records.append (record)63 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')64 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),65 warc_headers_dict=warcHeaders, http_headers=httpHeaders)66 records.append (record)67 for r in records:68 writer.write_record (r)69 output = NamedTemporaryFile()70 mergeWarc ([writer.out.name], output)71 output.seek(0)72 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))73def test_different_payload(writer):74 """75 Duplicate URL, but different payload76 """77 records = []78 for i in range (2):79 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)80 warcHeaders = {}81 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),82 warc_headers_dict=warcHeaders, http_headers=httpHeaders)83 records.append (record)84 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')85 record = writer.create_warc_record ('http://example.com/', 'response',86 payload=BytesIO(f'data{i}'.encode ('utf8')),87 warc_headers_dict=warcHeaders, http_headers=httpHeaders)88 records.append (record)89 for r in records:90 writer.write_record (r)91 output = NamedTemporaryFile()92 mergeWarc ([writer.out.name], output)93 output.seek(0)94 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))95def makeRevisit(writer, ref, dup):96 """ Make revisit record for reference """97 dupHeaders = dup.rec_headers98 refHeaders = ref.rec_headers99 record = writer.create_revisit_record (dupHeaders.get_header('WARC-Target-URI'),100 digest=refHeaders.get_header('WARC-Payload-Digest'),101 refers_to_uri=refHeaders.get_header('WARC-Target-URI'),102 refers_to_date=refHeaders.get_header('WARC-Date'),103 http_headers=dup.http_headers)104 record.rec_headers.add_header ('WARC-Refers-To', refHeaders.get_header('WARC-Record-ID'))105 record.rec_headers.add_header ('WARC-Truncated', 'length')106 return record107def test_resp_revisit_same_url(writer):108 """109 Duplicate record for the same URL, creates a revisit110 """111 records = []112 for i in range (2):113 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)114 warcHeaders = {}115 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),116 warc_headers_dict=warcHeaders, http_headers=httpHeaders)117 records.append (record)118 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')119 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),120 warc_headers_dict=warcHeaders, http_headers=httpHeaders)121 records.append (record)122 for r in records:123 writer.write_record (r)124 dup = records.pop ()125 ref = records[1]126 records.append (makeRevisit (writer, ref, dup))127 output = NamedTemporaryFile()128 mergeWarc ([writer.out.name], output)129 output.seek(0)130 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))131def test_resp_revisit_other_url(writer):132 """133 Duplicate record for different URL, creates a revisit134 """135 records = []136 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)137 warcHeaders = {}138 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),139 warc_headers_dict=warcHeaders, http_headers=httpHeaders)140 records.append (record)141 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')142 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),143 warc_headers_dict=warcHeaders, http_headers=httpHeaders)144 records.append (record)145 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)146 warcHeaders = {}147 record = writer.create_warc_record ('http://example.com/one', 'request', payload=BytesIO(b'foobar'),148 warc_headers_dict=warcHeaders, http_headers=httpHeaders)149 records.append (record)150 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')151 record = writer.create_warc_record ('http://example.com/one', 'response', payload=BytesIO(b'data'),152 warc_headers_dict=warcHeaders, http_headers=httpHeaders)153 records.append (record)154 for r in records:155 writer.write_record (r)156 dup = records.pop ()157 ref = records[1]158 records.append (makeRevisit (writer, ref, dup))159 output = NamedTemporaryFile()160 mergeWarc ([writer.out.name], output)161 output.seek(0)162 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))163def test_errata_contains():164 """ Test version matching """165 e = Errata('some-uuid', 'description', ['a<1.0'])166 assert {'a': parse_version('0.1')} in e167 assert {'a': parse_version('1.0')} not in e168 assert {'b': parse_version('1.0')} not in e169 e = Errata('some-uuid', 'description', ['a<1.0,>0.1'])170 assert {'a': parse_version('0.1')} not in e171 assert {'a': parse_version('0.2')} in e172 assert {'a': parse_version('1.0')} not in e173 # a AND b174 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])175 assert {'a': parse_version('0.1')} not in e176 assert {'b': parse_version('1.1')} not in e177 assert {'a': parse_version('0.1'), 'b': parse_version('1.1')} in e178def test_errata_fixable ():179 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])180 assert not e.fixable181 e = FixableErrata('some-uuid', 'description', ['a<1.0', 'b>1.0'])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful