Best Python code snippet using tempest_python
test_waiters.py
Source:test_waiters.py  
...210                           build_timeout=5,211                           show_volume=show_volume)212        self.patch('time.time')213        self.patch('time.sleep')214        waiters.wait_for_volume_attachment_remove(client, uuids.volume_id,215                                                  uuids.attachment_id)216        # Assert that show volume is called until the attachment is removed.217        show_volume.assert_has_calls = [mock.call(uuids.volume_id),218                                        mock.call(uuids.volume_id),219                                        mock.call(uuids.volume_id)]220    def test_wait_for_volume_attachment_timeout(self):221        show_volume = mock.MagicMock(return_value={222            'volume': {'attachments': [223                {'attachment_id': uuids.attachment_id}]}})224        client = mock.Mock(spec=volumes_client.VolumesClient,225                           build_interval=1,226                           build_timeout=1,227                           show_volume=show_volume)228        self.patch('time.time', side_effect=[0., client.build_timeout + 1.])229        self.patch('time.sleep')230        # Assert that a timeout is raised if the attachment remains.231        self.assertRaises(lib_exc.TimeoutException,232                          waiters.wait_for_volume_attachment_remove,233                          client, uuids.volume_id, uuids.attachment_id)234    def test_wait_for_volume_attachment_not_present(self):235        show_volume = mock.MagicMock(return_value={236            'volume': {'attachments': []}})237        client = mock.Mock(spec=volumes_client.VolumesClient,238                           build_interval=1,239                           build_timeout=1,240                           show_volume=show_volume)241        self.patch('time.time', side_effect=[0., client.build_timeout + 1.])242        self.patch('time.sleep')243        waiters.wait_for_volume_attachment_remove(client, uuids.volume_id,244                                                  uuids.attachment_id)245        # Assert that show volume is only called once before we return...manager.py
Source:manager.py  
1# Copyright 2021 Red Hat, Inc.2# All Rights Reserved.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15from oslo_log import log16from tempest.common import waiters17from tempest import config18from tempest.lib.common.utils import data_utils19from tempest.lib.common.utils import test_utils20from tempest.lib import exceptions as lib_exc21from tempest.scenario import manager22CONF = config.CONF23LOG = log.getLogger(__name__)24class ScenarioTest(manager.ScenarioTest):25    credentials = ['primary', 'admin']26    @classmethod27    def setup_clients(cls):28        super(ScenarioTest, cls).setup_clients()29        cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest30    def _attached_volume_name(31            self, disks_list_before_attach, ip_address, private_key):32        ssh = self.get_remote_client(ip_address, private_key=private_key)33        def _wait_for_volume_available_on_system():34            disks_list_after_attach = ssh.list_disks()35            return len(disks_list_after_attach) > len(disks_list_before_attach)36        if not test_utils.call_until_true(_wait_for_volume_available_on_system,37                                          CONF.compute.build_timeout,38                                          CONF.compute.build_interval):39            raise lib_exc.TimeoutException40        disks_list_after_attach = ssh.list_disks()41        volume_name = [item for item in disks_list_after_attach42                       if item not in disks_list_before_attach][0]43        return volume_name44    def _get_file_md5(self, ip_address, filename, dev_name=None,45                      mount_path='/mnt', private_key=None, server=None):46        ssh_client = self.get_remote_client(ip_address,47                                            private_key=private_key,48                                            server=server)49        if dev_name is not None:50            ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,51                                                               mount_path))52        md5_sum = ssh_client.exec_command(53            'sudo md5sum %s/%s|cut -c 1-32' % (mount_path, filename))54        if dev_name is not None:55            ssh_client.exec_command('sudo umount %s' % mount_path)56        return md5_sum57    def _count_files(self, ip_address, dev_name=None, mount_path='/mnt',58                     private_key=None, server=None):59        ssh_client = self.get_remote_client(ip_address,60                                            private_key=private_key,61                                            server=server)62        if dev_name is not None:63            ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,64                                                               mount_path))65        count = ssh_client.exec_command('sudo ls -l %s | wc -l' % mount_path)66        if dev_name is not None:67            ssh_client.exec_command('sudo umount %s' % mount_path)68        # We subtract 2 from the count since `wc -l` also includes the count69        # of new line character and while creating the filesystem, a70        # lost+found folder is also created71        return int(count) - 272    def _make_fs(self, ip_address, private_key, server, dev_name, fs='ext4'):73        ssh_client = self.get_remote_client(ip_address,74                                            private_key=private_key,75                                            server=server)76        ssh_client.make_fs(dev_name, fs=fs)77    def create_md5_new_file(self, ip_address, filename, dev_name=None,78                            mount_path='/mnt', private_key=None, server=None):79        ssh_client = self.get_remote_client(ip_address,80                                            private_key=private_key,81                                            server=server)82        if dev_name is not None:83            ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,84                                                               mount_path))85        ssh_client.exec_command(86            'sudo dd bs=1024 count=100 if=/dev/urandom of=/%s/%s' %87            (mount_path, filename))88        md5 = ssh_client.exec_command(89            'sudo md5sum -b %s/%s|cut -c 1-32' % (mount_path, filename))90        ssh_client.exec_command('sudo sync')91        if dev_name is not None:92            ssh_client.exec_command('sudo umount %s' % mount_path)93        return md594    def get_md5_from_file(self, instance, instance_ip, filename,95                          dev_name=None):96        md5_sum = self._get_file_md5(instance_ip, filename=filename,97                                     dev_name=dev_name,98                                     private_key=self.keypair['private_key'],99                                     server=instance)100        count = self._count_files(instance_ip, dev_name=dev_name,101                                  private_key=self.keypair['private_key'],102                                  server=instance)103        return count, md5_sum104    def _attach_and_get_volume_device_name(self, server, volume, instance_ip,105                                           private_key):106        ssh_client = self.get_remote_client(107            instance_ip, private_key=private_key,108            server=server)109        # List disks before volume attachment110        disks_list_before_attach = ssh_client.list_disks()111        # Attach volume112        attachment = self.attach_volume(server, volume)113        # Find the difference between disks before and after attachment that114        # gives us the volume device name115        volume_device_name = self._attached_volume_name(116            disks_list_before_attach, instance_ip, private_key)117        return volume_device_name, attachment118    def create_volume_type(self, client=None, name=None, extra_specs=None):119        if not client:120            client = self.os_admin.volume_types_client_latest121        if not name:122            class_name = self.__class__.__name__123            name = data_utils.rand_name(class_name + '-volume-type')124        randomized_name = data_utils.rand_name('scenario-type-' + name)125        LOG.debug("Creating a volume type: %s with extra_specs %s",126                  randomized_name, extra_specs)127        if extra_specs is None:128            extra_specs = {}129        volume_type = self.admin_volume_types_client.create_volume_type(130            name=randomized_name, extra_specs=extra_specs)['volume_type']131        self.addCleanup(self.cleanup_volume_type, volume_type)132        return volume_type133    def attach_volume(self, server, volume, device=None, tag=None):134        """Attaches volume to server and waits for 'in-use' volume status.135        The volume will be detached when the test tears down.136        :param server: The server to which the volume will be attached.137        :param volume: The volume to attach.138        :param device: Optional mountpoint for the attached volume. Note that139            this is not guaranteed for all hypervisors and is not recommended.140        :param tag: Optional device role tag to apply to the volume.141        """142        attach_kwargs = dict(volumeId=volume['id'])143        if device:144            attach_kwargs['device'] = device145        if tag:146            attach_kwargs['tag'] = tag147        attachment = self.servers_client.attach_volume(148            server['id'], **attach_kwargs)['volumeAttachment']149        # On teardown detach the volume and for multiattach volumes wait for150        # the attachment to be removed. For non-multiattach volumes wait for151        # the state of the volume to change to available. This is so we don't152        # error out when trying to delete the volume during teardown.153        if volume['multiattach']:154            att = waiters.wait_for_volume_attachment_create(155                self.volumes_client, volume['id'], server['id'])156            self.addCleanup(waiters.wait_for_volume_attachment_remove,157                            self.volumes_client, volume['id'],158                            att['attachment_id'])159        else:160            self.addCleanup(waiters.wait_for_volume_resource_status,161                            self.volumes_client, volume['id'], 'available')162            waiters.wait_for_volume_resource_status(self.volumes_client,163                                                    volume['id'], 'in-use')164        # Ignore 404s on detach in case the server is deleted or the volume165        # is already detached.166        self.addCleanup(self._detach_volume, server, volume)167        return attachment168    def _detach_volume(self, server, volume):169        """Helper method to detach a volume.170        Ignores 404 responses if the volume or server do not exist, or the171        volume is already detached from the server.172        """173        try:174            volume = self.volumes_client.show_volume(volume['id'])['volume']175            # Check the status. You can only detach an in-use volume, otherwise176            # the compute API will return a 400 response.177            if volume['status'] == 'in-use':178                self.servers_client.detach_volume(server['id'], volume['id'])179        except lib_exc.NotFound:180            # Ignore 404s on detach in case the server is deleted or the volume181            # is already detached....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
