Best Python code snippet using lisa_python
features.py
Source:features.py  
...1060        add_disk_names = [managed_disk.name for managed_disk in managed_disks]1061        self.disks += add_disk_names1062        self._node.capability.disk.data_disk_count += len(managed_disks)1063        return add_disk_names1064    def remove_data_disk(self, names: Optional[List[str]] = None) -> None:1065        assert self._node.capability.disk1066        platform: AzurePlatform = self._platform  # type: ignore1067        compute_client = get_compute_client(platform)1068        # if names is None, remove all data disks1069        if names is None:1070            names = self.disks1071        # detach managed disk1072        azure_platform: AzurePlatform = self._platform  # type: ignore1073        vm = get_vm(azure_platform, self._node)1074        # remove managed disk1075        data_disks = vm.storage_profile.data_disks1076        data_disks[:] = [disk for disk in data_disks if disk.name not in names]1077        # update vm1078        async_vm_update = compute_client.virtual_machines.begin_create_or_update(...storage.py
Source:storage.py  
...350        disk = node.features[Disk]351        # cleanup any disks added as part of the test352        # If the cleanup operation fails, mark node to be recycled353        try:354            disk.remove_data_disk()355        except Exception:356            raise BadEnvironmentStateException357    def _hot_add_disk_serial(358        self, log: Logger, node: Node, type: DiskType, size: int359    ) -> None:360        disk = node.features[Disk]361        lsblk = node.tools[Lsblk]362        # get max data disk count for the node363        assert node.capability.disk364        assert isinstance(node.capability.disk.max_data_disk_count, int)365        max_data_disk_count = node.capability.disk.max_data_disk_count366        log.debug(f"max_data_disk_count: {max_data_disk_count}")367        # get the number of data disks already added to the vm368        assert isinstance(node.capability.disk.data_disk_count, int)369        current_data_disk_count = node.capability.disk.data_disk_count370        log.debug(f"current_data_disk_count: {current_data_disk_count}")371        # disks to be added to the vm372        disks_to_add = max_data_disk_count - current_data_disk_count373        # get partition info before adding data disk374        partitions_before_adding_disk = lsblk.get_disks(force_run=True)375        for _ in range(disks_to_add):376            # add data disk377            log.debug("Adding 1 managed disk")378            disks_added = disk.add_data_disk(1, type, size)379            # verify that partition count is increased by 1380            # and the size of partition is correct381            partitons_after_adding_disk = lsblk.get_disks(force_run=True)382            added_partitions = [383                item384                for item in partitons_after_adding_disk385                if item not in partitions_before_adding_disk386            ]387            log.debug(f"added_partitions: {added_partitions}")388            assert_that(added_partitions, "Data disk should be added").is_length(1)389            assert_that(390                added_partitions[0].size_in_gb,391                f"data disk { added_partitions[0].name} size should be equal to "392                f"{size} GB",393            ).is_equal_to(size)394            # remove data disk395            log.debug(f"Removing managed disk: {disks_added}")396            disk.remove_data_disk(disks_added)397            # verify that partition count is decreased by 1398            partition_after_removing_disk = lsblk.get_disks(force_run=True)399            added_partitions = [400                item401                for item in partitions_before_adding_disk402                if item not in partition_after_removing_disk403            ]404            assert_that(added_partitions, "data disks should not be present").is_length(405                0406            )407    def _hot_add_disk_parallel(408        self, log: Logger, node: Node, type: DiskType, size: int409    ) -> None:410        disk = node.features[Disk]411        lsblk = node.tools[Lsblk]412        # get max data disk count for the node413        assert node.capability.disk414        assert isinstance(node.capability.disk.max_data_disk_count, int)415        max_data_disk_count = node.capability.disk.max_data_disk_count416        log.debug(f"max_data_disk_count: {max_data_disk_count}")417        # get the number of data disks already added to the vm418        assert isinstance(node.capability.disk.data_disk_count, int)419        current_data_disk_count = node.capability.disk.data_disk_count420        log.debug(f"current_data_disk_count: {current_data_disk_count}")421        # disks to be added to the vm422        disks_to_add = max_data_disk_count - current_data_disk_count423        # get partition info before adding data disks424        partitions_before_adding_disks = lsblk.get_disks(force_run=True)425        # add data disks426        log.debug(f"Adding {disks_to_add} managed disks")427        disks_added = disk.add_data_disk(disks_to_add, type, size)428        # verify that partition count is increased by disks_to_add429        # and the size of partition is correct430        partitons_after_adding_disks = lsblk.get_disks(force_run=True)431        added_partitions = [432            item433            for item in partitons_after_adding_disks434            if item not in partitions_before_adding_disks435        ]436        log.debug(f"added_partitions: {added_partitions}")437        assert_that(438            added_partitions, f"{disks_to_add} disks should be added"439        ).is_length(disks_to_add)440        for partition in added_partitions:441            assert_that(442                partition.size_in_gb,443                f"data disk {partition.name} size should be equal to {size} GB",444            ).is_equal_to(size)445        # remove data disks446        log.debug(f"Removing managed disks: {disks_added}")447        disk.remove_data_disk(disks_added)448        # verify that partition count is decreased by disks_to_add449        partition_after_removing_disk = lsblk.get_disks(force_run=True)450        added_partitions = [451            item452            for item in partitions_before_adding_disks453            if item not in partition_after_removing_disk454        ]455        assert_that(added_partitions, "data disks should not be present").is_length(0)456    def _get_managed_disk_id(self, identifier: str) -> str:457        return f"disk_{identifier}"458    def _get_mtab_mount_point_regex(self, mount_point: str) -> Pattern[str]:459        regex = re.compile(rf".*\s+\/dev\/(?P<partition>\D+).*\s+{mount_point}.*")...disks.py
Source:disks.py  
...40        type: schema.DiskType = schema.DiskType.StandardHDDLRS,41        size_in_gb: int = 20,42    ) -> List[str]:43        raise NotImplementedError44    def remove_data_disk(self, names: Optional[List[str]] = None) -> None:45        raise NotImplementedError46    def _initialize(self, *args: Any, **kwargs: Any) -> None:47        self.disks: List[str] = []48DiskEphemeral = partial(schema.DiskOptionSettings, disk_type=schema.DiskType.Ephemeral)49DiskPremiumSSDLRS = partial(50    schema.DiskOptionSettings, disk_type=schema.DiskType.PremiumSSDLRS51)52DiskStandardHDDLRS = partial(53    schema.DiskOptionSettings, disk_type=schema.DiskType.StandardHDDLRS54)55DiskStandardSSDLRS = partial(56    schema.DiskOptionSettings, disk_type=schema.DiskType.StandardSSDLRS...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
