How to use get_identity_version method in tempest

Best Python code snippet using tempest_python

test.py

Source:test.py Github

copy

Full Screen

...266 In general skip checks that require an API call are discouraged.267 If one is really needed it may be implemented either in the268 resource_setup or at test level.269 """270 identity_version = cls.get_identity_version()271 if 'admin' in cls.credentials and not credentials.is_admin_available(272 identity_version=identity_version):273 msg = "Missing Identity Admin API credentials in configuration."274 raise cls.skipException(msg)275 if 'alt' in cls.credentials and not credentials.is_alt_available(276 identity_version=identity_version):277 msg = "Missing a 2nd set of API credentials in configuration."278 raise cls.skipException(msg)279 if hasattr(cls, 'identity_version'):280 if cls.identity_version == 'v2':281 if not CONF.identity_feature_enabled.api_v2:282 raise cls.skipException("Identity api v2 is not enabled")283 elif cls.identity_version == 'v3':284 if not CONF.identity_feature_enabled.api_v3:285 raise cls.skipException("Identity api v3 is not enabled")286 @classmethod287 def setup_credentials(cls):288 """Allocate credentials and the client managers from them.289 A test class that requires network resources must override290 setup_credentials and defined the required resources before super291 is invoked.292 """293 for credentials_type in cls.credentials:294 # This may raise an exception in case credentials are not available295 # In that case we want to let the exception through and the test296 # fail accordingly297 if isinstance(credentials_type, six.string_types):298 manager = cls.get_client_manager(299 credential_type=credentials_type)300 setattr(cls, 'os_%s' % credentials_type, manager)301 # Setup some common aliases302 # TODO(andreaf) The aliases below are a temporary hack303 # to avoid changing too much code in one patch. They should304 # be removed eventually305 if credentials_type == 'primary':306 cls.os = cls.manager = cls.os_primary307 if credentials_type == 'admin':308 cls.os_adm = cls.admin_manager = cls.os_admin309 if credentials_type == 'alt':310 cls.alt_manager = cls.os_alt311 elif isinstance(credentials_type, list):312 manager = cls.get_client_manager(roles=credentials_type[1:],313 force_new=True)314 setattr(cls, 'os_roles_%s' % credentials_type[0], manager)315 @classmethod316 def setup_clients(cls):317 """Create links to the clients into the test object."""318 # TODO(andreaf) There is a fair amount of code that could me moved from319 # base / test classes in here. Ideally tests should be able to only320 # specify which client is `client` and nothing else.321 pass322 @classmethod323 def resource_setup(cls):324 """Class level resource setup for test cases."""325 if hasattr(cls, "os"):326 cls.validation_resources = vresources.create_validation_resources(327 cls.os, cls.validation_resources)328 else:329 LOG.warning("Client manager not found, validation resources not"330 " created")331 @classmethod332 def resource_cleanup(cls):333 """Class level resource cleanup for test cases.334 Resource cleanup must be able to handle the case of partially setup335 resources, in case a failure during `resource_setup` should happen.336 """337 if cls.validation_resources:338 if hasattr(cls, "os"):339 vresources.clear_validation_resources(cls.os,340 cls.validation_resources)341 cls.validation_resources = {}342 else:343 LOG.warning("Client manager not found, validation resources "344 "not deleted")345 def setUp(self):346 super(BaseTestCase, self).setUp()347 if not self.setUpClassCalled:348 raise RuntimeError("setUpClass does not calls the super's"349 "setUpClass in the "350 + self.__class__.__name__)351 at_exit_set.add(self.__class__)352 test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)353 try:354 test_timeout = int(test_timeout)355 except ValueError:356 test_timeout = 0357 if test_timeout > 0:358 self.useFixture(fixtures.Timeout(test_timeout, gentle=True))359 if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or360 os.environ.get('OS_STDOUT_CAPTURE') == '1'):361 stdout = self.useFixture(fixtures.StringStream('stdout')).stream362 self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))363 if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or364 os.environ.get('OS_STDERR_CAPTURE') == '1'):365 stderr = self.useFixture(fixtures.StringStream('stderr')).stream366 self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))367 if (os.environ.get('OS_LOG_CAPTURE') != 'False' and368 os.environ.get('OS_LOG_CAPTURE') != '0'):369 self.useFixture(fixtures.LoggerFixture(nuke_handlers=False,370 format=self.log_format,371 level=None))372 @property373 def credentials_provider(self):374 return self._get_credentials_provider()375 @property376 def identity_utils(self):377 """A client that abstracts v2 and v3 identity operations.378 This can be used for creating and tearing down projects in tests. It379 should not be used for testing identity features.380 """381 if CONF.identity.auth_version == 'v2':382 client = self.os_admin.identity_client383 project_client = self.os_admin.tenants_client384 roles_client = self.os_admin.roles_client385 users_client = self.os_admin.users_client386 else:387 client = self.os_admin.identity_v3_client388 project_client = None389 roles_client = None390 users_client = None391 try:392 domain = client.auth_provider.credentials.project_domain_name393 except AttributeError:394 domain = 'Default'395 return cred_client.get_creds_client(client, project_client,396 roles_client,397 users_client,398 project_domain_name=domain)399 @classmethod400 def get_identity_version(cls):401 """Returns the identity version used by the test class"""402 identity_version = getattr(cls, 'identity_version', None)403 return identity_version or CONF.identity.auth_version404 @classmethod405 def _get_credentials_provider(cls):406 """Returns a credentials provider407 If no credential provider exists yet creates one.408 It uses self.identity_version if defined, or the configuration value409 """410 if (not hasattr(cls, '_creds_provider') or not cls._creds_provider or411 not cls._creds_provider.name == cls.__name__):412 force_tenant_isolation = getattr(cls, 'force_tenant_isolation',413 False)414 cls._creds_provider = credentials.get_credentials_provider(415 name=cls.__name__, network_resources=cls.network_resources,416 force_tenant_isolation=force_tenant_isolation,417 identity_version=cls.get_identity_version())418 return cls._creds_provider419 @classmethod420 def get_client_manager(cls, credential_type=None, roles=None,421 force_new=None):422 """Returns an OpenStack client manager423 Returns an OpenStack client manager based on either credential_type424 or a list of roles. If neither is specified, it defaults to425 credential_type 'primary'426 :param credential_type: string - primary, alt or admin427 :param roles: list of roles428 :returns: the created client manager429 :raises skipException: if the requested credentials are not available430 """431 if all([roles, credential_type]):432 msg = "Cannot get credentials by type and roles at the same time"433 raise ValueError(msg)434 if not any([roles, credential_type]):435 credential_type = 'primary'436 cred_provider = cls._get_credentials_provider()437 if roles:438 for role in roles:439 if not cred_provider.is_role_available(role):440 skip_msg = (441 "%s skipped because the configured credential provider"442 " is not able to provide credentials with the %s role "443 "assigned." % (cls.__name__, role))444 raise cls.skipException(skip_msg)445 params = dict(roles=roles)446 if force_new is not None:447 params.update(force_new=force_new)448 creds = cred_provider.get_creds_by_roles(**params)449 else:450 credentials_method = 'get_%s_creds' % credential_type451 if hasattr(cred_provider, credentials_method):452 creds = getattr(cred_provider, credentials_method)()453 else:454 raise exceptions.InvalidCredentials(455 "Invalid credentials type %s" % credential_type)456 return clients.Manager(credentials=creds, service=cls._service,457 api_microversions=cls.services_microversion)458 @classmethod459 def clear_credentials(cls):460 """Clears creds if set"""461 if hasattr(cls, '_creds_provider'):462 cls._creds_provider.clear_creds()463 @classmethod464 def set_validation_resources(cls, keypair=None, floating_ip=None,465 security_group=None,466 security_group_rules=None):467 """Specify which ssh server validation resources should be created.468 Each of the argument must be set to either None, True or False, with469 None - use default from config (security groups and security group470 rules get created when set to None)471 False - Do not create the validation resource472 True - create the validation resource473 @param keypair474 @param security_group475 @param security_group_rules476 @param floating_ip477 """478 if not CONF.validation.run_validation:479 return480 if keypair is None:481 if CONF.validation.auth_method.lower() == "keypair":482 keypair = True483 else:484 keypair = False485 if floating_ip is None:486 if CONF.validation.connect_method.lower() == "floating":487 floating_ip = True488 else:489 floating_ip = False490 if security_group is None:491 security_group = CONF.validation.security_group492 if security_group_rules is None:493 security_group_rules = CONF.validation.security_group_rules494 if not cls.validation_resources:495 cls.validation_resources = {496 'keypair': keypair,497 'security_group': security_group,498 'security_group_rules': security_group_rules,499 'floating_ip': floating_ip}500 @classmethod501 def set_network_resources(cls, network=False, router=False, subnet=False,502 dhcp=False):503 """Specify which network resources should be created504 @param network505 @param router506 @param subnet507 @param dhcp508 """509 # network resources should be set only once from callers510 # in order to ensure that even if it's called multiple times in511 # a chain of overloaded methods, the attribute is set only512 # in the leaf class513 if not cls.network_resources:514 cls.network_resources = {515 'network': network,516 'router': router,517 'subnet': subnet,518 'dhcp': dhcp}519 @classmethod520 def get_tenant_network(cls):521 """Get the network to be used in testing522 :return: network dict including 'id' and 'name'523 """524 # Make sure cred_provider exists and get a network client525 networks_client = cls.get_client_manager().compute_networks_client526 cred_provider = cls._get_credentials_provider()527 # In case of nova network, isolated tenants are not able to list the528 # network configured in fixed_network_name, even if the can use it529 # for their servers, so using an admin network client to validate530 # the network name531 if (not CONF.service_available.neutron and532 credentials.is_admin_available(533 identity_version=cls.get_identity_version())):534 admin_creds = cred_provider.get_admin_creds()535 admin_manager = clients.Manager(536 admin_creds, api_microversions=cls.services_microversion)537 networks_client = admin_manager.compute_networks_client538 return fixed_network.get_tenant_network(539 cred_provider, networks_client, CONF.compute.fixed_network_name)540 def assertEmpty(self, list, msg=None):541 self.assertTrue(len(list) == 0, msg)542 def assertNotEmpty(self, list, msg=None):543 self.assertTrue(len(list) > 0, msg)544class NegativeAutoTest(BaseTestCase):545 _resources = {}546 @classmethod547 def setUpClass(cls):548 super(NegativeAutoTest, cls).setUpClass()549 os = cls.get_client_manager(credential_type='primary')550 cls.client = os.negative_client551 @staticmethod552 def load_tests(*args):553 """Wrapper for testscenarios554 To set the mandatory scenarios variable only in case a real test555 loader is in place. Will be automatically called in case the variable556 "load_tests" is set.557 """558 if getattr(args[0], 'suiteClass', None) is not None:559 loader, standard_tests, pattern = args560 else:561 standard_tests, module, loader = args562 for test in testtools.iterate_tests(standard_tests):563 schema = getattr(test, '_schema', None)564 if schema is not None:565 setattr(test, 'scenarios',566 NegativeAutoTest.generate_scenario(schema))567 return testscenarios.load_tests_apply_scenarios(*args)568 @staticmethod569 def generate_scenario(description):570 """Generates the test scenario list for a given description.571 :param description: A file or dictionary with the following entries:572 name (required) name for the api573 http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE574 url (required) the url to be appended to the catalog url with '%s'575 for each resource mentioned576 resources: (optional) A list of resource names such as "server",577 "flavor", etc. with an element for each '%s' in the url. This578 method will call self.get_resource for each element when579 constructing the positive test case template so negative580 subclasses are expected to return valid resource ids when581 appropriate.582 json-schema (optional) A valid json schema that will be used to583 create invalid data for the api calls. For "GET" and "HEAD",584 the data is used to generate query strings appended to the url,585 otherwise for the body of the http call.586 """587 LOG.debug(description)588 generator = importutils.import_class(589 CONF.negative.test_generator)()590 generator.validate_schema(description)591 schema = description.get("json-schema", None)592 resources = description.get("resources", [])593 scenario_list = []594 expected_result = None595 for resource in resources:596 if isinstance(resource, dict):597 expected_result = resource['expected_result']598 resource = resource['name']599 LOG.debug("Add resource to test %s" % resource)600 scn_name = "inv_res_%s" % (resource)601 scenario_list.append((scn_name, {"resource": (resource,602 str(uuid.uuid4())),603 "expected_result": expected_result604 }))605 if schema is not None:606 for scenario in generator.generate_scenarios(schema):607 scenario_list.append((scenario['_negtest_name'],608 scenario))609 LOG.debug(scenario_list)610 return scenario_list611 def execute(self, description):612 """Execute a http call613 Execute a http call on an api that are expected to614 result in client errors. First it uses invalid resources that are part615 of the url, and then invalid data for queries and http request bodies.616 :param description: A json file or dictionary with the following617 entries:618 name (required) name for the api619 http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE620 url (required) the url to be appended to the catalog url with '%s'621 for each resource mentioned622 resources: (optional) A list of resource names such as "server",623 "flavor", etc. with an element for each '%s' in the url. This624 method will call self.get_resource for each element when625 constructing the positive test case template so negative626 subclasses are expected to return valid resource ids when627 appropriate.628 json-schema (optional) A valid json schema that will be used to629 create invalid data for the api calls. For "GET" and "HEAD",630 the data is used to generate query strings appended to the url,631 otherwise for the body of the http call.632 """633 LOG.info("Executing %s" % description["name"])634 LOG.debug(description)635 generator = importutils.import_class(636 CONF.negative.test_generator)()637 schema = description.get("json-schema", None)638 method = description["http-method"]639 url = description["url"]640 expected_result = None641 if "default_result_code" in description:642 expected_result = description["default_result_code"]643 resources = [self.get_resource(r) for644 r in description.get("resources", [])]645 if hasattr(self, "resource"):646 # Note(mkoderer): The resources list already contains an invalid647 # entry (see get_resource).648 # We just send a valid json-schema with it649 valid_schema = None650 if schema:651 valid_schema = \652 valid.ValidTestGenerator().generate_valid(schema)653 new_url, body = self._http_arguments(valid_schema, url, method)654 elif hasattr(self, "_negtest_name"):655 schema_under_test = \656 valid.ValidTestGenerator().generate_valid(schema)657 local_expected_result = \658 generator.generate_payload(self, schema_under_test)659 if local_expected_result is not None:660 expected_result = local_expected_result661 new_url, body = \662 self._http_arguments(schema_under_test, url, method)663 else:664 raise Exception("testscenarios are not active. Please make sure "665 "that your test runner supports the load_tests "666 "mechanism")667 if "admin_client" in description and description["admin_client"]:668 if not credentials.is_admin_available(669 identity_version=self.get_identity_version()):670 msg = ("Missing Identity Admin API credentials in"671 "configuration.")672 raise self.skipException(msg)673 creds = self.credentials_provider.get_admin_creds()674 os_adm = clients.Manager(credentials=creds)675 client = os_adm.negative_client676 else:677 client = self.client678 resp, resp_body = client.send_request(method, new_url,679 resources, body=body)680 self._check_negative_response(expected_result, resp.status, resp_body)681 def _http_arguments(self, json_dict, url, method):682 LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))683 if not json_dict:...

Full Screen

Full Screen

test_credentials_factory.py

Source:test_credentials_factory.py Github

copy

Full Screen

...143 mock_get.assert_called_once_with(144 name=mock_cls.__name__,145 network_resources=mock_cls.network_resources,146 force_tenant_isolation=tenant_iso,147 identity_version=mock_cls.get_identity_version())148 def _do_client_manager_test(self, cls, roles=None):149 mock_gcm = mock.Mock()150 mock_man = mock.Mock()151 mock_gcm.return_value = mock_man152 my_super = mock.Mock()153 my_super.get_client_manager = mock_gcm154 with mock.patch('cinnamon_role.credentials_factory.super') as mock_sup:155 mock_sup.return_value = my_super156 cm = creds._get_client_manager(cls, credential_type='foo',157 roles=roles, force_new=True)158 self.assertEqual(mock_man, cm)159 mock_gcm.assert_called_once_with(credential_type='foo', roles=roles,...

Full Screen

Full Screen

create_instance.py

Source:create_instance.py Github

copy

Full Screen

...21USER = os.environ.get('SSH_USER') or 'root'22SSH_PASSWORD = os.environ.get('SSH_PASSWORD') or ''23SSH_KEY = os.environ.get('SSH_KEY') or None24SSH_WAIT_TIME = int(os.environ.get('SSH_WAIT_TIME') or 120)25def get_identity_version(auth_url):26 return auth_url.split('/')[-1][-1]27def get_session():28 kwargs = {'auth_url': OS_AUTH_URL, 'username': OS_USERNAME,29 'password': OS_PASSWORD}30 auth_version = get_identity_version(OS_AUTH_URL)31 if auth_version == '3':32 kwargs.update({'user_domain_name': OS_USER_DOMAIN_NAME,33 'project_domain_name': OS_PROJECT_DOMAIN_NAME,34 'project_name': OS_PROJECT_NAME})35 loader = loading.get_plugin_loader('password')36 auth = loader.load_from_options(**kwargs)37 sess = session.Session(auth=auth, verify=False)38 return sess39def get_nova_client():40 return novaclient.Client(version='2', session=get_session())41def get_neutron_client():42 return neutronclient.Client(session=get_session())43def get_glance_client():44 return glanceclient(version='2', session=get_session())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful