How to use wait_for_volume_attachment_remove method in tempest

Best Python code snippet using tempest_python

test_waiters.py

Source:test_waiters.py Github

copy

Full Screen

...210 build_timeout=5,211 show_volume=show_volume)212 self.patch('time.time')213 self.patch('time.sleep')214 waiters.wait_for_volume_attachment_remove(client, uuids.volume_id,215 uuids.attachment_id)216 # Assert that show volume is called until the attachment is removed.217 show_volume.assert_has_calls = [mock.call(uuids.volume_id),218 mock.call(uuids.volume_id),219 mock.call(uuids.volume_id)]220 def test_wait_for_volume_attachment_timeout(self):221 show_volume = mock.MagicMock(return_value={222 'volume': {'attachments': [223 {'attachment_id': uuids.attachment_id}]}})224 client = mock.Mock(spec=volumes_client.VolumesClient,225 build_interval=1,226 build_timeout=1,227 show_volume=show_volume)228 self.patch('time.time', side_effect=[0., client.build_timeout + 1.])229 self.patch('time.sleep')230 # Assert that a timeout is raised if the attachment remains.231 self.assertRaises(lib_exc.TimeoutException,232 waiters.wait_for_volume_attachment_remove,233 client, uuids.volume_id, uuids.attachment_id)234 def test_wait_for_volume_attachment_not_present(self):235 show_volume = mock.MagicMock(return_value={236 'volume': {'attachments': []}})237 client = mock.Mock(spec=volumes_client.VolumesClient,238 build_interval=1,239 build_timeout=1,240 show_volume=show_volume)241 self.patch('time.time', side_effect=[0., client.build_timeout + 1.])242 self.patch('time.sleep')243 waiters.wait_for_volume_attachment_remove(client, uuids.volume_id,244 uuids.attachment_id)245 # Assert that show volume is only called once before we return...

Full Screen

Full Screen

manager.py

Source:manager.py Github

copy

Full Screen

