How to use hotunplug method in avocado

Best Python code snippet using avocado_python Github


Full Screen

1import time2from virttest.qemu_monitor import QMPCmdError3from provider import job_utils4from provider.blockdev_stream_nowait import BlockdevStreamNowaitTest5class BlockdevStreamHotunplugTest(BlockdevStreamNowaitTest):6 """7 Block stream with hotunplug test8 """9 def hotunplug_frontend_device(self):10 """11 device_del the frontend device during stream,12 the device CAN be removed without any issue13 """14 self.main_vm.monitor.cmd('device_del', {'id': self.base_tag})15 def wait_till_frontend_device_deleted(self):16 """Wait till devices removed from output of query-block"""17 def _is_device_deleted(device):18 for item in self.main_vm.monitor.query("block"):19 if device in item["qdev"]:20 return False21 return True22 tmo = self.params.get_numeric('device_del_timeout', 60)23 for i in range(tmo):24 if _is_device_deleted(self.base_tag):25 break26 time.sleep(1)27 else:28'Failed to hotunplug the frontend device')29 def hotunplug_format_node(self):30 """31 blockdev-del the format nodes during streaming,32 the nodes CANNOT be removed for they are busy33 """34 try:35 self.main_vm.monitor.cmd('blockdev-del',36 {'node-name': self.params['node']})37 except QMPCmdError as e:38 if self.params['block_node_busy_error'] not in str(e):39'Unexpected error: %s' % str(e))40 else:41'blockdev-del succeeded unexpectedly')42 def do_test(self):43 self.snapshot_test()44 self.blockdev_stream()45 job_utils.check_block_jobs_started(46 self.main_vm, [self._job],47 self.params.get_numeric('job_started_timeout', 60)48 )49 self.hotunplug_frontend_device()50 self.wait_till_frontend_device_deleted()51 self.hotunplug_format_node()52 job_utils.check_block_jobs_running(53 self.main_vm, [self._job],54 self.params.get_numeric('job_running_timeout', 300)55 )56 self.main_vm.monitor.cmd(57 "block-job-set-speed", {'device': self._job, 'speed': 0})58 self.wait_stream_job_completed()59 self.check_backing_file()60 self.clone_vm.create()61 self.mount_data_disks()62 self.verify_data_file()63def run(test, params, env):64 """65 Block stream with hotunplug test66 test steps:67 1. boot VM with a 2G data disk68 2. format the data disk and mount it69 3. create a file70 4. add a snapshot image to VM via qmp commands71 5. do live snapshot (base: data, overlay: snapshot)72 6. do block-stream73 7. hotunplug the frontend device with device_del (OK)74 8. hotunplug the format node with blockdev-del (ERROR)75 9. check stream continues and wait stream done76 10. restart VM with the snapshot disk77 11. check the file and its md578 :param test: test object79 :param params: test configuration dict80 :param env: env object81 """82 stream_test = BlockdevStreamHotunplugTest(test, params, env)...

Full Screen

Full Screen Github


Full Screen

1import time2from virttest.qemu_monitor import QMPCmdError3from provider.blockdev_mirror_nowait import BlockdevMirrorNowaitTest4class BlockdevMirrorHotunplugTest(BlockdevMirrorNowaitTest):5 """6 Block mirror with hotunplug test7 """8 def hotunplug_frontend_devices(self):9 """10 device_del the frontend devices during mirroring,11 the devices CAN be removed without any issue12 """13 def _device_del(device):14 self.main_vm.monitor.cmd('device_del', {'id': device})15 list(map(_device_del, self._source_images))16 def wait_till_frontend_devices_deleted(self):17 """Wait till devices removed from output of query-block"""18 def _is_device_deleted(device):19 for item in self.main_vm.monitor.query("block"):20 if device in item["qdev"]:21 return False22 return True23 def _wait_till_device_deleted(device):24 tmo = self.params.get_numeric('device_del_timeout', 60)25 for i in range(tmo):26 if _is_device_deleted(device):27 break28 time.sleep(1)29 else:30'Failed to hotunplug the frontend device')31 list(map(_wait_till_device_deleted, self._source_images))32 def hotunplug_format_nodes(self):33 """34 blockdev-del the format nodes during mirroring,35 the nodes CANNOT be removed for they are busy36 """37 def _blockdev_del(node):38 try:39 self.main_vm.monitor.cmd('blockdev-del', {'node-name': node})40 except QMPCmdError as e:41 err = self.params['block_node_busy_error'] % node42 if err not in str(e):43'Unexpected error: %s' % str(e))44 else:45'blockdev-del succeeded unexpectedly')46 list(map(_blockdev_del, self._source_nodes))47 def do_test(self):48 self.blockdev_mirror()49 self.check_block_jobs_started(self._jobs)50 self.hotunplug_frontend_devices()51 self.wait_till_frontend_devices_deleted()52 self.hotunplug_format_nodes()53 self.check_block_jobs_running(self._jobs)54 self.wait_mirror_jobs_completed()55 self.clone_vm_with_mirrored_images()56 self.verify_data_files()57def run(test, params, env):58 """59 Block mirror with hotunplug test60 test steps:61 1. boot VM with a 2G data disk62 2. format the data disk and mount it63 3. create a file64 4. add a target disk for mirror to VM via qmp commands65 5. do block-mirror with sync mode full66 6. hotunplug the frontend device with device_del (OK)67 7. hotunplug the format node with blockdev-del (ERROR)68 8. check mirror continues and wait mirror done69 9. restart VM with the mirror disk70 10. check the file and its md571 :param test: test object72 :param params: test configuration dict73 :param env: env object74 """75 mirror_test = BlockdevMirrorHotunplugTest(test, params, env)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?