How to use headers method in mountebank

Best JavaScript code snippet using mountebank

test_http_headers.py

Source:test_http_headers.py Github

copy

Full Screen

...199 """200 Tests for the backwards compatible C{dict} interface for L{Headers}201 provided by L{_DictHeaders}.202 """203 def headers(self, **kw):204 """205 Create a L{Headers} instance populated with the header name/values206 specified by C{kw} and a L{_DictHeaders} wrapped around it and return207 them both.208 """209 h = Headers()210 for k, v in kw.items():211 h.setRawHeaders(k.encode('ascii'), v)212 return h, _DictHeaders(h)213 def test_getItem(self):214 """215 L{_DictHeaders.__getitem__} returns a single header for the given name.216 """217 headers, wrapper = self.headers(test=[b"lemur"])218 self.assertEqual(wrapper[b"test"], b"lemur")219 def test_getItemMultiple(self):220 """221 L{_DictHeaders.__getitem__} returns only the last header value for a222 given name.223 """224 headers, wrapper = self.headers(test=[b"lemur", b"panda"])225 self.assertEqual(wrapper[b"test"], b"panda")226 def test_getItemMissing(self):227 """228 L{_DictHeaders.__getitem__} raises L{KeyError} if called with a header229 which is not present.230 """231 headers, wrapper = self.headers()232 exc = self.assertRaises(KeyError, wrapper.__getitem__, b"test")233 self.assertEqual(exc.args, (b"test",))234 def test_iteration(self):235 """236 L{_DictHeaders.__iter__} returns an iterator the elements of which237 are the lowercase name of each header present.238 """239 headers, wrapper = self.headers(foo=[b"lemur", b"panda"], bar=[b"baz"])240 self.assertEqual(set(list(wrapper)), set([b"foo", b"bar"]))241 def test_length(self):242 """243 L{_DictHeaders.__len__} returns the number of headers present.244 """245 headers, wrapper = self.headers()246 self.assertEqual(len(wrapper), 0)247 headers.setRawHeaders(b"foo", [b"bar"])248 self.assertEqual(len(wrapper), 1)249 headers.setRawHeaders(b"test", [b"lemur", b"panda"])250 self.assertEqual(len(wrapper), 2)251 def test_setItem(self):252 """253 L{_DictHeaders.__setitem__} sets a single header value for the given254 name.255 """256 headers, wrapper = self.headers()257 wrapper[b"test"] = b"lemur"258 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])259 def test_setItemOverwrites(self):260 """261 L{_DictHeaders.__setitem__} will replace any previous header values for262 the given name.263 """264 headers, wrapper = self.headers(test=[b"lemur", b"panda"])265 wrapper[b"test"] = b"lemur"266 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])267 def test_delItem(self):268 """269 L{_DictHeaders.__delitem__} will remove the header values for the given270 name.271 """272 headers, wrapper = self.headers(test=[b"lemur"])273 del wrapper[b"test"]274 self.assertFalse(headers.hasHeader(b"test"))275 def test_delItemMissing(self):276 """277 L{_DictHeaders.__delitem__} will raise L{KeyError} if the given name is278 not present.279 """280 headers, wrapper = self.headers()281 exc = self.assertRaises(KeyError, wrapper.__delitem__, b"test")282 self.assertEqual(exc.args, (b"test",))283 def test_keys(self, _method='keys', _requireList=not _PY3):284 """285 L{_DictHeaders.keys} will return a list of all present header names.286 """287 headers, wrapper = self.headers(test=[b"lemur"], foo=[b"bar"])288 keys = getattr(wrapper, _method)()289 if _requireList:290 self.assertIsInstance(keys, list)291 self.assertEqual(set(keys), set([b"foo", b"test"]))292 def test_iterkeys(self):293 """294 L{_DictHeaders.iterkeys} will return all present header names.295 """296 self.test_keys('iterkeys', False)297 def test_values(self, _method='values', _requireList=not _PY3):298 """299 L{_DictHeaders.values} will return a list of all present header values,300 returning only the last value for headers with more than one.301 """302 headers, wrapper = self.headers(303 foo=[b"lemur"], bar=[b"marmot", b"panda"])304 values = getattr(wrapper, _method)()305 if _requireList:306 self.assertIsInstance(values, list)307 self.assertEqual(set(values), set([b"lemur", b"panda"]))308 def test_itervalues(self):309 """310 L{_DictHeaders.itervalues} will return all present header values,311 returning only the last value for headers with more than one.312 """313 self.test_values('itervalues', False)314 def test_items(self, _method='items', _requireList=not _PY3):315 """316 L{_DictHeaders.items} will return a list of all present header names317 and values as tuples, returning only the last value for headers with318 more than one.319 """320 headers, wrapper = self.headers(321 foo=[b"lemur"], bar=[b"marmot", b"panda"])322 items = getattr(wrapper, _method)()323 if _requireList:324 self.assertIsInstance(items, list)325 self.assertEqual(326 set(items), set([(b"foo", b"lemur"), (b"bar", b"panda")]))327 def test_iteritems(self):328 """329 L{_DictHeaders.iteritems} will return all present header names and330 values as tuples, returning only the last value for headers with more331 than one.332 """333 self.test_items('iteritems', False)334 def test_clear(self):335 """336 L{_DictHeaders.clear} will remove all headers.337 """338 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda"])339 wrapper.clear()340 self.assertEqual(list(headers.getAllRawHeaders()), [])341 def test_copy(self):342 """343 L{_DictHeaders.copy} will return a C{dict} with all the same headers344 and the last value for each.345 """346 headers, wrapper = self.headers(347 foo=[b"lemur", b"panda"], bar=[b"marmot"])348 duplicate = wrapper.copy()349 self.assertEqual(duplicate, {b"foo": b"panda", b"bar": b"marmot"})350 def test_get(self):351 """352 L{_DictHeaders.get} returns the last value for the given header name.353 """354 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])355 self.assertEqual(wrapper.get(b"foo"), b"panda")356 def test_getMissing(self):357 """358 L{_DictHeaders.get} returns C{None} for a header which is not present.359 """360 headers, wrapper = self.headers()361 self.assertIdentical(wrapper.get(b"foo"), None)362 def test_getDefault(self):363 """364 L{_DictHeaders.get} returns the last value for the given header name365 even when it is invoked with a default value.366 """367 headers, wrapper = self.headers(foo=[b"lemur"])368 self.assertEqual(wrapper.get(b"foo", b"bar"), b"lemur")369 def test_getDefaultMissing(self):370 """371 L{_DictHeaders.get} returns the default value specified if asked for a372 header which is not present.373 """374 headers, wrapper = self.headers()375 self.assertEqual(wrapper.get(b"foo", b"bar"), b"bar")376 def test_has_key(self):377 """378 L{_DictHeaders.has_key} returns C{True} if the given header is present,379 C{False} otherwise.380 """381 headers, wrapper = self.headers(foo=[b"lemur"])382 self.assertTrue(wrapper.has_key(b"foo"))383 self.assertFalse(wrapper.has_key(b"bar"))384 def test_contains(self):385 """386 L{_DictHeaders.__contains__} returns C{True} if the given header is387 present, C{False} otherwise.388 """389 headers, wrapper = self.headers(foo=[b"lemur"])390 self.assertIn(b"foo", wrapper)391 self.assertNotIn(b"bar", wrapper)392 def test_pop(self):393 """394 L{_DictHeaders.pop} returns the last header value associated with the395 given header name and removes the header.396 """397 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])398 self.assertEqual(wrapper.pop(b"foo"), b"panda")399 self.assertIdentical(headers.getRawHeaders(b"foo"), None)400 def test_popMissing(self):401 """402 L{_DictHeaders.pop} raises L{KeyError} if passed a header name which is403 not present.404 """405 headers, wrapper = self.headers()406 self.assertRaises(KeyError, wrapper.pop, b"foo")407 def test_popDefault(self):408 """409 L{_DictHeaders.pop} returns the last header value associated with the410 given header name and removes the header, even if it is supplied with a411 default value.412 """413 headers, wrapper = self.headers(foo=[b"lemur"])414 self.assertEqual(wrapper.pop(b"foo", b"bar"), b"lemur")415 self.assertIdentical(headers.getRawHeaders(b"foo"), None)416 def test_popDefaultMissing(self):417 """418 L{_DictHeaders.pop} returns the default value is asked for a header419 name which is not present.420 """421 headers, wrapper = self.headers(foo=[b"lemur"])422 self.assertEqual(wrapper.pop(b"bar", b"baz"), b"baz")423 self.assertEqual(headers.getRawHeaders(b"foo"), [b"lemur"])424 def test_popitem(self):425 """426 L{_DictHeaders.popitem} returns some header name/value pair.427 """428 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])429 self.assertEqual(wrapper.popitem(), (b"foo", b"panda"))430 self.assertIdentical(headers.getRawHeaders(b"foo"), None)431 def test_popitemEmpty(self):432 """433 L{_DictHeaders.popitem} raises L{KeyError} if there are no headers434 present.435 """436 headers, wrapper = self.headers()437 self.assertRaises(KeyError, wrapper.popitem)438 def test_update(self):439 """440 L{_DictHeaders.update} adds the header/value pairs in the C{dict} it is441 passed, overriding any existing values for those headers.442 """443 headers, wrapper = self.headers(foo=[b"lemur"])444 wrapper.update({b"foo": b"panda", b"bar": b"marmot"})445 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])446 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])447 def test_updateWithKeywords(self):448 """449 L{_DictHeaders.update} adds header names given as keyword arguments450 with the keyword values as the header value.451 """452 headers, wrapper = self.headers(foo=[b"lemur"])453 wrapper.update(foo=b"panda", bar=b"marmot")454 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])455 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])456 if _PY3:457 test_updateWithKeywords.skip = "Not yet supported on Python 3; see #6082."458 def test_setdefaultMissing(self):459 """460 If passed the name of a header which is not present,461 L{_DictHeaders.setdefault} sets the value of the given header to the462 specified default value and returns it.463 """464 headers, wrapper = self.headers(foo=[b"bar"])465 self.assertEqual(wrapper.setdefault(b"baz", b"quux"), b"quux")466 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar"])467 self.assertEqual(headers.getRawHeaders(b"baz"), [b"quux"])468 def test_setdefaultPresent(self):469 """470 If passed the name of a header which is present,471 L{_DictHeaders.setdefault} makes no changes to the headers and472 returns the last value already associated with that header.473 """474 headers, wrapper = self.headers(foo=[b"bar", b"baz"])475 self.assertEqual(wrapper.setdefault(b"foo", b"quux"), b"baz")476 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar", b"baz"])477 def test_setdefaultDefault(self):478 """479 If a value is not passed to L{_DictHeaders.setdefault}, C{None} is480 used.481 """482 # This results in an invalid state for the headers, but maybe some483 # application is doing this an intermediate step towards some other484 # state. Anyway, it was broken with the old implementation so it's485 # broken with the new implementation. Compatibility, for the win.486 # -exarkun487 headers, wrapper = self.headers()488 self.assertIdentical(wrapper.setdefault(b"foo"), None)489 self.assertEqual(headers.getRawHeaders(b"foo"), [None])490 def test_dictComparison(self):491 """492 An instance of L{_DictHeaders} compares equal to a C{dict} which493 contains the same header/value pairs. For header names with multiple494 values, the last value only is considered.495 """496 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda", b"marmot"])497 self.assertNotEqual(wrapper, {b"foo": b"lemur", b"bar": b"panda"})498 self.assertEqual(wrapper, {b"foo": b"lemur", b"bar": b"marmot"})499 def test_otherComparison(self):500 """501 An instance of L{_DictHeaders} does not compare equal to other502 unrelated objects.503 """504 headers, wrapper = self.headers()505 self.assertNotEqual(wrapper, ())506 self.assertNotEqual(wrapper, object())507 self.assertNotEqual(wrapper, b"foo")508 if _PY3:509 # Python 3 lacks these APIs...

Full Screen

Full Screen

test_base.py

Source:test_base.py Github

copy

Full Screen

1# encoding: utf-82from nose import tools as nose_tools3import ckan.tests.helpers as helpers4import ckan.plugins as p5import ckan.tests.factories as factories6class TestRenderSnippet(helpers.FunctionalTestBase):7 """8 Test ``ckan.lib.base.render_snippet``.9 """10 @helpers.change_config('debug', True)11 def test_comment_present_if_debug_true(self):12 response = self._get_test_app().get('/')13 assert '<!-- Snippet ' in response14 @helpers.change_config('debug', False)15 def test_comment_absent_if_debug_false(self):16 response = self._get_test_app().get('/')17 assert '<!-- Snippet ' not in response18class TestGetUserForApikey(helpers.FunctionalTestBase):19 def test_apikey_missing(self):20 app = self._get_test_app()21 request_headers = {}22 app.get('/dataset/new', headers=request_headers, status=403)23 def test_apikey_in_authorization_header(self):24 user = factories.Sysadmin()25 app = self._get_test_app()26 request_headers = {'Authorization': str(user['apikey'])}27 app.get('/dataset/new', headers=request_headers)28 def test_apikey_in_x_ckan_header(self):29 user = factories.Sysadmin()30 app = self._get_test_app()31 # non-standard header name is defined in test-core.ini32 request_headers = {'X-Non-Standard-CKAN-API-Key': str(user['apikey'])}33 app.get('/dataset/new', headers=request_headers)34 def test_apikey_contains_unicode(self):35 # there is no valid apikey containing unicode, but we should fail36 # nicely if unicode is supplied37 app = self._get_test_app()38 request_headers = {'Authorization': '\xc2\xb7'}39 app.get('/dataset/new', headers=request_headers, status=403)40class TestCORS(helpers.FunctionalTestBase):41 def test_options(self):42 app = self._get_test_app()43 response = app.options(url='/', status=200)44 assert len(str(response.body)) == 0, 'OPTIONS must return no content'45 def test_cors_config_no_cors(self):46 '''47 No ckan.cors settings in config, so no Access-Control-Allow headers in48 response.49 '''50 app = self._get_test_app()51 response = app.get('/')52 response_headers = dict(response.headers)53 assert 'Access-Control-Allow-Origin' not in response_headers54 assert 'Access-Control-Allow-Methods' not in response_headers55 assert 'Access-Control-Allow-Headers' not in response_headers56 def test_cors_config_no_cors_with_origin(self):57 '''58 No ckan.cors settings in config, so no Access-Control-Allow headers in59 response, even with origin header in request.60 '''61 app = self._get_test_app()62 request_headers = {'Origin': 'http://thirdpartyrequests.org'}63 response = app.get('/', headers=request_headers)64 response_headers = dict(response.headers)65 assert 'Access-Control-Allow-Origin' not in response_headers66 assert 'Access-Control-Allow-Methods' not in response_headers67 assert 'Access-Control-Allow-Headers' not in response_headers68 @helpers.change_config('ckan.cors.origin_allow_all', 'true')69 def test_cors_config_origin_allow_all_true_no_origin(self):70 '''71 With origin_allow_all set to true, but no origin in the request72 header, no Access-Control-Allow headers should be in the response.73 '''74 app = self._get_test_app()75 response = app.get('/')76 response_headers = dict(response.headers)77 assert 'Access-Control-Allow-Origin' not in response_headers78 assert 'Access-Control-Allow-Methods' not in response_headers79 assert 'Access-Control-Allow-Headers' not in response_headers80 @helpers.change_config('ckan.cors.origin_allow_all', 'true')81 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')82 def test_cors_config_origin_allow_all_true_with_origin(self):83 '''84 With origin_allow_all set to true, and an origin in the request85 header, the appropriate Access-Control-Allow headers should be in the86 response.87 '''88 app = self._get_test_app()89 request_headers = {'Origin': 'http://thirdpartyrequests.org'}90 response = app.get('/', headers=request_headers)91 response_headers = dict(response.headers)92 assert 'Access-Control-Allow-Origin' in response_headers93 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')94 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")95 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")96 @helpers.change_config('ckan.cors.origin_allow_all', 'false')97 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')98 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):99 '''100 With origin_allow_all set to false, with an origin in the request101 header, but no whitelist defined, there should be no Access-Control-102 Allow headers in the response.103 '''104 app = self._get_test_app()105 request_headers = {'Origin': 'http://thirdpartyrequests.org'}106 response = app.get('/', headers=request_headers)107 response_headers = dict(response.headers)108 assert 'Access-Control-Allow-Origin' not in response_headers109 assert 'Access-Control-Allow-Methods' not in response_headers110 assert 'Access-Control-Allow-Headers' not in response_headers111 @helpers.change_config('ckan.cors.origin_allow_all', 'false')112 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')113 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')114 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):115 '''116 With origin_allow_all set to false, with an origin in the request117 header, and a whitelist defined (containing the origin), the118 appropriate Access-Control-Allow headers should be in the response.119 '''120 app = self._get_test_app()121 request_headers = {'Origin': 'http://thirdpartyrequests.org'}122 response = app.get('/', headers=request_headers)123 response_headers = dict(response.headers)124 assert 'Access-Control-Allow-Origin' in response_headers125 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')126 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")127 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")128 @helpers.change_config('ckan.cors.origin_allow_all', 'false')129 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')130 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')131 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):132 '''133 With origin_allow_all set to false, with an origin in the request134 header, and a whitelist defining multiple allowed origins (containing135 the origin), the appropriate Access-Control-Allow headers should be in136 the response.137 '''138 app = self._get_test_app()139 request_headers = {'Origin': 'http://thirdpartyrequests.org'}140 response = app.get('/', headers=request_headers)141 response_headers = dict(response.headers)142 assert 'Access-Control-Allow-Origin' in response_headers143 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')144 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")145 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")146 @helpers.change_config('ckan.cors.origin_allow_all', 'false')147 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')148 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')149 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):150 '''151 With origin_allow_all set to false, with an origin in the request152 header, and a whitelist defining multiple allowed origins (but not153 containing the requesting origin), there should be no Access-Control-154 Allow headers in the response.155 '''156 app = self._get_test_app()157 request_headers = {'Origin': 'http://thirdpartyrequests.org'}158 response = app.get('/', headers=request_headers)159 response_headers = dict(response.headers)160 assert 'Access-Control-Allow-Origin' not in response_headers161 assert 'Access-Control-Allow-Methods' not in response_headers162 assert 'Access-Control-Allow-Headers' not in response_headers163class TestCORSFlask(helpers.FunctionalTestBase):164 @classmethod165 def setup_class(cls):166 super(TestCORSFlask, cls).setup_class()167 cls.app = cls._get_test_app()168 flask_app = cls.app.flask_app169 if not p.plugin_loaded('test_routing_plugin'):170 p.load('test_routing_plugin')171 plugin = p.get_plugin('test_routing_plugin')172 flask_app.register_blueprint(plugin.get_blueprint(),173 prioritise_rules=True)174 @classmethod175 def teardown_class(cls):176 super(TestCORSFlask, cls).teardown_class()177 p.unload('test_routing_plugin')178 def test_options(self):179 response = self.app.options(url='/simple_flask', status=200)180 assert len(str(response.body)) == 0, 'OPTIONS must return no content'181 def test_cors_config_no_cors(self):182 '''183 No ckan.cors settings in config, so no Access-Control-Allow headers in184 response.185 '''186 response = self.app.get('/simple_flask')187 response_headers = dict(response.headers)188 assert 'Access-Control-Allow-Origin' not in response_headers189 assert 'Access-Control-Allow-Methods' not in response_headers190 assert 'Access-Control-Allow-Headers' not in response_headers191 def test_cors_config_no_cors_with_origin(self):192 '''193 No ckan.cors settings in config, so no Access-Control-Allow headers in194 response, even with origin header in request.195 '''196 request_headers = {'Origin': 'http://thirdpartyrequests.org'}197 response = self.app.get('/simple_flask', headers=request_headers)198 response_headers = dict(response.headers)199 assert 'Access-Control-Allow-Origin' not in response_headers200 assert 'Access-Control-Allow-Methods' not in response_headers201 assert 'Access-Control-Allow-Headers' not in response_headers202 @helpers.change_config('ckan.cors.origin_allow_all', 'true')203 def test_cors_config_origin_allow_all_true_no_origin(self):204 '''205 With origin_allow_all set to true, but no origin in the request206 header, no Access-Control-Allow headers should be in the response.207 '''208 response = self.app.get('/simple_flask')209 response_headers = dict(response.headers)210 assert 'Access-Control-Allow-Origin' not in response_headers211 assert 'Access-Control-Allow-Methods' not in response_headers212 assert 'Access-Control-Allow-Headers' not in response_headers213 @helpers.change_config('ckan.cors.origin_allow_all', 'true')214 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')215 def test_cors_config_origin_allow_all_true_with_origin(self):216 '''217 With origin_allow_all set to true, and an origin in the request218 header, the appropriate Access-Control-Allow headers should be in the219 response.220 '''221 request_headers = {'Origin': 'http://thirdpartyrequests.org'}222 response = self.app.get('/simple_flask', headers=request_headers)223 response_headers = dict(response.headers)224 assert 'Access-Control-Allow-Origin' in response_headers225 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')226 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")227 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")228 @helpers.change_config('ckan.cors.origin_allow_all', 'false')229 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')230 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):231 '''232 With origin_allow_all set to false, with an origin in the request233 header, but no whitelist defined, there should be no Access-Control-234 Allow headers in the response.235 '''236 request_headers = {'Origin': 'http://thirdpartyrequests.org'}237 response = self.app.get('/simple_flask', headers=request_headers)238 response_headers = dict(response.headers)239 assert 'Access-Control-Allow-Origin' not in response_headers240 assert 'Access-Control-Allow-Methods' not in response_headers241 assert 'Access-Control-Allow-Headers' not in response_headers242 @helpers.change_config('ckan.cors.origin_allow_all', 'false')243 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')244 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')245 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):246 '''247 With origin_allow_all set to false, with an origin in the request248 header, and a whitelist defined (containing the origin), the249 appropriate Access-Control-Allow headers should be in the response.250 '''251 request_headers = {'Origin': 'http://thirdpartyrequests.org'}252 response = self.app.get('/simple_flask', headers=request_headers)253 response_headers = dict(response.headers)254 assert 'Access-Control-Allow-Origin' in response_headers255 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')256 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")257 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")258 @helpers.change_config('ckan.cors.origin_allow_all', 'false')259 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')260 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')261 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):262 '''263 With origin_allow_all set to false, with an origin in the request264 header, and a whitelist defining multiple allowed origins (containing265 the origin), the appropriate Access-Control-Allow headers should be in266 the response.267 '''268 request_headers = {'Origin': 'http://thirdpartyrequests.org'}269 response = self.app.get('/simple_flask', headers=request_headers)270 response_headers = dict(response.headers)271 assert 'Access-Control-Allow-Origin' in response_headers272 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')273 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")274 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")275 @helpers.change_config('ckan.cors.origin_allow_all', 'false')276 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')277 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')278 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):279 '''280 With origin_allow_all set to false, with an origin in the request281 header, and a whitelist defining multiple allowed origins (but not282 containing the requesting origin), there should be no Access-Control-283 Allow headers in the response.284 '''285 request_headers = {'Origin': 'http://thirdpartyrequests.org'}286 response = self.app.get('/simple_flask', headers=request_headers)287 response_headers = dict(response.headers)288 assert 'Access-Control-Allow-Origin' not in response_headers289 assert 'Access-Control-Allow-Methods' not in response_headers...

Full Screen

Full Screen

test_warc.py

Source:test_warc.py Github

copy

Full Screen

1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21import json, urllib22from operator import itemgetter23from warcio.archiveiterator import ArchiveIterator24from yarl import URL25from multidict import CIMultiDict26from hypothesis import given, reproduce_failure27import hypothesis.strategies as st28import pytest29from .warc import WarcHandler30from .logger import Logger, WarcHandlerConsumer31from .controller import ControllerStart32from .behavior import Script, ScreenshotEvent, DomSnapshotEvent33from .browser import RequestResponsePair, Base64Body, UnicodeBody34from .test_browser import requestResponsePair, urls35def test_log ():36 logger = Logger ()37 with NamedTemporaryFile() as fd:38 with WarcHandler (fd, logger) as handler:39 warclogger = WarcHandlerConsumer (handler)40 logger.connect (warclogger)41 golden = []42 assert handler.log.tell () == 043 golden.append (logger.info (foo=1, bar='baz', encoding='äöü⇔ΓΨ'))44 assert handler.log.tell () != 045 handler.maxLogSize = 046 golden.append (logger.info (bar=1, baz='baz'))47 # should flush the log48 assert handler.log.tell () == 049 fd.seek (0)50 for it in ArchiveIterator (fd):51 headers = it.rec_headers52 assert headers['warc-type'] == 'metadata'53 assert 'warc-target-uri' not in headers54 assert headers['x-crocoite-type'] == 'log'55 assert headers['content-type'] == f'application/json; charset={handler.logEncoding}'56 while True:57 l = it.raw_stream.readline ()58 if not l:59 break60 data = json.loads (l.strip ())61 assert data == golden.pop (0)62def jsonObject ():63 """ JSON-encodable objects """64 return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ()))65def viewport ():66 return st.builds (lambda x, y: f'{x}x{y}', st.integers (), st.integers ())67def event ():68 return st.one_of (69 st.builds (ControllerStart, jsonObject ()),70 st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),71 st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),72 st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),73 requestResponsePair (),74 )75@pytest.mark.asyncio76@given (st.lists (event ()))77async def test_push (golden):78 def checkWarcinfoId (headers):79 if lastWarcinfoRecordid is not None:80 assert headers['WARC-Warcinfo-ID'] == lastWarcinfoRecordid81 lastWarcinfoRecordid = None82 # null logger83 logger = Logger ()84 with open('/tmp/test.warc.gz', 'w+b') as fd:85 with WarcHandler (fd, logger) as handler:86 for g in golden:87 await handler.push (g)88 fd.seek (0)89 it = iter (ArchiveIterator (fd))90 for g in golden:91 if isinstance (g, ControllerStart):92 rec = next (it)93 headers = rec.rec_headers94 assert headers['warc-type'] == 'warcinfo'95 assert 'warc-target-uri' not in headers96 assert 'x-crocoite-type' not in headers97 data = json.load (rec.raw_stream)98 assert data == g.payload99 lastWarcinfoRecordid = headers['warc-record-id']100 assert lastWarcinfoRecordid101 elif isinstance (g, Script):102 rec = next (it)103 headers = rec.rec_headers104 assert headers['warc-type'] == 'resource'105 assert headers['content-type'] == 'application/javascript; charset=utf-8'106 assert headers['x-crocoite-type'] == 'script'107 checkWarcinfoId (headers)108 if g.path:109 assert URL (headers['warc-target-uri']) == URL ('file://' + g.abspath)110 else:111 assert 'warc-target-uri' not in headers112 data = rec.raw_stream.read ().decode ('utf-8')113 assert data == g.data114 elif isinstance (g, ScreenshotEvent):115 # XXX: check refers-to header116 rec = next (it)117 headers = rec.rec_headers118 assert headers['warc-type'] == 'conversion'119 assert headers['x-crocoite-type'] == 'screenshot'120 checkWarcinfoId (headers)121 assert URL (headers['warc-target-uri']) == g.url, (headers['warc-target-uri'], g.url)122 assert headers['warc-refers-to'] is None123 assert int (headers['X-Crocoite-Screenshot-Y-Offset']) == g.yoff124 assert rec.raw_stream.read () == g.data125 elif isinstance (g, DomSnapshotEvent):126 rec = next (it)127 headers = rec.rec_headers128 assert headers['warc-type'] == 'conversion'129 assert headers['x-crocoite-type'] == 'dom-snapshot'130 checkWarcinfoId (headers)131 assert URL (headers['warc-target-uri']) == g.url132 assert headers['warc-refers-to'] is None133 assert rec.raw_stream.read () == g.document134 elif isinstance (g, RequestResponsePair):135 rec = next (it)136 # request137 headers = rec.rec_headers138 assert headers['warc-type'] == 'request'139 assert 'x-crocoite-type' not in headers140 checkWarcinfoId (headers)141 assert URL (headers['warc-target-uri']) == g.url142 assert headers['x-chrome-request-id'] == g.id143 144 assert CIMultiDict (rec.http_headers.headers) == g.request.headers145 if g.request.hasPostData:146 if g.request.body is not None:147 assert rec.raw_stream.read () == g.request.body148 else:149 # body fetch failed150 assert headers['warc-truncated'] == 'unspecified'151 assert not rec.raw_stream.read ()152 else:153 assert not rec.raw_stream.read ()154 # response155 if g.response:156 rec = next (it)157 headers = rec.rec_headers158 httpheaders = rec.http_headers159 assert headers['warc-type'] == 'response'160 checkWarcinfoId (headers)161 assert URL (headers['warc-target-uri']) == g.url162 assert headers['x-chrome-request-id'] == g.id163 assert 'x-crocoite-type' not in headers164 # these are checked separately165 filteredHeaders = CIMultiDict (httpheaders.headers)166 for b in {'content-type', 'content-length'}:167 if b in g.response.headers:168 g.response.headers.popall (b)169 if b in filteredHeaders:170 filteredHeaders.popall (b)171 assert filteredHeaders == g.response.headers172 expectedContentType = g.response.mimeType173 if expectedContentType is not None:174 assert httpheaders['content-type'].startswith (expectedContentType)175 if g.response.body is not None:176 assert rec.raw_stream.read () == g.response.body177 assert httpheaders['content-length'] == str (len (g.response.body))178 # body is never truncated if it exists179 assert headers['warc-truncated'] is None180 # unencoded strings are converted to utf8181 if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None:182 assert httpheaders['content-type'].endswith ('; charset=utf-8')183 else:184 # body fetch failed185 assert headers['warc-truncated'] == 'unspecified'186 assert not rec.raw_stream.read ()187 # content-length header should be kept intact188 else:189 assert False, f"invalid golden type {type(g)}" # pragma: no cover190 # no further records191 with pytest.raises (StopIteration):...

Full Screen

Full Screen

test_tools.py

Source:test_tools.py Github

copy

Full Screen

1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21from operator import itemgetter22from io import BytesIO23import pytest24from warcio.archiveiterator import ArchiveIterator25from warcio.warcwriter import WARCWriter26from warcio.statusandheaders import StatusAndHeaders27from pkg_resources import parse_version28from .tools import mergeWarc, Errata, FixableErrata29@pytest.fixture30def writer():31 return WARCWriter (NamedTemporaryFile(), gzip=True)32def recordsEqual(golden, underTest):33 for a, b in zip (golden, underTest):34 # record ids are not predictable, so we cannot compare them. Dito for35 # dates. Content-* seems to be added when writing to file.36 for x in {'WARC-Record-Id', 'WARC-Block-Digest', 'WARC-Date',37 'Content-Length', 'Content-Type'}:38 a.rec_headers.remove_header(x)39 b.rec_headers.remove_header(x)40 aheader = sorted(a.rec_headers.headers, key=itemgetter(0))41 bheader = sorted(b.rec_headers.headers, key=itemgetter(0))42 assert aheader == bheader43 assert a.http_headers == b.http_headers44def makeGolden(writer, records):45 # additional warcinfo is written. Content does not matter.46 record = writer.create_warc_record (47 '',48 'warcinfo',49 payload=b'',50 warc_headers_dict={'Content-Type': 'application/json; charset=utf-8'})51 records.insert (0, record)52 return records53def test_unmodified(writer):54 """55 Single request/response pair, no revisits56 """57 records = []58 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)59 warcHeaders = {}60 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),61 warc_headers_dict=warcHeaders, http_headers=httpHeaders)62 records.append (record)63 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')64 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),65 warc_headers_dict=warcHeaders, http_headers=httpHeaders)66 records.append (record)67 for r in records:68 writer.write_record (r)69 output = NamedTemporaryFile()70 mergeWarc ([writer.out.name], output)71 output.seek(0)72 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))73def test_different_payload(writer):74 """75 Duplicate URL, but different payload76 """77 records = []78 for i in range (2):79 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)80 warcHeaders = {}81 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),82 warc_headers_dict=warcHeaders, http_headers=httpHeaders)83 records.append (record)84 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')85 record = writer.create_warc_record ('http://example.com/', 'response',86 payload=BytesIO(f'data{i}'.encode ('utf8')),87 warc_headers_dict=warcHeaders, http_headers=httpHeaders)88 records.append (record)89 for r in records:90 writer.write_record (r)91 output = NamedTemporaryFile()92 mergeWarc ([writer.out.name], output)93 output.seek(0)94 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))95def makeRevisit(writer, ref, dup):96 """ Make revisit record for reference """97 dupHeaders = dup.rec_headers98 refHeaders = ref.rec_headers99 record = writer.create_revisit_record (dupHeaders.get_header('WARC-Target-URI'),100 digest=refHeaders.get_header('WARC-Payload-Digest'),101 refers_to_uri=refHeaders.get_header('WARC-Target-URI'),102 refers_to_date=refHeaders.get_header('WARC-Date'),103 http_headers=dup.http_headers)104 record.rec_headers.add_header ('WARC-Refers-To', refHeaders.get_header('WARC-Record-ID'))105 record.rec_headers.add_header ('WARC-Truncated', 'length')106 return record107def test_resp_revisit_same_url(writer):108 """109 Duplicate record for the same URL, creates a revisit110 """111 records = []112 for i in range (2):113 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)114 warcHeaders = {}115 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),116 warc_headers_dict=warcHeaders, http_headers=httpHeaders)117 records.append (record)118 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')119 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),120 warc_headers_dict=warcHeaders, http_headers=httpHeaders)121 records.append (record)122 for r in records:123 writer.write_record (r)124 dup = records.pop ()125 ref = records[1]126 records.append (makeRevisit (writer, ref, dup))127 output = NamedTemporaryFile()128 mergeWarc ([writer.out.name], output)129 output.seek(0)130 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))131def test_resp_revisit_other_url(writer):132 """133 Duplicate record for different URL, creates a revisit134 """135 records = []136 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)137 warcHeaders = {}138 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),139 warc_headers_dict=warcHeaders, http_headers=httpHeaders)140 records.append (record)141 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')142 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),143 warc_headers_dict=warcHeaders, http_headers=httpHeaders)144 records.append (record)145 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)146 warcHeaders = {}147 record = writer.create_warc_record ('http://example.com/one', 'request', payload=BytesIO(b'foobar'),148 warc_headers_dict=warcHeaders, http_headers=httpHeaders)149 records.append (record)150 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')151 record = writer.create_warc_record ('http://example.com/one', 'response', payload=BytesIO(b'data'),152 warc_headers_dict=warcHeaders, http_headers=httpHeaders)153 records.append (record)154 for r in records:155 writer.write_record (r)156 dup = records.pop ()157 ref = records[1]158 records.append (makeRevisit (writer, ref, dup))159 output = NamedTemporaryFile()160 mergeWarc ([writer.out.name], output)161 output.seek(0)162 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))163def test_errata_contains():164 """ Test version matching """165 e = Errata('some-uuid', 'description', ['a<1.0'])166 assert {'a': parse_version('0.1')} in e167 assert {'a': parse_version('1.0')} not in e168 assert {'b': parse_version('1.0')} not in e169 e = Errata('some-uuid', 'description', ['a<1.0,>0.1'])170 assert {'a': parse_version('0.1')} not in e171 assert {'a': parse_version('0.2')} in e172 assert {'a': parse_version('1.0')} not in e173 # a AND b174 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])175 assert {'a': parse_version('0.1')} not in e176 assert {'b': parse_version('1.1')} not in e177 assert {'a': parse_version('0.1'), 'b': parse_version('1.1')} in e178def test_errata_fixable ():179 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])180 assert not e.fixable181 e = FixableErrata('some-uuid', 'description', ['a<1.0', 'b>1.0'])...

Full Screen

Full Screen

http_headers.py

Source:http_headers.py Github

copy

Full Screen

1# -*- test-case-name: twisted.web.test.test_http_headers2# Copyright (c) Twisted Matrix Laboratories.3# See LICENSE for details.4"""5An API for storing HTTP header names and values.6"""7from __future__ import division, absolute_import8from collections import MutableMapping9from twisted.python.compat import comparable, cmp10def _dashCapitalize(name):11 """12 Return a byte string which is capitalized using '-' as a word separator.13 @param name: The name of the header to capitalize.14 @type name: C{bytes}15 @return: The given header capitalized using '-' as a word separator.16 @rtype: C{bytes}17 """18 return b'-'.join([word.capitalize() for word in name.split(b'-')])19class _DictHeaders(MutableMapping):20 """21 A C{dict}-like wrapper around L{Headers} to provide backwards compatibility22 for L{twisted.web.http.Request.received_headers} and23 L{twisted.web.http.Request.headers} which used to be plain C{dict}24 instances.25 @type _headers: L{Headers}26 @ivar _headers: The real header storage object.27 """28 def __init__(self, headers):29 self._headers = headers30 def __getitem__(self, key):31 """32 Return the last value for header of C{key}.33 """34 if self._headers.hasHeader(key):35 return self._headers.getRawHeaders(key)[-1]36 raise KeyError(key)37 def __setitem__(self, key, value):38 """39 Set the given header.40 """41 self._headers.setRawHeaders(key, [value])42 def __delitem__(self, key):43 """44 Delete the given header.45 """46 if self._headers.hasHeader(key):47 self._headers.removeHeader(key)48 else:49 raise KeyError(key)50 def __iter__(self):51 """52 Return an iterator of the lowercase name of each header present.53 """54 for k, v in self._headers.getAllRawHeaders():55 yield k.lower()56 def __len__(self):57 """58 Return the number of distinct headers present.59 """60 # XXX Too many _61 return len(self._headers._rawHeaders)62 # Extra methods that MutableMapping doesn't care about but that we do.63 def copy(self):64 """65 Return a C{dict} mapping each header name to the last corresponding66 header value.67 """68 return dict(self.items())69 def has_key(self, key):70 """71 Return C{True} if C{key} is a header in this collection, C{False}72 otherwise.73 """74 return key in self75@comparable76class Headers(object):77 """78 This class stores the HTTP headers as both a parsed representation79 and the raw string representation. It converts between the two on80 demand.81 @cvar _caseMappings: A C{dict} that maps lowercase header names82 to their canonicalized representation.83 @ivar _rawHeaders: A C{dict} mapping header names as C{bytes} to C{lists} of84 header values as C{bytes}.85 """86 _caseMappings = {87 b'content-md5': b'Content-MD5',88 b'dnt': b'DNT',89 b'etag': b'ETag',90 b'p3p': b'P3P',91 b'te': b'TE',92 b'www-authenticate': b'WWW-Authenticate',93 b'x-xss-protection': b'X-XSS-Protection'}94 def __init__(self, rawHeaders=None):95 self._rawHeaders = {}96 if rawHeaders is not None:97 for name, values in rawHeaders.items():98 self.setRawHeaders(name, values[:])99 def __repr__(self):100 """101 Return a string fully describing the headers set on this object.102 """103 return '%s(%r)' % (self.__class__.__name__, self._rawHeaders,)104 def __cmp__(self, other):105 """106 Define L{Headers} instances as being equal to each other if they have107 the same raw headers.108 """109 if isinstance(other, Headers):110 return cmp(111 sorted(self._rawHeaders.items()),112 sorted(other._rawHeaders.items()))113 return NotImplemented114 def copy(self):115 """116 Return a copy of itself with the same headers set.117 """118 return self.__class__(self._rawHeaders)119 def hasHeader(self, name):120 """121 Check for the existence of a given header.122 @type name: C{bytes}123 @param name: The name of the HTTP header to check for.124 @rtype: C{bool}125 @return: C{True} if the header exists, otherwise C{False}.126 """127 return name.lower() in self._rawHeaders128 def removeHeader(self, name):129 """130 Remove the named header from this header object.131 @type name: C{bytes}132 @param name: The name of the HTTP header to remove.133 @return: C{None}134 """135 self._rawHeaders.pop(name.lower(), None)136 def setRawHeaders(self, name, values):137 """138 Sets the raw representation of the given header.139 @type name: C{bytes}140 @param name: The name of the HTTP header to set the values for.141 @type values: C{list}142 @param values: A list of strings each one being a header value of143 the given name.144 @return: C{None}145 """146 if not isinstance(values, list):147 raise TypeError("Header entry %r should be list but found "148 "instance of %r instead" % (name, type(values)))149 self._rawHeaders[name.lower()] = values150 def addRawHeader(self, name, value):151 """152 Add a new raw value for the given header.153 @type name: C{bytes}154 @param name: The name of the header for which to set the value.155 @type value: C{bytes}156 @param value: The value to set for the named header.157 """158 values = self.getRawHeaders(name)159 if values is None:160 self.setRawHeaders(name, [value])161 else:162 values.append(value)163 def getRawHeaders(self, name, default=None):164 """165 Returns a list of headers matching the given name as the raw string166 given.167 @type name: C{bytes}168 @param name: The name of the HTTP header to get the values of.169 @param default: The value to return if no header with the given C{name}170 exists.171 @rtype: C{list}172 @return: A C{list} of values for the given header.173 """174 return self._rawHeaders.get(name.lower(), default)175 def getAllRawHeaders(self):176 """177 Return an iterator of key, value pairs of all headers contained in this178 object, as strings. The keys are capitalized in canonical179 capitalization.180 """181 for k, v in self._rawHeaders.items():182 yield self._canonicalNameCaps(k), v183 def _canonicalNameCaps(self, name):184 """185 Return the canonical name for the given header.186 @type name: C{bytes}187 @param name: The all-lowercase header name to capitalize in its188 canonical form.189 @rtype: C{bytes}190 @return: The canonical name of the header.191 """192 return self._caseMappings.get(name, _dashCapitalize(name))...

Full Screen

Full Screen

test_handshake.py

Source:test_handshake.py Github

copy

Full Screen

...17 response_headers = {}18 build_response(response_headers.__setitem__, response_key)19 check_response(response_headers.__getitem__, request_key)20 @contextlib.contextmanager21 def assert_invalid_request_headers(self):22 """23 Provide request headers for corruption.24 Assert that the transformation made them invalid.25 """26 headers = {}27 build_request(headers.__setitem__)28 yield headers29 with self.assertRaises(InvalidHandshake):30 check_request(headers.__getitem__)31 def test_request_invalid_upgrade(self):32 with self.assert_invalid_request_headers() as headers:33 headers['Upgrade'] = 'socketweb'34 def test_request_missing_upgrade(self):35 with self.assert_invalid_request_headers() as headers:36 del headers['Upgrade']37 def test_request_invalid_connection(self):38 with self.assert_invalid_request_headers() as headers:39 headers['Connection'] = 'Downgrade'40 def test_request_missing_connection(self):41 with self.assert_invalid_request_headers() as headers:42 del headers['Connection']43 def test_request_invalid_key_not_base64(self):44 with self.assert_invalid_request_headers() as headers:45 headers['Sec-WebSocket-Key'] = "!@#$%^&*()"46 def test_request_invalid_key_not_well_padded(self):47 with self.assert_invalid_request_headers() as headers:48 headers['Sec-WebSocket-Key'] = "CSIRmL8dWYxeAdr/XpEHRw"49 def test_request_invalid_key_not_16_bytes_long(self):50 with self.assert_invalid_request_headers() as headers:51 headers['Sec-WebSocket-Key'] = "ZLpprpvK4PE="52 def test_request_missing_key(self):53 with self.assert_invalid_request_headers() as headers:54 del headers['Sec-WebSocket-Key']55 def test_request_invalid_version(self):56 with self.assert_invalid_request_headers() as headers:57 headers['Sec-WebSocket-Version'] = '42'58 def test_request_missing_version(self):59 with self.assert_invalid_request_headers() as headers:60 del headers['Sec-WebSocket-Version']61 @contextlib.contextmanager62 def assert_invalid_response_headers(self, key='CSIRmL8dWYxeAdr/XpEHRw=='):63 """64 Provide response headers for corruption.65 Assert that the transformation made them invalid.66 """67 headers = {}68 build_response(headers.__setitem__, key)69 yield headers70 with self.assertRaises(InvalidHandshake):71 check_response(headers.__getitem__, key)72 def test_response_invalid_upgrade(self):73 with self.assert_invalid_response_headers() as headers:74 headers['Upgrade'] = 'socketweb'75 def test_response_missing_upgrade(self):76 with self.assert_invalid_response_headers() as headers:77 del headers['Upgrade']78 def test_response_invalid_connection(self):79 with self.assert_invalid_response_headers() as headers:80 headers['Connection'] = 'Downgrade'81 def test_response_missing_connection(self):82 with self.assert_invalid_response_headers() as headers:83 del headers['Connection']84 def test_response_invalid_accept(self):85 with self.assert_invalid_response_headers() as headers:86 other_key = "1Eq4UDEFQYg3YspNgqxv5g=="87 headers['Sec-WebSocket-Accept'] = accept(other_key)88 def test_response_missing_accept(self):89 with self.assert_invalid_response_headers() as headers:...

Full Screen

Full Screen

sort_includes.py

Source:sort_includes.py Github

copy

Full Screen

1#!/usr/bin/env python2"""Script to sort the top-most block of #include lines.3Assumes the LLVM coding conventions.4Currently, this script only bothers sorting the llvm/... headers. Patches5welcome for more functionality, and sorting other header groups.6"""7import argparse8import os9def sort_includes(f):10 """Sort the #include lines of a specific file."""11 # Skip files which are under INPUTS trees or test trees.12 if 'INPUTS/' in f.name or 'test/' in f.name:13 return14 ext = os.path.splitext(f.name)[1]15 if ext not in ['.cpp', '.c', '.h', '.inc', '.def']:16 return17 lines = f.readlines()18 look_for_api_header = ext in ['.cpp', '.c']19 found_headers = False20 headers_begin = 021 headers_end = 022 api_headers = []23 local_headers = []24 project_headers = []25 system_headers = []26 for (i, l) in enumerate(lines):27 if l.strip() == '':28 continue29 if l.startswith('#include'):30 if not found_headers:31 headers_begin = i32 found_headers = True33 headers_end = i34 header = l[len('#include'):].lstrip()35 if look_for_api_header and header.startswith('"'):36 api_headers.append(header)37 look_for_api_header = False38 continue39 if header.startswith('<') or header.startswith('"gtest/'):40 system_headers.append(header)41 continue42 if (header.startswith('"llvm/') or header.startswith('"llvm-c/') or43 header.startswith('"clang/') or header.startswith('"clang-c/')):44 project_headers.append(header)45 continue46 local_headers.append(header)47 continue48 # Only allow comments and #defines prior to any includes. If either are49 # mixed with includes, the order might be sensitive.50 if found_headers:51 break52 if l.startswith('//') or l.startswith('#define') or l.startswith('#ifndef'):53 continue54 break55 if not found_headers:56 return57 local_headers = sorted(set(local_headers))58 project_headers = sorted(set(project_headers))59 system_headers = sorted(set(system_headers))60 headers = api_headers + local_headers + project_headers + system_headers61 header_lines = ['#include ' + h for h in headers]62 lines = lines[:headers_begin] + header_lines + lines[headers_end + 1:]63 f.seek(0)64 f.truncate()65 f.writelines(lines)66def main():67 parser = argparse.ArgumentParser(description=__doc__)68 parser.add_argument('files', nargs='+', type=argparse.FileType('r+'),69 help='the source files to sort includes within')70 args = parser.parse_args()71 for f in args.files:72 sort_includes(f)73if __name__ == '__main__':...

