How to use remove_disk method in avocado

Best Python code snippet using avocado_python

image_model_capabilities.py

Source:image_model_capabilities.py Github

copy

Full Screen

...122 :type: bool123 """124 self._add_disk = add_disk125 @property126 def remove_disk(self):127 """Gets the remove_disk of this ImageModelCapabilities. # noqa: E501128 :return: The remove_disk of this ImageModelCapabilities. # noqa: E501129 :rtype: bool130 """131 return self._remove_disk132 @remove_disk.setter133 def remove_disk(self, remove_disk):134 """Sets the remove_disk of this ImageModelCapabilities.135 :param remove_disk: The remove_disk of this ImageModelCapabilities. # noqa: E501136 :type: bool137 """138 self._remove_disk = remove_disk139 @property140 def add_nic(self):141 """Gets the add_nic of this ImageModelCapabilities. # noqa: E501142 :return: The add_nic of this ImageModelCapabilities. # noqa: E501143 :rtype: bool144 """145 return self._add_nic146 @add_nic.setter147 def add_nic(self, add_nic):...

Full Screen

Full Screen

test_actions_remove_disk.py

Source:test_actions_remove_disk.py Github

copy

Full Screen

1# Copyright 2021 Canonical Ltd2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from unittest import mock15from actions import remove_disk16from test_utils import CharmTestCase17class RemoveDiskActionTests(CharmTestCase):18 @mock.patch.object(remove_disk.subprocess, 'check_output')19 def test_get_device_map(self, check_output):20 check_output.return_value = b'''21{22 "1": [{"devices": ["/dev/sdx1"]}],23 "2": [{"devices": ["/dev/sdc2", "/dev/sdc3"]}]24}25 '''26 rv = remove_disk.get_device_map()27 self.assertEqual(rv[0]['path'], '/dev/sdx1')28 self.assertEqual(rv[1]['id'], rv[2]['id'])29 def test_normalize_osd_id(self):30 self.assertEqual('osd.1', remove_disk.normalize_osd_id(1))31 self.assertEqual('osd.2', remove_disk.normalize_osd_id('osd.2'))32 self.assertEqual('osd.3', remove_disk.normalize_osd_id('3'))33 def test_map_device_id(self):34 dev_map = [35 {'id': 'osd.1', 'path': '/dev/sdc1'},36 {'id': 'osd.2', 'path': '/dev/sdd2'},37 {'id': 'osd.2', 'path': '/dev/sdx3'}38 ]39 self.assertEqual(40 'osd.1',41 remove_disk.map_device_to_id(dev_map, '/dev/sdc1'))42 self.assertIsNone(43 remove_disk.map_device_to_id(dev_map, '/dev/sdx4'))44 self.assertEqual(45 '/dev/sdd2',46 remove_disk.map_id_to_device(dev_map, 'osd.2'))47 self.assertIsNone(48 remove_disk.map_id_to_device(dev_map, 'osd.3'))49 @mock.patch.object(remove_disk, 'get_bcache_names')50 def test_action_osd_constructor(self, bcache_names):51 bcache_names.return_value = ('bcache0', '/dev/bcache0')52 dev_map = [53 {'path': '/dev/sdx1', 'id': 'osd.1'}54 ]55 with self.assertRaises(remove_disk.RemoveException):56 remove_disk.ActionOSD(dev_map, dev='/dev/sdx1', osd_id='osd.1')57 obj = remove_disk.ActionOSD(dev_map, dev='/dev/sdx1')58 self.assertEqual(obj.osd_id, 'osd.1')59 obj = remove_disk.ActionOSD(dev_map, osd_id='1')60 self.assertEqual(obj.device, '/dev/sdx1')61 @mock.patch.object(remove_disk, 'device_size')62 @mock.patch.object(remove_disk.charms_ceph.utils, 'stop_osd')63 @mock.patch.object(remove_disk, 'bcache_remove')64 @mock.patch.object(remove_disk.subprocess, 'call')65 @mock.patch.object(remove_disk.subprocess, 'check_call')66 @mock.patch.object(remove_disk, 'get_bcache_names')67 def test_action_osd_remove(self, get_bcache_names, check_call,68 call, bcache_remove, stop_osd, device_size):69 call.return_value = 070 get_bcache_names.return_value = ('/dev/backing', '/dev/caching')71 device_size.side_effect = lambda x: 1 if x == '/dev/caching' else 072 dev_map = [73 {'path': '/dev/bcache0', 'id': 'osd.1'}74 ]75 prefix_args = ['ceph', '--id', 'osd-removal']76 obj = remove_disk.ActionOSD(dev_map, osd_id='1')77 obj.remove(True, 1, True)78 call.assert_any_call(prefix_args + ['osd', 'safe-to-destroy', 'osd.1'])79 check_call.assert_any_call(prefix_args + ['osd', 'purge', 'osd.1',80 '--yes-i-really-mean-it'])81 check_call.assert_any_call(prefix_args + ['osd', 'crush', 'reweight',82 'osd.1', '0'])83 bcache_remove.assert_called_with(84 '/dev/bcache0', '/dev/backing', '/dev/caching')85 report = obj.report86 self.assertIn('/dev/backing', report)87 report = report['/dev/backing']88 self.assertIn('osd-ids', report)89 self.assertIn('osd.1', report['osd-ids'])90 self.assertIn('cache-devices', report)91 self.assertIn('partition-size', report)92 self.assertEqual('/dev/caching', report['cache-devices'])93 self.assertEqual(1, report['partition-size'])94 # Test the timeout check.95 with self.assertRaises(remove_disk.RemoveException):96 call.return_value = 197 obj.remove(False, 0, False)98 @mock.patch.object(remove_disk.hookenv, 'local_unit')99 @mock.patch.object(remove_disk.hookenv, 'action_set')100 def test_write_report(self, action_set, local_unit):101 output = {}102 local_unit.return_value = 'ceph-osd/0'103 action_set.side_effect = lambda x: output.update(x)104 report = {'dev@': {'osd-ids': 'osd.1', 'cache-devices': 'cache@',105 'partition-size': 5}}106 remove_disk.write_report(report, 'text')107 self.assertIn('message', output)108 msg = output['message']109 self.assertIn('juju run-action ceph-osd/0 add-disk', msg)110 self.assertIn('osd-devices=dev@', msg)111 self.assertIn('osd-ids=osd.1', msg)112 self.assertIn('cache-devices=cache@', msg)113 self.assertIn('partition-size=5', msg)114 def test_make_same_length(self):115 l1, l2 = [1], []116 remove_disk.make_same_length(l1, l2)117 self.assertEqual(len(l1), len(l2))118 self.assertIsNone(l2[0])119 prev_len = len(l1)120 remove_disk.make_same_length(l1, l2)...

