How to use _create_response method in tempest

Best Python code snippet using tempest_python

test_zfssarest.py

Source:test_zfssarest.py Github

copy

Full Screen

...39 'property': 'manila_managed',40 'description': 'Managed by Manila',41 'type': 'Boolean',42 }43 def _create_response(self, status):44 response = fake_zfssa.FakeResponse(status)45 return response46 def test_enable_service(self):47 self.mock_object(self._zfssa.rclient, 'put')48 self._zfssa.rclient.put.return_value = self._create_response(49 restclient.Status.ACCEPTED)50 self._zfssa.enable_service('nfs')51 self.assertEqual(1, self._zfssa.rclient.put.call_count)52 self._zfssa.rclient.put.return_value = self._create_response(53 restclient.Status.OK)54 self.assertRaises(exception.ShareBackendException,55 self._zfssa.enable_service,56 'nfs')57 def test_verify_avail_space(self):58 self.mock_object(self._zfssa, 'verify_project')59 self.mock_object(self._zfssa, 'get_project_stats')60 self._zfssa.get_project_stats.return_value = 200061 self._zfssa.verify_avail_space(self.pool,62 self.project,63 self.share,64 1000)65 self.assertEqual(1, self._zfssa.verify_project.call_count)66 self.assertEqual(1, self._zfssa.get_project_stats.call_count)67 self._zfssa.verify_project.assert_called_with(self.pool, self.project)68 self._zfssa.get_project_stats.assert_called_with(self.pool,69 self.project)70 self._zfssa.get_project_stats.return_value = 90071 self.assertRaises(exception.ShareBackendException,72 self._zfssa.verify_avail_space,73 self.pool,74 self.project,75 self.share,76 1000)77 def test_create_project(self):78 self.mock_object(self._zfssa, 'verify_pool')79 self.mock_object(self._zfssa.rclient, 'get')80 self.mock_object(self._zfssa.rclient, 'post')81 arg = {82 'name': self.project,83 'sharesmb': 'off',84 'sharenfs': 'off',85 'mountpoint': 'fakemnpt',86 }87 self._zfssa.rclient.get.return_value = self._create_response(88 restclient.Status.NOT_FOUND)89 self._zfssa.rclient.post.return_value = self._create_response(90 restclient.Status.CREATED)91 self._zfssa.create_project(self.pool, self.project, arg)92 self.assertEqual(1, self._zfssa.rclient.get.call_count)93 self.assertEqual(1, self._zfssa.rclient.post.call_count)94 self.assertEqual(1, self._zfssa.verify_pool.call_count)95 self._zfssa.verify_pool.assert_called_with(self.pool)96 self._zfssa.rclient.post.return_value = self._create_response(97 restclient.Status.NOT_FOUND)98 self.assertRaises(exception.ShareBackendException,99 self._zfssa.create_project,100 self.pool,101 self.project,102 arg)103 def test_create_share(self):104 self.mock_object(self._zfssa, 'verify_avail_space')105 self.mock_object(self._zfssa.rclient, 'get')106 self.mock_object(self._zfssa.rclient, 'post')107 self._zfssa.rclient.get.return_value = self._create_response(108 restclient.Status.NOT_FOUND)109 self._zfssa.rclient.post.return_value = self._create_response(110 restclient.Status.CREATED)111 arg = {112 "name": self.share,113 "quota": 1,114 }115 self._zfssa.create_share(self.pool, self.project, arg)116 self.assertEqual(1, self._zfssa.rclient.get.call_count)117 self.assertEqual(1, self._zfssa.rclient.post.call_count)118 self.assertEqual(1, self._zfssa.verify_avail_space.call_count)119 self._zfssa.verify_avail_space.assert_called_with(self.pool,120 self.project,121 arg,122 arg['quota'])123 self._zfssa.rclient.post.return_value = self._create_response(124 restclient.Status.NOT_FOUND)125 self.assertRaises(exception.ShareBackendException,126 self._zfssa.create_share,127 self.pool,128 self.project,129 arg)130 self._zfssa.rclient.get.return_value = self._create_response(131 restclient.Status.OK)132 self.assertRaises(exception.ShareBackendException,133 self._zfssa.create_share,134 self.pool,135 self.project,136 arg)137 def test_modify_share(self):138 self.mock_object(self._zfssa.rclient, 'put')139 self._zfssa.rclient.put.return_value = self._create_response(140 restclient.Status.ACCEPTED)141 arg = {"name": "dummyname"}142 svc = self._zfssa.share_path % (self.pool, self.project, self.share)143 self._zfssa.modify_share(self.pool, self.project, self.share, arg)144 self.assertEqual(1, self._zfssa.rclient.put.call_count)145 self._zfssa.rclient.put.assert_called_with(svc, arg)146 self._zfssa.rclient.put.return_value = self._create_response(147 restclient.Status.BAD_REQUEST)148 self.assertRaises(exception.ShareBackendException,149 self._zfssa.modify_share,150 self.pool,151 self.project,152 self.share,153 arg)154 def test_delete_share(self):155 self.mock_object(self._zfssa.rclient, 'delete')156 self._zfssa.rclient.delete.return_value = self._create_response(157 restclient.Status.NO_CONTENT)158 svc = self._zfssa.share_path % (self.pool, self.project, self.share)159 self._zfssa.delete_share(self.pool, self.project, self.share)160 self.assertEqual(1, self._zfssa.rclient.delete.call_count)161 self._zfssa.rclient.delete.assert_called_with(svc)162 def test_create_snapshot(self):163 self.mock_object(self._zfssa.rclient, 'post')164 self._zfssa.rclient.post.return_value = self._create_response(165 restclient.Status.CREATED)166 arg = {"name": self.snap}167 svc = self._zfssa.snapshots_path % (self.pool,168 self.project,169 self.share)170 self._zfssa.create_snapshot(self.pool,171 self.project,172 self.share,173 self.snap)174 self.assertEqual(1, self._zfssa.rclient.post.call_count)175 self._zfssa.rclient.post.assert_called_with(svc, arg)176 self._zfssa.rclient.post.return_value = self._create_response(177 restclient.Status.BAD_REQUEST)178 self.assertRaises(exception.ShareBackendException,179 self._zfssa.create_snapshot,180 self.pool,181 self.project,182 self.share,183 self.snap)184 def test_delete_snapshot(self):185 self.mock_object(self._zfssa.rclient, 'delete')186 self._zfssa.rclient.delete.return_value = self._create_response(187 restclient.Status.NO_CONTENT)188 svc = self._zfssa.snapshot_path % (self.pool,189 self.project,190 self.share,191 self.snap)192 self._zfssa.delete_snapshot(self.pool,193 self.project,194 self.share,195 self.snap)196 self.assertEqual(1, self._zfssa.rclient.delete.call_count)197 self._zfssa.rclient.delete.assert_called_with(svc)198 self._zfssa.rclient.delete.return_value = self._create_response(199 restclient.Status.BAD_REQUEST)200 self.assertRaises(exception.ShareBackendException,201 self._zfssa.delete_snapshot,202 self.pool,203 self.project,204 self.share,205 self.snap)206 def test_clone_snapshot(self):207 self.mock_object(self._zfssa, 'verify_avail_space')208 self.mock_object(self._zfssa.rclient, 'put')209 self._zfssa.rclient.put.return_value = self._create_response(210 restclient.Status.CREATED)211 snapshot = {212 "id": self.snap,213 "share_id": self.share,214 }215 clone = {216 "id": "cloneid",217 "size": 1,218 }219 arg = {220 "name": "dummyname",221 "quota": 1,222 }223 self._zfssa.clone_snapshot(self.pool,224 self.project,225 snapshot,226 clone,227 arg)228 self.assertEqual(1, self._zfssa.rclient.put.call_count)229 self.assertEqual(1, self._zfssa.verify_avail_space.call_count)230 self._zfssa.verify_avail_space.assert_called_with(self.pool,231 self.project,232 clone['id'],233 clone['size'])234 self._zfssa.rclient.put.return_value = self._create_response(235 restclient.Status.NOT_FOUND)236 self.assertRaises(exception.ShareBackendException,237 self._zfssa.clone_snapshot,238 self.pool,239 self.project,240 snapshot,241 clone,242 arg)243 def _create_entry(self, sharenfs, ip):244 if sharenfs == 'off':245 sharenfs = 'sec=sys'246 entry = (',rw=@%s' % ip)247 if '/' not in ip:248 entry = entry + '/32'249 arg = {'sharenfs': sharenfs + entry}250 return arg251 def test_allow_access_nfs(self):252 self.mock_object(self._zfssa, 'get_share')253 self.mock_object(self._zfssa, 'modify_share')254 details = {"sharenfs": "off"}255 access = {256 "access_type": "nonip",257 "access_to": "foo",258 }259 # invalid access type260 self.assertRaises(exception.InvalidShareAccess,261 self._zfssa.allow_access_nfs,262 self.pool,263 self.project,264 self.share,265 access)266 # valid entry267 access.update({"access_type": "ip"})268 arg = self._create_entry("off", access['access_to'])269 self._zfssa.get_share.return_value = details270 self._zfssa.allow_access_nfs(self.pool,271 self.project,272 self.share,273 access)274 self.assertEqual(1, self._zfssa.get_share.call_count)275 self.assertEqual(1, self._zfssa.modify_share.call_count)276 self._zfssa.get_share.assert_called_with(self.pool,277 self.project,278 self.share)279 self._zfssa.modify_share.assert_called_with(self.pool,280 self.project,281 self.share,282 arg)283 # add another entry284 access.update({"access_to": "10.0.0.1/24"})285 arg = self._create_entry("off", access['access_to'])286 self._zfssa.allow_access_nfs(self.pool,287 self.project,288 self.share,289 access)290 self.assertEqual(2, self._zfssa.modify_share.call_count)291 self._zfssa.modify_share.assert_called_with(self.pool,292 self.project,293 self.share,294 arg)295 # verify modify_share is not called if sharenfs='on'296 details = {"sharenfs": "on"}297 self._zfssa.get_share.return_value = details298 self._zfssa.allow_access_nfs(self.pool,299 self.project,300 self.share,301 access)302 self.assertEqual(2, self._zfssa.modify_share.call_count)303 # verify modify_share is not called if ip is already in the list304 access.update({"access_to": "10.0.0.1/24"})305 details = self._create_entry("off", access['access_to'])306 self._zfssa.get_share.return_value = details307 self._zfssa.allow_access_nfs(self.pool,308 self.project,309 self.share,310 access)311 self.assertEqual(2, self._zfssa.modify_share.call_count)312 def test_deny_access_nfs(self):313 self.mock_object(self._zfssa, 'get_share')314 self.mock_object(self._zfssa, 'modify_share')315 data1 = self._create_entry("off", "10.0.0.1")316 access = {317 "access_type": "nonip",318 "access_to": "foo",319 }320 # invalid access_type321 self.assertRaises(exception.InvalidShareAccess,322 self._zfssa.deny_access_nfs,323 self.pool,324 self.project,325 self.share,326 access)327 # valid entry328 access.update({"access_type": "ip"})329 self._zfssa.get_share.return_value = data1330 self._zfssa.deny_access_nfs(self.pool,331 self.project,332 self.share,333 access)334 self.assertEqual(1, self._zfssa.get_share.call_count)335 self.assertEqual(0, self._zfssa.modify_share.call_count)336 self._zfssa.get_share.assert_called_with(self.pool,337 self.project,338 self.share)339 # another valid entry340 data1 = self._create_entry(data1['sharenfs'], '10.0.0.2/24')341 data2 = self._create_entry(data1['sharenfs'], access['access_to'])342 self._zfssa.get_share.return_value = data2343 self._zfssa.deny_access_nfs(self.pool,344 self.project,345 self.share,346 access)347 self.assertEqual(2, self._zfssa.get_share.call_count)348 self.assertEqual(1, self._zfssa.modify_share.call_count)349 self._zfssa.get_share.assert_called_with(self.pool,350 self.project,351 self.share)352 self._zfssa.modify_share.assert_called_with(self.pool,353 self.project,354 self.share,355 data1)356 def test_create_schema_negative(self):357 self.mock_object(self._zfssa.rclient, 'get')358 self.mock_object(self._zfssa.rclient, 'post')359 self._zfssa.rclient.post.return_value = self._create_response(360 restclient.Status.NOT_FOUND)361 self.assertRaises(exception.ShareBackendException,362 self._zfssa.create_schema,363 self.schema)364 def test_create_schema_property_exists(self):365 self.mock_object(self._zfssa.rclient, 'get')366 self.mock_object(self._zfssa.rclient, 'post')367 self._zfssa.rclient.get.return_value = self._create_response(368 restclient.Status.OK)369 self._zfssa.create_schema(self.schema)370 self.assertEqual(1, self._zfssa.rclient.get.call_count)371 self.assertEqual(0, self._zfssa.rclient.post.call_count)372 def test_create_schema(self):373 self.mock_object(self._zfssa.rclient, 'get')374 self.mock_object(self._zfssa.rclient, 'post')375 self._zfssa.rclient.get.return_value = self._create_response(376 restclient.Status.NOT_FOUND)377 self._zfssa.rclient.post.return_value = self._create_response(378 restclient.Status.CREATED)379 self._zfssa.create_schema(self.schema)380 self.assertEqual(1, self._zfssa.rclient.get.call_count)...