Full Screen

Full Screen

server.py

Source:server.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from .config import Config3import json, requests, urllib4class Server(Config):5 _session = requests.session()6 timelineHeaders = {}7 liffHeaders = {}8 Headers = {}9 def __init__(self, appType=None):10 self.Headers = {}11 self.timelineHeaders = {}12 Config.__init__(self, appType)13 def parseUrl(self, path):14 return self.LINE_HOST_DOMAIN + path15 def urlEncode(self, url, path, params=[]):16 return url + path + '?' + urllib.parse.urlencode(params)17 def getJson(self, url, allowHeader=False):18 if allowHeader is False:19 return json.loads(self._session.get(url).text)20 else:21 return json.loads(self._session.get(url, headers=self.Headers).text)22 def setHeadersWithDict(self, headersDict):23 self.Headers.update(headersDict)24 def setHeaders(self, argument, value):25 self.Headers[argument] = value26 def setTimelineHeadersWithDict(self, headersDict):27 self.timelineHeaders.update(headersDict)28 def setTimelineHeaders(self, argument, value):29 self.timelineHeaders[argument] = value30 def setLiffHeadersWithDict(self, headersDict):31 self.liffHeaders.update(headersDict)32 def setLiffHeaders(self, key, value):33 self.liffHeaders[key] = value34 def additionalHeaders(self, source, newSource):35 headerList={}36 headerList.update(source)37 headerList.update(newSource)38 return headerList39 def optionsContent(self, url, data=None, headers=None):40 if headers is None:41 headers=self.Headers42 return self._session.options(url, headers=headers, data=data)43 def postContent(self, url, data=None, files=None, headers=None):44 if headers is None:45 headers=self.Headers46 return self._session.post(url, headers=headers, data=data, files=files)47 def getContent(self, url, headers=None):48 if headers is None:49 headers=self.Headers50 return self._session.get(url, headers=headers, stream=True)51 def deleteContent(self, url, data=None, headers=None):52 if headers is None:53 headers=self.Headers54 return self._session.delete(url, headers=headers, data=data)55 def putContent(self, url, data=None, headers=None):56 if headers is None:57 headers=self.Headers...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2var port = 2525;3mb.create({4}, function () {5 console.log('mountebank started on port', port);6});7var imposter = {8 {9 {10 equals: {11 }12 }13 {14 is: {15 headers: {16 },17 body: JSON.stringify({18 "data": {

Full Screen

Using AI Code Generation

copy

Full Screen

1var imposter = {2 {3 {4 is: {5 headers: {6 }7 }8 }9 }10};11var mb = require('mountebank'),12 assert = require('assert'),13 Q = require('q'),14 request = require('request');15mb.create(imposter).then(function (isCreated) {16 assert(isCreated);17 return Q.nfcall(request, {18 });19}).then(function (response) {20 assert.equal(response[0].statusCode, 200);21 assert.equal(response[1].body, 'Hello, World!');22 assert.equal(response[1].headers['content-type'], 'text/plain');23 return mb.stop(4545);24}).done();

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2mb.create({port: 2525}, function (error, server) {3 server.post('/imposters', {4 {5 {6 is: {7 headers: {8 }9 }10 }11 }12 }, function (error, response) {13 if (error) {14 console.error(error);15 }16 else {17 console.log('Running on port', response.port);18 }19 });20});21var mb = require('mountebank');22mb.create({port: 2525}, function (error, server) {23 server.post('/imposters', {24 {25 {26 equals: {27 query: { key: 'value' },28 headers: { 'Content-Type': 'application/json' },29 body: { key: 'value' }30 }31 }32 {33 is: {34 }35 }36 }37 }, function (error, response) {38 if (error) {39 console.error(error);40 }41 else {42 console.log('Running on port', response.port);43 }44 });45});46var mb = require('mountebank');47mb.create({port: 2525}, function (error, server) {48 server.post('/imposters', {49 {50 {51 proxy: {52 }53 }54 }55 }, function (error, response) {56 if (error) {57 console.error(error);58 }59 else {60 console.log('Running on port', response.port);61 }62 });

Full Screen

Using AI Code Generation

copy

Full Screen

1var http = require('http');2var headers = {3};4var options = {5};6var req = http.request(options, function (res) {7 res.setEncoding('utf8');8 res.on('data', function (chunk) {9 console.log(chunk);10 });11});12req.write(JSON.stringify({13 "stubs": [{14 "responses": [{15 "is": {16 "headers": {17 },18 "body": JSON.stringify({19 })20 }21 }]22 }]23}));24req.end();

Full Screen

Using AI Code Generation

copy

Full Screen

1var express = require('express');2var app = express();3var bodyParser = require('body-parser');4var mb = require('mountebank');5var mbHelper = require('mountebank-helper');6var mbPort = 2525;7var mbHost = 'localhost';8var mbImposterPort = 4545;9var mbImposterProtocol = 'http';10var mbImposterName = 'test';11var mbImposterStub = {12 {13 is: {14 }15 }16};17var mbImposterPredicate = {18 equals: {19 }20};21var mbImposterStubWithPredicate = {22 {23 is: {24 }25 }26};27var mbImposterStubWithPredicateAndHeaders = {28 {29 is: {30 headers: {31 }32 }33 }34};35var mbImposterStubWithPredicateAndHeadersAndBody = {36 {37 is: {38 headers: {39 }40 }41 }42};43var mbImposterStubWithPredicateAndHeadersAndBodyAndTransform = {44 {45 is: {46 headers: {47 },48 _transformer: {49 input: {50 },51 output: {52 }53 }54 }55 }56};57var mbImposterStubWithPredicateAndHeadersAndBodyAndTransformAndDynamic = {

Full Screen

Using AI Code Generation

copy

Full Screen

1var mb = require('mountebank');2var port = 2525;3mb.create(port, function (error, mbServer) {4 if (error) {5 console.error(error);6 } else {7 mbServer.post('/imposters', {8 {9 {10 equals: {11 headers: {12 }13 }14 }15 {16 is: {17 headers: {18 },19 body: {20 }21 }22 }23 }24 }, function (error, response) {25 if (error) {26 console.error(error);27 } else {28 console.log(response);29 }30 });31 }32});33var mb = require('mountebank');34var port = 2525;35mb.create(port, function (error, mbServer) {36 if (error) {37 console.error(error);38 } else {39 mbServer.post('/imposters', {40 {41 {42 equals: {43 body: {

Full Screen

Using AI Code Generation

copy

Full Screen

1const mb = require('mountebank');2const { createImposter } = require('mountebank/src/models/mb');3const { createResponse } = require('mountebank/src/models/http/httpResponse');4const { createStub } = require('mountebank/src/models/http/httpStub');5const { createPredicateEquality } = require('mountebank/src/models/predicates');6const { createPredicateContains } = require('mountebank/src/models/predicates');7const { createPredicateMatches } = require('mountebank/src/models/predicates');8const { createPredicateStartsWith } = require('mountebank/src/models/predicates');9const { createPredicateEndsWith } = require('mountebank/src/models/predicates');10const { createPredicateExists } = require('mountebank/src/models/predicates');11const { createPredicateNot } = require('mountebank/src/models/predicates');12const { createPredicateAnd } = require('mountebank/src/models/predicates');13const { createPredicateOr } = require('mountebank/src/models/predicates');14const { createPredicateXpath } = require('mountebank/src/models/predicates');15const { createPredicateJsonPath } = require('mountebank/src/models/predicates');16const { createPredicateJsonTypes } = require('mountebank/src/models/predicates');17const { createPredicateJsonSchema } = require('mountebank/src/models/predicates');18const { createPredicateInjection } = require('mountebank/src/models/predicates');19const { createPredicate } = require('mountebank/src/models/predicates');20const { createProxyResponse } = require('mountebank/src/models/http/httpProxyResponse');21const { createProxyBehavior } = require('mountebank/src/models/http/httpProxyBehavior');22const { createTcpResponse } = require('mountebank/src/models/tcp/tcpResponse');23const { createTcpStub } = require('mountebank/src/models/tcp/tcpStub');24const { createTcpRequest } = require('mountebank/src/models/tcp/tcpRequest');25const { createTcpPredicate } = require('mountebank/src/models/tcp/tcpPredicate');26const { createTcpProxyResponse } = require('mountebank/src/models/tcp/tcpProxyResponse');27const { createTcpProxyBehavior } = require('mountebank/src/models/tcp/tcpProxyBehavior');28const { createTcpImposter } = require('mountebank

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run mountebank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful