How to use add_data_disk method in lisa

Best Python code snippet using lisa_python

instance_data_disk_test.py

Source:instance_data_disk_test.py Github

copy

Full Screen

1from .test_helper import argv_kiwi_tests2import sys3import mock4import random5from azure.common import AzureMissingResourceHttpError6from collections import namedtuple7from datetime import datetime8from mock import patch9from pytest import raises10from azurectl.account.service import AzureAccount11from azurectl.config.parser import Config12from azurectl.instance.data_disk import DataDisk13from collections import namedtuple14import azurectl15from azurectl.defaults import Defaults16from azurectl.azurectl_exceptions import (17 AzureDataDiskAttachError,18 AzureDataDiskCreateError,19 AzureDataDiskDeleteError,20 AzureDataDiskNoAvailableLun,21 AzureDataDiskShowError22)23class TestDataDisk:24 def setup(self):25 account = AzureAccount(26 Config(27 region_name='East US 2', filename='../data/config'28 )29 )30 self.service = mock.Mock()31 account.get_management_service = mock.Mock(return_value=self.service)32 account.get_blob_service_host_base = mock.Mock(33 return_value='test.url'34 )35 account.storage_key = mock.Mock()36 # now that that's done, instantiate a DataDisk with the account37 self.data_disk = DataDisk(account)38 # asynchronous API operations return a request object39 self.my_request = mock.Mock(request_id=Defaults.unify_id(42))40 # variables used in multiple tests41 self.cloud_service_name = 'mockcloudservice'42 self.instance_name = 'mockcloudserviceinstance1'43 self.lun = 044 self.host_caching = 'ReadWrite'45 self.disk_filename = 'mockcloudserviceinstance1-data-disk-0.vhd'46 self.disk_name = 'mockcloudserviceinstance1-data-disk-0'47 self.disk_url = (48 'https://' +49 account.storage_name() +50 '.blob.' +51 account.get_blob_service_host_base() + '/' +52 account.storage_container() + '/' +53 self.disk_filename54 )55 self.disk_label = 'Mock data disk'56 self.disk_size = 4257 self.timestamp = datetime.utcnow()58 self.time_string = datetime.isoformat(self.timestamp).replace(':', '_')59 self.account = account60 def test_attach_error(self):61 self.service.add_data_disk.side_effect = Exception62 with raises(AzureDataDiskAttachError):63 self.data_disk.attach(64 self.disk_name,65 self.cloud_service_name,66 self.instance_name,67 self.disk_label,68 self.lun,69 self.host_caching70 )71 @patch('azurectl.instance.data_disk.Storage')72 def test_create_error_on_add_disk(self, mock_storage):73 self.service.add_disk.side_effect = Exception74 with raises(AzureDataDiskCreateError):75 self.data_disk.create(76 identifier=self.instance_name,77 disk_size_in_gb=self.disk_size,78 label=self.disk_label79 )80 @patch('azurectl.instance.data_disk.Storage')81 def test_create_error_on_vhd_upload(self, mock_storage):82 mock_storage.side_effect = Exception83 with raises(AzureDataDiskCreateError):84 self.data_disk.create(85 identifier=self.instance_name, disk_size_in_gb=self.disk_size86 )87 def test_delete_error(self):88 self.service.delete_disk.side_effect = Exception89 with raises(AzureDataDiskDeleteError):90 self.data_disk.delete(self.disk_name)91 def test_detach_error(self):92 self.service.delete_data_disk.side_effect = Exception93 with raises(AzureDataDiskDeleteError):94 self.data_disk.detach(self.lun, self.cloud_service_name)95 def test_show_attached_error(self):96 self.service.get_data_disk.side_effect = Exception97 with raises(AzureDataDiskShowError):98 self.data_disk.show_attached(99 self.cloud_service_name, self.instance_name, self.lun100 )101 def test_show_attached_no_raise_for_all_lun_list(self):102 self.service.get_data_disk.side_effect = Exception103 result = self.data_disk.show_attached(104 self.cloud_service_name105 )106 assert result == []107 def test_show_error(self):108 self.service.get_disk.side_effect = Exception109 with raises(AzureDataDiskShowError):110 self.data_disk.show(self.disk_name)111 def test_no_available_lun_exception(self):112 self.service.get_data_disk.side_effect = iter([113 self.__create_mock_data_disk(i) for i in range(16)114 ])115 with raises(AzureDataDiskNoAvailableLun):116 self.data_disk._DataDisk__get_first_available_lun(117 self.cloud_service_name, self.instance_name118 )119 @patch('azurectl.instance.data_disk.datetime')120 def test_generate_filename(self, mock_timestamp):121 mock_timestamp.utcnow = mock.Mock(return_value=self.timestamp)122 mock_timestamp.isoformat = mock.Mock(return_value=self.time_string)123 expected = '%s-data-disk-%s.vhd' % (124 self.instance_name,125 self.time_string126 )127 result = self.data_disk._DataDisk__generate_filename(128 identifier=self.instance_name129 )130 assert result == expected131 def test_get_first_available_lun(self):132 self.service.get_data_disk.side_effect = iter([133 self.__create_mock_data_disk(0),134 self.__create_mock_data_disk(1),135 AzureMissingResourceHttpError('NOT FOUND', 404)136 ])137 result = self.data_disk._DataDisk__get_first_available_lun(138 self.cloud_service_name, self.instance_name139 )140 assert self.service.get_data_disk.call_count == 3141 assert result == 2 # 0 and 1 are taken142 @patch('azurectl.instance.data_disk.datetime')143 @patch('azurectl.instance.data_disk.Storage')144 def test_create(self, mock_storage, mock_datetime):145 self.service.add_disk.return_value = self.my_request146 mock_datetime.isoformat.return_value = '0'147 time_now = mock.Mock()148 time_now.strftime.return_value = 1471858765149 mock_datetime.now = mock.Mock(150 return_value=time_now151 )152 result = self.data_disk.create(153 identifier=self.instance_name,154 disk_size_in_gb=self.disk_size,155 label=self.disk_label156 )157 mock_storage.assert_called_once_with(158 self.account, self.account.storage_container()159 )160 self.service.add_disk.assert_called_once_with(161 media_link=self.disk_url,162 name=self.data_disk.data_disk_name.replace('.vhd', ''),163 label=self.disk_label,164 has_operating_system=False,165 os='Linux',166 )167 @patch('azurectl.instance.data_disk.Storage')168 def test_sizes_on_create(self, mock_storage_class):169 mock_storage = mock.Mock()170 mock_storage_class.return_value = mock_storage171 # size in GB * bytes/GB + 512 bytes for the footer172 blob_size_in_bytes = self.disk_size * 1073741824 + 512173 self.data_disk._DataDisk__generate_vhd_footer = mock.Mock(174 return_value='mock-footer'175 )176 self.data_disk._DataDisk__generate_filename = mock.Mock(177 return_value='mock-filename'178 )179 self.data_disk.create(180 identifier=self.instance_name,181 disk_size_in_gb=self.disk_size,182 label=self.disk_label183 )184 self.data_disk._DataDisk__generate_vhd_footer.assert_called_once_with(185 self.disk_size186 )187 mock_storage.upload_empty_image.assert_called_once_with(188 blob_size_in_bytes, 'mock-footer', 'mock-filename'189 )190 def test_show(self):191 self.service.get_disk.return_value = self.__create_mock_disk()192 expected = self.__create_expected_disk_output()193 result = self.data_disk.show(self.disk_name)194 self.service.get_disk.assert_called_once_with(195 self.disk_name196 )197 assert result == expected198 def test_show_attached(self):199 self.service.get_data_disk.return_value = self.__create_mock_data_disk(200 self.lun201 )202 expected = self.__create_expected_data_disk_output(self.lun)203 result = self.data_disk.show_attached(204 self.cloud_service_name, self.instance_name, self.lun205 )206 self.service.get_data_disk.assert_called_once_with(207 self.cloud_service_name,208 self.cloud_service_name,209 self.instance_name,210 self.lun211 )212 assert result == expected213 def test_list(self):214 self.service.list_disks.return_value = [self.__create_mock_disk()]215 expected = self.__create_expected_disk_list_output()216 result = self.data_disk.list()217 self.service.list_disks.assert_called_once_with()218 assert result == expected219 def test_list_empty(self):220 self.service.list_disks.side_effect = Exception221 result = self.data_disk.list()222 self.service.list_disks.assert_called_once_with()223 assert result == []224 def test_attach(self):225 self.service.add_data_disk.return_value = self.my_request226 result = self.data_disk.attach(227 self.disk_name,228 self.cloud_service_name,229 self.instance_name,230 self.disk_label,231 self.lun,232 self.host_caching233 )234 assert result == self.my_request.request_id235 self.service.add_data_disk.assert_called_once_with(236 self.cloud_service_name,237 self.cloud_service_name,238 self.instance_name,239 self.lun,240 host_caching=self.host_caching,241 disk_label=self.disk_label,242 disk_name=self.disk_name243 )244 @patch('azurectl.instance.data_disk.datetime')245 def test_attach_without_lun(self, mock_datetime):246 # mock no data disks attached has to result in lun 0 assigned later247 self.service.get_data_disk.side_effect = AzureMissingResourceHttpError(248 'NOT FOUND', 404249 )250 mock_datetime.isoformat.return_value = '0'251 self.service.add_data_disk.return_value = self.my_request252 result = self.data_disk.attach(253 self.disk_name,254 self.cloud_service_name255 )256 self.service.add_data_disk.assert_called_once_with(257 self.cloud_service_name,258 self.cloud_service_name,259 self.cloud_service_name,260 0,261 disk_name=self.disk_name262 )263 def test_attach_by_blob_name(self):264 # should send disk_name and source_media_link in order265 # to create a new data-disk266 self.service.add_data_disk.return_value = self.my_request267 self.service.list_disks.return_value = []268 result = self.data_disk.attach(269 None,270 self.cloud_service_name,271 lun=0,272 blob_name=self.disk_filename273 )274 self.service.add_data_disk.assert_called_once_with(275 self.cloud_service_name,276 self.cloud_service_name,277 self.cloud_service_name,278 0,279 disk_name=self.disk_name,280 source_media_link=self.disk_url281 )282 def test_find_data_disk_name_for_blob_name(self):283 mock_disks = [284 self.__create_mock_disk()285 ]286 result = self.data_disk._DataDisk__find_existing_disk_name_for_blob_name(287 self.disk_filename,288 mock_disks289 )290 assert result == self.disk_name291 def test_attach_by_blob_name_with_existing_data_disk(self):292 # should find a disk_name associated with blob_name and use it293 self.service.add_data_disk.return_value = self.my_request294 mock_disks = [295 self.__create_mock_disk()296 ]297 self.service.list_disks.return_value = mock_disks298 result = self.data_disk.attach(299 None,300 self.cloud_service_name,301 lun=0,302 blob_name=self.disk_filename303 )304 self.service.add_data_disk.assert_called_once_with(305 self.cloud_service_name,306 self.cloud_service_name,307 self.cloud_service_name,308 0,309 disk_name=self.disk_name310 )311 def test_attach_by_disk_name_and_blob_name(self):312 # should create a new data-disk with supplied disk_name and313 # source_media_link set to blob_name url314 self.service.add_data_disk.return_value = self.my_request315 result = self.data_disk.attach(316 self.disk_name,317 self.cloud_service_name,318 lun=0,319 blob_name=self.disk_filename320 )321 self.service.add_data_disk.assert_called_once_with(322 self.cloud_service_name,323 self.cloud_service_name,324 self.cloud_service_name,325 0,326 disk_name=self.disk_name,327 source_media_link=self.disk_url328 )329 def test_disk_name_or_blob_name_is_required(self):330 with raises(AzureDataDiskAttachError):331 self.data_disk.attach(332 None, self.cloud_service_name, lun=0, blob_name=None333 )334 def test_detach(self):335 self.service.delete_data_disk.return_value = self.my_request336 result = self.data_disk.detach(337 self.lun, self.cloud_service_name, self.instance_name338 )339 self.service.delete_data_disk.assert_called_once_with(340 self.cloud_service_name,341 self.cloud_service_name,342 self.instance_name,343 self.lun,344 delete_vhd=False345 )346 assert result == self.my_request.request_id347 def test_detach_no_instance_name(self):348 self.service.delete_data_disk.return_value = self.my_request349 result = self.data_disk.detach(350 self.lun, self.cloud_service_name351 )352 self.service.delete_data_disk.assert_called_once_with(353 self.cloud_service_name,354 self.cloud_service_name,355 self.cloud_service_name,356 self.lun,357 delete_vhd=False358 )359 assert result == self.my_request.request_id360 def test_delete(self):361 self.service.delete_disk.return_value = self.my_request362 result = self.data_disk.delete(self.disk_name)363 self.service.delete_disk.assert_called_once_with(364 self.disk_name, delete_vhd=True365 )366 def __create_mock_data_disk(self, lun):367 data_disk_type = namedtuple(368 'data_disk_type', [369 'host_caching', 'disk_label', 'disk_name', 'lun',370 'logical_disk_size_in_gb', 'media_link', 'source_media_link'371 ]372 )373 return data_disk_type(374 host_caching=self.host_caching,375 disk_label=self.disk_label,376 disk_name=self.disk_name,377 lun=lun,378 logical_disk_size_in_gb=self.disk_size,379 media_link=self.disk_url,380 source_media_link=''381 )382 def __create_mock_disk(self):383 disk_type = namedtuple(384 'disk_type', [385 'affinity_group', 'attached_to', 'has_operating_system',386 'is_corrupted', 'location', 'logical_disk_size_in_gb',387 'label', 'media_link', 'name', 'os', 'source_image_name'388 ]389 )390 attach_info_type = namedtuple(391 'attach_info_type', [392 'hosted_service_name', 'deployment_name', 'role_name'393 ]394 )395 return disk_type(396 affinity_group='',397 attached_to=attach_info_type(398 hosted_service_name='',399 deployment_name='',400 role_name=''401 ),402 has_operating_system=False,403 is_corrupted=False,404 location='',405 logical_disk_size_in_gb=self.disk_size,406 label=self.disk_label,407 media_link=self.disk_url,408 name=self.disk_name,409 os='Linux',410 source_image_name=''411 )412 def __create_expected_data_disk_output(self, lun):413 return [414 {415 'size': '%d GB' % self.disk_size,416 'label': self.disk_label,417 'disk-url': self.disk_url,418 'source-image-url': '',419 'lun': lun,420 'host-caching': 'ReadWrite'421 }422 ]423 def __create_expected_disk_output(self):424 return {425 'affinity_group': '',426 'attached_to': {427 'hosted_service_name': '',428 'deployment_name': '',429 'role_name': ''430 },431 'has_operating_system': False,432 'is_corrupted': False,433 'location': '',434 'logical_disk_size_in_gb': '%d GB' % self.disk_size,435 'label': self.disk_label,436 'media_link': self.disk_url,437 'name': self.disk_name,438 'os': 'Linux',439 'source_image_name': ''440 }441 def __create_expected_disk_list_output(self):442 return [443 {444 'is_attached': True,445 'name': self.disk_name446 }...