Full Screen

Full Screen

handler.py

Source:handler.py Github

copy

Full Screen

...74 'No handler configured for: {} {}'.format(request.method, request.url)75 )76 return func(request, **params)77 def _get_requests(self, _):78 return self._create_response(json.dumps(79 [r.to_dict() for r in self.storage.load_requests()]80 ).encode('utf-8'))81 def _get_last_request(self, _):82 request = self.storage.load_last_request()83 if request is not None:84 request = request.to_dict()85 return self._create_response(json.dumps(request).encode('utf-8'))86 def _clear_requests(self, _):87 self.storage.clear_requests()88 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))89 def _get_request_body(self, _, request_id):90 body = self.storage.load_request_body(request_id[0])91 return self._create_response(body, 'application/octet-stream')92 def _get_response_body(self, _, request_id):93 body = self.storage.load_response_body(request_id[0])94 return self._create_response(body, 'application/octet-stream')95 def _find_request(self, _, path):96 request = self.storage.find(path[0])97 if request is not None:98 request = request.to_dict()99 return self._create_response(json.dumps(request).encode('utf-8'))100 def _set_header_overrides(self, request):101 headers = json.loads(request.body.decode('utf-8'))102 self.modifier.headers = headers103 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))104 def _clear_header_overrides(self, _):105 del self.modifier.headers106 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))107 def _get_header_overrides(self, _):108 return self._create_response(json.dumps(self.modifier.headers).encode('utf-8'))109 def _set_param_overrides(self, request):110 params = json.loads(request.body.decode('utf-8'))111 self.modifier.params = params112 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))113 def _clear_param_overrides(self, _):114 del self.modifier.params115 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))116 def _get_param_overrides(self, _):117 return self._create_response(json.dumps(self.modifier.params).encode('utf-8'))118 def _set_querystring_overrides(self, request):119 querystring = json.loads(request.body.decode('utf-8'))['overrides']120 self.modifier.querystring = querystring121 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))122 def _clear_querystring_overrides(self, _):123 del self.modifier.querystring124 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))125 def _get_querystring_overrides(self, _):126 return self._create_response(json.dumps({127 'overrides': self.modifier.querystring}128 ).encode('utf-8'))129 def _set_rewrite_rules(self, request):130 rewrite_rules = json.loads(request.body.decode('utf-8'))131 self.modifier.rewrite_rules = rewrite_rules132 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))133 def _clear_rewrite_rules(self, _):134 del self.modifier.rewrite_rules135 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))136 def _get_rewrite_rules(self, _):137 return self._create_response(json.dumps(self.modifier.rewrite_rules).encode('utf-8'))138 def _set_scopes(self, request):139 scopes = json.loads(request.body.decode('utf-8'))140 self.scopes = scopes141 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))142 def _reset_scopes(self, _):143 self.scopes = []144 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))145 def _get_scopes(self, _):146 return self._create_response(json.dumps(self.scopes).encode('utf-8'))147 def _initialise(self, request):148 options = json.loads(request.body.decode('utf-8'))149 self.initialise(options)150 return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))151 def _create_response(self, body, content_type='application/json'):152 response = Response(153 status_code=200,154 reason='OK',155 headers={156 'Content-Type': content_type,157 },158 body=body159 )160 response.headers['Content-Length'] = str(len(response.body))161 return response162 def initialise(self, options):163 """Perform any initialisation actions.164 Args:165 options: The selenium wire options....

Full Screen

Full Screen

test_main.py

Source:test_main.py Github

copy

Full Screen

1import nest_asyncio2import json3import pytest4from fastapi.testclient import TestClient5from app.utils import Constants6from starlette.status import HTTP_201_CREATED, HTTP_200_OK7from fastapi_cache.backends.redis import RedisCacheBackend, CACHE_KEY8from app.main import connect, app9from config.app_config import TEST_HOST, TEST_PORT, TEST_INDEX10from fastapi_cache import caches11nest_asyncio.apply()12client = TestClient(app)13@pytest.fixture14async def f_backend() -> None:15 return await connect(TEST_HOST, TEST_PORT, TEST_INDEX)16def test_create_item_bad_token():17 response = client.post(18 "/api/person",19 headers={"X-Token": "coneofsilence"},20 json={"first_name": "Nithish", "last_name": "Mohan", "age": "28", "favourite_color": "red"}21 )22 assert response.status_code == 40123@pytest.mark.asyncio24async def test_create_person(f_backend: RedisCacheBackend) -> None:25 access_token = await authorization()26 response = await create_person(access_token)27 assert response.status_code == HTTP_201_CREATED28 caches.remove(CACHE_KEY)29@pytest.mark.asyncio30async def test_fetch_person(f_backend: RedisCacheBackend):31 access_token = await authorization()32 create_response = await create_person(access_token)33 _create_response = json.loads(create_response.content)34 _fetch_response = await fetch_person(_create_response['first_name'],35 _create_response['last_name'], access_token)36 assert _fetch_response.status_code == HTTP_200_OK37 assert json.dumps(_create_response, sort_keys=True) == \38 json.dumps(json.loads(_fetch_response.content), sort_keys=True)39 caches.remove(CACHE_KEY)40@pytest.mark.asyncio41async def test_update_person(f_backend: RedisCacheBackend):42 access_token = await authorization()43 create_response = await create_person(access_token)44 _create_response = json.loads(create_response.content)45 update_json = {"first_name": _create_response['first_name'],46 "last_name": _create_response['last_name'],47 "age": "29", "favourite_color": "blue"}48 response = client.put(49 "/api/person",50 headers={"Authorization": "Bearer {}".format(access_token)},51 json=update_json52 )53 fetch_response = await fetch_person(_create_response['first_name'],54 _create_response['last_name'], access_token)55 _fetch_response = json.loads(fetch_response.content)56 assert response.status_code == HTTP_200_OK57 assert json.dumps(58 _fetch_response, sort_keys=True) == json.dumps(update_json, sort_keys=True)59 caches.remove(CACHE_KEY)60@pytest.mark.asyncio61async def test_delete_person(f_backend: RedisCacheBackend) -> None:62 access_token = await authorization()63 response = client.delete(64 "/api/person",65 headers={"Authorization": "Bearer {}".format(access_token)},66 json={"first_name": "nithish", "last_name": "mohan"}67 )68 assert response.status_code == HTTP_200_OK69 caches.remove(CACHE_KEY)70async def authorization():71 result = client.post(72 "/api/login",73 json={"username": Constants.username.value, "password": Constants.password.value}74 )75 return json.loads(result.content)['access_token']76async def create_person(access_token):77 result = client.post(78 "/api/person",79 headers={"Authorization": "Bearer {}".format(access_token)},80 json={"first_name": "Nithish", "last_name": "Mohan", "age": "28", "favourite_color": "red"},81 )82 return result83async def fetch_person(first_name, last_name, access_token):84 response = client.get(85 "/api/person",86 headers={"Authorization": "Bearer {}".format(access_token)},87 params={"first_name": first_name,88 "last_name": last_name},89 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful