Best Python code snippet using tempest_python
test_javelin.py
Source:test_javelin.py  
1#!/usr/bin/env python2#3#    Licensed under the Apache License, Version 2.0 (the "License"); you may4#    not use this file except in compliance with the License. You may obtain5#    a copy of the License at6#7#         http://www.apache.org/licenses/LICENSE-2.08#9#    Unless required by applicable law or agreed to in writing, software10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12#    License for the specific language governing permissions and limitations13#    under the License.14import mock15from oslotest import mockpatch16from tempest_lib import exceptions as lib_exc17from tempest.cmd import javelin18from tempest.tests import base19class JavelinUnitTest(base.TestCase):20    def setUp(self):21        super(JavelinUnitTest, self).setUp()22        javelin.setup_logging()23        self.fake_client = mock.MagicMock()24        self.fake_object = mock.MagicMock()25    def test_load_resources(self):26        with mock.patch('six.moves.builtins.open', mock.mock_open(),27                        create=True) as open_mock:28            with mock.patch('yaml.load', mock.MagicMock(),29                            create=True) as load_mock:30                javelin.load_resources(self.fake_object)31                load_mock.assert_called_once_with(open_mock(self.fake_object))32    def test_keystone_admin(self):33        self.useFixture(mockpatch.PatchObject(javelin, "OSClient"))34        javelin.OPTS = self.fake_object35        javelin.keystone_admin()36        javelin.OSClient.assert_called_once_with(37            self.fake_object.os_username,38            self.fake_object.os_password,39            self.fake_object.os_tenant_name)40    def test_client_for_user(self):41        fake_user = mock.MagicMock()42        javelin.USERS = {fake_user['name']: fake_user}43        self.useFixture(mockpatch.PatchObject(javelin, "OSClient"))44        javelin.client_for_user(fake_user['name'])45        javelin.OSClient.assert_called_once_with(46            fake_user['name'], fake_user['pass'], fake_user['tenant'])47    def test_client_for_non_existing_user(self):48        fake_non_existing_user = self.fake_object49        fake_user = mock.MagicMock()50        javelin.USERS = {fake_user['name']: fake_user}51        self.useFixture(mockpatch.PatchObject(javelin, "OSClient"))52        javelin.client_for_user(fake_non_existing_user['name'])53        self.assertFalse(javelin.OSClient.called)54    def test_attach_volumes(self):55        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",56                                              return_value=self.fake_client))57        self.useFixture(mockpatch.PatchObject(58            javelin, "_get_volume_by_name",59            return_value=self.fake_object.volume))60        self.useFixture(mockpatch.PatchObject(61            javelin, "_get_server_by_name",62            return_value=self.fake_object.server))63        javelin.attach_volumes([self.fake_object])64        mocked_function = self.fake_client.volumes.attach_volume65        mocked_function.assert_called_once_with(66            self.fake_object.volume['id'],67            self.fake_object.server['id'],68            self.fake_object['device'])69class TestCreateResources(JavelinUnitTest):70    def test_create_tenants(self):71        self.fake_client.identity.list_tenants.return_value = {'tenants': []}72        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",73                                              return_value=self.fake_client))74        javelin.create_tenants([self.fake_object['name']])75        mocked_function = self.fake_client.identity.create_tenant76        mocked_function.assert_called_once_with(self.fake_object['name'])77    def test_create_duplicate_tenant(self):78        self.fake_client.identity.list_tenants.return_value = {'tenants': [79            {'name': self.fake_object['name']}]}80        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",81                                              return_value=self.fake_client))82        javelin.create_tenants([self.fake_object['name']])83        mocked_function = self.fake_client.identity.create_tenant84        self.assertFalse(mocked_function.called)85    def test_create_users(self):86        self.fake_client.identity.get_tenant_by_name.return_value = \87            self.fake_object['tenant']88        self.fake_client.identity.get_user_by_username.side_effect = \89            lib_exc.NotFound("user is not found")90        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",91                                              return_value=self.fake_client))92        javelin.create_users([self.fake_object])93        fake_tenant_id = self.fake_object['tenant']['id']94        fake_email = "%s@%s" % (self.fake_object['user'], fake_tenant_id)95        mocked_function = self.fake_client.identity.create_user96        mocked_function.assert_called_once_with(self.fake_object['name'],97                                                self.fake_object['password'],98                                                fake_tenant_id,99                                                fake_email,100                                                enabled=True)101    def test_create_user_missing_tenant(self):102        self.fake_client.identity.get_tenant_by_name.side_effect = \103            lib_exc.NotFound("tenant is not found")104        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",105                                              return_value=self.fake_client))106        javelin.create_users([self.fake_object])107        mocked_function = self.fake_client.identity.create_user108        self.assertFalse(mocked_function.called)109    def test_create_objects(self):110        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",111                                              return_value=self.fake_client))112        self.useFixture(mockpatch.PatchObject(javelin, "_assign_swift_role"))113        self.useFixture(mockpatch.PatchObject(javelin, "_file_contents",114                        return_value=self.fake_object.content))115        javelin.create_objects([self.fake_object])116        mocked_function = self.fake_client.containers.create_container117        mocked_function.assert_called_once_with(self.fake_object['container'])118        mocked_function = self.fake_client.objects.create_object119        mocked_function.assert_called_once_with(self.fake_object['container'],120                                                self.fake_object['name'],121                                                self.fake_object.content)122    def test_create_images(self):123        self.fake_client.images.create_image.return_value = \124            self.fake_object['body']125        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",126                                              return_value=self.fake_client))127        self.useFixture(mockpatch.PatchObject(javelin, "_get_image_by_name",128                                              return_value=[]))129        self.useFixture(mockpatch.PatchObject(javelin, "_resolve_image",130                                              return_value=(None, None)))131        with mock.patch('six.moves.builtins.open', mock.mock_open(),132                        create=True) as open_mock:133            javelin.create_images([self.fake_object])134        mocked_function = self.fake_client.images.create_image135        mocked_function.assert_called_once_with(self.fake_object['name'],136                                                self.fake_object['format'],137                                                self.fake_object['format'])138        mocked_function = self.fake_client.images.store_image_file139        fake_image_id = self.fake_object['body'].get('id')140        mocked_function.assert_called_once_with(fake_image_id, open_mock())141    def test_create_networks(self):142        self.fake_client.networks.list_networks.return_value = {143            'networks': []}144        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",145                                              return_value=self.fake_client))146        javelin.create_networks([self.fake_object])147        mocked_function = self.fake_client.networks.create_network148        mocked_function.assert_called_once_with(name=self.fake_object['name'])149    def test_create_subnet(self):150        fake_network = self.fake_object['network']151        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",152                                              return_value=self.fake_client))153        self.useFixture(mockpatch.PatchObject(javelin, "_get_resource_by_name",154                                              return_value=fake_network))155        fake_netaddr = mock.MagicMock()156        self.useFixture(mockpatch.PatchObject(javelin, "netaddr",157                                              return_value=fake_netaddr))158        fake_version = javelin.netaddr.IPNetwork().version159        javelin.create_subnets([self.fake_object])160        mocked_function = self.fake_client.networks.create_subnet161        mocked_function.assert_called_once_with(network_id=fake_network['id'],162                                                cidr=self.fake_object['range'],163                                                name=self.fake_object['name'],164                                                ip_version=fake_version)165    def test_create_volumes(self):166        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",167                                              return_value=self.fake_client))168        self.useFixture(mockpatch.PatchObject(javelin, "_get_volume_by_name",169                                              return_value=None))170        self.fake_client.volumes.create_volume.return_value = \171            self.fake_object.body172        javelin.create_volumes([self.fake_object])173        mocked_function = self.fake_client.volumes.create_volume174        mocked_function.assert_called_once_with(175            size=self.fake_object['gb'],176            display_name=self.fake_object['name'])177        mocked_function = self.fake_client.volumes.wait_for_volume_status178        mocked_function.assert_called_once_with(179            self.fake_object.body['volume']['id'],180            'available')181    def test_create_volume_existing(self):182        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",183                                              return_value=self.fake_client))184        self.useFixture(mockpatch.PatchObject(javelin, "_get_volume_by_name",185                                              return_value=self.fake_object))186        self.fake_client.volumes.create_volume.return_value = \187            self.fake_object.body188        javelin.create_volumes([self.fake_object])189        mocked_function = self.fake_client.volumes.create_volume190        self.assertFalse(mocked_function.called)191        mocked_function = self.fake_client.volumes.wait_for_volume_status192        self.assertFalse(mocked_function.called)193    def test_create_router(self):194        self.fake_client.networks.list_routers.return_value = {'routers': []}195        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",196                                              return_value=self.fake_client))197        javelin.create_routers([self.fake_object])198        mocked_function = self.fake_client.networks.create_router199        mocked_function.assert_called_once_with(self.fake_object['name'])200    def test_create_router_existing(self):201        self.fake_client.networks.list_routers.return_value = {202            'routers': [self.fake_object]}203        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",204                                              return_value=self.fake_client))205        javelin.create_routers([self.fake_object])206        mocked_function = self.fake_client.networks.create_router207        self.assertFalse(mocked_function.called)208    def test_create_secgroup(self):209        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",210                                              return_value=self.fake_client))211        self.fake_client.secgroups.list_security_groups.return_value = (212            {'security_groups': []})213        self.fake_client.secgroups.create_security_group.return_value = \214            {'security_group': {'id': self.fake_object['secgroup_id']}}215        javelin.create_secgroups([self.fake_object])216        mocked_function = self.fake_client.secgroups.create_security_group217        mocked_function.assert_called_once_with(218            name=self.fake_object['name'],219            description=self.fake_object['description'])220class TestDestroyResources(JavelinUnitTest):221    def test_destroy_tenants(self):222        fake_tenant = self.fake_object['tenant']223        fake_auth = self.fake_client224        fake_auth.identity.get_tenant_by_name.return_value = fake_tenant225        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",226                                              return_value=fake_auth))227        javelin.destroy_tenants([fake_tenant])228        mocked_function = fake_auth.identity.delete_tenant229        mocked_function.assert_called_once_with(fake_tenant['id'])230    def test_destroy_users(self):231        fake_user = self.fake_object['user']232        fake_tenant = self.fake_object['tenant']233        fake_auth = self.fake_client234        fake_auth.identity.get_tenant_by_name.return_value = fake_tenant235        fake_auth.identity.get_user_by_username.return_value = fake_user236        self.useFixture(mockpatch.PatchObject(javelin, "keystone_admin",237                                              return_value=fake_auth))238        javelin.destroy_users([fake_user])239        mocked_function = fake_auth.identity.delete_user240        mocked_function.assert_called_once_with(fake_user['id'])241    def test_destroy_objects(self):242        self.fake_client.objects.delete_object.return_value = \243            {'status': "200"}, ""244        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",245                                              return_value=self.fake_client))246        javelin.destroy_objects([self.fake_object])247        mocked_function = self.fake_client.objects.delete_object248        mocked_function.asswert_called_once(self.fake_object['container'],249                                            self.fake_object['name'])250    def test_destroy_images(self):251        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",252                                              return_value=self.fake_client))253        self.useFixture(mockpatch.PatchObject(javelin, "_get_image_by_name",254                        return_value=self.fake_object['image']))255        javelin.destroy_images([self.fake_object])256        mocked_function = self.fake_client.images.delete_image257        mocked_function.assert_called_once_with(258            self.fake_object['image']['id'])259    def test_destroy_networks(self):260        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",261                                              return_value=self.fake_client))262        self.useFixture(mockpatch.PatchObject(263            javelin, "_get_resource_by_name",264            return_value=self.fake_object['resource']))265        javelin.destroy_networks([self.fake_object])266        mocked_function = self.fake_client.networks.delete_network267        mocked_function.assert_called_once_with(268            self.fake_object['resource']['id'])269    def test_destroy_volumes(self):270        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",271                                              return_value=self.fake_client))272        self.useFixture(mockpatch.PatchObject(273            javelin, "_get_volume_by_name",274            return_value=self.fake_object.volume))275        javelin.destroy_volumes([self.fake_object])276        mocked_function = self.fake_client.volumes.detach_volume277        mocked_function.assert_called_once_with(self.fake_object.volume['id'])278        mocked_function = self.fake_client.volumes.delete_volume279        mocked_function.assert_called_once_with(self.fake_object.volume['id'])280    def test_destroy_subnets(self):281        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",282                                              return_value=self.fake_client))283        fake_subnet_id = self.fake_object['subnet_id']284        self.useFixture(mockpatch.PatchObject(javelin, "_get_resource_by_name",285                                              return_value={286                                                  'id': fake_subnet_id}))287        javelin.destroy_subnets([self.fake_object])288        mocked_function = self.fake_client.subnets.delete_subnet289        mocked_function.assert_called_once_with(fake_subnet_id)290    def test_destroy_routers(self):291        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",292                                              return_value=self.fake_client))293        # this function is used on 2 different occasions in the code294        def _fake_get_resource_by_name(*args):295            if args[1] == "routers":296                return {"id": self.fake_object['router_id']}297            elif args[1] == "subnets":298                return {"id": self.fake_object['subnet_id']}299        javelin._get_resource_by_name = _fake_get_resource_by_name300        javelin.destroy_routers([self.fake_object])301        mocked_function = self.fake_client.networks.delete_router302        mocked_function.assert_called_once_with(303            self.fake_object['router_id'])304    def test_destroy_secgroup(self):305        self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",306                                              return_value=self.fake_client))307        fake_secgroup = {'id': self.fake_object['id']}308        self.useFixture(mockpatch.PatchObject(javelin, "_get_resource_by_name",309                                              return_value=fake_secgroup))310        javelin.destroy_secgroups([self.fake_object])311        mocked_function = self.fake_client.secgroups.delete_security_group...DBPatch.py
Source:DBPatch.py  
1# Embedded file name: scripts/common/db/DBPatch.py2import debug_utils3import types4import inspect5def obj2xml(dataObject):6    """7    Output xml representation of an arbitrary object to debug output, for debug purposes.8    """9    if dataObject == None:10        pass11    elif dataObject == '__call__':12        pass13    elif dataObject == '__add__':14        pass15    elif isinstance(dataObject, types.IntType):16        debug_utils.LOG_NOTE(str(dataObject))17    elif isinstance(dataObject, types.LongType):18        debug_utils.LOG_NOTE(str(dataObject))19    elif isinstance(dataObject, types.FloatType):20        debug_utils.LOG_NOTE(str(dataObject))21    elif isinstance(dataObject, types.StringTypes):22        debug_utils.LOG_NOTE(str(dataObject))23    elif isinstance(dataObject, types.BooleanType):24        debug_utils.LOG_NOTE(str(dataObject))25    elif isinstance(dataObject, types.ListType):26        for item in dataObject:27            debug_utils.LOG_NOTE('<__listitem>')28            obj2xml(item)29            debug_utils.LOG_NOTE('</__listitem>')30    elif isinstance(dataObject, types.DictType):31        for key in dataObject.keys():32            debug_utils.LOG_NOTE('<__dictitem key="' + str(key) + '">')33            obj2xml(dataObject[key])34            debug_utils.LOG_NOTE('</__dictitem>')35    elif isinstance(dataObject, object):36        for attributeName, attributeValue in inspect.getmembers(dataObject):37            if len(attributeName) > 4 and attributeName[0:2] == '__' and attributeName[-2:] == '__':38                continue39            if attributeName == 'asBlob':40                continue41            if isinstance(attributeValue, types.TypeType):42                continue43            if isinstance(attributeValue, types.MethodType):44                debug_utils.LOG_NOTE('<method name="' + attributeName + '" />')45            else:46                debug_utils.LOG_NOTE('<' + attributeName + '>')47                obj2xml(attributeValue)48                debug_utils.LOG_NOTE('</' + attributeName + '>')49    else:50        debug_utils.LOG_NOTE('ERROR unknown type unknown data.')51    return52def patch(dataObject, patchObject):53    """54    Data inheritance implementation; every attribute of dataObject is updated with similar attribute of patchObject.55    56    dataObject  - object to be "patched" (updated)57    patchObject - well, it's a patch-object :)58    59    Examples:60        dataObject.x == 0, patchObject.x == 1 results in dataObject.x == 161        dataObject.x == 0, patchObject.x == None results in dataObject.x == 062        dataObject.x == None, patchObject.x == 1 results in dataObject.x == 163        dataObject.x == [1, 2], patchObject.x == [3] results in dataObject.x == [1, 2, 3]64        dataObject.x == ['a':1, 'b':2], patchObject.x == ['c':3] results in dataObject.x == ['a':1, 'b':2, 'c':3]65    """66    if isinstance(patchObject, types.IntType):67        dataObject = patchObject68    elif isinstance(patchObject, types.LongType):69        dataObject = patchObject70    elif isinstance(patchObject, types.FloatType):71        dataObject = patchObject72    elif isinstance(patchObject, types.StringTypes):73        dataObject = patchObject74    elif isinstance(patchObject, types.BooleanType):75        dataObject = patchObject76    elif isinstance(patchObject, types.ListType):77        for item in patchObject:78            dataObject.append(item)79    elif isinstance(patchObject, types.DictType):80        for key in patchObject.keys():81            patch(dataObject[key], patchObject[key])82    elif isinstance(patchObject, object):83        for attributeName, attributeValue in inspect.getmembers(patchObject):84            if attributeName[0:1] == '_' or isinstance(attributeValue, types.MethodType):85                continue86            elif hasattr(dataObject, attributeName):87                patch(getattr(dataObject, attributeName), attributeValue)88            else:89                dataObject.__dict__[attributeName] = attributeValue90def update(dataObject, updateObject):91    """92    "Force" copy of every data field from updateObject to data object.93    dataObject and updateObject should be actual objects, not simple types like strings, integers, etc.94    95    dataObject  - object to be updated (fields are rewrited/added, as necessary)96    updateObject - well, it's a update-object :)97    98    Examples:99        dataObject.x == 0, updateObject.x == 1 results in dataObject.x == 1100        dataObject.x == 0, updateObject.x == None results in dataObject.x == 0101        dataObject.x == None, updateObject.x == 1 results in dataObject.x == 1102    """103    for attributeName, attributeValue in inspect.getmembers(updateObject):104        if attributeName[0:1] == '_' or isinstance(attributeValue, types.MethodType):105            continue106        else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