Full Screen

Full Screen

service_code.py

Source:service_code.py Github

copy

Full Screen

1service_code_name = {2 1000: "wakeup",3 1001: "shutdown",4 1002: "restart",5 1003: "delete",6 1004: "update_name",7 1005: "update_config",8 1006: "enter_maintenance_mode",9 1007: "clear_all_desktop",10 1008: "download_desktop",11 1009: "cancel_download_desktop",12 1010: "add_data_disk",13 1011: "order",14 1012: "update_ip",15 1013: "update_difference_disk",16 1014: "pxe_start",17 1015: "send_torrent",18 1016: "upload_desktop",19 1017: "send_desktop",20 1018: "push_task_result", # 任务结果 {"torrent_id": "xxxxx", "msg": "Success","result": 0 }21 # 任务结果 {"torrent_id": "xxxxx", "msg": "Fail","result": 1 }22 1019: "enter_maintain", # 进入维护模式23 1020: "upload_disk",24 1021: "bt_task_complete", # bt任务完成通知25 1022: "watermark_switch", # 水印开关通知26 1023: "ssh_upload_desktop", 27 1024: "update_desktop_group_info",28 1025: "cancel_send_desktop",29 1026: "cancel_p_to_v", # 取消p2v30 1030: "terminal_upgrade",31 9000: "heartbeat",32 9001: "terminal_login",33 9002: "terminal_logout",34 9003: "get_date_time",35 9004: "get_config_version_id",36 9005: "get_config_info",37 9006: "update_config_info",38 9007: "get_desktop_group_list",39 9008: "verify_admin_user",40 9009: "order_confirm",41 9010: "order_query",42 9011: "sync_desktop_info",43 9012: "get_desktop_info",44 9013: "desktop_upload",45 9014: "torrent_upload",46 9015: "task_result", # 上报任务结果47 9016: "operate_auth",48 9017: "init_desktop", # 初始化桌面49 9018: "check_upload_state", # 校验50 9019: "bt_tracker", # bt tracker地址51 9020: "p_to_v_start", # p2v52 9021: "p_to_v_state",53 # 9030: "win_login" # window终端登录54 9022: "diff_disk_upload", # 差分盘上传磁盘55 9023: "diff_disk_download",56 9024: "desktop_login", # 上报桌面下载57 9025: "bt_task_state", # bt任务状态58 9026: "put_desktop_group_list",59 9027: "bt_upload_state",60 9028: "query_teach_pc", #查询教师机61 9029: "update_performance", # 上报终端性能信息62 9999: "upload_log" # 终端日志上传63}...

Full Screen

Full Screen

volume.py

Source:volume.py Github

copy

Full Screen

...17 return self.azure.add_disk(False, name, kwargs['media_link'], name, "Linux")18 def detach_volume(self, instance=None, volume=None):19 self.azure.delete_data_disk(self.ctx.service_name, instance, volume, 0, delete_vhd=False)20 def attach_volume(self, instance=None, volume=None, device=None, **kwargs):21 self.azure.add_data_disk(self, self.ctx.service_name, instance, volume, 0,22 host_caching=None, media_link=None, disk_label=volume,23 disk_name=volume, logical_disk_size_in_gb=kwargs['size'],24 source_media_link=None)25 def list_volume(self):26 return self.azure.list_disks()27 def remove_volume(self, volume):28 pass29 def query_volume_details(self, volume):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful