Best Python code snippet using localstack_python
test_zfssarest.py
Source:test_zfssarest.py  
...39            'property': 'manila_managed',40            'description': 'Managed by Manila',41            'type': 'Boolean',42        }43    def _create_response(self, status):44        response = fake_zfssa.FakeResponse(status)45        return response46    def test_enable_service(self):47        self.mock_object(self._zfssa.rclient, 'put')48        self._zfssa.rclient.put.return_value = self._create_response(49            restclient.Status.ACCEPTED)50        self._zfssa.enable_service('nfs')51        self.assertEqual(1, self._zfssa.rclient.put.call_count)52        self._zfssa.rclient.put.return_value = self._create_response(53            restclient.Status.OK)54        self.assertRaises(exception.ShareBackendException,55                          self._zfssa.enable_service,56                          'nfs')57    def test_verify_avail_space(self):58        self.mock_object(self._zfssa, 'verify_project')59        self.mock_object(self._zfssa, 'get_project_stats')60        self._zfssa.get_project_stats.return_value = 200061        self._zfssa.verify_avail_space(self.pool,62                                       self.project,63                                       self.share,64                                       1000)65        self.assertEqual(1, self._zfssa.verify_project.call_count)66        self.assertEqual(1, self._zfssa.get_project_stats.call_count)67        self._zfssa.verify_project.assert_called_with(self.pool, self.project)68        self._zfssa.get_project_stats.assert_called_with(self.pool,69                                                         self.project)70        self._zfssa.get_project_stats.return_value = 90071        self.assertRaises(exception.ShareBackendException,72                          self._zfssa.verify_avail_space,73                          self.pool,74                          self.project,75                          self.share,76                          1000)77    def test_create_project(self):78        self.mock_object(self._zfssa, 'verify_pool')79        self.mock_object(self._zfssa.rclient, 'get')80        self.mock_object(self._zfssa.rclient, 'post')81        arg = {82            'name': self.project,83            'sharesmb': 'off',84            'sharenfs': 'off',85            'mountpoint': 'fakemnpt',86        }87        self._zfssa.rclient.get.return_value = self._create_response(88            restclient.Status.NOT_FOUND)89        self._zfssa.rclient.post.return_value = self._create_response(90            restclient.Status.CREATED)91        self._zfssa.create_project(self.pool, self.project, arg)92        self.assertEqual(1, self._zfssa.rclient.get.call_count)93        self.assertEqual(1, self._zfssa.rclient.post.call_count)94        self.assertEqual(1, self._zfssa.verify_pool.call_count)95        self._zfssa.verify_pool.assert_called_with(self.pool)96        self._zfssa.rclient.post.return_value = self._create_response(97            restclient.Status.NOT_FOUND)98        self.assertRaises(exception.ShareBackendException,99                          self._zfssa.create_project,100                          self.pool,101                          self.project,102                          arg)103    def test_create_share(self):104        self.mock_object(self._zfssa, 'verify_avail_space')105        self.mock_object(self._zfssa.rclient, 'get')106        self.mock_object(self._zfssa.rclient, 'post')107        self._zfssa.rclient.get.return_value = self._create_response(108            restclient.Status.NOT_FOUND)109        self._zfssa.rclient.post.return_value = self._create_response(110            restclient.Status.CREATED)111        arg = {112            "name": self.share,113            "quota": 1,114        }115        self._zfssa.create_share(self.pool, self.project, arg)116        self.assertEqual(1, self._zfssa.rclient.get.call_count)117        self.assertEqual(1, self._zfssa.rclient.post.call_count)118        self.assertEqual(1, self._zfssa.verify_avail_space.call_count)119        self._zfssa.verify_avail_space.assert_called_with(self.pool,120                                                          self.project,121                                                          arg,122                                                          arg['quota'])123        self._zfssa.rclient.post.return_value = self._create_response(124            restclient.Status.NOT_FOUND)125        self.assertRaises(exception.ShareBackendException,126                          self._zfssa.create_share,127                          self.pool,128                          self.project,129                          arg)130        self._zfssa.rclient.get.return_value = self._create_response(131            restclient.Status.OK)132        self.assertRaises(exception.ShareBackendException,133                          self._zfssa.create_share,134                          self.pool,135                          self.project,136                          arg)137    def test_modify_share(self):138        self.mock_object(self._zfssa.rclient, 'put')139        self._zfssa.rclient.put.return_value = self._create_response(140            restclient.Status.ACCEPTED)141        arg = {"name": "dummyname"}142        svc = self._zfssa.share_path % (self.pool, self.project, self.share)143        self._zfssa.modify_share(self.pool, self.project, self.share, arg)144        self.assertEqual(1, self._zfssa.rclient.put.call_count)145        self._zfssa.rclient.put.assert_called_with(svc, arg)146        self._zfssa.rclient.put.return_value = self._create_response(147            restclient.Status.BAD_REQUEST)148        self.assertRaises(exception.ShareBackendException,149                          self._zfssa.modify_share,150                          self.pool,151                          self.project,152                          self.share,153                          arg)154    def test_delete_share(self):155        self.mock_object(self._zfssa.rclient, 'delete')156        self._zfssa.rclient.delete.return_value = self._create_response(157            restclient.Status.NO_CONTENT)158        svc = self._zfssa.share_path % (self.pool, self.project, self.share)159        self._zfssa.delete_share(self.pool, self.project, self.share)160        self.assertEqual(1, self._zfssa.rclient.delete.call_count)161        self._zfssa.rclient.delete.assert_called_with(svc)162    def test_create_snapshot(self):163        self.mock_object(self._zfssa.rclient, 'post')164        self._zfssa.rclient.post.return_value = self._create_response(165            restclient.Status.CREATED)166        arg = {"name": self.snap}167        svc = self._zfssa.snapshots_path % (self.pool,168                                            self.project,169                                            self.share)170        self._zfssa.create_snapshot(self.pool,171                                    self.project,172                                    self.share,173                                    self.snap)174        self.assertEqual(1, self._zfssa.rclient.post.call_count)175        self._zfssa.rclient.post.assert_called_with(svc, arg)176        self._zfssa.rclient.post.return_value = self._create_response(177            restclient.Status.BAD_REQUEST)178        self.assertRaises(exception.ShareBackendException,179                          self._zfssa.create_snapshot,180                          self.pool,181                          self.project,182                          self.share,183                          self.snap)184    def test_delete_snapshot(self):185        self.mock_object(self._zfssa.rclient, 'delete')186        self._zfssa.rclient.delete.return_value = self._create_response(187            restclient.Status.NO_CONTENT)188        svc = self._zfssa.snapshot_path % (self.pool,189                                           self.project,190                                           self.share,191                                           self.snap)192        self._zfssa.delete_snapshot(self.pool,193                                    self.project,194                                    self.share,195                                    self.snap)196        self.assertEqual(1, self._zfssa.rclient.delete.call_count)197        self._zfssa.rclient.delete.assert_called_with(svc)198        self._zfssa.rclient.delete.return_value = self._create_response(199            restclient.Status.BAD_REQUEST)200        self.assertRaises(exception.ShareBackendException,201                          self._zfssa.delete_snapshot,202                          self.pool,203                          self.project,204                          self.share,205                          self.snap)206    def test_clone_snapshot(self):207        self.mock_object(self._zfssa, 'verify_avail_space')208        self.mock_object(self._zfssa.rclient, 'put')209        self._zfssa.rclient.put.return_value = self._create_response(210            restclient.Status.CREATED)211        snapshot = {212            "id": self.snap,213            "share_id": self.share,214        }215        clone = {216            "id": "cloneid",217            "size": 1,218        }219        arg = {220            "name": "dummyname",221            "quota": 1,222        }223        self._zfssa.clone_snapshot(self.pool,224                                   self.project,225                                   snapshot,226                                   clone,227                                   arg)228        self.assertEqual(1, self._zfssa.rclient.put.call_count)229        self.assertEqual(1, self._zfssa.verify_avail_space.call_count)230        self._zfssa.verify_avail_space.assert_called_with(self.pool,231                                                          self.project,232                                                          clone['id'],233                                                          clone['size'])234        self._zfssa.rclient.put.return_value = self._create_response(235            restclient.Status.NOT_FOUND)236        self.assertRaises(exception.ShareBackendException,237                          self._zfssa.clone_snapshot,238                          self.pool,239                          self.project,240                          snapshot,241                          clone,242                          arg)243    def _create_entry(self, sharenfs, ip):244        if sharenfs == 'off':245            sharenfs = 'sec=sys'246        entry = (',rw=@%s' % ip)247        if '/' not in ip:248            entry = entry + '/32'249        arg = {'sharenfs': sharenfs + entry}250        return arg251    def test_allow_access_nfs(self):252        self.mock_object(self._zfssa, 'get_share')253        self.mock_object(self._zfssa, 'modify_share')254        details = {"sharenfs": "off"}255        access = {256            "access_type": "nonip",257            "access_to": "foo",258        }259        # invalid access type260        self.assertRaises(exception.InvalidShareAccess,261                          self._zfssa.allow_access_nfs,262                          self.pool,263                          self.project,264                          self.share,265                          access)266        # valid entry267        access.update({"access_type": "ip"})268        arg = self._create_entry("off", access['access_to'])269        self._zfssa.get_share.return_value = details270        self._zfssa.allow_access_nfs(self.pool,271                                     self.project,272                                     self.share,273                                     access)274        self.assertEqual(1, self._zfssa.get_share.call_count)275        self.assertEqual(1, self._zfssa.modify_share.call_count)276        self._zfssa.get_share.assert_called_with(self.pool,277                                                 self.project,278                                                 self.share)279        self._zfssa.modify_share.assert_called_with(self.pool,280                                                    self.project,281                                                    self.share,282                                                    arg)283        # add another entry284        access.update({"access_to": "10.0.0.1/24"})285        arg = self._create_entry("off", access['access_to'])286        self._zfssa.allow_access_nfs(self.pool,287                                     self.project,288                                     self.share,289                                     access)290        self.assertEqual(2, self._zfssa.modify_share.call_count)291        self._zfssa.modify_share.assert_called_with(self.pool,292                                                    self.project,293                                                    self.share,294                                                    arg)295        # verify modify_share is not called if sharenfs='on'296        details = {"sharenfs": "on"}297        self._zfssa.get_share.return_value = details298        self._zfssa.allow_access_nfs(self.pool,299                                     self.project,300                                     self.share,301                                     access)302        self.assertEqual(2, self._zfssa.modify_share.call_count)303        # verify modify_share is not called if ip is already in the list304        access.update({"access_to": "10.0.0.1/24"})305        details = self._create_entry("off", access['access_to'])306        self._zfssa.get_share.return_value = details307        self._zfssa.allow_access_nfs(self.pool,308                                     self.project,309                                     self.share,310                                     access)311        self.assertEqual(2, self._zfssa.modify_share.call_count)312    def test_deny_access_nfs(self):313        self.mock_object(self._zfssa, 'get_share')314        self.mock_object(self._zfssa, 'modify_share')315        data1 = self._create_entry("off", "10.0.0.1")316        access = {317            "access_type": "nonip",318            "access_to": "foo",319        }320        # invalid access_type321        self.assertRaises(exception.InvalidShareAccess,322                          self._zfssa.deny_access_nfs,323                          self.pool,324                          self.project,325                          self.share,326                          access)327        # valid entry328        access.update({"access_type": "ip"})329        self._zfssa.get_share.return_value = data1330        self._zfssa.deny_access_nfs(self.pool,331                                    self.project,332                                    self.share,333                                    access)334        self.assertEqual(1, self._zfssa.get_share.call_count)335        self.assertEqual(0, self._zfssa.modify_share.call_count)336        self._zfssa.get_share.assert_called_with(self.pool,337                                                 self.project,338                                                 self.share)339        # another valid entry340        data1 = self._create_entry(data1['sharenfs'], '10.0.0.2/24')341        data2 = self._create_entry(data1['sharenfs'], access['access_to'])342        self._zfssa.get_share.return_value = data2343        self._zfssa.deny_access_nfs(self.pool,344                                    self.project,345                                    self.share,346                                    access)347        self.assertEqual(2, self._zfssa.get_share.call_count)348        self.assertEqual(1, self._zfssa.modify_share.call_count)349        self._zfssa.get_share.assert_called_with(self.pool,350                                                 self.project,351                                                 self.share)352        self._zfssa.modify_share.assert_called_with(self.pool,353                                                    self.project,354                                                    self.share,355                                                    data1)356    def test_create_schema_negative(self):357        self.mock_object(self._zfssa.rclient, 'get')358        self.mock_object(self._zfssa.rclient, 'post')359        self._zfssa.rclient.post.return_value = self._create_response(360            restclient.Status.NOT_FOUND)361        self.assertRaises(exception.ShareBackendException,362                          self._zfssa.create_schema,363                          self.schema)364    def test_create_schema_property_exists(self):365        self.mock_object(self._zfssa.rclient, 'get')366        self.mock_object(self._zfssa.rclient, 'post')367        self._zfssa.rclient.get.return_value = self._create_response(368            restclient.Status.OK)369        self._zfssa.create_schema(self.schema)370        self.assertEqual(1, self._zfssa.rclient.get.call_count)371        self.assertEqual(0, self._zfssa.rclient.post.call_count)372    def test_create_schema(self):373        self.mock_object(self._zfssa.rclient, 'get')374        self.mock_object(self._zfssa.rclient, 'post')375        self._zfssa.rclient.get.return_value = self._create_response(376            restclient.Status.NOT_FOUND)377        self._zfssa.rclient.post.return_value = self._create_response(378            restclient.Status.CREATED)379        self._zfssa.create_schema(self.schema)380        self.assertEqual(1, self._zfssa.rclient.get.call_count)...handler.py
Source:handler.py  
...74                'No handler configured for: {} {}'.format(request.method, request.url)75            )76        return func(request, **params)77    def _get_requests(self, _):78        return self._create_response(json.dumps(79            [r.to_dict() for r in self.storage.load_requests()]80        ).encode('utf-8'))81    def _get_last_request(self, _):82        request = self.storage.load_last_request()83        if request is not None:84            request = request.to_dict()85        return self._create_response(json.dumps(request).encode('utf-8'))86    def _clear_requests(self, _):87        self.storage.clear_requests()88        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))89    def _get_request_body(self, _, request_id):90        body = self.storage.load_request_body(request_id[0])91        return self._create_response(body, 'application/octet-stream')92    def _get_response_body(self, _, request_id):93        body = self.storage.load_response_body(request_id[0])94        return self._create_response(body, 'application/octet-stream')95    def _find_request(self, _, path):96        request = self.storage.find(path[0])97        if request is not None:98            request = request.to_dict()99        return self._create_response(json.dumps(request).encode('utf-8'))100    def _set_header_overrides(self, request):101        headers = json.loads(request.body.decode('utf-8'))102        self.modifier.headers = headers103        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))104    def _clear_header_overrides(self, _):105        del self.modifier.headers106        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))107    def _get_header_overrides(self, _):108        return self._create_response(json.dumps(self.modifier.headers).encode('utf-8'))109    def _set_param_overrides(self, request):110        params = json.loads(request.body.decode('utf-8'))111        self.modifier.params = params112        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))113    def _clear_param_overrides(self, _):114        del self.modifier.params115        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))116    def _get_param_overrides(self, _):117        return self._create_response(json.dumps(self.modifier.params).encode('utf-8'))118    def _set_querystring_overrides(self, request):119        querystring = json.loads(request.body.decode('utf-8'))['overrides']120        self.modifier.querystring = querystring121        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))122    def _clear_querystring_overrides(self, _):123        del self.modifier.querystring124        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))125    def _get_querystring_overrides(self, _):126        return self._create_response(json.dumps({127            'overrides': self.modifier.querystring}128        ).encode('utf-8'))129    def _set_rewrite_rules(self, request):130        rewrite_rules = json.loads(request.body.decode('utf-8'))131        self.modifier.rewrite_rules = rewrite_rules132        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))133    def _clear_rewrite_rules(self, _):134        del self.modifier.rewrite_rules135        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))136    def _get_rewrite_rules(self, _):137        return self._create_response(json.dumps(self.modifier.rewrite_rules).encode('utf-8'))138    def _set_scopes(self, request):139        scopes = json.loads(request.body.decode('utf-8'))140        self.scopes = scopes141        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))142    def _reset_scopes(self, _):143        self.scopes = []144        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))145    def _get_scopes(self, _):146        return self._create_response(json.dumps(self.scopes).encode('utf-8'))147    def _initialise(self, request):148        options = json.loads(request.body.decode('utf-8'))149        self.initialise(options)150        return self._create_response(json.dumps({'status': 'ok'}).encode('utf-8'))151    def _create_response(self, body, content_type='application/json'):152        response = Response(153            status_code=200,154            reason='OK',155            headers={156                'Content-Type': content_type,157            },158            body=body159        )160        response.headers['Content-Length'] = str(len(response.body))161        return response162    def initialise(self, options):163        """Perform any initialisation actions.164        Args:165            options: The selenium wire options....test_main.py
Source:test_main.py  
1import nest_asyncio2import json3import pytest4from fastapi.testclient import TestClient5from app.utils import Constants6from starlette.status import HTTP_201_CREATED, HTTP_200_OK7from fastapi_cache.backends.redis import RedisCacheBackend, CACHE_KEY8from app.main import connect, app9from config.app_config import TEST_HOST, TEST_PORT, TEST_INDEX10from fastapi_cache import caches11nest_asyncio.apply()12client = TestClient(app)13@pytest.fixture14async def f_backend() -> None:15    return await connect(TEST_HOST, TEST_PORT, TEST_INDEX)16def test_create_item_bad_token():17    response = client.post(18        "/api/person",19        headers={"X-Token": "coneofsilence"},20        json={"first_name": "Nithish", "last_name": "Mohan", "age": "28", "favourite_color": "red"}21    )22    assert response.status_code == 40123@pytest.mark.asyncio24async def test_create_person(f_backend: RedisCacheBackend) -> None:25    access_token = await authorization()26    response = await create_person(access_token)27    assert response.status_code == HTTP_201_CREATED28    caches.remove(CACHE_KEY)29@pytest.mark.asyncio30async def test_fetch_person(f_backend: RedisCacheBackend):31    access_token = await authorization()32    create_response = await create_person(access_token)33    _create_response = json.loads(create_response.content)34    _fetch_response = await fetch_person(_create_response['first_name'],35                                         _create_response['last_name'], access_token)36    assert _fetch_response.status_code == HTTP_200_OK37    assert json.dumps(_create_response, sort_keys=True) == \38           json.dumps(json.loads(_fetch_response.content), sort_keys=True)39    caches.remove(CACHE_KEY)40@pytest.mark.asyncio41async def test_update_person(f_backend: RedisCacheBackend):42    access_token = await authorization()43    create_response = await create_person(access_token)44    _create_response = json.loads(create_response.content)45    update_json = {"first_name": _create_response['first_name'],46                   "last_name": _create_response['last_name'],47                   "age": "29", "favourite_color": "blue"}48    response = client.put(49        "/api/person",50        headers={"Authorization": "Bearer {}".format(access_token)},51        json=update_json52    )53    fetch_response = await fetch_person(_create_response['first_name'],54                                        _create_response['last_name'], access_token)55    _fetch_response = json.loads(fetch_response.content)56    assert response.status_code == HTTP_200_OK57    assert json.dumps(58        _fetch_response, sort_keys=True) == json.dumps(update_json, sort_keys=True)59    caches.remove(CACHE_KEY)60@pytest.mark.asyncio61async def test_delete_person(f_backend: RedisCacheBackend) -> None:62    access_token = await authorization()63    response = client.delete(64        "/api/person",65        headers={"Authorization": "Bearer {}".format(access_token)},66        json={"first_name": "nithish", "last_name": "mohan"}67    )68    assert response.status_code == HTTP_200_OK69    caches.remove(CACHE_KEY)70async def authorization():71    result = client.post(72        "/api/login",73        json={"username": Constants.username.value, "password": Constants.password.value}74    )75    return json.loads(result.content)['access_token']76async def create_person(access_token):77    result = client.post(78        "/api/person",79        headers={"Authorization": "Bearer {}".format(access_token)},80        json={"first_name": "Nithish", "last_name": "Mohan", "age": "28", "favourite_color": "red"},81    )82    return result83async def fetch_person(first_name, last_name, access_token):84    response = client.get(85        "/api/person",86        headers={"Authorization": "Bearer {}".format(access_token)},87        params={"first_name": first_name,88                "last_name": last_name},89    )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
