How to use test_auth method in tempest

Best Python code snippet using tempest_python

test_tempauth.py

Source:test_tempauth.py Github

copy

Full Screen

1# Copyright (c) 2011 OpenStack, LLC.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13# See the License for the specific language governing permissions and14# limitations under the License.15try:16 import simplejson as json17except ImportError:18 import json19import unittest20from contextlib import contextmanager21from time import time22from webob import Request, Response23from swift.common.middleware import tempauth as auth24class FakeMemcache(object):25 def __init__(self):26 self.store = {}27 def get(self, key):28 return self.store.get(key)29 def set(self, key, value, timeout=0):30 self.store[key] = value31 return True32 def incr(self, key, timeout=0):33 self.store[key] = self.store.setdefault(key, 0) + 134 return self.store[key]35 @contextmanager36 def soft_lock(self, key, timeout=0, retries=5):37 yield True38 def delete(self, key):39 try:40 del self.store[key]41 except Exception:42 pass43 return True44class FakeApp(object):45 def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None):46 self.calls = 047 self.status_headers_body_iter = status_headers_body_iter48 if not self.status_headers_body_iter:49 self.status_headers_body_iter = iter([('404 Not Found', {}, '')])50 self.acl = acl51 self.sync_key = sync_key52 def __call__(self, env, start_response):53 self.calls += 154 self.request = Request.blank('', environ=env)55 if self.acl:56 self.request.acl = self.acl57 if self.sync_key:58 self.request.environ['swift_sync_key'] = self.sync_key59 if 'swift.authorize' in env:60 resp = env['swift.authorize'](self.request)61 if resp:62 return resp(env, start_response)63 status, headers, body = self.status_headers_body_iter.next()64 return Response(status=status, headers=headers,65 body=body)(env, start_response)66class FakeConn(object):67 def __init__(self, status_headers_body_iter=None):68 self.calls = 069 self.status_headers_body_iter = status_headers_body_iter70 if not self.status_headers_body_iter:71 self.status_headers_body_iter = iter([('404 Not Found', {}, '')])72 def request(self, method, path, headers):73 self.calls += 174 self.request_path = path75 self.status, self.headers, self.body = \76 self.status_headers_body_iter.next()77 self.status, self.reason = self.status.split(' ', 1)78 self.status = int(self.status)79 def getresponse(self):80 return self81 def read(self):82 body = self.body83 self.body = ''84 return body85class TestAuth(unittest.TestCase):86 def setUp(self):87 self.test_auth = auth.filter_factory({})(FakeApp())88 def _make_request(self, path, **kwargs):89 req = Request.blank(path, **kwargs)90 req.environ['swift.cache'] = FakeMemcache()91 return req92 def test_reseller_prefix_init(self):93 app = FakeApp()94 ath = auth.filter_factory({})(app)95 self.assertEquals(ath.reseller_prefix, 'AUTH_')96 ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app)97 self.assertEquals(ath.reseller_prefix, 'TEST_')98 ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app)99 self.assertEquals(ath.reseller_prefix, 'TEST_')100 def test_auth_prefix_init(self):101 app = FakeApp()102 ath = auth.filter_factory({})(app)103 self.assertEquals(ath.auth_prefix, '/auth/')104 ath = auth.filter_factory({'auth_prefix': ''})(app)105 self.assertEquals(ath.auth_prefix, '/auth/')106 ath = auth.filter_factory({'auth_prefix': '/test/'})(app)107 self.assertEquals(ath.auth_prefix, '/test/')108 ath = auth.filter_factory({'auth_prefix': '/test'})(app)109 self.assertEquals(ath.auth_prefix, '/test/')110 ath = auth.filter_factory({'auth_prefix': 'test/'})(app)111 self.assertEquals(ath.auth_prefix, '/test/')112 ath = auth.filter_factory({'auth_prefix': 'test'})(app)113 self.assertEquals(ath.auth_prefix, '/test/')114 def test_top_level_deny(self):115 req = self._make_request('/')116 resp = req.get_response(self.test_auth)117 self.assertEquals(resp.status_int, 401)118 self.assertEquals(req.environ['swift.authorize'],119 self.test_auth.denied_response)120 def test_anon(self):121 req = self._make_request('/v1/AUTH_account')122 resp = req.get_response(self.test_auth)123 self.assertEquals(resp.status_int, 401)124 self.assertEquals(req.environ['swift.authorize'],125 self.test_auth.authorize)126 def test_override_asked_for_but_not_allowed(self):127 self.test_auth = \128 auth.filter_factory({'allow_overrides': 'false'})(FakeApp())129 req = self._make_request('/v1/AUTH_account',130 environ={'swift.authorize_override': True})131 resp = req.get_response(self.test_auth)132 self.assertEquals(resp.status_int, 401)133 self.assertEquals(req.environ['swift.authorize'],134 self.test_auth.authorize)135 def test_override_asked_for_and_allowed(self):136 self.test_auth = \137 auth.filter_factory({'allow_overrides': 'true'})(FakeApp())138 req = self._make_request('/v1/AUTH_account',139 environ={'swift.authorize_override': True})140 resp = req.get_response(self.test_auth)141 self.assertEquals(resp.status_int, 404)142 self.assertTrue('swift.authorize' not in req.environ)143 def test_override_default_allowed(self):144 req = self._make_request('/v1/AUTH_account',145 environ={'swift.authorize_override': True})146 resp = req.get_response(self.test_auth)147 self.assertEquals(resp.status_int, 404)148 self.assertTrue('swift.authorize' not in req.environ)149 def test_auth_deny_non_reseller_prefix(self):150 req = self._make_request('/v1/BLAH_account',151 headers={'X-Auth-Token': 'BLAH_t'})152 resp = req.get_response(self.test_auth)153 self.assertEquals(resp.status_int, 401)154 self.assertEquals(req.environ['swift.authorize'],155 self.test_auth.denied_response)156 def test_auth_deny_non_reseller_prefix_no_override(self):157 fake_authorize = lambda x: Response(status='500 Fake')158 req = self._make_request('/v1/BLAH_account',159 headers={'X-Auth-Token': 'BLAH_t'},160 environ={'swift.authorize': fake_authorize}161 )162 resp = req.get_response(self.test_auth)163 self.assertEquals(resp.status_int, 500)164 self.assertEquals(req.environ['swift.authorize'], fake_authorize)165 def test_auth_no_reseller_prefix_deny(self):166 # Ensures that when we have no reseller prefix, we don't deny a request167 # outright but set up a denial swift.authorize and pass the request on168 # down the chain.169 local_app = FakeApp()170 local_auth = auth.filter_factory({'reseller_prefix': ''})(local_app)171 req = self._make_request('/v1/account',172 headers={'X-Auth-Token': 't'})173 resp = req.get_response(local_auth)174 self.assertEquals(resp.status_int, 401)175 self.assertEquals(local_app.calls, 1)176 self.assertEquals(req.environ['swift.authorize'],177 local_auth.denied_response)178 def test_auth_no_reseller_prefix_no_token(self):179 # Check that normally we set up a call back to our authorize.180 local_auth = \181 auth.filter_factory({'reseller_prefix': ''})(FakeApp(iter([])))182 req = self._make_request('/v1/account')183 resp = req.get_response(local_auth)184 self.assertEquals(resp.status_int, 401)185 self.assertEquals(req.environ['swift.authorize'],186 local_auth.authorize)187 # Now make sure we don't override an existing swift.authorize when we188 # have no reseller prefix.189 local_auth = \190 auth.filter_factory({'reseller_prefix': ''})(FakeApp())191 local_authorize = lambda req: Response('test')192 req = self._make_request('/v1/account', environ={'swift.authorize':193 local_authorize})194 resp = req.get_response(local_auth)195 self.assertEquals(resp.status_int, 200)196 self.assertEquals(req.environ['swift.authorize'], local_authorize)197 def test_auth_fail(self):198 resp = self._make_request('/v1/AUTH_cfa',199 headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)200 self.assertEquals(resp.status_int, 401)201 def test_authorize_bad_path(self):202 req = self._make_request('/badpath')203 resp = self.test_auth.authorize(req)204 self.assertEquals(resp.status_int, 401)205 req = self._make_request('/badpath')206 req.remote_user = 'act:usr,act,AUTH_cfa'207 resp = self.test_auth.authorize(req)208 self.assertEquals(resp.status_int, 403)209 def test_authorize_account_access(self):210 req = self._make_request('/v1/AUTH_cfa')211 req.remote_user = 'act:usr,act,AUTH_cfa'212 self.assertEquals(self.test_auth.authorize(req), None)213 req = self._make_request('/v1/AUTH_cfa')214 req.remote_user = 'act:usr,act'215 resp = self.test_auth.authorize(req)216 self.assertEquals(resp.status_int, 403)217 def test_authorize_acl_group_access(self):218 req = self._make_request('/v1/AUTH_cfa')219 req.remote_user = 'act:usr,act'220 resp = self.test_auth.authorize(req)221 self.assertEquals(resp.status_int, 403)222 req = self._make_request('/v1/AUTH_cfa')223 req.remote_user = 'act:usr,act'224 req.acl = 'act'225 self.assertEquals(self.test_auth.authorize(req), None)226 req = self._make_request('/v1/AUTH_cfa')227 req.remote_user = 'act:usr,act'228 req.acl = 'act:usr'229 self.assertEquals(self.test_auth.authorize(req), None)230 req = self._make_request('/v1/AUTH_cfa')231 req.remote_user = 'act:usr,act'232 req.acl = 'act2'233 resp = self.test_auth.authorize(req)234 self.assertEquals(resp.status_int, 403)235 req = self._make_request('/v1/AUTH_cfa')236 req.remote_user = 'act:usr,act'237 req.acl = 'act:usr2'238 resp = self.test_auth.authorize(req)239 self.assertEquals(resp.status_int, 403)240 def test_deny_cross_reseller(self):241 # Tests that cross-reseller is denied, even if ACLs/group names match242 req = self._make_request('/v1/OTHER_cfa')243 req.remote_user = 'act:usr,act,AUTH_cfa'244 req.acl = 'act'245 resp = self.test_auth.authorize(req)246 self.assertEquals(resp.status_int, 403)247 def test_authorize_acl_referrer_access(self):248 req = self._make_request('/v1/AUTH_cfa/c')249 req.remote_user = 'act:usr,act'250 resp = self.test_auth.authorize(req)251 self.assertEquals(resp.status_int, 403)252 req = self._make_request('/v1/AUTH_cfa/c')253 req.remote_user = 'act:usr,act'254 req.acl = '.r:*,.rlistings'255 self.assertEquals(self.test_auth.authorize(req), None)256 req = self._make_request('/v1/AUTH_cfa/c')257 req.remote_user = 'act:usr,act'258 req.acl = '.r:*' # No listings allowed259 resp = self.test_auth.authorize(req)260 self.assertEquals(resp.status_int, 403)261 req = self._make_request('/v1/AUTH_cfa/c')262 req.remote_user = 'act:usr,act'263 req.acl = '.r:.example.com,.rlistings'264 resp = self.test_auth.authorize(req)265 self.assertEquals(resp.status_int, 403)266 req = self._make_request('/v1/AUTH_cfa/c')267 req.remote_user = 'act:usr,act'268 req.referer = 'http://www.example.com/index.html'269 req.acl = '.r:.example.com,.rlistings'270 self.assertEquals(self.test_auth.authorize(req), None)271 req = self._make_request('/v1/AUTH_cfa/c')272 resp = self.test_auth.authorize(req)273 self.assertEquals(resp.status_int, 401)274 req = self._make_request('/v1/AUTH_cfa/c')275 req.acl = '.r:*,.rlistings'276 self.assertEquals(self.test_auth.authorize(req), None)277 req = self._make_request('/v1/AUTH_cfa/c')278 req.acl = '.r:*' # No listings allowed279 resp = self.test_auth.authorize(req)280 self.assertEquals(resp.status_int, 401)281 req = self._make_request('/v1/AUTH_cfa/c')282 req.acl = '.r:.example.com,.rlistings'283 resp = self.test_auth.authorize(req)284 self.assertEquals(resp.status_int, 401)285 req = self._make_request('/v1/AUTH_cfa/c')286 req.referer = 'http://www.example.com/index.html'287 req.acl = '.r:.example.com,.rlistings'288 self.assertEquals(self.test_auth.authorize(req), None)289 def test_account_put_permissions(self):290 req = self._make_request('/v1/AUTH_new',291 environ={'REQUEST_METHOD': 'PUT'})292 req.remote_user = 'act:usr,act'293 resp = self.test_auth.authorize(req)294 self.assertEquals(resp.status_int, 403)295 req = self._make_request('/v1/AUTH_new',296 environ={'REQUEST_METHOD': 'PUT'})297 req.remote_user = 'act:usr,act,AUTH_other'298 resp = self.test_auth.authorize(req)299 self.assertEquals(resp.status_int, 403)300 # Even PUTs to your own account as account admin should fail301 req = self._make_request('/v1/AUTH_old',302 environ={'REQUEST_METHOD': 'PUT'})303 req.remote_user = 'act:usr,act,AUTH_old'304 resp = self.test_auth.authorize(req)305 self.assertEquals(resp.status_int, 403)306 req = self._make_request('/v1/AUTH_new',307 environ={'REQUEST_METHOD': 'PUT'})308 req.remote_user = 'act:usr,act,.reseller_admin'309 resp = self.test_auth.authorize(req)310 self.assertEquals(resp, None)311 # .super_admin is not something the middleware should ever see or care312 # about313 req = self._make_request('/v1/AUTH_new',314 environ={'REQUEST_METHOD': 'PUT'})315 req.remote_user = 'act:usr,act,.super_admin'316 resp = self.test_auth.authorize(req)317 self.assertEquals(resp.status_int, 403)318 def test_account_delete_permissions(self):319 req = self._make_request('/v1/AUTH_new',320 environ={'REQUEST_METHOD': 'DELETE'})321 req.remote_user = 'act:usr,act'322 resp = self.test_auth.authorize(req)323 self.assertEquals(resp.status_int, 403)324 req = self._make_request('/v1/AUTH_new',325 environ={'REQUEST_METHOD': 'DELETE'})326 req.remote_user = 'act:usr,act,AUTH_other'327 resp = self.test_auth.authorize(req)328 self.assertEquals(resp.status_int, 403)329 # Even DELETEs to your own account as account admin should fail330 req = self._make_request('/v1/AUTH_old',331 environ={'REQUEST_METHOD': 'DELETE'})332 req.remote_user = 'act:usr,act,AUTH_old'333 resp = self.test_auth.authorize(req)334 self.assertEquals(resp.status_int, 403)335 req = self._make_request('/v1/AUTH_new',336 environ={'REQUEST_METHOD': 'DELETE'})337 req.remote_user = 'act:usr,act,.reseller_admin'338 resp = self.test_auth.authorize(req)339 self.assertEquals(resp, None)340 # .super_admin is not something the middleware should ever see or care341 # about342 req = self._make_request('/v1/AUTH_new',343 environ={'REQUEST_METHOD': 'DELETE'})344 req.remote_user = 'act:usr,act,.super_admin'345 resp = self.test_auth.authorize(req)346 self.assertEquals(resp.status_int, 403)347 def test_get_token_fail(self):348 resp = self._make_request('/auth/v1.0').get_response(self.test_auth)349 self.assertEquals(resp.status_int, 401)350 resp = self._make_request('/auth/v1.0',351 headers={'X-Auth-User': 'act:usr',352 'X-Auth-Key': 'key'}).get_response(self.test_auth)353 self.assertEquals(resp.status_int, 401)354 def test_get_token_fail_invalid_x_auth_user_format(self):355 resp = self._make_request('/auth/v1/act/auth',356 headers={'X-Auth-User': 'usr',357 'X-Auth-Key': 'key'}).get_response(self.test_auth)358 self.assertEquals(resp.status_int, 401)359 def test_get_token_fail_non_matching_account_in_request(self):360 resp = self._make_request('/auth/v1/act/auth',361 headers={'X-Auth-User': 'act2:usr',362 'X-Auth-Key': 'key'}).get_response(self.test_auth)363 self.assertEquals(resp.status_int, 401)364 def test_get_token_fail_bad_path(self):365 resp = self._make_request('/auth/v1/act/auth/invalid',366 headers={'X-Auth-User': 'act:usr',367 'X-Auth-Key': 'key'}).get_response(self.test_auth)368 self.assertEquals(resp.status_int, 400)369 def test_get_token_fail_missing_key(self):370 resp = self._make_request('/auth/v1/act/auth',371 headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)372 self.assertEquals(resp.status_int, 401)373 def test_allowed_sync_hosts(self):374 a = auth.filter_factory({'super_admin_key': 'supertest'})(FakeApp())375 self.assertEquals(a.allowed_sync_hosts, ['127.0.0.1'])376 a = auth.filter_factory({'super_admin_key': 'supertest',377 'allowed_sync_hosts':378 '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp())379 self.assertEquals(a.allowed_sync_hosts,380 ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1'])381 def test_reseller_admin_is_owner(self):382 orig_authorize = self.test_auth.authorize383 owner_values = []384 def mitm_authorize(req):385 rv = orig_authorize(req)386 owner_values.append(req.environ.get('swift_owner', False))387 return rv388 self.test_auth.authorize = mitm_authorize389 req = self._make_request('/v1/AUTH_cfa',390 headers={'X-Auth-Token': 'AUTH_t'})391 req.remote_user = '.reseller_admin'392 self.test_auth.authorize(req)393 self.assertEquals(owner_values, [True])394 def test_admin_is_owner(self):395 orig_authorize = self.test_auth.authorize396 owner_values = []397 def mitm_authorize(req):398 rv = orig_authorize(req)399 owner_values.append(req.environ.get('swift_owner', False))400 return rv401 self.test_auth.authorize = mitm_authorize402 req = self._make_request('/v1/AUTH_cfa',403 headers={'X-Auth-Token': 'AUTH_t'})404 req.remote_user = 'AUTH_cfa'405 self.test_auth.authorize(req)406 self.assertEquals(owner_values, [True])407 def test_regular_is_not_owner(self):408 orig_authorize = self.test_auth.authorize409 owner_values = []410 def mitm_authorize(req):411 rv = orig_authorize(req)412 owner_values.append(req.environ.get('swift_owner', False))413 return rv414 self.test_auth.authorize = mitm_authorize415 req = self._make_request('/v1/AUTH_cfa/c',416 headers={'X-Auth-Token': 'AUTH_t'})417 req.remote_user = 'act:usr'418 self.test_auth.authorize(req)419 self.assertEquals(owner_values, [False])420 def test_sync_request_success(self):421 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),422 sync_key='secret')423 req = self._make_request('/v1/AUTH_cfa/c/o',424 environ={'REQUEST_METHOD': 'DELETE'},425 headers={'x-container-sync-key': 'secret',426 'x-timestamp': '123.456'})427 req.remote_addr = '127.0.0.1'428 resp = req.get_response(self.test_auth)429 self.assertEquals(resp.status_int, 204)430 def test_sync_request_fail_key(self):431 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),432 sync_key='secret')433 req = self._make_request('/v1/AUTH_cfa/c/o',434 environ={'REQUEST_METHOD': 'DELETE'},435 headers={'x-container-sync-key': 'wrongsecret',436 'x-timestamp': '123.456'})437 req.remote_addr = '127.0.0.1'438 resp = req.get_response(self.test_auth)439 self.assertEquals(resp.status_int, 401)440 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),441 sync_key='othersecret')442 req = self._make_request('/v1/AUTH_cfa/c/o',443 environ={'REQUEST_METHOD': 'DELETE'},444 headers={'x-container-sync-key': 'secret',445 'x-timestamp': '123.456'})446 req.remote_addr = '127.0.0.1'447 resp = req.get_response(self.test_auth)448 self.assertEquals(resp.status_int, 401)449 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),450 sync_key=None)451 req = self._make_request('/v1/AUTH_cfa/c/o',452 environ={'REQUEST_METHOD': 'DELETE'},453 headers={'x-container-sync-key': 'secret',454 'x-timestamp': '123.456'})455 req.remote_addr = '127.0.0.1'456 resp = req.get_response(self.test_auth)457 self.assertEquals(resp.status_int, 401)458 def test_sync_request_fail_no_timestamp(self):459 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),460 sync_key='secret')461 req = self._make_request('/v1/AUTH_cfa/c/o',462 environ={'REQUEST_METHOD': 'DELETE'},463 headers={'x-container-sync-key': 'secret'})464 req.remote_addr = '127.0.0.1'465 resp = req.get_response(self.test_auth)466 self.assertEquals(resp.status_int, 401)467 def test_sync_request_fail_sync_host(self):468 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),469 sync_key='secret')470 req = self._make_request('/v1/AUTH_cfa/c/o',471 environ={'REQUEST_METHOD': 'DELETE'},472 headers={'x-container-sync-key': 'secret',473 'x-timestamp': '123.456'})474 req.remote_addr = '127.0.0.2'475 resp = req.get_response(self.test_auth)476 self.assertEquals(resp.status_int, 401)477 def test_sync_request_success_lb_sync_host(self):478 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),479 sync_key='secret')480 req = self._make_request('/v1/AUTH_cfa/c/o',481 environ={'REQUEST_METHOD': 'DELETE'},482 headers={'x-container-sync-key': 'secret',483 'x-timestamp': '123.456',484 'x-forwarded-for': '127.0.0.1'})485 req.remote_addr = '127.0.0.2'486 resp = req.get_response(self.test_auth)487 self.assertEquals(resp.status_int, 204)488 self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),489 sync_key='secret')490 req = self._make_request('/v1/AUTH_cfa/c/o',491 environ={'REQUEST_METHOD': 'DELETE'},492 headers={'x-container-sync-key': 'secret',493 'x-timestamp': '123.456',494 'x-cluster-client-ip': '127.0.0.1'})495 req.remote_addr = '127.0.0.2'496 resp = req.get_response(self.test_auth)497 self.assertEquals(resp.status_int, 204)498class TestParseUserCreation(unittest.TestCase):499 def test_parse_user_creation(self):500 auth_filter = auth.filter_factory({501 'user_test_tester3': 'testing',502 'user_admin_admin': 'admin .admin .reseller_admin',503 })(FakeApp())504 self.assertEquals(auth_filter.users, {505 'admin:admin': {506 'url': 'http://127.0.0.1:8080/v1/AUTH_admin', 507 'groups': ['.admin', '.reseller_admin'], 508 'key': 'admin'509 }, 'test:tester3': {510 'url': 'http://127.0.0.1:8080/v1/AUTH_test', 511 'groups': [], 512 'key': 'testing'513 },514 })515if __name__ == '__main__':...