Full Screen

Full Screen

disk_events.py

Source:disk_events.py Github

copy

Full Screen

...10 if osc.IS_FREEBSD:11 # TODO: Add support for multipath12 await middleware.call('disk.multipath_sync')13 await middleware.call('alert.oneshot_delete', 'SMART', disk_name)14async def remove_disk(middleware, disk_name):15 await (await middleware.call('disk.sync_all')).wait()16 if osc.IS_FREEBSD:17 await middleware.call('disk.multipath_sync')18 await middleware.call('alert.oneshot_delete', 'SMART', disk_name)19 # If a disk dies we need to reconfigure swaps so we are not left20 # with a single disk mirror swap, which may be a point of failure.21 asyncio.ensure_future(middleware.call('disk.swaps_configure'))22async def devd_devfs_hook(middleware, data):23 if data.get('subsystem') != 'CDEV':24 return25 if data['type'] == 'CREATE':26 disks = await middleware.run_in_thread(lambda: sysctl.filter('kern.disks')[0].value.split())27 # Device notified about is not a disk28 if data['cdev'] not in disks:29 return30 await added_disk(middleware, data['cdev'])31 elif data['type'] == 'DESTROY':32 # Device notified about is not a disk33 if not RE_ISDISK.match(data['cdev']):34 return35 await remove_disk(middleware, data['cdev'])36async def udev_block_devices_hook(middleware, data):37 if data.get('SUBSYSTEM') != 'block' or data.get('DEVTYPE') != 'disk' or data['SYS_NAME'].startswith((38 'sr', 'md', 'dm-', 'loop'39 )):40 return41 if data['ACTION'] == 'add':42 await added_disk(middleware, data['SYS_NAME'])43 elif data['ACTION'] == 'remove':44 await remove_disk(middleware, data['SYS_NAME'])45def setup(middleware):46 if osc.IS_LINUX:47 middleware.register_hook('udev.block', udev_block_devices_hook)48 else:49 # Listen to DEVFS events so we can sync on disk attach/detach...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful