Best Python code snippet using lisa_python
node.py
Source:node.py  
...92        return self._shell is not None and self._shell.is_connected93    @property94    def local_log_path(self) -> Path:95        if not self._local_log_path:96            part_name = self._get_node_part_path()97            log_path = constants.RUN_LOCAL_LOG_PATH / self._base_part_path / part_name98            self._local_log_path = log_path99            self._local_log_path.mkdir(parents=True, exist_ok=True)100        return self._local_log_path101    @property102    def local_working_path(self) -> Path:103        if not self._local_working_path:104            part_name = self._get_node_part_path()105            self._local_working_path = (106                constants.RUN_LOCAL_WORKING_PATH / self._base_part_path / part_name107            )108            self._local_working_path.mkdir(parents=True, exist_ok=True)109        return self._local_working_path110    @property111    def working_path(self) -> PurePath:112        """113        The working path may be a remote path on remote node. It uses to put executable.114        """115        if not self._working_path:116            self._working_path = self.get_working_path()117            self.shell.mkdir(self._working_path, parents=True, exist_ok=True)118            self.log.debug(f"working path is: '{self._working_path}'")119        return self._working_path120    @property121    def nics(self) -> Nics:122        if self._nics is None:123            self._nics = Nics(self)124            self._nics.initialize()125        return self._nics126    @property127    def is_dirty(self) -> bool:128        return self._is_dirty129    @classmethod130    def create(131        cls,132        index: int,133        runbook: schema.Node,134        logger_name: str = "node",135        base_part_path: Optional[Path] = None,136        parent_logger: Optional[Logger] = None,137    ) -> Node:138        if not cls._factory:139            cls._factory = subclasses.Factory[Node](Node)140        node = cls._factory.create_by_runbook(141            index=index,142            runbook=runbook,143            logger_name=logger_name,144            base_part_path=base_part_path,145            parent_logger=parent_logger,146        )147        node.log.debug(148            f"created, type: '{node.__class__.__name__}', default: {runbook.is_default}"149        )150        return node151    def reboot(self, time_out: int = 300) -> None:152        self.tools[Reboot].reboot(time_out)153    def execute(154        self,155        cmd: str,156        shell: bool = False,157        sudo: bool = False,158        no_error_log: bool = False,159        no_info_log: bool = True,160        no_debug_log: bool = False,161        cwd: Optional[PurePath] = None,162        timeout: int = 600,163        update_envs: Optional[Dict[str, str]] = None,164        expected_exit_code: Optional[int] = None,165        expected_exit_code_failure_message: str = "",166    ) -> ExecutableResult:167        process = self.execute_async(168            cmd,169            shell=shell,170            sudo=sudo,171            no_error_log=no_error_log,172            no_info_log=no_info_log,173            no_debug_log=no_debug_log,174            cwd=cwd,175            update_envs=update_envs,176        )177        return process.wait_result(178            timeout=timeout,179            expected_exit_code=expected_exit_code,180            expected_exit_code_failure_message=expected_exit_code_failure_message,181        )182    def execute_async(183        self,184        cmd: str,185        shell: bool = False,186        sudo: bool = False,187        no_error_log: bool = False,188        no_info_log: bool = True,189        no_debug_log: bool = False,190        cwd: Optional[PurePath] = None,191        update_envs: Optional[Dict[str, str]] = None,192    ) -> Process:193        self.initialize()194        if sudo and not self.support_sudo:195            raise LisaException(196                f"node doesn't support [command] or [sudo], cannot execute: {cmd}"197            )198        return self._execute(199            cmd,200            shell=shell,201            sudo=sudo,202            no_error_log=no_error_log,203            no_info_log=no_info_log,204            no_debug_log=no_debug_log,205            cwd=cwd,206            update_envs=update_envs,207        )208    def cleanup(self) -> None:209        self.log.debug("cleaning up...")210        if hasattr(self, "_log_handler") and self._log_handler:211            remove_handler(self._log_handler, self.log)212            self._log_handler.close()213    def close(self) -> None:214        self.log.debug("closing node connection...")215        if self._shell:216            self._shell.close()217        if self._nics:218            self._nics = None219    def get_pure_path(self, path: str) -> PurePath:220        # spurplus doesn't support PurePath, so it needs to resolve by the221        # node's os here.222        if self.is_posix:223            return PurePosixPath(path)224        else:225            return PureWindowsPath(path)226    def get_case_working_path(self, case_unique_name: str) -> PurePath:227        working_path = self.working_path / "tests" / case_unique_name228        self.shell.mkdir(path=working_path, exist_ok=True)229        return working_path230    def capture_system_information(self, name: str = "") -> None:231        """232        download key files or outputs of commands to a subfolder of the node.233        """234        saved_path = self.local_log_path / f"{get_datetime_path()}_captured_{name}"235        saved_path.mkdir(parents=True, exist_ok=True)236        self.log.debug(f"capturing system information to {saved_path}.")237        self.os.capture_system_information(saved_path)238    def find_partition_with_freespace(self, size_in_gb: int) -> str:239        if self.os.is_windows:240            raise NotImplementedError(241                (242                    "find_partition_with_freespace was called on a Windows node, "243                    "this function is not implemented for Windows"244                )245            )246        mount = self.tools[Mount]247        lsblk = self.tools[Lsblk]248        disks = lsblk.get_disks()249        # find a disk/partition with required space250        for disk in disks:251            # we do not write on the os disk252            if disk.is_os_disk:253                continue254            # if the disk contains partition, check the partitions255            if len(disk.partitions) > 0:256                for partition in disk.partitions:257                    if not partition.size_in_gb >= size_in_gb:258                        continue259                    # mount partition if it is not mounted260                    partition_name = partition.name261                    if not partition.is_mounted:262                        mountpoint = f"{PATH_REMOTE_ROOT}/{partition_name}"263                        mount.mount(partition.device_name, mountpoint, format=True)264                    else:265                        mountpoint = partition.mountpoint266                    # some distro use absolute path wrt to the root, so we need to267                    # requery the mount point after mounting268                    return lsblk.find_mountpoint_by_volume_name(269                        partition_name, force_run=True270                    )271            else:272                if not disk.size_in_gb >= size_in_gb:273                    continue274                # mount the disk if it isn't mounted275                disk_name = disk.name276                if not disk.is_mounted:277                    mountpoint = f"{PATH_REMOTE_ROOT}/{disk_name}"278                    self.tools[Mkfs].format_disk(disk.device_name, FileSystem.ext4)279                    mount.mount(disk.device_name, mountpoint, format=True)280                else:281                    mountpoint = disk.mountpoint282                # some distro use absolute path wrt to the root, so we need to requery283                # the mount point after mounting284                return lsblk.find_mountpoint_by_volume_name(disk_name, force_run=True)285        raise LisaException(286            f"No partition with Required disk space of {size_in_gb}GB found"287        )288    def get_working_path(self) -> PurePath:289        """290        It returns the path with expanded environment variables, but not create291        the folder. So, it can be used to locate a relative path from it, and292        not create extra folders.293        """294        raise NotImplementedError()295    def mark_dirty(self) -> None:296        self.log.debug("mark node to dirty")297        self._is_dirty = True298    def test_connection(self) -> bool:299        try:300            self.execute("date")301            return True302        except Exception as identifier:303            self.log.debug(f"cannot access VM {self.name}, error is {identifier}")304        return False305    def _initialize(self, *args: Any, **kwargs: Any) -> None:306        if not hasattr(self, "_log_handler"):307            self._log_handler = create_file_handler(308                self.local_log_path / "node.log", self.log309            )310        self.log.info(f"initializing node '{self.name}' {self}")311        self.shell.initialize()312        self.os: OperatingSystem = OperatingSystem.create(self)313        self.capture_system_information("started")314    def _execute(315        self,316        cmd: str,317        shell: bool = False,318        sudo: bool = False,319        no_error_log: bool = False,320        no_info_log: bool = False,321        no_debug_log: bool = False,322        cwd: Optional[PurePath] = None,323        update_envs: Optional[Dict[str, str]] = None,324    ) -> Process:325        cmd_id = str(randint(0, 10000))326        process = Process(cmd_id, self.shell, parent_logger=self.log)327        process.start(328            cmd,329            shell=shell,330            sudo=sudo,331            no_error_log=no_error_log,332            no_info_log=no_info_log,333            no_debug_log=no_debug_log,334            cwd=cwd,335            update_envs=update_envs,336        )337        return process338    def _get_node_part_path(self) -> PurePath:339        path_name = self.name340        if not path_name:341            if self.index:342                index = self.index343            else:344                index = randint(0, 10000)345            path_name = f"node-{index}"346        return PurePath(path_name)347class RemoteNode(Node):348    def __repr__(self) -> str:349        # it's used to handle UT failure.350        if hasattr(self, "_connection_info"):351            return str(self._connection_info)352        return ""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