1# Copyright 2021 Red Hat, Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15from oslo_log import log16from tempest.common import waiters17from tempest import config18from tempest.lib.common.utils import data_utils19from tempest.lib.common.utils import test_utils20from tempest.lib import exceptions as lib_exc21from tempest.scenario import manager22CONF = config.CONF23LOG = log.getLogger(__name__)24class ScenarioTest(manager.ScenarioTest):25 credentials = ['primary', 'admin']26 @classmethod27 def setup_clients(cls):28 super(ScenarioTest, cls).setup_clients()29 cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest30 def _attached_volume_name(31 self, disks_list_before_attach, ip_address, private_key):32 ssh = self.get_remote_client(ip_address, private_key=private_key)33 def _wait_for_volume_available_on_system():34 disks_list_after_attach = ssh.list_disks()35 return len(disks_list_after_attach) > len(disks_list_before_attach)36 if not test_utils.call_until_true(_wait_for_volume_available_on_system,37 CONF.compute.build_timeout,38 CONF.compute.build_interval):39 raise lib_exc.TimeoutException40 disks_list_after_attach = ssh.list_disks()41 volume_name = [item for item in disks_list_after_attach42 if item not in disks_list_before_attach][0]43 return volume_name44 def _get_file_md5(self, ip_address, filename, dev_name=None,45 mount_path='/mnt', private_key=None, server=None):46 ssh_client = self.get_remote_client(ip_address,47 private_key=private_key,48 server=server)49 if dev_name is not None:50 ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,51 mount_path))52 md5_sum = ssh_client.exec_command(53 'sudo md5sum %s/%s|cut -c 1-32' % (mount_path, filename))54 if dev_name is not None:55 ssh_client.exec_command('sudo umount %s' % mount_path)56 return md5_sum57 def _count_files(self, ip_address, dev_name=None, mount_path='/mnt',58 private_key=None, server=None):59 ssh_client = self.get_remote_client(ip_address,60 private_key=private_key,61 server=server)62 if dev_name is not None:63 ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,64 mount_path))65 count = ssh_client.exec_command('sudo ls -l %s | wc -l' % mount_path)66 if dev_name is not None:67 ssh_client.exec_command('sudo umount %s' % mount_path)68 # We subtract 2 from the count since `wc -l` also includes the count69 # of new line character and while creating the filesystem, a70 # lost+found folder is also created71 return int(count) - 272 def _make_fs(self, ip_address, private_key, server, dev_name, fs='ext4'):73 ssh_client = self.get_remote_client(ip_address,74 private_key=private_key,75 server=server)76 ssh_client.make_fs(dev_name, fs=fs)77 def create_md5_new_file(self, ip_address, filename, dev_name=None,78 mount_path='/mnt', private_key=None, server=None):79 ssh_client = self.get_remote_client(ip_address,80 private_key=private_key,81 server=server)82 if dev_name is not None:83 ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,84 mount_path))85 ssh_client.exec_command(86 'sudo dd bs=1024 count=100 if=/dev/urandom of=/%s/%s' %87 (mount_path, filename))88 md5 = ssh_client.exec_command(89 'sudo md5sum -b %s/%s|cut -c 1-32' % (mount_path, filename))90 ssh_client.exec_command('sudo sync')91 if dev_name is not None:92 ssh_client.exec_command('sudo umount %s' % mount_path)93 return md594 def get_md5_from_file(self, instance, instance_ip, filename,95 dev_name=None):96 md5_sum = self._get_file_md5(instance_ip, filename=filename,97 dev_name=dev_name,98 private_key=self.keypair['private_key'],99 server=instance)100 count = self._count_files(instance_ip, dev_name=dev_name,101 private_key=self.keypair['private_key'],102 server=instance)103 return count, md5_sum104 def _attach_and_get_volume_device_name(self, server, volume, instance_ip,105 private_key):106 ssh_client = self.get_remote_client(107 instance_ip, private_key=private_key,108 server=server)109 # List disks before volume attachment110 disks_list_before_attach = ssh_client.list_disks()111 # Attach volume112 attachment = self.attach_volume(server, volume)113 # Find the difference between disks before and after attachment that114 # gives us the volume device name115 volume_device_name = self._attached_volume_name(116 disks_list_before_attach, instance_ip, private_key)117 return volume_device_name, attachment118 def create_volume_type(self, client=None, name=None, extra_specs=None):119 if not client:120 client = self.os_admin.volume_types_client_latest121 if not name:122 class_name = self.__class__.__name__123 name = data_utils.rand_name(class_name + '-volume-type')124 randomized_name = data_utils.rand_name('scenario-type-' + name)125 LOG.debug("Creating a volume type: %s with extra_specs %s",126 randomized_name, extra_specs)127 if extra_specs is None:128 extra_specs = {}129 volume_type = self.admin_volume_types_client.create_volume_type(130 name=randomized_name, extra_specs=extra_specs)['volume_type']131 self.addCleanup(self.cleanup_volume_type, volume_type)132 return volume_type133 def attach_volume(self, server, volume, device=None, tag=None):134 """Attaches volume to server and waits for 'in-use' volume status.135 The volume will be detached when the test tears down.136 :param server: The server to which the volume will be attached.137 :param volume: The volume to attach.138 :param device: Optional mountpoint for the attached volume. Note that139 this is not guaranteed for all hypervisors and is not recommended.140 :param tag: Optional device role tag to apply to the volume.141 """142 attach_kwargs = dict(volumeId=volume['id'])143 if device:144 attach_kwargs['device'] = device145 if tag:146 attach_kwargs['tag'] = tag147 attachment = self.servers_client.attach_volume(148 server['id'], **attach_kwargs)['volumeAttachment']149 # On teardown detach the volume and for multiattach volumes wait for150 # the attachment to be removed. For non-multiattach volumes wait for151 # the state of the volume to change to available. This is so we don't152 # error out when trying to delete the volume during teardown.153 if volume['multiattach']:154 att = waiters.wait_for_volume_attachment_create(155 self.volumes_client, volume['id'], server['id'])156 self.addCleanup(waiters.wait_for_volume_attachment_remove,157 self.volumes_client, volume['id'],158 att['attachment_id'])159 else:160 self.addCleanup(waiters.wait_for_volume_resource_status,161 self.volumes_client, volume['id'], 'available')162 waiters.wait_for_volume_resource_status(self.volumes_client,163 volume['id'], 'in-use')164 # Ignore 404s on detach in case the server is deleted or the volume165 # is already detached.166 self.addCleanup(self._detach_volume, server, volume)167 return attachment168 def _detach_volume(self, server, volume):169 """Helper method to detach a volume.170 Ignores 404 responses if the volume or server do not exist, or the171 volume is already detached from the server.172 """173 try:174 volume = self.volumes_client.show_volume(volume['id'])['volume']175 # Check the status. You can only detach an in-use volume, otherwise176 # the compute API will return a 400 response.177 if volume['status'] == 'in-use':178 self.servers_client.detach_volume(server['id'], volume['id'])179 except lib_exc.NotFound:180 # Ignore 404s on detach in case the server is deleted or the volume181 # is already detached....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful