Best Python code snippet using lisa_python
instance_data_disk_test.py
Source:instance_data_disk_test.py  
1from .test_helper import argv_kiwi_tests2import sys3import mock4import random5from azure.common import AzureMissingResourceHttpError6from collections import namedtuple7from datetime import datetime8from mock import patch9from pytest import raises10from azurectl.account.service import AzureAccount11from azurectl.config.parser import Config12from azurectl.instance.data_disk import DataDisk13from collections import namedtuple14import azurectl15from azurectl.defaults import Defaults16from azurectl.azurectl_exceptions import (17    AzureDataDiskAttachError,18    AzureDataDiskCreateError,19    AzureDataDiskDeleteError,20    AzureDataDiskNoAvailableLun,21    AzureDataDiskShowError22)23class TestDataDisk:24    def setup(self):25        account = AzureAccount(26            Config(27                region_name='East US 2', filename='../data/config'28            )29        )30        self.service = mock.Mock()31        account.get_management_service = mock.Mock(return_value=self.service)32        account.get_blob_service_host_base = mock.Mock(33            return_value='test.url'34        )35        account.storage_key = mock.Mock()36        # now that that's done, instantiate a DataDisk with the account37        self.data_disk = DataDisk(account)38        # asynchronous API operations return a request object39        self.my_request = mock.Mock(request_id=Defaults.unify_id(42))40        # variables used in multiple tests41        self.cloud_service_name = 'mockcloudservice'42        self.instance_name = 'mockcloudserviceinstance1'43        self.lun = 044        self.host_caching = 'ReadWrite'45        self.disk_filename = 'mockcloudserviceinstance1-data-disk-0.vhd'46        self.disk_name = 'mockcloudserviceinstance1-data-disk-0'47        self.disk_url = (48            'https://' +49            account.storage_name() +50            '.blob.' +51            account.get_blob_service_host_base() + '/' +52            account.storage_container() + '/' +53            self.disk_filename54        )55        self.disk_label = 'Mock data disk'56        self.disk_size = 4257        self.timestamp = datetime.utcnow()58        self.time_string = datetime.isoformat(self.timestamp).replace(':', '_')59        self.account = account60    def test_attach_error(self):61        self.service.add_data_disk.side_effect = Exception62        with raises(AzureDataDiskAttachError):63            self.data_disk.attach(64                self.disk_name,65                self.cloud_service_name,66                self.instance_name,67                self.disk_label,68                self.lun,69                self.host_caching70            )71    @patch('azurectl.instance.data_disk.Storage')72    def test_create_error_on_add_disk(self, mock_storage):73        self.service.add_disk.side_effect = Exception74        with raises(AzureDataDiskCreateError):75            self.data_disk.create(76                identifier=self.instance_name,77                disk_size_in_gb=self.disk_size,78                label=self.disk_label79            )80    @patch('azurectl.instance.data_disk.Storage')81    def test_create_error_on_vhd_upload(self, mock_storage):82        mock_storage.side_effect = Exception83        with raises(AzureDataDiskCreateError):84            self.data_disk.create(85                identifier=self.instance_name, disk_size_in_gb=self.disk_size86            )87    def test_delete_error(self):88        self.service.delete_disk.side_effect = Exception89        with raises(AzureDataDiskDeleteError):90            self.data_disk.delete(self.disk_name)91    def test_detach_error(self):92        self.service.delete_data_disk.side_effect = Exception93        with raises(AzureDataDiskDeleteError):94            self.data_disk.detach(self.lun, self.cloud_service_name)95    def test_show_attached_error(self):96        self.service.get_data_disk.side_effect = Exception97        with raises(AzureDataDiskShowError):98            self.data_disk.show_attached(99                self.cloud_service_name, self.instance_name, self.lun100            )101    def test_show_attached_no_raise_for_all_lun_list(self):102        self.service.get_data_disk.side_effect = Exception103        result = self.data_disk.show_attached(104            self.cloud_service_name105        )106        assert result == []107    def test_show_error(self):108        self.service.get_disk.side_effect = Exception109        with raises(AzureDataDiskShowError):110            self.data_disk.show(self.disk_name)111    def test_no_available_lun_exception(self):112        self.service.get_data_disk.side_effect = iter([113            self.__create_mock_data_disk(i) for i in range(16)114        ])115        with raises(AzureDataDiskNoAvailableLun):116            self.data_disk._DataDisk__get_first_available_lun(117                self.cloud_service_name, self.instance_name118            )119    @patch('azurectl.instance.data_disk.datetime')120    def test_generate_filename(self, mock_timestamp):121        mock_timestamp.utcnow = mock.Mock(return_value=self.timestamp)122        mock_timestamp.isoformat = mock.Mock(return_value=self.time_string)123        expected = '%s-data-disk-%s.vhd' % (124            self.instance_name,125            self.time_string126        )127        result = self.data_disk._DataDisk__generate_filename(128            identifier=self.instance_name129        )130        assert result == expected131    def test_get_first_available_lun(self):132        self.service.get_data_disk.side_effect = iter([133            self.__create_mock_data_disk(0),134            self.__create_mock_data_disk(1),135            AzureMissingResourceHttpError('NOT FOUND', 404)136        ])137        result = self.data_disk._DataDisk__get_first_available_lun(138            self.cloud_service_name, self.instance_name139        )140        assert self.service.get_data_disk.call_count == 3141        assert result == 2  # 0 and 1 are taken142    @patch('azurectl.instance.data_disk.datetime')143    @patch('azurectl.instance.data_disk.Storage')144    def test_create(self, mock_storage, mock_datetime):145        self.service.add_disk.return_value = self.my_request146        mock_datetime.isoformat.return_value = '0'147        time_now = mock.Mock()148        time_now.strftime.return_value = 1471858765149        mock_datetime.now = mock.Mock(150            return_value=time_now151        )152        result = self.data_disk.create(153            identifier=self.instance_name,154            disk_size_in_gb=self.disk_size,155            label=self.disk_label156        )157        mock_storage.assert_called_once_with(158            self.account, self.account.storage_container()159        )160        self.service.add_disk.assert_called_once_with(161            media_link=self.disk_url,162            name=self.data_disk.data_disk_name.replace('.vhd', ''),163            label=self.disk_label,164            has_operating_system=False,165            os='Linux',166        )167    @patch('azurectl.instance.data_disk.Storage')168    def test_sizes_on_create(self, mock_storage_class):169        mock_storage = mock.Mock()170        mock_storage_class.return_value = mock_storage171        # size in GB * bytes/GB + 512 bytes for the footer172        blob_size_in_bytes = self.disk_size * 1073741824 + 512173        self.data_disk._DataDisk__generate_vhd_footer = mock.Mock(174            return_value='mock-footer'175        )176        self.data_disk._DataDisk__generate_filename = mock.Mock(177            return_value='mock-filename'178        )179        self.data_disk.create(180            identifier=self.instance_name,181            disk_size_in_gb=self.disk_size,182            label=self.disk_label183        )184        self.data_disk._DataDisk__generate_vhd_footer.assert_called_once_with(185            self.disk_size186        )187        mock_storage.upload_empty_image.assert_called_once_with(188            blob_size_in_bytes, 'mock-footer', 'mock-filename'189        )190    def test_show(self):191        self.service.get_disk.return_value = self.__create_mock_disk()192        expected = self.__create_expected_disk_output()193        result = self.data_disk.show(self.disk_name)194        self.service.get_disk.assert_called_once_with(195            self.disk_name196        )197        assert result == expected198    def test_show_attached(self):199        self.service.get_data_disk.return_value = self.__create_mock_data_disk(200            self.lun201        )202        expected = self.__create_expected_data_disk_output(self.lun)203        result = self.data_disk.show_attached(204            self.cloud_service_name, self.instance_name, self.lun205        )206        self.service.get_data_disk.assert_called_once_with(207            self.cloud_service_name,208            self.cloud_service_name,209            self.instance_name,210            self.lun211        )212        assert result == expected213    def test_list(self):214        self.service.list_disks.return_value = [self.__create_mock_disk()]215        expected = self.__create_expected_disk_list_output()216        result = self.data_disk.list()217        self.service.list_disks.assert_called_once_with()218        assert result == expected219    def test_list_empty(self):220        self.service.list_disks.side_effect = Exception221        result = self.data_disk.list()222        self.service.list_disks.assert_called_once_with()223        assert result == []224    def test_attach(self):225        self.service.add_data_disk.return_value = self.my_request226        result = self.data_disk.attach(227            self.disk_name,228            self.cloud_service_name,229            self.instance_name,230            self.disk_label,231            self.lun,232            self.host_caching233        )234        assert result == self.my_request.request_id235        self.service.add_data_disk.assert_called_once_with(236            self.cloud_service_name,237            self.cloud_service_name,238            self.instance_name,239            self.lun,240            host_caching=self.host_caching,241            disk_label=self.disk_label,242            disk_name=self.disk_name243        )244    @patch('azurectl.instance.data_disk.datetime')245    def test_attach_without_lun(self, mock_datetime):246        # mock no data disks attached has to result in lun 0 assigned later247        self.service.get_data_disk.side_effect = AzureMissingResourceHttpError(248            'NOT FOUND', 404249        )250        mock_datetime.isoformat.return_value = '0'251        self.service.add_data_disk.return_value = self.my_request252        result = self.data_disk.attach(253            self.disk_name,254            self.cloud_service_name255        )256        self.service.add_data_disk.assert_called_once_with(257            self.cloud_service_name,258            self.cloud_service_name,259            self.cloud_service_name,260            0,261            disk_name=self.disk_name262        )263    def test_attach_by_blob_name(self):264        # should send disk_name and source_media_link in order265        # to create a new data-disk266        self.service.add_data_disk.return_value = self.my_request267        self.service.list_disks.return_value = []268        result = self.data_disk.attach(269            None,270            self.cloud_service_name,271            lun=0,272            blob_name=self.disk_filename273        )274        self.service.add_data_disk.assert_called_once_with(275            self.cloud_service_name,276            self.cloud_service_name,277            self.cloud_service_name,278            0,279            disk_name=self.disk_name,280            source_media_link=self.disk_url281        )282    def test_find_data_disk_name_for_blob_name(self):283        mock_disks = [284            self.__create_mock_disk()285        ]286        result = self.data_disk._DataDisk__find_existing_disk_name_for_blob_name(287            self.disk_filename,288            mock_disks289        )290        assert result == self.disk_name291    def test_attach_by_blob_name_with_existing_data_disk(self):292        # should find a disk_name associated with blob_name and use it293        self.service.add_data_disk.return_value = self.my_request294        mock_disks = [295            self.__create_mock_disk()296        ]297        self.service.list_disks.return_value = mock_disks298        result = self.data_disk.attach(299            None,300            self.cloud_service_name,301            lun=0,302            blob_name=self.disk_filename303        )304        self.service.add_data_disk.assert_called_once_with(305            self.cloud_service_name,306            self.cloud_service_name,307            self.cloud_service_name,308            0,309            disk_name=self.disk_name310        )311    def test_attach_by_disk_name_and_blob_name(self):312        # should create a new data-disk with supplied disk_name and313        # source_media_link set to blob_name url314        self.service.add_data_disk.return_value = self.my_request315        result = self.data_disk.attach(316            self.disk_name,317            self.cloud_service_name,318            lun=0,319            blob_name=self.disk_filename320        )321        self.service.add_data_disk.assert_called_once_with(322            self.cloud_service_name,323            self.cloud_service_name,324            self.cloud_service_name,325            0,326            disk_name=self.disk_name,327            source_media_link=self.disk_url328        )329    def test_disk_name_or_blob_name_is_required(self):330        with raises(AzureDataDiskAttachError):331            self.data_disk.attach(332                None, self.cloud_service_name, lun=0, blob_name=None333            )334    def test_detach(self):335        self.service.delete_data_disk.return_value = self.my_request336        result = self.data_disk.detach(337            self.lun, self.cloud_service_name, self.instance_name338        )339        self.service.delete_data_disk.assert_called_once_with(340            self.cloud_service_name,341            self.cloud_service_name,342            self.instance_name,343            self.lun,344            delete_vhd=False345        )346        assert result == self.my_request.request_id347    def test_detach_no_instance_name(self):348        self.service.delete_data_disk.return_value = self.my_request349        result = self.data_disk.detach(350            self.lun, self.cloud_service_name351        )352        self.service.delete_data_disk.assert_called_once_with(353            self.cloud_service_name,354            self.cloud_service_name,355            self.cloud_service_name,356            self.lun,357            delete_vhd=False358        )359        assert result == self.my_request.request_id360    def test_delete(self):361        self.service.delete_disk.return_value = self.my_request362        result = self.data_disk.delete(self.disk_name)363        self.service.delete_disk.assert_called_once_with(364            self.disk_name, delete_vhd=True365        )366    def __create_mock_data_disk(self, lun):367        data_disk_type = namedtuple(368            'data_disk_type', [369                'host_caching', 'disk_label', 'disk_name', 'lun',370                'logical_disk_size_in_gb', 'media_link', 'source_media_link'371            ]372        )373        return data_disk_type(374            host_caching=self.host_caching,375            disk_label=self.disk_label,376            disk_name=self.disk_name,377            lun=lun,378            logical_disk_size_in_gb=self.disk_size,379            media_link=self.disk_url,380            source_media_link=''381        )382    def __create_mock_disk(self):383        disk_type = namedtuple(384            'disk_type', [385                'affinity_group', 'attached_to', 'has_operating_system',386                'is_corrupted', 'location', 'logical_disk_size_in_gb',387                'label', 'media_link', 'name', 'os', 'source_image_name'388            ]389        )390        attach_info_type = namedtuple(391            'attach_info_type', [392                'hosted_service_name', 'deployment_name', 'role_name'393            ]394        )395        return disk_type(396            affinity_group='',397            attached_to=attach_info_type(398                hosted_service_name='',399                deployment_name='',400                role_name=''401            ),402            has_operating_system=False,403            is_corrupted=False,404            location='',405            logical_disk_size_in_gb=self.disk_size,406            label=self.disk_label,407            media_link=self.disk_url,408            name=self.disk_name,409            os='Linux',410            source_image_name=''411        )412    def __create_expected_data_disk_output(self, lun):413        return [414            {415                'size': '%d GB' % self.disk_size,416                'label': self.disk_label,417                'disk-url': self.disk_url,418                'source-image-url': '',419                'lun': lun,420                'host-caching': 'ReadWrite'421            }422        ]423    def __create_expected_disk_output(self):424        return {425            'affinity_group': '',426            'attached_to': {427                'hosted_service_name': '',428                'deployment_name': '',429                'role_name': ''430            },431            'has_operating_system': False,432            'is_corrupted': False,433            'location': '',434            'logical_disk_size_in_gb': '%d GB' % self.disk_size,435            'label': self.disk_label,436            'media_link': self.disk_url,437            'name': self.disk_name,438            'os': 'Linux',439            'source_image_name': ''440        }441    def __create_expected_disk_list_output(self):442        return [443            {444                'is_attached': True,445                'name': self.disk_name446            }...service_code.py
Source:service_code.py  
1service_code_name = {2    1000: "wakeup",3    1001: "shutdown",4    1002: "restart",5    1003: "delete",6    1004: "update_name",7    1005: "update_config",8    1006: "enter_maintenance_mode",9    1007: "clear_all_desktop",10    1008: "download_desktop",11    1009: "cancel_download_desktop",12    1010: "add_data_disk",13    1011: "order",14    1012: "update_ip",15    1013: "update_difference_disk",16    1014: "pxe_start",17    1015: "send_torrent",18    1016: "upload_desktop",19    1017: "send_desktop",20    1018: "push_task_result",  # ä»»å¡ç»æ {"torrent_id": "xxxxx", "msg": "Success","result": 0 }21                               # ä»»å¡ç»æ {"torrent_id": "xxxxx", "msg": "Fail","result": 1 }22    1019: "enter_maintain", # è¿å
¥ç»´æ¤æ¨¡å¼23    1020: "upload_disk",24    1021: "bt_task_complete",  # btä»»å¡å®æéç¥25    1022: "watermark_switch",   # æ°´å°å¼å
³éç¥26    1023: "ssh_upload_desktop", 27    1024: "update_desktop_group_info",28    1025: "cancel_send_desktop",29    1026: "cancel_p_to_v",      # åæ¶p2v30    1030: "terminal_upgrade",31    9000: "heartbeat",32    9001: "terminal_login",33    9002: "terminal_logout",34    9003: "get_date_time",35    9004: "get_config_version_id",36    9005: "get_config_info",37    9006: "update_config_info",38    9007: "get_desktop_group_list",39    9008: "verify_admin_user",40    9009: "order_confirm",41    9010: "order_query",42    9011: "sync_desktop_info",43    9012: "get_desktop_info",44    9013: "desktop_upload",45    9014: "torrent_upload",46    9015: "task_result",    # 䏿¥ä»»å¡ç»æ47    9016: "operate_auth",48    9017: "init_desktop",   # åå§åæ¡é¢49    9018: "check_upload_state", # æ ¡éª50    9019: "bt_tracker",         # bt trackerå°å51    9020: "p_to_v_start",          # p2v52    9021: "p_to_v_state",53    # 9030: "win_login"           # windowç»ç«¯ç»å½54    9022: "diff_disk_upload",       # å·®åçä¸ä¼ ç£ç55    9023: "diff_disk_download",56    9024: "desktop_login",          # 䏿¥æ¡é¢ä¸è½½57    9025: "bt_task_state",         # btä»»å¡ç¶æ58    9026: "put_desktop_group_list",59    9027: "bt_upload_state",60    9028: "query_teach_pc",     #æ¥è¯¢æå¸æº61    9029: "update_performance",    # 䏿¥ç»ç«¯æ§è½ä¿¡æ¯62    9999: "upload_log"             # ç»ç«¯æ¥å¿ä¸ä¼ 63}...volume.py
Source:volume.py  
...17        return self.azure.add_disk(False, name, kwargs['media_link'], name, "Linux")18    def detach_volume(self, instance=None, volume=None):19        self.azure.delete_data_disk(self.ctx.service_name, instance, volume, 0, delete_vhd=False)20    def attach_volume(self, instance=None, volume=None, device=None, **kwargs):21        self.azure.add_data_disk(self, self.ctx.service_name, instance, volume, 0,22                                 host_caching=None, media_link=None, disk_label=volume,23                                 disk_name=volume, logical_disk_size_in_gb=kwargs['size'],24                                 source_media_link=None)25    def list_volume(self):26        return self.azure.list_disks()27    def remove_volume(self, volume):28        pass29    def query_volume_details(self, volume):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