Full Screen

Full Screen

test_keystoneauth.py

Source:test_keystoneauth.py Github

copy

Full Screen

1# Copyright (c) 2012 OpenStack, LLC.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import unittest16import webob17from swift.common.middleware import keystoneauth18class FakeApp(object):19 def __init__(self, status_headers_body_iter=None):20 self.calls = 021 self.status_headers_body_iter = status_headers_body_iter22 if not self.status_headers_body_iter:23 self.status_headers_body_iter = iter([('404 Not Found', {}, '')])24 def __call__(self, env, start_response):25 self.calls += 126 self.request = webob.Request.blank('', environ=env)27 if 'swift.authorize' in env:28 resp = env['swift.authorize'](self.request)29 if resp:30 return resp(env, start_response)31 status, headers, body = self.status_headers_body_iter.next()32 return webob.Response(status=status, headers=headers,33 body=body)(env, start_response)34class SwiftAuth(unittest.TestCase):35 def setUp(self):36 self.test_auth = keystoneauth.filter_factory({})(FakeApp())37 def _make_request(self, path=None, headers=None, **kwargs):38 if not path:39 path = '/v1/%s/c/o' % self.test_auth._get_account_for_tenant('foo')40 return webob.Request.blank(path, headers=headers, **kwargs)41 def _get_identity_headers(self, status='Confirmed', tenant_id='1',42 tenant_name='acct', user='usr', role=''):43 return dict(X_IDENTITY_STATUS=status,44 X_TENANT_ID=tenant_id,45 X_TENANT_NAME=tenant_name,46 X_ROLES=role,47 X_USER_NAME=user)48 def _get_successful_middleware(self):49 response_iter = iter([('200 OK', {}, '')])50 return keystoneauth.filter_factory({})(FakeApp(response_iter))51 def test_confirmed_identity_is_authorized(self):52 role = self.test_auth.reseller_admin_role53 headers = self._get_identity_headers(role=role)54 req = self._make_request('/v1/AUTH_acct/c', headers)55 resp = req.get_response(self._get_successful_middleware())56 self.assertEqual(resp.status_int, 200)57 def test_confirmed_identity_is_not_authorized(self):58 headers = self._get_identity_headers()59 req = self._make_request('/v1/AUTH_acct/c', headers)60 resp = req.get_response(self.test_auth)61 self.assertEqual(resp.status_int, 403)62 def test_anonymous_is_authorized_for_permitted_referrer(self):63 req = self._make_request(headers={'X_IDENTITY_STATUS': 'Invalid'})64 req.acl = '.r:*'65 resp = req.get_response(self._get_successful_middleware())66 self.assertEqual(resp.status_int, 200)67 def test_anonymous_is_not_authorized_for_unknown_reseller_prefix(self):68 req = self._make_request(path='/v1/BLAH_foo/c/o',69 headers={'X_IDENTITY_STATUS': 'Invalid'})70 resp = req.get_response(self.test_auth)71 self.assertEqual(resp.status_int, 401)72 def test_blank_reseller_prefix(self):73 conf = {'reseller_prefix': ''}74 test_auth = keystoneauth.filter_factory(conf)(FakeApp())75 account = tenant_id = 'foo'76 self.assertTrue(test_auth._reseller_check(account, tenant_id))77 def test_override_asked_for_but_not_allowed(self):78 conf = {'allow_overrides': 'false'}79 self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())80 req = self._make_request('/v1/AUTH_account',81 environ={'swift.authorize_override': True})82 resp = req.get_response(self.test_auth)83 self.assertEquals(resp.status_int, 401)84 def test_override_asked_for_and_allowed(self):85 conf = {'allow_overrides': 'true'}86 self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())87 req = self._make_request('/v1/AUTH_account',88 environ={'swift.authorize_override': True})89 resp = req.get_response(self.test_auth)90 self.assertEquals(resp.status_int, 404)91 def test_override_default_allowed(self):92 req = self._make_request('/v1/AUTH_account',93 environ={'swift.authorize_override': True})94 resp = req.get_response(self.test_auth)95 self.assertEquals(resp.status_int, 404)96 def test_quota_without_resller_admin_not_allowed(self): 97 headers = self._get_identity_headers(role='123')98 headers['x-account-meta-quota'] = 'L1'99 req = self._make_request('/v1/AUTH_account', headers=headers)100 req.method = 'PUT'101 resp = req.get_response(self.test_auth)102 self.assertEquals(resp.status_int, 403)103 req.method = 'POST'104 resp = req.get_response(self.test_auth)105 self.assertEquals(resp.status_int, 403)106 def test_quota_with_resller_admin_allowed(self):107 headers = self._get_identity_headers(108 role=self.test_auth.reseller_admin_role)109 headers['x-account-meta-quota'] = 'L1'110 req = self._make_request('/v1/AUTH_account', headers=headers)111 resp = req.get_response(self._get_successful_middleware())112 self.assertEquals(resp.status_int, 200)113class TestAuthorize(unittest.TestCase):114 def setUp(self):115 self.test_auth = keystoneauth.filter_factory({})(FakeApp())116 def _make_request(self, path, **kwargs):117 return webob.Request.blank(path, **kwargs)118 def _get_account(self, identity=None):119 if not identity:120 identity = self._get_identity()121 return self.test_auth._get_account_for_tenant(identity['tenant'][0])122 def _get_identity(self, tenant_id='tenant_id',123 tenant_name='tenant_name', user='user', roles=None):124 if not roles:125 roles = []126 return dict(tenant=(tenant_id, tenant_name), user=user, roles=roles)127 def _check_authenticate(self, account=None, identity=None, headers=None,128 exception=None, acl=None, env=None, path=None):129 if not identity:130 identity = self._get_identity()131 if not account:132 account = self._get_account(identity)133 if not path:134 path = '/v1/%s/c' % account135 default_env = {'keystone.identity': identity,136 'REMOTE_USER': identity['tenant']}137 if env:138 default_env.update(env)139 req = self._make_request(path, headers=headers, environ=default_env)140 req.acl = acl141 result = self.test_auth.authorize(req)142 if exception:143 self.assertTrue(isinstance(result, exception))144 else:145 self.assertTrue(result is None)146 return req147 def test_authorize_fails_for_unauthorized_user(self):148 self._check_authenticate(exception=webob.exc.HTTPForbidden)149 def test_authorize_fails_for_invalid_reseller_prefix(self):150 self._check_authenticate(account='BLAN_a',151 exception=webob.exc.HTTPForbidden)152 def test_authorize_succeeds_for_reseller_admin(self):153 roles = [self.test_auth.reseller_admin_role]154 identity = self._get_identity(roles=roles)155 req = self._check_authenticate(identity=identity)156 self.assertTrue(req.environ.get('swift_owner'))157 def test_authorize_succeeds_as_owner_for_operator_role(self):158 roles = self.test_auth.operator_roles.split(',')[0]159 identity = self._get_identity(roles=roles)160 req = self._check_authenticate(identity=identity)161 self.assertTrue(req.environ.get('swift_owner'))162 def _check_authorize_for_tenant_owner_match(self, exception=None):163 identity = self._get_identity()164 identity['user'] = identity['tenant'][1]165 req = self._check_authenticate(identity=identity, exception=exception)166 expected = bool(exception is None)167 self.assertEqual(bool(req.environ.get('swift_owner')), expected)168 def test_authorize_succeeds_as_owner_for_tenant_owner_match(self):169 self.test_auth.is_admin = True170 self._check_authorize_for_tenant_owner_match()171 def test_authorize_fails_as_owner_for_tenant_owner_match(self):172 self.test_auth.is_admin = False173 self._check_authorize_for_tenant_owner_match(174 exception=webob.exc.HTTPForbidden)175 def test_authorize_succeeds_for_container_sync(self):176 env = {'swift_sync_key': 'foo', 'REMOTE_ADDR': '127.0.0.1'}177 headers = {'x-container-sync-key': 'foo', 'x-timestamp': None}178 self._check_authenticate(env=env, headers=headers)179 def test_authorize_fails_for_invalid_referrer(self):180 env = {'HTTP_REFERER': 'http://invalid.com/index.html'}181 self._check_authenticate(acl='.r:example.com', env=env,182 exception=webob.exc.HTTPForbidden)183 def test_authorize_fails_for_referrer_without_rlistings(self):184 env = {'HTTP_REFERER': 'http://example.com/index.html'}185 self._check_authenticate(acl='.r:example.com', env=env,186 exception=webob.exc.HTTPForbidden)187 def test_authorize_succeeds_for_referrer_with_rlistings(self):188 env = {'HTTP_REFERER': 'http://example.com/index.html'}189 self._check_authenticate(acl='.r:example.com,.rlistings', env=env)190 def test_authorize_succeeds_for_referrer_with_obj(self):191 path = '/v1/%s/c/o' % self._get_account()192 env = {'HTTP_REFERER': 'http://example.com/index.html'}193 self._check_authenticate(acl='.r:example.com', env=env, path=path)194 def test_authorize_succeeds_for_user_role_in_roles(self):195 acl = 'allowme'196 identity = self._get_identity(roles=[acl])197 self._check_authenticate(identity=identity, acl=acl)198 def test_authorize_succeeds_for_tenant_name_user_in_roles(self):199 identity = self._get_identity()200 acl = '%s:%s' % (identity['tenant'][1], identity['user'])201 self._check_authenticate(identity=identity, acl=acl)202 def test_authorize_succeeds_for_tenant_id_user_in_roles(self):203 identity = self._get_identity()204 acl = '%s:%s' % (identity['tenant'][0], identity['user'])205 self._check_authenticate(identity=identity, acl=acl)206if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful