Best Python code snippet using tempest_python
basic.py
Source:basic.py  
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15from rally.common import logging16from rally.task import validation17from rally_openstack.common.services.identity import identity18from rally_openstack.task import scenario19class KeystoneBasic(scenario.OpenStackScenario):20    """Base class for Keystone scenarios with initialized service object."""21    def __init__(self, context=None, admin_clients=None, clients=None):22        super(KeystoneBasic, self).__init__(context, admin_clients, clients)23        if hasattr(self, "_admin_clients"):24            self.admin_keystone = identity.Identity(25                self._admin_clients, name_generator=self.generate_random_name,26                atomic_inst=self.atomic_actions())27        if hasattr(self, "_clients"):28            self.keystone = identity.Identity(29                self._clients, name_generator=self.generate_random_name,30                atomic_inst=self.atomic_actions())31@validation.add("required_platform", platform="openstack", admin=True)32@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},33                    name="KeystoneBasic.create_user",34                    platform="openstack")35class CreateUser(KeystoneBasic):36    @logging.log_deprecated_args(37        "The 'name_length' argument to create_user is ignored",38        "0.1.2", ["name_length"], once=True)39    def run(self, name_length=10, **kwargs):40        """Create a keystone user with random name.41        :param kwargs: Other optional parameters to create users like42                         "tenant_id", "enabled".43        """44        self.admin_keystone.create_user(**kwargs)45@validation.add("required_platform", platform="openstack", admin=True)46@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},47                    name="KeystoneBasic.create_delete_user",48                    platform="openstack")49class CreateDeleteUser(KeystoneBasic):50    @logging.log_deprecated_args(51        "The 'name_length' argument to create_delete_user is ignored",52        "0.1.2", ["name_length"], once=True)53    def run(self, name_length=10, **kwargs):54        """Create a keystone user with random name and then delete it.55        :param kwargs: Other optional parameters to create users like56                         "tenant_id", "enabled".57        """58        user = self.admin_keystone.create_user(**kwargs)59        self.admin_keystone.delete_user(user.id)60@validation.add("required_platform", platform="openstack", admin=True)61@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},62                    name="KeystoneBasic.create_user_set_enabled_and_delete",63                    platform="openstack")64class CreateUserSetEnabledAndDelete(KeystoneBasic):65    def run(self, enabled=True, **kwargs):66        """Create a keystone user, enable or disable it, and delete it.67        :param enabled: Initial state of user 'enabled' flag. The user68                        will be created with 'enabled' set to this69                        value, and then it will be toggled.70        :param kwargs: Other optional parameters to create user.71        """72        user = self.admin_keystone.create_user(enabled=enabled, **kwargs)73        self.admin_keystone.update_user(user.id, enabled=(not enabled))74        self.admin_keystone.delete_user(user.id)75@validation.add("required_platform", platform="openstack", admin=True)76@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},77                    name="KeystoneBasic.create_tenant",78                    platform="openstack")79class CreateTenant(KeystoneBasic):80    @logging.log_deprecated_args(81        "The 'name_length' argument to create_tenant is ignored",82        "0.1.2", ["name_length"], once=True)83    def run(self, name_length=10, **kwargs):84        """Create a keystone tenant with random name.85        :param kwargs: Other optional parameters86        """87        self.admin_keystone.create_project(**kwargs)88@validation.add("required_platform", platform="openstack", admin=True)89@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},90                    name="KeystoneBasic.authenticate_user_and_validate_token",91                    platform="openstack")92class AuthenticateUserAndValidateToken(KeystoneBasic):93    def run(self):94        """Authenticate and validate a keystone token."""95        token = self.admin_keystone.fetch_token()96        self.admin_keystone.validate_token(token)97@validation.add("number", param_name="users_per_tenant", minval=1)98@validation.add("required_platform", platform="openstack", admin=True)99@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},100                    name="KeystoneBasic.create_tenant_with_users",101                    platform="openstack")102class CreateTenantWithUsers(KeystoneBasic):103    @logging.log_deprecated_args(104        "The 'name_length' argument to create_tenant_with_users is ignored",105        "0.1.2", ["name_length"], once=True)106    def run(self, users_per_tenant, name_length=10, **kwargs):107        """Create a keystone tenant and several users belonging to it.108        :param users_per_tenant: number of users to create for the tenant109        :param kwargs: Other optional parameters for tenant creation110        :returns: keystone tenant instance111        """112        tenant = self.admin_keystone.create_project(**kwargs)113        self.admin_keystone.create_users(tenant.id,114                                         number_of_users=users_per_tenant)115@validation.add("required_platform", platform="openstack", admin=True)116@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},117                    name="KeystoneBasic.create_and_list_users",118                    platform="openstack")119class CreateAndListUsers(KeystoneBasic):120    @logging.log_deprecated_args(121        "The 'name_length' argument to create_and_list_users is ignored",122        "0.1.2", ["name_length"], once=True)123    def run(self, name_length=10, **kwargs):124        """Create a keystone user with random name and list all users.125        :param kwargs: Other optional parameters to create users like126                         "tenant_id", "enabled".127        """128        kwargs.pop("name", None)129        self.admin_keystone.create_user(**kwargs)130        self.admin_keystone.list_users()131@validation.add("required_platform", platform="openstack", admin=True)132@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},133                    name="KeystoneBasic.create_and_list_tenants",134                    platform="openstack")135class CreateAndListTenants(KeystoneBasic):136    @logging.log_deprecated_args(137        "The 'name_length' argument to create_and_list_tenants is ignored",138        "0.1.2", ["name_length"], once=True)139    def run(self, name_length=10, **kwargs):140        """Create a keystone tenant with random name and list all tenants.141        :param kwargs: Other optional parameters142        """143        self.admin_keystone.create_project(**kwargs)144        self.admin_keystone.list_projects()145@validation.add("required_platform", platform="openstack",146                admin=True, users=True)147@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},148                    name="KeystoneBasic.add_and_remove_user_role",149                    platform="openstack")150class AddAndRemoveUserRole(KeystoneBasic):151    def run(self):152        """Create a user role add to a user and disassociate."""153        tenant_id = self.context["tenant"]["id"]154        user_id = self.context["user"]["id"]155        role = self.admin_keystone.create_role()156        self.admin_keystone.add_role(role_id=role.id, user_id=user_id,157                                     project_id=tenant_id)158        self.admin_keystone.revoke_role(role.id, user_id=user_id,159                                        project_id=tenant_id)160@validation.add("required_platform", platform="openstack", admin=True)161@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},162                    name="KeystoneBasic.create_and_delete_role",163                    platform="openstack")164class CreateAndDeleteRole(KeystoneBasic):165    def run(self):166        """Create a user role and delete it."""167        role = self.admin_keystone.create_role()168        self.admin_keystone.delete_role(role.id)169@validation.add("required_platform", platform="openstack",170                admin=True, users=True)171@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},172                    name="KeystoneBasic.create_add_and_list_user_roles",173                    platform="openstack")174class CreateAddAndListUserRoles(KeystoneBasic):175    def run(self):176        """Create user role, add it and list user roles for given user."""177        tenant_id = self.context["tenant"]["id"]178        user_id = self.context["user"]["id"]179        role = self.admin_keystone.create_role()180        self.admin_keystone.add_role(user_id=user_id, role_id=role.id,181                                     project_id=tenant_id)182        self.admin_keystone.list_roles(user_id=user_id, project_id=tenant_id)183@validation.add("required_platform", platform="openstack", admin=True)184@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},185                    name="KeystoneBasic.get_entities",186                    platform="openstack")187class GetEntities(KeystoneBasic):188    def run(self, service_name="keystone"):189        """Get instance of a tenant, user, role and service by id's.190        An ephemeral tenant, user, and role are each created. By191        default, fetches the 'keystone' service. This can be192        overridden (for instance, to get the 'Identity Service'193        service on older OpenStack), or None can be passed explicitly194        to service_name to create a new service and then query it by195        ID.196        :param service_name: The name of the service to get by ID; or197                             None, to create an ephemeral service and198                             get it by ID.199        """200        project = self.admin_keystone.create_project()201        user = self.admin_keystone.create_user(project_id=project.id)202        role = self.admin_keystone.create_role()203        self.admin_keystone.get_project(project.id)204        self.admin_keystone.get_user(user.id)205        self.admin_keystone.get_role(role.id)206        if service_name is None:207            service = self.admin_keystone.create_service()208        else:209            service = self.admin_keystone.get_service_by_name(service_name)210        self.admin_keystone.get_service(service.id)211@validation.add("required_platform", platform="openstack", admin=True)212@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},213                    name="KeystoneBasic.create_and_delete_service",214                    platform="openstack")215class CreateAndDeleteService(KeystoneBasic):216    @logging.log_deprecated_args(217        "The 'name' argument to create_and_delete_service will be ignored",218        "0.0.5", ["name"])219    def run(self, name=None, service_type=None, description=None):220        """Create and delete service.221        :param service_type: type of the service222        :param description: description of the service223        """224        service = self.admin_keystone.create_service(service_type=service_type,225                                                     description=description)226        self.admin_keystone.delete_service(service.id)227@validation.add("required_platform", platform="openstack", admin=True)228@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},229                    name="KeystoneBasic.create_update_and_delete_tenant",230                    platform="openstack")231class CreateUpdateAndDeleteTenant(KeystoneBasic):232    @logging.log_deprecated_args(233        "The 'name_length' argument to create_update_and_delete_tenant is "234        "ignored", "0.1.2", ["name_length"], once=True)235    def run(self, name_length=None, **kwargs):236        """Create, update and delete tenant.237        :param kwargs: Other optional parameters for tenant creation238        """239        project = self.admin_keystone.create_project(**kwargs)240        new_name = self.generate_random_name()241        new_description = self.generate_random_name()242        self.admin_keystone.update_project(project.id, name=new_name,243                                           description=new_description)244        self.admin_keystone.delete_project(project.id)245@validation.add("required_platform", platform="openstack", admin=True)246@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},247                    name="KeystoneBasic.create_user_update_password",248                    platform="openstack")249class CreateUserUpdatePassword(KeystoneBasic):250    @logging.log_deprecated_args(251        "The 'name_length' and 'password_length' arguments to "252        "create_user_update_password are ignored",253        "0.1.2", ["name_length", "password_length"], once=True)254    def run(self, name_length=None, password_length=None):255        """Create user and update password for that user."""256        user = self.admin_keystone.create_user()257        password = self.generate_random_name()258        self.admin_keystone.update_user(user.id, password=password)259@validation.add("required_platform", platform="openstack", admin=True)260@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},261                    name="KeystoneBasic.create_and_list_services",262                    platform="openstack")263class CreateAndListServices(KeystoneBasic):264    @logging.log_deprecated_args(265        "The 'name' argument to create_and_list_services will be ignored",266        "0.0.5", ["name"])267    def run(self, name=None, service_type=None, description=None):268        """Create and list services.269        :param service_type: type of the service270        :param description: description of the service271        """272        self.admin_keystone.create_service(service_type=service_type,273                                           description=description)274        self.admin_keystone.list_services()275@validation.add("required_platform", platform="openstack", users=True)276@scenario.configure(context={"cleanup@openstack": ["keystone"]},277                    name="KeystoneBasic.create_and_list_ec2credentials",278                    platform="openstack")279class CreateAndListEc2Credentials(KeystoneBasic):280    def run(self):281        """Create and List all keystone ec2-credentials."""282        self.keystone.create_ec2credentials(283            self.context["user"]["id"],284            project_id=self.context["tenant"]["id"])285        self.keystone.list_ec2credentials(self.context["user"]["id"])286@validation.add("required_platform", platform="openstack", users=True)287@scenario.configure(context={"cleanup@openstack": ["keystone"]},288                    name="KeystoneBasic.create_and_delete_ec2credential",289                    platform="openstack")290class CreateAndDeleteEc2Credential(KeystoneBasic):291    def run(self):292        """Create and delete keystone ec2-credential."""293        creds = self.keystone.create_ec2credentials(294            self.context["user"]["id"],295            project_id=self.context["tenant"]["id"])296        self.keystone.delete_ec2credential(297            self.context["user"]["id"], access=creds.access)298@validation.add("required_platform", platform="openstack", admin=True)299@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},300                    name="KeystoneBasic.create_and_get_role",301                    platform="openstack")302class CreateAndGetRole(KeystoneBasic):303    def run(self, **kwargs):304        """Create a user role and get it detailed information.305        :param kwargs: Optional additional arguments for roles creation306        """307        role = self.admin_keystone.create_role(**kwargs)308        self.admin_keystone.get_role(role.id)309@validation.add("required_platform", platform="openstack", admin=True)310@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},311                    name="KeystoneBasic.create_and_list_roles",312                    platform="openstack")313class CreateAddListRoles(KeystoneBasic):314    def run(self, create_role_kwargs=None, list_role_kwargs=None):315        """Create a role, then list all roles.316        :param create_role_kwargs: Optional additional arguments for317                                   roles create318        :param list_role_kwargs: Optional additional arguments for roles list319        """320        create_role_kwargs = create_role_kwargs or {}321        list_role_kwargs = list_role_kwargs or {}322        role = self.admin_keystone.create_role(**create_role_kwargs)323        msg = "Role isn't created"324        self.assertTrue(role, err_msg=msg)325        all_roles = self.admin_keystone.list_roles(**list_role_kwargs)326        msg = ("Created role is not in the"327               " list of all available roles")328        self.assertIn(role, all_roles, err_msg=msg)329@validation.add("required_platform", platform="openstack", admin=True)330@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},331                    name="KeystoneBasic.create_and_update_user",332                    platform="openstack")333class CreateAndUpdateUser(KeystoneBasic):334    def run(self, create_user_kwargs=None, update_user_kwargs=None):335        """Create user and update the user.336        :param create_user_kwargs: Optional additional arguments for user337                                   creation338        :param update_user_kwargs: Optional additional arguments for user339                                   updation340        """341        create_user_kwargs = create_user_kwargs or {}342        user = self.admin_keystone.create_user(**create_user_kwargs)343        self.admin_keystone.update_user(user.id, **update_user_kwargs)344        user_data = self.admin_clients("keystone").users.get(user.id)345        for args in update_user_kwargs:346            msg = ("%s isn't updated" % args)347            self.assertEqual(getattr(user_data, str(args)),...test_utils.py
Source:test_utils.py  
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15import mock16from rally.plugins.openstack.scenarios.keystone import utils17from tests.unit import fakes18from tests.unit import test19UTILS = "rally.plugins.openstack.scenarios.keystone.utils."20class KeystoneUtilsTestCase(test.TestCase):21    @mock.patch("rally.common.utils.name_matches_object")22    def test_is_temporary(self, mock_name_matches_object):23        resource = mock.Mock()24        self.assertEqual(utils.is_temporary(resource),25                         mock_name_matches_object.return_value)26        mock_name_matches_object.assert_called_once_with(27            resource.name, utils.KeystoneScenario)28class KeystoneScenarioTestCase(test.ScenarioTestCase):29    @mock.patch("uuid.uuid4", return_value="pwd")30    def test_user_create(self, mock_uuid4):31        scenario = utils.KeystoneScenario(self.context)32        scenario.generate_random_name = mock.Mock(return_value="foobarov")33        result = scenario._user_create()34        self.assertEqual(35            self.admin_clients("keystone").users.create.return_value, result)36        self.admin_clients("keystone").users.create.assert_called_once_with(37            "foobarov",38            password=mock_uuid4.return_value,39            email="foobarov@rally.me")40        mock_uuid4.assert_called_with()41        self._test_atomic_action_timer(scenario.atomic_actions(),42                                       "keystone.create_user")43    def test_update_user_enabled(self):44        user = mock.Mock()45        enabled = mock.Mock()46        scenario = utils.KeystoneScenario(self.context)47        scenario._update_user_enabled(user, enabled)48        self.admin_clients(49            "keystone").users.update_enabled.assert_called_once_with(user,50                                                                     enabled)51        self._test_atomic_action_timer(scenario.atomic_actions(),52                                       "keystone.update_user_enabled")53    def test_role_create(self):54        scenario = utils.KeystoneScenario(self.context)55        scenario.generate_random_name = mock.Mock()56        result = scenario._role_create()57        self.assertEqual(58            self.admin_clients("keystone").roles.create.return_value, result)59        self.admin_clients("keystone").roles.create.assert_called_once_with(60            scenario.generate_random_name.return_value)61        self._test_atomic_action_timer(scenario.atomic_actions(),62                                       "keystone.create_role")63    def test_list_roles_for_user(self):64        user = mock.MagicMock()65        tenant = mock.MagicMock()66        scenario = utils.KeystoneScenario(self.context)67        scenario._list_roles_for_user(user, tenant)68        self.admin_clients(69            "keystone").roles.roles_for_user.assert_called_once_with(user,70                                                                     tenant)71        self._test_atomic_action_timer(scenario.atomic_actions(),72                                       "keystone.list_roles")73    def test_role_add(self):74        user = mock.MagicMock()75        role = mock.MagicMock()76        tenant = mock.MagicMock()77        scenario = utils.KeystoneScenario(self.context)78        scenario._role_add(user=user.id, role=role.id, tenant=tenant.id)79        self.admin_clients(80            "keystone").roles.add_user_role.assert_called_once_with(user.id,81                                                                    role.id,82                                                                    tenant.id)83        self._test_atomic_action_timer(scenario.atomic_actions(),84                                       "keystone.add_role")85    def test_user_delete(self):86        resource = fakes.FakeResource()87        resource.delete = mock.MagicMock()88        scenario = utils.KeystoneScenario(self.context)89        scenario._resource_delete(resource)90        resource.delete.assert_called_once_with()91        r = "keystone.delete_%s" % resource.__class__.__name__.lower()92        self._test_atomic_action_timer(scenario.atomic_actions(), r)93    def test_role_remove(self):94        user = mock.MagicMock()95        role = mock.MagicMock()96        tenant = mock.MagicMock()97        scenario = utils.KeystoneScenario(self.context)98        scenario._role_remove(user=user, role=role, tenant=tenant)99        self.admin_clients(100            "keystone").roles.remove_user_role.assert_called_once_with(user,101                                                                       role,102                                                                       tenant)103        self._test_atomic_action_timer(scenario.atomic_actions(),104                                       "keystone.remove_role")105    def test_tenant_create(self):106        scenario = utils.KeystoneScenario(self.context)107        scenario.generate_random_name = mock.Mock()108        result = scenario._tenant_create()109        self.assertEqual(110            self.admin_clients("keystone").tenants.create.return_value, result)111        self.admin_clients("keystone").tenants.create.assert_called_once_with(112            scenario.generate_random_name.return_value)113        self._test_atomic_action_timer(scenario.atomic_actions(),114                                       "keystone.create_tenant")115    def test_service_create(self):116        service_type = "service_type"117        description = "description"118        scenario = utils.KeystoneScenario(self.context)119        scenario.generate_random_name = mock.Mock()120        result = scenario._service_create(service_type=service_type,121                                          description=description)122        self.assertEqual(123            self.admin_clients("keystone").services.create.return_value,124            result)125        self.admin_clients("keystone").services.create.assert_called_once_with(126            scenario.generate_random_name.return_value,127            service_type, description)128        self._test_atomic_action_timer(scenario.atomic_actions(),129                                       "keystone.create_service")130    def test_tenant_create_with_users(self):131        tenant = mock.MagicMock()132        scenario = utils.KeystoneScenario(self.context)133        scenario.generate_random_name = mock.Mock(return_value="foobarov")134        scenario._users_create(tenant, users_per_tenant=1)135        self.admin_clients("keystone").users.create.assert_called_once_with(136            "foobarov", password="foobarov", email="foobarov@rally.me",137            tenant_id=tenant.id)138        self._test_atomic_action_timer(scenario.atomic_actions(),139                                       "keystone.create_users")140    def test_list_users(self):141        scenario = utils.KeystoneScenario(self.context)142        scenario._list_users()143        self.admin_clients("keystone").users.list.assert_called_once_with()144        self._test_atomic_action_timer(scenario.atomic_actions(),145                                       "keystone.list_users")146    def test_list_tenants(self):147        scenario = utils.KeystoneScenario(self.context)148        scenario._list_tenants()149        self.admin_clients("keystone").tenants.list.assert_called_once_with()150        self._test_atomic_action_timer(scenario.atomic_actions(),151                                       "keystone.list_tenants")152    def test_list_services(self):153        scenario = utils.KeystoneScenario(self.context)154        scenario._list_services()155        self.admin_clients("keystone").services.list.assert_called_once_with()156        self._test_atomic_action_timer(scenario.atomic_actions(),157                                       "keystone.service_list")158    def test_delete_service(self):159        service = mock.MagicMock()160        scenario = utils.KeystoneScenario(self.context)161        scenario._delete_service(service_id=service.id)162        self.admin_clients("keystone").services.delete.assert_called_once_with(163            service.id)164        self._test_atomic_action_timer(scenario.atomic_actions(),165                                       "keystone.delete_service")166    def test_get_tenant(self):167        tenant = mock.MagicMock()168        scenario = utils.KeystoneScenario(self.context)169        scenario._get_tenant(tenant_id=tenant.id)170        self.admin_clients("keystone").tenants.get.assert_called_once_with(171            tenant.id)172        self._test_atomic_action_timer(scenario.atomic_actions(),173                                       "keystone.get_tenant")174    def test_get_user(self):175        user = mock.MagicMock()176        scenario = utils.KeystoneScenario(self.context)177        scenario._get_user(user_id=user.id)178        self.admin_clients("keystone").users.get.assert_called_once_with(179            user.id)180        self._test_atomic_action_timer(scenario.atomic_actions(),181                                       "keystone.get_user")182    def test_get_role(self):183        role = mock.MagicMock()184        scenario = utils.KeystoneScenario(self.context)185        scenario._get_role(role_id=role.id)186        self.admin_clients("keystone").roles.get.assert_called_once_with(187            role.id)188        self._test_atomic_action_timer(scenario.atomic_actions(),189                                       "keystone.get_role")190    def test_get_service(self):191        service = mock.MagicMock()192        scenario = utils.KeystoneScenario(self.context)193        scenario._get_service(service_id=service.id)194        self.admin_clients("keystone").services.get.assert_called_once_with(195            service.id)196        self._test_atomic_action_timer(scenario.atomic_actions(),197                                       "keystone.get_service")198    def test_update_tenant(self):199        tenant = mock.MagicMock()200        description = "new description"201        scenario = utils.KeystoneScenario(self.context)202        scenario.generate_random_name = mock.Mock()203        scenario._update_tenant(tenant=tenant, description=description)204        self.admin_clients("keystone").tenants.update.assert_called_once_with(205            tenant.id, scenario.generate_random_name.return_value,206            description)207        self._test_atomic_action_timer(scenario.atomic_actions(),208                                       "keystone.update_tenant")209    def test_update_user_password(self):210        password = "pswd"211        user = mock.MagicMock()212        scenario = utils.KeystoneScenario(self.context)213        scenario._update_user_password(password=password, user_id=user.id)214        self.admin_clients(215            "keystone").users.update_password.assert_called_once_with(user.id,216                                                                      password)217        self._test_atomic_action_timer(scenario.atomic_actions(),218                                       "keystone.update_user_password")219    @mock.patch("rally.plugins.openstack.scenario.OpenStackScenario."220                "admin_clients")221    def test_update_user_password_v3(self,222                                     mock_open_stack_scenario_admin_clients):223        password = "pswd"224        user = mock.MagicMock()225        scenario = utils.KeystoneScenario()226        type(mock_open_stack_scenario_admin_clients.return_value).version = (227            mock.PropertyMock(return_value="v3"))228        scenario._update_user_password(password=password, user_id=user.id)229        mock_open_stack_scenario_admin_clients(230            "keystone").users.update.assert_called_once_with(231            user.id, password=password)232        self._test_atomic_action_timer(scenario.atomic_actions(),233                                       "keystone.update_user_password")234    def test_get_service_by_name(self):235        scenario = utils.KeystoneScenario(self.context)236        svc_foo, svc_bar = mock.Mock(), mock.Mock()237        scenario._list_services = mock.Mock(return_value=[svc_foo, svc_bar])238        self.assertEqual(scenario._get_service_by_name(svc_bar.name), svc_bar)239        self.assertIsNone(scenario._get_service_by_name("spam"))240    @mock.patch(UTILS + "KeystoneScenario.clients")241    def test_create_ec2credentials(self, mock_clients):242        scenario = utils.KeystoneScenario(self.context)243        creds = mock.Mock()244        mock_clients("keystone").ec2.create.return_value = creds245        create_creds = scenario._create_ec2credentials("user_id",246                                                       "tenant_id")247        self.assertEqual(create_creds, creds)248        mock_clients("keystone").ec2.create.assert_called_once_with(249            "user_id", "tenant_id")250        self._test_atomic_action_timer(scenario.atomic_actions(),251                                       "keystone.create_ec2creds")252    @mock.patch(UTILS + "KeystoneScenario.clients")253    def test_list_ec2credentials(self, mock_clients):254        scenario = utils.KeystoneScenario(self.context)255        creds_list = mock.Mock()256        mock_clients("keystone").ec2.list.return_value = creds_list257        list_creds = scenario._list_ec2credentials("user_id")258        self.assertEqual(list_creds, creds_list)259        mock_clients("keystone").ec2.list.assert_called_once_with("user_id")260        self._test_atomic_action_timer(scenario.atomic_actions(),261                                       "keystone.list_ec2creds")262    @mock.patch(UTILS + "KeystoneScenario.clients")263    def test_delete_ec2credentials(self, mock_clients):264        scenario = utils.KeystoneScenario(self.context)265        mock_clients("keystone").ec2.delete = mock.MagicMock()266        scenario._delete_ec2credential("user_id", "access")267        mock_clients("keystone").ec2.delete.assert_called_once_with("user_id",268                                                                    "access")269        self._test_atomic_action_timer(scenario.atomic_actions(),...utils.py
Source:utils.py  
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15import uuid16from rally.common import utils17from rally.plugins.openstack import scenario18from rally.task import atomic19def is_temporary(resource):20    return utils.name_matches_object(resource.name, KeystoneScenario)21class KeystoneScenario(scenario.OpenStackScenario):22    """Base class for Keystone scenarios with basic atomic actions."""23    @atomic.action_timer("keystone.create_user")24    def _user_create(self, email=None, **kwargs):25        """Creates keystone user with random name.26        :param kwargs: Other optional parameters to create users like27                        "tenant_id", "enabled".28        :returns: keystone user instance29        """30        name = self.generate_random_name()31        # NOTE(boris-42): password and email parameters are required by32        #                 keystone client v2.0. This should be cleanuped33        #                 when we switch to v3.34        password = kwargs.pop("password", str(uuid.uuid4()))35        email = email or (name + "@rally.me")36        return self.admin_clients("keystone").users.create(37            name, password=password, email=email, **kwargs)38    @atomic.action_timer("keystone.update_user_enabled")39    def _update_user_enabled(self, user, enabled):40        """Enable or disable a user.41        :param user: The user to enable or disable42        :param enabled: Boolean indicating if the user should be43                        enabled (True) or disabled (False)44        """45        self.admin_clients("keystone").users.update_enabled(user, enabled)46    def _resource_delete(self, resource):47        """"Delete keystone resource."""48        r = "keystone.delete_%s" % resource.__class__.__name__.lower()49        with atomic.ActionTimer(self, r):50            resource.delete()51    @atomic.action_timer("keystone.create_tenant")52    def _tenant_create(self, **kwargs):53        """Creates keystone tenant with random name.54        :param kwargs: Other optional parameters55        :returns: keystone tenant instance56        """57        name = self.generate_random_name()58        return self.admin_clients("keystone").tenants.create(name, **kwargs)59    @atomic.action_timer("keystone.create_service")60    def _service_create(self, service_type="rally_test_type",61                        description=None):62        """Creates keystone service with random name.63        :param service_type: type of the service64        :param description: description of the service65        :returns: keystone service instance66        """67        description = description or self.generate_random_name()68        return self.admin_clients("keystone").services.create(69            self.generate_random_name(),70            service_type, description)71    @atomic.action_timer("keystone.create_users")72    def _users_create(self, tenant, users_per_tenant):73        """Adds users to a tenant.74        :param tenant: tenant object75        :param users_per_tenant: number of users in per tenant76        """77        for i in range(users_per_tenant):78            name = self.generate_random_name()79            password = name80            email = name + "@rally.me"81            self.admin_clients("keystone").users.create(82                name, password=password, email=email, tenant_id=tenant.id)83    @atomic.action_timer("keystone.create_role")84    def _role_create(self):85        """Creates keystone user role with random name.86        :returns: keystone user role instance87        """88        role = self.admin_clients("keystone").roles.create(89            self.generate_random_name())90        return role91    @atomic.action_timer("keystone.list_users")92    def _list_users(self):93        """List users."""94        return self.admin_clients("keystone").users.list()95    @atomic.action_timer("keystone.list_tenants")96    def _list_tenants(self):97        """List tenants."""98        return self.admin_clients("keystone").tenants.list()99    @atomic.action_timer("keystone.service_list")100    def _list_services(self):101        """List services."""102        return self.admin_clients("keystone").services.list()103    @atomic.action_timer("keystone.list_roles")104    def _list_roles_for_user(self, user, tenant):105        """List user roles.106        :param user: user for whom roles will be listed107        :param tenant: tenant on which user have roles108        """109        return self.admin_clients("keystone").roles.roles_for_user(110            user, tenant)111    @atomic.action_timer("keystone.add_role")112    def _role_add(self, user, role, tenant):113        """Add role to a given user on a tenant.114        :param user: user to be assigned the role to115        :param role: user role to assign with116        :param tenant: tenant on which assignation will take place117        """118        self.admin_clients("keystone").roles.add_user_role(user, role, tenant)119    @atomic.action_timer("keystone.remove_role")120    def _role_remove(self, user, role, tenant):121        """Dissociate user with role.122        :param user: user to be stripped with role123        :param role: role to be dissociated with user124        :param tenant: tenant on which assignation took place125        """126        self.admin_clients("keystone").roles.remove_user_role(user,127                                                              role, tenant)128    @atomic.action_timer("keystone.get_tenant")129    def _get_tenant(self, tenant_id):130        """Get given tenant.131        :param tenant_id: tenant object132        """133        return self.admin_clients("keystone").tenants.get(tenant_id)134    @atomic.action_timer("keystone.get_user")135    def _get_user(self, user_id):136        """Get given user.137        :param user_id: user object138        """139        return self.admin_clients("keystone").users.get(user_id)140    @atomic.action_timer("keystone.get_role")141    def _get_role(self, role_id):142        """Get given user role.143        :param role_id: user role object144        """145        return self.admin_clients("keystone").roles.get(role_id)146    @atomic.action_timer("keystone.get_service")147    def _get_service(self, service_id):148        """Get service with given service id.149        :param service_id: id for service object150        """151        return self.admin_clients("keystone").services.get(service_id)152    def _get_service_by_name(self, name):153        for i in self._list_services():154            if i.name == name:155                return i156    @atomic.action_timer("keystone.delete_service")157    def _delete_service(self, service_id):158        """Delete service.159        :param service_id: service to be deleted160        """161        self.admin_clients("keystone").services.delete(service_id)162    @atomic.action_timer("keystone.update_tenant")163    def _update_tenant(self, tenant, description=None):164        """Update tenant name and description.165        :param tenant: tenant to be updated166        :param description: tenant description to be set167        """168        name = self.generate_random_name()169        description = description or self.generate_random_name()170        self.admin_clients("keystone").tenants.update(tenant.id,171                                                      name, description)172    @atomic.action_timer("keystone.update_user_password")173    def _update_user_password(self, user_id, password):174        """Update user password.175        :param user_id: id of the user176        :param password: new password177        """178        admin_clients = self.admin_clients("keystone")179        if admin_clients.version in ["v3"]:180            admin_clients.users.update(user_id, password=password)181        else:182            admin_clients.users.update_password(user_id, password)183    @atomic.action_timer("keystone.create_ec2creds")184    def _create_ec2credentials(self, user_id, tenant_id):185        """Create ec2credentials.186        :param user_id: User ID for which to create credentials187        :param tenant_id: Tenant ID for which to create credentials188        :returns: Created ec2-credentials object189        """190        return self.clients("keystone").ec2.create(user_id, tenant_id)191    @atomic.action_timer("keystone.list_ec2creds")192    def _list_ec2credentials(self, user_id):193        """List of access/secret pairs for a user_id.194        :param user_id: List all ec2-credentials for User ID195        :returns: Return ec2-credentials list196        """197        return self.clients("keystone").ec2.list(user_id)198    @atomic.action_timer("keystone.delete_ec2creds")199    def _delete_ec2credential(self, user_id, access):200        """Delete ec2credential.201        :param user_id: User ID for which to delete credential202        :param access: access key for ec2credential to delete203        """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
