Best Python code snippet using tempest_python
validation_resources.py
Source:validation_resources.py  
...107                    LOG.exception(msg, sgc_exc, cleanup_exc)108    LOG.debug("SSH Validation resource security group with tcp and icmp "109              "rules %s created", sg_name)110    return security_group111def create_validation_resources(clients, keypair=False, floating_ip=False,112                                security_group=False,113                                security_group_rules=False,114                                ethertype='IPv4', use_neutron=True,115                                floating_network_id=None,116                                floating_network_name=None):117    """Provision resources for VM ping/ssh testing118    Create resources required to be able to ping / ssh a virtual machine:119    keypair, security group, security group rules and a floating IP.120    Which of those resources are required may depend on the cloud setup and on121    the specific test and it can be controlled via the corresponding122    arguments.123    Provisioned resources are returned in a dictionary.124    :param clients: Instance of `tempest.lib.services.clients.ServiceClients`125        or of a subclass of it. Resources are provisioned using clients from126        `clients`.127    :param keypair: Whether to provision a keypair. Defaults to False.128    :param floating_ip: Whether to provision a floating IP. Defaults to False.129    :param security_group: Whether to provision a security group. Defaults to130        False.131    :param security_group_rules: Whether to provision security group rules.132        Defaults to False.133    :param ethertype: 'IPv4' or 'IPv6'. Honoured only in case neutron is used.134    :param use_neutron: When True resources are provisioned via neutron, when135        False resources are provisioned via nova.136    :param floating_network_id: The id of the network used to provision a137        floating IP. Only used if a floating IP is requested and with neutron.138    :param floating_network_name: The name of the floating IP pool used to139        provision the floating IP. Only used if a floating IP is requested and140        with nova-net.141    :returns: A dictionary with the resources in the format they are returned142        by the API. Valid keys are 'keypair', 'floating_ip' and143        'security_group'.144    Examples::145        from tempest.common import validation_resources as vr146        from tempest.lib import auth147        from tempest.lib.services import clients148        creds = auth.get_credentials('http://mycloud/identity/v3',149                                     username='me', project_name='me',150                                     password='secret', domain_name='Default')151        osclients = clients.ServiceClients(creds, 'http://mycloud/identity/v3')152        # Request keypair and floating IP153        resources = dict(keypair=True, security_group=False,154                         security_group_rules=False, floating_ip=True)155        resources = vr.create_validation_resources(156            osclients, use_neutron=True,157            floating_network_id='4240E68E-23DA-4C82-AC34-9FEFAA24521C',158            **resources)159        # The floating IP to be attached to the VM160        floating_ip = resources['floating_ip']['ip']161    """162    # Create and Return the validation resources required to validate a VM163    msg = ('Requested validation resources keypair %s, floating IP %s, '164           'security group %s')165    LOG.debug(msg, keypair, floating_ip, security_group)166    validation_data = {}167    try:168        if keypair:169            keypair_name = data_utils.rand_name('keypair')170            validation_data.update(171                clients.compute.KeyPairsClient().create_keypair(172                    name=keypair_name))173            LOG.debug("Validation resource key %s created", keypair_name)174        if security_group:175            validation_data['security_group'] = create_ssh_security_group(176                clients, add_rule=security_group_rules,177                use_neutron=use_neutron, ethertype=ethertype)178        if floating_ip:179            floating_ip_client = _network_service(180                clients, use_neutron).FloatingIPsClient()181            if use_neutron:182                floatingip = floating_ip_client.create_floatingip(183                    floating_network_id=floating_network_id)184                # validation_resources['floating_ip'] has historically looked185                # like a compute API POST /os-floating-ips response, so we need186                # to mangle it a bit for a Neutron response with different187                # fields.188                validation_data['floating_ip'] = floatingip['floatingip']189                validation_data['floating_ip']['ip'] = (190                    floatingip['floatingip']['floating_ip_address'])191            else:192                # NOTE(mriedem): The os-floating-ips compute API was deprecated193                # in the 2.36 microversion. Any tests for CRUD operations on194                # floating IPs using the compute API should be capped at 2.35.195                validation_data.update(floating_ip_client.create_floating_ip(196                    pool=floating_network_name))197            LOG.debug("Validation resource floating IP %s created",198                      validation_data['floating_ip'])199    except Exception as prov_exc:200        # If something goes wrong, cleanup as much as possible before we201        # re-raise the exception202        with excutils.save_and_reraise_exception():203            if validation_data:204                # Cleanup may fail as well205                try:206                    msg = ('Error while provisioning validation resources %s. '207                           'Trying to cleanup what we provisioned so far: %s')208                    # The exceptions logging is already handled, so using209                    # debug here just to provide more context210                    LOG.debug(msg, prov_exc, str(validation_data))211                    clear_validation_resources(212                        clients,213                        keypair=validation_data.get('keypair', None),214                        floating_ip=validation_data.get('floating_ip', None),215                        security_group=validation_data.get('security_group',216                                                           None),217                        use_neutron=use_neutron)218                except Exception as cleanup_exc:219                    msg = ('Error during cleanup of validation resources. '220                           'The cleanup was triggered by an exception during '221                           'the provisioning step.\n'222                           'Provisioning exception: %s\n'223                           'First cleanup exception: %s')224                    LOG.exception(msg, prov_exc, cleanup_exc)225    return validation_data226def clear_validation_resources(clients, keypair=None, floating_ip=None,227                               security_group=None, use_neutron=True):228    """Cleanup resources for VM ping/ssh testing229    Cleanup a set of resources provisioned via `create_validation_resources`.230    In case of errors during cleanup, the exception is logged and the cleanup231    process is continued. The first exception that was raised is re-raised232    after the cleanup is complete.233    :param clients: Instance of `tempest.lib.services.clients.ServiceClients`234        or of a subclass of it. Resources are provisioned using clients from235        `clients`.236    :param keypair: A dictionary with the keypair to be deleted. Defaults to237        None.238    :param floating_ip: A dictionary with the floating_ip to be deleted.239        Defaults to None.240    :param security_group: A dictionary with the security_group to be deleted.241        Defaults to None.242    :param use_neutron: When True resources are provisioned via neutron, when243        False resources are provisioned via nova.244    Examples::245        from tempest.common import validation_resources as vr246        from tempest.lib import auth247        from tempest.lib.services import clients248        creds = auth.get_credentials('http://mycloud/identity/v3',249                                     username='me', project_name='me',250                                     password='secret', domain_name='Default')251        osclients = clients.ServiceClients(creds, 'http://mycloud/identity/v3')252        # Request keypair and floating IP253        resources = dict(keypair=True, security_group=False,254                         security_group_rules=False, floating_ip=True)255        resources = vr.create_validation_resources(256            osclients, validation_resources=resources, use_neutron=True,257            floating_network_id='4240E68E-23DA-4C82-AC34-9FEFAA24521C')258        # Now cleanup the resources259        try:260            vr.clear_validation_resources(osclients, use_neutron=True,261                                          **resources)262        except Exception as e:263            LOG.exception('Something went wrong during cleanup, ignoring')264    """265    has_exception = None266    if keypair:267        keypair_client = clients.compute.KeyPairsClient()268        keypair_name = keypair['name']269        try:270            keypair_client.delete_keypair(keypair_name)271        except lib_exc.NotFound:272            LOG.warning(273                "Keypair %s is not found when attempting to delete",274                keypair_name275            )276        except Exception as exc:277            LOG.exception('Exception raised while deleting key %s',278                          keypair_name)279            if not has_exception:280                has_exception = exc281    network_service = _network_service(clients, use_neutron)282    if security_group:283        security_group_client = network_service.SecurityGroupsClient()284        sec_id = security_group['id']285        try:286            security_group_client.delete_security_group(sec_id)287            security_group_client.wait_for_resource_deletion(sec_id)288        except lib_exc.NotFound:289            LOG.warning("Security group %s is not found when attempting "290                        "to delete", sec_id)291        except lib_exc.Conflict as exc:292            LOG.exception('Conflict while deleting security '293                          'group %s VM might not be deleted', sec_id)294            if not has_exception:295                has_exception = exc296        except Exception as exc:297            LOG.exception('Exception raised while deleting security '298                          'group %s', sec_id)299            if not has_exception:300                has_exception = exc301    if floating_ip:302        floating_ip_client = network_service.FloatingIPsClient()303        fip_id = floating_ip['id']304        try:305            if use_neutron:306                floating_ip_client.delete_floatingip(fip_id)307            else:308                floating_ip_client.delete_floating_ip(fip_id)309        except lib_exc.NotFound:310            LOG.warning('Floating ip %s not found while attempting to '311                        'delete', fip_id)312        except Exception as exc:313            LOG.exception('Exception raised while deleting ip %s', fip_id)314            if not has_exception:315                has_exception = exc316    if has_exception:317        raise has_exception318class ValidationResourcesFixture(fixtures.Fixture):319    """Fixture to provision and cleanup validation resources"""320    DICT_KEYS = ['keypair', 'security_group', 'floating_ip']321    def __init__(self, clients, keypair=False, floating_ip=False,322                 security_group=False, security_group_rules=False,323                 ethertype='IPv4', use_neutron=True, floating_network_id=None,324                 floating_network_name=None):325        """Create a ValidationResourcesFixture326        Create a ValidationResourcesFixture fixtures, which provisions the327        resources required to be able to ping / ssh a virtual machine upon328        setUp and clears them out upon cleanup. Resources are  keypair,329        security group, security group rules and a floating IP - depending330        on the params.331        The fixture exposes a dictionary that includes provisioned resources.332        :param clients: `tempest.lib.services.clients.ServiceClients` or of a333            subclass of it. Resources are provisioned using clients from334            `clients`.335        :param keypair: Whether to provision a keypair. Defaults to False.336        :param floating_ip: Whether to provision a floating IP.337            Defaults to False.338        :param security_group: Whether to provision a security group.339            Defaults to False.340        :param security_group_rules: Whether to provision security group rules.341            Defaults to False.342        :param ethertype: 'IPv4' or 'IPv6'. Honoured only if neutron is used.343        :param use_neutron: When True resources are provisioned via neutron,344            when False resources are provisioned via nova.345        :param floating_network_id: The id of the network used to provision a346            floating IP. Only used if a floating IP is requested in case347            neutron is used.348        :param floating_network_name: The name of the floating IP pool used to349            provision the floating IP. Only used if a floating IP is requested350            and with nova-net.351        :returns: A dictionary with the same keys as the input352            `validation_resources` and the resources for values in the format353             they are returned by the API.354        Examples::355            from tempest.common import validation_resources as vr356            from tempest.lib import auth357            from tempest.lib.services import clients358            import testtools359            class TestWithVR(testtools.TestCase):360                def setUp(self):361                    creds = auth.get_credentials(362                        'http://mycloud/identity/v3',363                         username='me', project_name='me',364                         password='secret', domain_name='Default')365                    osclients = clients.ServiceClients(366                        creds, 'http://mycloud/identity/v3')367                    # Request keypair and floating IP368                    resources = dict(keypair=True, security_group=False,369                                     security_group_rules=False,370                                     floating_ip=True)371                    network_id = '4240E68E-23DA-4C82-AC34-9FEFAA24521C'372                    self.vr = self.useFixture(vr.ValidationResourcesFixture(373                        osclients, use_neutron=True,374                        floating_network_id=network_id,375                        **resources)376                def test_use_ip(self):377                    # The floating IP to be attached to the VM378                    floating_ip = self.vr['floating_ip']['ip']379        """380        self._clients = clients381        self._keypair = keypair382        self._floating_ip = floating_ip383        self._security_group = security_group384        self._security_group_rules = security_group_rules385        self._ethertype = ethertype386        self._use_neutron = use_neutron387        self._floating_network_id = floating_network_id388        self._floating_network_name = floating_network_name389        self._validation_resources = None390    def _setUp(self):391        msg = ('Requested setup of ValidationResources keypair %s, floating '392               'IP %s, security group %s')393        LOG.debug(msg, self._keypair, self._floating_ip, self._security_group)394        self._validation_resources = create_validation_resources(395            self._clients, keypair=self._keypair,396            floating_ip=self._floating_ip,397            security_group=self._security_group,398            security_group_rules=self._security_group_rules,399            ethertype=self._ethertype, use_neutron=self._use_neutron,400            floating_network_id=self._floating_network_id,401            floating_network_name=self._floating_network_name)402        # If provisioning raises an exception we won't have anything to403        # cleanup here, so we don't need a try-finally around provisioning404        vr = self._validation_resources405        self.addCleanup(clear_validation_resources, self._clients,406                        keypair=vr.get('keypair', None),407                        floating_ip=vr.get('floating_ip', None),408                        security_group=vr.get('security_group', None),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
