Best Python code snippet using lisa_python
storage.py
Source:storage.py  
...285        """,286        timeout=TIME_OUT,287        requirement=simple_requirement(disk=DiskStandardHDDLRS()),288    )289    def hot_add_disk_parallel(self, log: Logger, node: Node) -> None:290        self._hot_add_disk_parallel(291            log, node, DiskType.StandardHDDLRS, self.DEFAULT_DISK_SIZE_IN_GB292        )293    @TestCaseMetadata(294        description="""295        This test case will verify that the standard ssd data disks disks can296        be added serially while the vm is running. The test steps are same as297        `hot_add_disk_parallel`.298        """,299        timeout=TIME_OUT,300        requirement=simple_requirement(disk=DiskStandardSSDLRS()),301    )302    def hot_add_disk_parallel_standard_ssd(self, log: Logger, node: Node) -> None:303        self._hot_add_disk_parallel(304            log, node, DiskType.StandardSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB305        )306    @TestCaseMetadata(307        description="""308        This test case will verify that the premium ssd data disks disks can309        be added serially while the vm is running. The test steps are same as310        `hot_add_disk_parallel`.311        """,312        timeout=TIME_OUT,313        requirement=simple_requirement(disk=DiskPremiumSSDLRS()),314    )315    def hot_add_disk_parallel_premium_ssd(self, log: Logger, node: Node) -> None:316        self._hot_add_disk_parallel(317            log, node, DiskType.PremiumSSDLRS, self.DEFAULT_DISK_SIZE_IN_GB318        )319    @TestCaseMetadata(320        description="""321        This test case will verify mount azure nfs on guest successfully.322        """,323        timeout=TIME_OUT,324        requirement=simple_requirement(supported_features=[Nfs]),325        priority=2,326    )327    def verify_azure_file_share_nfs(self, log: Logger, node: Node) -> None:328        nfs = node.features[Nfs]329        mount_dir = "/mount/azure_share"330        nfs.create_share()331        storage_account_name = nfs.storage_account_name332        mount_nfs = f"{storage_account_name}.file.core.windows.net"333        server_shared_dir = f"{nfs.storage_account_name}/{nfs.file_share_name}"334        try:335            node.tools[NFSClient].setup(336                mount_nfs,337                server_shared_dir,338                mount_dir,339            )340        except Exception as identifier:341            raise LisaException(342                f"fail to mount {server_shared_dir} into {mount_dir}"343                f"{identifier.__class__.__name__}: {identifier}."344            )345        finally:346            nfs.delete_share()347            node.tools[NFSClient].stop(mount_dir)348    def after_case(self, log: Logger, **kwargs: Any) -> None:349        node: Node = kwargs["node"]350        disk = node.features[Disk]351        # cleanup any disks added as part of the test352        # If the cleanup operation fails, mark node to be recycled353        try:354            disk.remove_data_disk()355        except Exception:356            raise BadEnvironmentStateException357    def _hot_add_disk_serial(358        self, log: Logger, node: Node, type: DiskType, size: int359    ) -> None:360        disk = node.features[Disk]361        lsblk = node.tools[Lsblk]362        # get max data disk count for the node363        assert node.capability.disk364        assert isinstance(node.capability.disk.max_data_disk_count, int)365        max_data_disk_count = node.capability.disk.max_data_disk_count366        log.debug(f"max_data_disk_count: {max_data_disk_count}")367        # get the number of data disks already added to the vm368        assert isinstance(node.capability.disk.data_disk_count, int)369        current_data_disk_count = node.capability.disk.data_disk_count370        log.debug(f"current_data_disk_count: {current_data_disk_count}")371        # disks to be added to the vm372        disks_to_add = max_data_disk_count - current_data_disk_count373        # get partition info before adding data disk374        partitions_before_adding_disk = lsblk.get_disks(force_run=True)375        for _ in range(disks_to_add):376            # add data disk377            log.debug("Adding 1 managed disk")378            disks_added = disk.add_data_disk(1, type, size)379            # verify that partition count is increased by 1380            # and the size of partition is correct381            partitons_after_adding_disk = lsblk.get_disks(force_run=True)382            added_partitions = [383                item384                for item in partitons_after_adding_disk385                if item not in partitions_before_adding_disk386            ]387            log.debug(f"added_partitions: {added_partitions}")388            assert_that(added_partitions, "Data disk should be added").is_length(1)389            assert_that(390                added_partitions[0].size_in_gb,391                f"data disk { added_partitions[0].name} size should be equal to "392                f"{size} GB",393            ).is_equal_to(size)394            # remove data disk395            log.debug(f"Removing managed disk: {disks_added}")396            disk.remove_data_disk(disks_added)397            # verify that partition count is decreased by 1398            partition_after_removing_disk = lsblk.get_disks(force_run=True)399            added_partitions = [400                item401                for item in partitions_before_adding_disk402                if item not in partition_after_removing_disk403            ]404            assert_that(added_partitions, "data disks should not be present").is_length(405                0406            )407    def _hot_add_disk_parallel(408        self, log: Logger, node: Node, type: DiskType, size: int409    ) -> None:410        disk = node.features[Disk]411        lsblk = node.tools[Lsblk]412        # get max data disk count for the node413        assert node.capability.disk414        assert isinstance(node.capability.disk.max_data_disk_count, int)415        max_data_disk_count = node.capability.disk.max_data_disk_count416        log.debug(f"max_data_disk_count: {max_data_disk_count}")417        # get the number of data disks already added to the vm418        assert isinstance(node.capability.disk.data_disk_count, int)419        current_data_disk_count = node.capability.disk.data_disk_count420        log.debug(f"current_data_disk_count: {current_data_disk_count}")421        # disks to be added to the vm...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
