Best Python code snippet using lisa_python
storage.py
Source:storage.py  
...85            supported_platform_type=[AZURE],86        ),87    )88    def verify_resource_disk_mtab_entry(self, log: Logger, node: RemoteNode) -> None:89        resource_disk_mount_point = get_resource_disk_mount_point(log, node)90        # os disk(root disk) is the entry with mount point `/' in the output91        # of `mount` command92        os_disk = (93            node.features[Disk]94            .get_partition_with_mount_point(self.os_disk_mount_point)95            .disk96        )97        mtab = node.tools[Cat].run("/etc/mtab").stdout98        resource_disk_from_mtab = get_matched_str(99            mtab, self._get_mtab_mount_point_regex(resource_disk_mount_point)100        )101        assert (102            resource_disk_from_mtab103        ), f"resource disk mountpoint not found {resource_disk_mount_point}"104        assert_that(105            resource_disk_from_mtab, "Resource disk should not be equal to os disk"106        ).is_not_equal_to(os_disk)107    @TestCaseMetadata(108        description="""109        This test will check that the swap is correctly configured on the VM.110        Steps:111        1. Check if swap file/partition is configured by checking the output of112        `swapon -s` and `lsblk`.113        2. Check swap status in `waagent.conf`.114        3. Verify that truth value in step 1 and step 2 match.115        """,116        priority=1,117        requirement=simple_requirement(supported_platform_type=[AZURE]),118    )119    def verify_swap(self, node: RemoteNode) -> None:120        is_swap_enabled_wa_agent = node.tools[Waagent].is_swap_enabled()121        is_swap_enabled_distro = node.tools[Swap].is_swap_enabled()122        assert_that(123            is_swap_enabled_distro,124            "swap configuration from waagent.conf and distro should match",125        ).is_equal_to(is_swap_enabled_wa_agent)126    @TestCaseMetadata(127        description="""128        This test will check that the file IO operations are working correctly129        Steps:130        1. Get the mount point for the resource disk. If `/var/log/cloud-init.log`131        file is present, mount location is `/mnt`, otherwise it is obtained from132        `ResourceDisk.MountPoint` entry in `waagent.conf` configuration file.133        2. Verify that resource disk is mounted from the output of `mount` command.134        3. Write a text file to the resource disk.135        4. Read the text file and verify that content is same.136        """,137        priority=1,138        requirement=simple_requirement(139            disk=AzureDiskOptionSettings(has_resource_disk=True),140            supported_platform_type=[AZURE],141        ),142    )143    def verify_resource_disk_io(self, log: Logger, node: RemoteNode) -> None:144        resource_disk_mount_point = get_resource_disk_mount_point(log, node)145        # verify that resource disk is mounted146        # function returns successfully if disk matching mount point is present147        node.features[Disk].get_partition_with_mount_point(resource_disk_mount_point)148        file_path = f"{resource_disk_mount_point}/sample.txt"149        original_text = "Writing to resource disk!!!"150        # write content to the file151        node.tools[Echo].write_to_file(152            original_text, node.get_pure_path(file_path), sudo=True153        )154        # read content from the file155        read_text = node.tools[Cat].read(file_path, force_run=True, sudo=True)156        assert_that(157            read_text,158            "content read from file should be equal to content written to file",...ResourceDiskUtil.py
Source:ResourceDiskUtil.py  
...109					tokens =Utils.HandlerUtil.HandlerUtility.split(self.logger, entry)110					#Return the 3rd column of this line111					return tokens[2] if len(tokens) > 2 else None112		return None113	def get_resource_disk_mount_point(self,option=1): # pylint: disable=R0912,R0914114		try:115			"""116			if option = 0 then partition will be returned eg sdb1117			if option = 1 then mount point will be returned eg /mnt/resource118			"""119			device = self.device_for_ide_port()120			if device is None:121				self.logger.log('unable to detect disk topology',True,'Error')122			if device is not None:123				partition = "{0}{1}".format(device,"1")  #assuming only one resourde disk partition124			else:125				partition=""126			self.logger.log("Resource disk partition: {0} ".format(partition),True)127			if(option==0):...common.py
Source:common.py  
1from pathlib import PurePosixPath2from lisa import Logger, RemoteNode3from lisa.operating_system import CentOs4from lisa.sut_orchestrator.azure.tools import Waagent5def get_resource_disk_mount_point(6    log: Logger,7    node: RemoteNode,8) -> str:9    # by default, cloudinit will use /mnt as mount point of resource disk10    # in CentOS, cloud.cfg.d/91-azure_datasource.cfg customize mount point as11    # /mnt/resource12    if (13        node.shell.exists(PurePosixPath("/var/log/cloud-init.log"))14        and node.shell.exists(PurePosixPath("/var/lib/cloud/instance"))15        and not isinstance(node.os, CentOs)16    ):17        log.debug("Disk handled by cloud-init.")18        mount_point = "/mnt"19    else:20        log.debug("Disk handled by waagent.")21        mount_point = node.tools[Waagent].get_resource_disk_mount_point()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
