Best Python code snippet using lisa_python
operating_system.py
Source:operating_system.py  
...253        return cls.__name__254    @classmethod255    def name_pattern(cls) -> Pattern[str]:256        return re.compile(f"^{cls.type_name()}$")257    def replace_boot_kernel(self, kernel_version: str) -> None:258        raise NotImplementedError("update boot entry is not implemented")259    def get_kernel_information(self, force_run: bool = False) -> KernelInformation:260        uname = self._node.tools[Uname]261        uname_result = uname.get_linux_information(force_run=force_run)262        parts: List[str] = [str(x) for x in uname_result.kernel_version]263        kernel_information = KernelInformation(264            version=uname_result.kernel_version,265            raw_version=uname_result.kernel_version_raw,266            hardware_platform=uname_result.hardware_platform,267            operating_system=uname_result.operating_system,268            version_parts=parts,269        )270        return kernel_information271    def install_packages(272        self,273        packages: Union[274            str,275            Tool,276            Type[Tool],277            Sequence[Union[str, Tool, Type[Tool]]],278        ],279        signed: bool = False,280        timeout: int = 1200,281        extra_args: Optional[List[str]] = None,282    ) -> None:283        package_names = self._get_package_list(packages)284        self._install_packages(package_names, signed, timeout, extra_args)285    def package_exists(self, package: Union[str, Tool, Type[Tool]]) -> bool:286        """287        Query if a package/tool is installed on the node.288        Return Value - bool289        """290        package_name = self.__resolve_package_name(package)291        return self._package_exists(package_name)292    def is_package_in_repo(self, package: Union[str, Tool, Type[Tool]]) -> bool:293        """294        Query if a package/tool exists in the repo295        Return Value - bool296        """297        package_name = self.__resolve_package_name(package)298        if self._first_time_installation:299            self._initialize_package_installation()300            self._first_time_installation = False301        return self._is_package_in_repo(package_name)302    def update_packages(303        self,304        packages: Union[str, Tool, Type[Tool], Sequence[Union[str, Tool, Type[Tool]]]],305    ) -> None:306        package_names = self._get_package_list(packages)307        self._update_packages(package_names)308    def capture_system_information(self, saved_path: Path) -> None:309        # avoid to involve node, it's ok if some command doesn't exist.310        self._node.execute("uname -vrmo").save_stdout_to_file(saved_path / "uname.txt")311        self._node.execute(312            "uptime -s || last reboot -F | head -1 | awk '{print $9,$6,$7,$8}'",313            shell=True,314        ).save_stdout_to_file(saved_path / "uptime.txt")315        self._node.execute("modinfo hv_netvsc").save_stdout_to_file(316            saved_path / "modinfo-hv_netvsc.txt"317        )318        try:319            self._node.shell.copy_back(320                self._node.get_pure_path("/etc/os-release"),321                saved_path / "os-release.txt",322            )323        except FileNotFoundError:324            self._log.debug("File /etc/os-release doesn't exist.")325    def get_package_information(326        self, package_name: str, use_cached: bool = True327    ) -> VersionInfo:328        found = self._packages.get(package_name, None)329        if found and use_cached:330            return found331        return self._get_package_information(package_name)332    def get_repositories(self) -> List[RepositoryInfo]:333        raise NotImplementedError("get_repositories is not implemented")334    def _process_extra_package_args(self, extra_args: Optional[List[str]]) -> str:335        if extra_args:336            add_args = " ".join(extra_args)337        else:338            add_args = ""339        return add_args340    def _install_packages(341        self,342        packages: List[str],343        signed: bool = True,344        timeout: int = 600,345        extra_args: Optional[List[str]] = None,346    ) -> None:347        raise NotImplementedError()348    def _update_packages(self, packages: Optional[List[str]] = None) -> None:349        raise NotImplementedError()350    def _package_exists(self, package: str) -> bool:351        raise NotImplementedError()352    def _is_package_in_repo(self, package: str) -> bool:353        raise NotImplementedError()354    def _initialize_package_installation(self) -> None:355        # sub os can override it, but it's optional356        pass357    def _get_package_information(self, package_name: str) -> VersionInfo:358        raise NotImplementedError()359    def _get_version_info_from_named_regex_match(360        self, package_name: str, named_matches: Match[str]361    ) -> VersionInfo:362        essential_matches = ["major", "minor", "build"]363        # verify all essential keys are in our match dict364        assert_that(365            all(map(lambda x: x in named_matches.groupdict().keys(), essential_matches))366        ).described_as(367            "VersionInfo fetch could not identify all required parameters."368        ).is_true()369        # fill in 'patch' version if it's missing370        patch_match = named_matches.group("patch")371        if not patch_match:372            patch_match = "0"373        major_match = named_matches.group("major")374        minor_match = named_matches.group("minor")375        build_match = named_matches.group("build")376        major, minor, patch = map(377            int,378            [major_match, minor_match, patch_match],379        )380        build_match = named_matches.group("build")381        self._node.log.debug(382            f"Found {package_name} version "383            f"{major_match}.{minor_match}.{patch_match}-{build_match}"384        )385        return VersionInfo(major, minor, patch, build=build_match)386    def _cache_and_return_version_info(387        self, package_name: str, info: VersionInfo388    ) -> VersionInfo:389        self._packages[package_name] = info390        return info391    def _get_information(self) -> OsInformation:392        # try to set version info from /etc/os-release.393        cat = self._node.tools[Cat]394        cmd_result = cat.run(395            "/etc/os-release",396            expected_exit_code=0,397            expected_exit_code_failure_message="error on get os information",398        )399        vendor: str = ""400        release: str = ""401        codename: str = ""402        full_version: str = ""403        for row in cmd_result.stdout.splitlines():404            os_release_info = self._os_info_pattern.match(row)405            if not os_release_info:406                continue407            if os_release_info.group("name") == "NAME":408                vendor = os_release_info.group("value")409            elif os_release_info.group("name") == "VERSION_ID":410                release = os_release_info.group("value")411            elif os_release_info.group("name") == "VERSION":412                codename = get_matched_str(413                    os_release_info.group("value"),414                    self._distro_codename_pattern,415                )416            elif os_release_info.group("name") == "PRETTY_NAME":417                full_version = os_release_info.group("value")418        if vendor == "":419            raise LisaException("OS vendor information not found")420        if release == "":421            raise LisaException("OS release information not found")422        information = OsInformation(423            version=self._parse_version(release),424            vendor=vendor,425            release=release,426            codename=codename,427            full_version=full_version,428        )429        return information430    def _get_package_list(431        self,432        packages: Union[433            str,434            Tool,435            Type[Tool],436            Sequence[Union[str, Tool, Type[Tool]]],437        ],438    ) -> List[str]:439        package_names: List[str] = []440        if isinstance(packages, (str, Tool, type)):441            packages = [packages]442        package_names = [self.__resolve_package_name(item) for item in packages]443        if self._first_time_installation:444            self._first_time_installation = False445            self._initialize_package_installation()446        return package_names447    def _install_package_from_url(448        self,449        package_url: str,450        package_name: str = "",451        signed: bool = True,452        timeout: int = 600,453    ) -> None:454        """455        Used if the package to be installed needs to be downloaded from a url first.456        """457        # when package is URL, download the package first at the working path.458        wget_tool = self._node.tools[Wget]459        pkg = wget_tool.get(package_url, str(self._node.working_path), package_name)460        self.install_packages(pkg, signed, timeout)461    def wait_running_process(self, process_name: str, timeout: int = 5) -> None:462        # by default, wait for 5 minutes463        timeout = 60 * timeout464        timer = create_timer()465        while timeout > timer.elapsed(False):466            cmd_result = self._node.execute(f"pidof {process_name}")467            if cmd_result.exit_code == 1:468                # not found dpkg or zypper process, it's ok to exit.469                break470            time.sleep(1)471        if timeout < timer.elapsed():472            raise Exception(f"timeout to wait previous {process_name} process stop.")473    def __resolve_package_name(self, package: Union[str, Tool, Type[Tool]]) -> str:474        """475        A package can be a string or a tool or a type of tool.476        Resolve it to a standard package_name so it can be installed.477        """478        if isinstance(package, str):479            package_name = package480        elif isinstance(package, Tool):481            package_name = package.package_name482        else:483            assert isinstance(package, type), f"actual:{type(package)}"484            # Create a temp object, it doesn't query.485            # So they can be queried together.486            tool = package.create(self._node)487            package_name = tool.package_name488        return package_name489class BSD(Posix):490    ...491class MacOS(Posix):492    @classmethod493    def name_pattern(cls) -> Pattern[str]:494        return re.compile("^Darwin$")495class Linux(Posix):496    ...497class CoreOs(Linux):498    @classmethod499    def name_pattern(cls) -> Pattern[str]:500        return re.compile("^coreos|Flatcar|flatcar$")501@dataclass502# `apt-get update` repolist is of the form `<status>:<id> <uri> <name> <metadata>`503# Example:504# Get:5 http://azure.archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [1298 kB] # noqa: E501505class DebianRepositoryInfo(RepositoryInfo):506    # status for the repository. Examples: `Hit`, `Get`507    status: str508    # id for the repository. Examples : 1, 2509    id: str510    # uri for the repository. Example: `http://azure.archive.ubuntu.com/ubuntu`511    uri: str512    # metadata for the repository. Example: `amd64 Packages [1298 kB]`513    metadata: str514class Debian(Linux):515    # Get:5 http://azure.archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [1298 kB] # noqa: E501516    _debian_repository_info_pattern = re.compile(517        r"(?P<status>\S+):(?P<id>\d+)\s+(?P<uri>\S+)\s+(?P<name>\S+)"518        r"\s+(?P<metadata>.*)\s*"519    )520    """ Package: dpdk521        Version: 20.11.3-0ubuntu1~backport20.04-202111041420~ubuntu20.04.1522        Version: 1:2.25.1-1ubuntu3.2523    """524    _debian_package_information_regex = re.compile(525        r"Package: ([a-zA-Z0-9:_\-\.]+)\r?\n"  # package name group526        r"Version: ([a-zA-Z0-9:_\-\.~+]+)\r?\n"  # version number group527    )528    _debian_version_splitter_regex = re.compile(529        r"([0-9]+:)?"  # some examples have a mystery number followed by a ':' (git)530        r"(?P<major>[0-9]+)\."  # major531        r"(?P<minor>[0-9]+)\."  # minor532        r"(?P<patch>[0-9]+)"  # patch533        r"-(?P<build>[a-zA-Z0-9-_\.~+]+)"  # build534    )535    # apt-cache policy git536    # git:537    #   Installed: 1:2.17.1-1ubuntu0.9538    #   Candidate: 1:2.17.1-1ubuntu0.9539    #   Version table:540    #  *** 1:2.17.1-1ubuntu0.9 500541    #         500 http://azure.archive.ubuntu.com/ubuntu bionic-updates/main amd64 Packages # noqa: E501542    #         500 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages # noqa: E501543    #         100 /var/lib/dpkg/status544    #      1:2.17.0-1ubuntu1 500545    #         500 http://azure.archive.ubuntu.com/ubuntu bionic/main amd64 Packages546    # apt-cache policy mock547    # mock:548    #   Installed: (none)549    #   Candidate: 1.3.2-2550    #   Version table:551    #      1.3.2-2 500552    #         500 http://azure.archive.ubuntu.com/ubuntu bionic/universe amd64 Packages # noqa: E501553    # apt-cache policy test554    # N: Unable to locate package test555    _package_candidate_pattern = re.compile(556        r"([\w\W]*?)(Candidate: \(none\)|Unable to locate package.*)", re.M557    )558    # E: The repository 'http://azure.archive.ubuntu.com/ubuntu groovy Release' no longer has a Release file. # noqa: E501559    _repo_not_exist_pattern = re.compile("does not have a Release file", re.M)560    @classmethod561    def name_pattern(cls) -> Pattern[str]:562        return re.compile("^debian|Forcepoint|Kali$")563    def get_apt_error(self, stdout: str) -> List[str]:564        error_lines: List[str] = []565        for line in stdout.splitlines(keepends=False):566            if line.startswith("E: "):567                error_lines.append(line)568        return error_lines569    def _get_package_information(self, package_name: str) -> VersionInfo:570        # run update of package info571        apt_info = self._node.execute(572            f"apt show {package_name}",573            expected_exit_code=0,574            expected_exit_code_failure_message=(575                f"Could not find package information for package {package_name}"576            ),577        )578        match = self._debian_package_information_regex.search(apt_info.stdout)579        if not match:580            raise LisaException(581                "Package information parsing could not find regex match "582                f" for {package_name} using regex "583                f"{self._debian_package_information_regex.pattern}"584            )585        version_str = match.group(2)586        match = self._debian_version_splitter_regex.search(version_str)587        if not match:588            raise LisaException(589                f"Could not parse version info: {version_str} "590                "for package {package_name}"591            )592        self._node.log.debug(f"Attempting to parse version string: {version_str}")593        version_info = self._get_version_info_from_named_regex_match(594            package_name, match595        )596        return self._cache_and_return_version_info(package_name, version_info)597    def wait_running_package_process(self) -> None:598        is_first_time: bool = True599        # wait for 10 minutes600        timeout = 60 * 10601        timer = create_timer()602        while timeout > timer.elapsed(False):603            # fix the dpkg, in case it's broken.604            dpkg_result = self._node.execute(605                "dpkg --force-all --configure -a", sudo=True606            )607            pidof_result = self._node.execute("pidof dpkg dpkg-deb")608            if dpkg_result.exit_code == 0 and pidof_result.exit_code == 1:609                # not found dpkg process, it's ok to exit.610                break611            if is_first_time:612                is_first_time = False613                self._log.debug("found system dpkg process, waiting it...")614            time.sleep(1)615        if timeout < timer.elapsed():616            raise Exception("timeout to wait previous dpkg process stop.")617    def get_repositories(self) -> List[RepositoryInfo]:618        self._initialize_package_installation()619        repo_list_str = self._node.execute("apt-get update", sudo=True).stdout620        repositories: List[RepositoryInfo] = []621        for line in repo_list_str.splitlines():622            matched = self._debian_repository_info_pattern.search(line)623            if matched:624                repositories.append(625                    DebianRepositoryInfo(626                        name=matched.group("name"),627                        status=matched.group("status"),628                        id=matched.group("id"),629                        uri=matched.group("uri"),630                        metadata=matched.group("metadata"),631                    )632                )633        return repositories634    @retry(tries=10, delay=5)635    def add_repository(636        self,637        repo: str,638        no_gpgcheck: bool = True,639        repo_name: Optional[str] = None,640        keys_location: Optional[List[str]] = None,641    ) -> None:642        if keys_location:643            for key_location in keys_location:644                wget = self._node.tools[Wget]645                key_file_path = wget.get(646                    url=key_location,647                    file_path=str(self._node.working_path),648                    force_run=True,649                )650                self._node.execute(651                    cmd=f"apt-key add {key_file_path}",652                    sudo=True,653                    expected_exit_code=0,654                    expected_exit_code_failure_message="fail to add apt key",655                )656        # This command will trigger apt update too, so it doesn't need to update657        # repos again.658        self._node.execute(659            cmd=f'apt-add-repository -y "{repo}"',660            sudo=True,661            expected_exit_code=0,662            expected_exit_code_failure_message="fail to add repository",663        )664        # apt update will not be triggered on Debian during add repo665        if type(self._node.os) == Debian:666            self._node.execute("apt-get update", sudo=True)667    @retry(tries=10, delay=5)668    def _initialize_package_installation(self) -> None:669        # wait running system package process.670        self.wait_running_package_process()671        result = self._node.execute("apt-get update", sudo=True)672        if self._repo_not_exist_pattern.search(result.stdout):673            raise RepoNotExistException(self._node.os)674        result.assert_exit_code(message="\n".join(self.get_apt_error(result.stdout)))675    @retry(tries=10, delay=5)676    def _install_packages(677        self,678        packages: List[str],679        signed: bool = True,680        timeout: int = 600,681        extra_args: Optional[List[str]] = None,682    ) -> None:683        file_packages = []684        for index, package in enumerate(packages):685            if package.endswith(".deb"):686                # If the package is a .deb file then it would first need to be unpacked.687                # using dpkg command before installing it like other packages.688                file_packages.append(package)689                package = Path(package).stem690                packages[index] = package691        add_args = self._process_extra_package_args(extra_args)692        command = (693            f"DEBIAN_FRONTEND=noninteractive apt-get {add_args} "694            f"-y install {' '.join(packages)}"695        )696        if not signed:697            command += " --allow-unauthenticated"698        self.wait_running_package_process()699        if file_packages:700            self._node.execute(701                f"dpkg -i {' '.join(file_packages)}", sudo=True, timeout=timeout702            )703            # after install package, need update the repo704            self._initialize_package_installation()705        install_result = self._node.execute(706            command, shell=True, sudo=True, timeout=timeout707        )708        # get error lines.709        install_result.assert_exit_code(710            0,711            f"Failed to install {packages}, "712            f"please check the package name and repo are correct or not.\n"713            + "\n".join(self.get_apt_error(install_result.stdout))714            + "\n",715        )716    def _package_exists(self, package: str) -> bool:717        command = "dpkg --get-selections"718        result = self._node.execute(command, sudo=True, shell=True)719        package_pattern = re.compile(f"{package}([ \t]+)install")720        # Not installed package not shown in the output721        # Uninstall package will show as deinstall722        # vim                                             deinstall723        # vim-common                                      install724        if len(list(filter(package_pattern.match, result.stdout.splitlines()))) == 1:725            return True726        return False727    def _is_package_in_repo(self, package: str) -> bool:728        command = f"apt-cache policy {package}"729        result = self._node.execute(command, sudo=True, shell=True)730        matched = get_matched_str(result.stdout, self._package_candidate_pattern)731        if matched:732            return False733        return True734    def _get_information(self) -> OsInformation:735        # try to set version info from /etc/os-release.736        cat = self._node.tools[Cat]737        cmd_result = cat.run(738            "/etc/os-release",739            expected_exit_code=0,740            expected_exit_code_failure_message="error on get os information",741        )742        vendor: str = ""743        release: str = ""744        codename: str = ""745        full_version: str = ""746        for row in cmd_result.stdout.splitlines():747            os_release_info = super()._os_info_pattern.match(row)748            if not os_release_info:749                continue750            if os_release_info.group("name") == "NAME":751                vendor = os_release_info.group("value")752            elif os_release_info.group("name") == "VERSION":753                codename = get_matched_str(754                    os_release_info.group("value"),755                    super()._distro_codename_pattern,756                )757            elif os_release_info.group("name") == "PRETTY_NAME":758                full_version = os_release_info.group("value")759        # version return from /etc/os-release is integer in debian760        # so get the precise version from /etc/debian_version761        # e.g.762        # marketplace image - credativ debian 9-backports 9.20190313.0763        # version from /etc/os-release is 9764        # version from /etc/debian_version is 9.8765        # marketplace image - debian debian-10 10-backports-gen2 0.20210201.535766        # version from /etc/os-release is 10767        # version from /etc/debian_version is 10.7768        cmd_result = cat.run(769            "/etc/debian_version",770            expected_exit_code=0,771            expected_exit_code_failure_message="error on get debian version",772        )773        release = cmd_result.stdout774        if vendor == "":775            raise LisaException("OS vendor information not found")776        if release == "":777            raise LisaException("OS release information not found")778        information = OsInformation(779            version=self._parse_version(release),780            vendor=vendor,781            release=release,782            codename=codename,783            full_version=full_version,784        )785        return information786    def _update_packages(self, packages: Optional[List[str]] = None) -> None:787        command = (788            "DEBIAN_FRONTEND=noninteractive apt-get upgrade -y "789            '-o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" '790        )791        if packages:792            command += " ".join(packages)793        self._node.execute(command, sudo=True, timeout=3600)794class Ubuntu(Debian):795    __lsb_os_info_pattern = re.compile(796        r"^(?P<name>.*):(\s+)(?P<value>.*?)?$", re.MULTILINE797    )798    # gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb799    # prefix: gnulinux800    # postfix: advanced-3fdd2548-1430-450b-b16d-9191404598fb801    __menu_id_parts_pattern = re.compile(802        r"^(?P<prefix>.*?)-.*-(?P<postfix>.*?-.*?-.*?-.*?-.*?-.*?)?$"803    )804    @classmethod805    def name_pattern(cls) -> Pattern[str]:806        return re.compile("^Ubuntu|ubuntu$")807    def replace_boot_kernel(self, kernel_version: str) -> None:808        # set installed kernel to default809        #810        # get boot entry id811        # positive example:812        #         menuentry 'Ubuntu, with Linux 5.11.0-1011-azure' --class ubuntu813        # --class gnu-linux --class gnu --class os $menuentry_id_option814        # 'gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb' {815        #816        # negative example:817        #         menuentry 'Ubuntu, with Linux 5.11.0-1011-azure (recovery mode)'818        # --class ubuntu --class gnu-linux --class gnu --class os $menuentry_id_option819        # 'gnulinux-5.11.0-1011-azure-recovery-3fdd2548-1430-450b-b16d-9191404598fb' {820        cat = self._node.tools[Cat]821        menu_id_pattern = re.compile(822            r"^.*?menuentry '.*?(?:"823            + kernel_version824            + r"[^ ]*?)(?<! \(recovery mode\))' "825            r".*?\$menuentry_id_option .*?'(?P<menu_id>.*)'.*$",826            re.M,827        )828        result = cat.run("/boot/grub/grub.cfg", sudo=True)829        submenu_id = get_matched_str(result.stdout, menu_id_pattern)830        assert submenu_id, (831            f"cannot find sub menu id from grub config by pattern: "832            f"{menu_id_pattern.pattern}"833        )834        self._log.debug(f"matched submenu_id: {submenu_id}")835        # get first level menu id in boot menu836        # input is the sub menu id like:837        # gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb838        # output is,839        # gnulinux-advanced-3fdd2548-1430-450b-b16d-9191404598fb840        menu_id = self.__menu_id_parts_pattern.sub(841            r"\g<prefix>-\g<postfix>", submenu_id842        )843        assert menu_id, f"cannot composite menu id from {submenu_id}"844        # composite boot menu in grub845        menu_entry = f"{menu_id}>{submenu_id}"846        self._log.debug(f"composited menu_entry: {menu_entry}")847        self._replace_default_entry(menu_entry)848        self._node.execute("update-grub", sudo=True)849        try:850            # install tool packages851            self.install_packages(852                [853                    f"linux-tools-{kernel_version}-azure",854                    f"linux-cloud-tools-{kernel_version}-azure",855                    f"linux-headers-{kernel_version}-azure",856                ]857            )858        except Exception as identifier:859            self._log.debug(860                f"ignorable error on install packages after replaced kernel: "861                f"{identifier}"862            )863    def _get_information(self) -> OsInformation:864        cmd_result = self._node.execute(865            cmd="lsb_release -a",866            shell=True,867            no_error_log=True,868            expected_exit_code=0,869            expected_exit_code_failure_message="error on get os information",870        )871        assert cmd_result.stdout, "not found os information from 'lsb_release -a'"872        for row in cmd_result.stdout.splitlines():873            os_release_info = self.__lsb_os_info_pattern.match(row)874            if os_release_info:875                if os_release_info.group("name") == "Distributor ID":876                    vendor = os_release_info.group("value")877                elif os_release_info.group("name") == "Release":878                    release = os_release_info.group("value")879                elif os_release_info.group("name") == "Codename":880                    codename = os_release_info.group("value")881                elif os_release_info.group("name") == "Description":882                    full_version = os_release_info.group("value")883        if vendor == "":884            raise LisaException("OS vendor information not found")885        if release == "":886            raise LisaException("OS release information not found")887        information = OsInformation(888            version=self._parse_version(release),889            vendor=vendor,890            release=release,891            codename=codename,892            full_version=full_version,893        )894        return information895    def _replace_default_entry(self, entry: str) -> None:896        self._log.debug(f"set boot entry to: {entry}")897        sed = self._node.tools[Sed]898        sed.substitute(899            regexp="GRUB_DEFAULT=.*",900            replacement=f"GRUB_DEFAULT='{entry}'",901            file="/etc/default/grub",902            sudo=True,903        )904        # output to log for troubleshooting905        cat = self._node.tools[Cat]906        cat.run("/etc/default/grub")907class FreeBSD(BSD):908    @retry(tries=10, delay=5)909    def _install_packages(910        self,911        packages: List[str],912        signed: bool = True,913        timeout: int = 600,914        extra_args: Optional[List[str]] = None,915    ) -> None:916        if self._first_time_installation:917            self._initialize_package_installation()918        self._first_time_installation = False919        command = f"env ASSUME_ALWAYS_YES=yes pkg install -y {' '.join(packages)}"920        install_result = self._node.execute(921            command, shell=True, sudo=True, timeout=timeout922        )923        # get error lines.924        install_result.assert_exit_code(925            0,926            f"Failed to install {packages}, "927            f"please check the package name and repo are correct or not.\n",928        )929    @retry(tries=10, delay=5)930    def _initialize_package_installation(self) -> None:931        result = self._node.execute("pkg update", sudo=True)932        result.assert_exit_code(message="fail to run pkg update")933class OpenBSD(BSD):934    ...935@dataclass936# dnf repolist is of the form `<id> <name>`937# Example:938# microsoft-azure-rhel8-eus  Microsoft Azure RPMs for RHEL8 Extended Update Support939class RPMRepositoryInfo(RepositoryInfo):940    # id for the repository, for example: microsoft-azure-rhel8-eus941    id: str942# Linux distros that use RPM.943class RPMDistro(Linux):944    # microsoft-azure-rhel8-eus  Microsoft Azure RPMs for RHEL8 Extended Update Support945    _rpm_repository_info_pattern = re.compile(r"(?P<id>\S+)\s+(?P<name>\S.*\S)\s*")946    # ex: dpdk-20.11-3.el8.x86_64 or dpdk-18.11.8-1.el7_8.x86_64947    _rpm_version_splitter_regex = re.compile(948        r"(?P<package_name>[a-zA-Z0-9\-_]+)-"949        r"(?P<major>[0-9]+)\."950        r"(?P<minor>[0-9]+)\.?"951        r"(?P<patch>[0-9]+)?"952        r"(?P<build>-[a-zA-Z0-9-_\.]+)?"953    )954    def get_repositories(self) -> List[RepositoryInfo]:955        if self._first_time_installation:956            self._initialize_package_installation()957            self._first_time_installation = False958        repo_list_str = self._node.execute(959            f"{self._dnf_tool()} repolist", sudo=True960        ).stdout.splitlines()961        # skip to the first entry in the output962        for index, repo_str in enumerate(repo_list_str):963            if repo_str.startswith("repo id"):964                header_index = index965                break966        repo_list_str = repo_list_str[header_index + 1 :]967        repositories: List[RepositoryInfo] = []968        for line in repo_list_str:969            repo_info = self._rpm_repository_info_pattern.search(line)970            if repo_info:971                repositories.append(972                    RPMRepositoryInfo(973                        name=repo_info.group("name"), id=repo_info.group("id").lower()974                    )975                )976        return repositories977    def add_repository(978        self,979        repo: str,980        no_gpgcheck: bool = True,981        repo_name: Optional[str] = None,982        keys_location: Optional[List[str]] = None,983    ) -> None:984        self._node.tools[YumConfigManager].add_repository(repo, no_gpgcheck)985    def _get_package_information(self, package_name: str) -> VersionInfo:986        rpm_info = self._node.execute(987            f"rpm -q {package_name}",988            expected_exit_code=0,989            expected_exit_code_failure_message=(990                f"Could not find package information for package {package_name}"991            ),992        )993        # rpm package should be of format (package_name)-(version)994        matches = self._rpm_version_splitter_regex.search(rpm_info.stdout)995        if not matches:996            raise LisaException(997                f"Could not parse package version {rpm_info} for {package_name}"998            )999        self._node.log.debug(f"Attempting to parse version string: {rpm_info.stdout}")1000        version_info = self._get_version_info_from_named_regex_match(1001            package_name, matches1002        )1003        return self._cache_and_return_version_info(package_name, version_info)1004    def _install_packages(1005        self,1006        packages: List[str],1007        signed: bool = True,1008        timeout: int = 600,1009        extra_args: Optional[List[str]] = None,1010    ) -> None:1011        add_args = self._process_extra_package_args(extra_args)1012        command = f"{self._dnf_tool()} install {add_args} -y {' '.join(packages)}"1013        if not signed:1014            command += " --nogpgcheck"1015        self._node.execute(1016            command,1017            shell=True,1018            sudo=True,1019            timeout=timeout,1020            expected_exit_code=0,1021            expected_exit_code_failure_message=f"Failed to install {packages}.",1022        )1023        self._log.debug(f"{packages} is/are installed successfully.")1024    def _package_exists(self, package: str) -> bool:1025        command = f"{self._dnf_tool()} list installed {package}"1026        result = self._node.execute(command, sudo=True)1027        if result.exit_code == 0:1028            for row in result.stdout.splitlines():1029                if package in row:1030                    return True1031        return False1032    def _is_package_in_repo(self, package: str) -> bool:1033        command = f"{self._dnf_tool()} list {package} -y"1034        result = self._node.execute(command, sudo=True, shell=True)1035        return 0 == result.exit_code1036    def _dnf_tool(self) -> str:1037        return "dnf"1038    def _update_packages(self, packages: Optional[List[str]] = None) -> None:1039        command = f"{self._dnf_tool()} -y --nogpgcheck update "1040        if packages:1041            command += " ".join(packages)1042        self._node.execute(command, sudo=True, timeout=3600)1043class Fedora(RPMDistro):1044    # Red Hat Enterprise Linux Server 7.8 (Maipo) => 7.81045    _fedora_release_pattern_version = re.compile(r"^.*release\s+([0-9\.]+).*$")1046    # 305.40.1.el8_4.x86_641047    # 240.el8.x86_641048    __kernel_version_parts_pattern = re.compile(1049        r"^(?P<part1>\d+)\.(?P<part2>\d+)?\.?(?P<part3>\d+)?\.?"1050        r"(?P<distro>.*?)\.(?P<platform>.*?)$"1051    )1052    @classmethod1053    def name_pattern(cls) -> Pattern[str]:1054        return re.compile("^Fedora|fedora$")1055    def get_kernel_information(self, force_run: bool = False) -> KernelInformation:1056        kernel_information = super().get_kernel_information(force_run)1057        # original parts: version_parts=['4', '18', '0', '305.40.1.el8_4.x86_64', '']1058        # target parts: version_parts=['4', '18', '0', '305', '40', '1', 'el8_4',1059        #   'x86_64']1060        groups = find_group_in_lines(1061            kernel_information.version_parts[3], self.__kernel_version_parts_pattern1062        )1063        new_parts = kernel_information.version_parts[:3]1064        # the default '1' is trying to build a meaningful Redhat version number.1065        new_parts.extend(1066            [1067                groups["part1"],1068                groups["part2"],1069                groups["part3"],1070                groups["distro"],1071                groups["platform"],1072            ]1073        )1074        for index, part in enumerate(new_parts):1075            if part is None:1076                new_parts[index] = ""1077        kernel_information.version_parts = new_parts1078        return kernel_information1079    def install_epel(self) -> None:1080        # Extra Packages for Enterprise Linux (EPEL) is a special interest group1081        # (SIG) from the Fedora Project that provides a set of additional packages1082        # for RHEL (and CentOS, and others) from the Fedora sources.1083        major = self._node.os.information.version.major1084        assert_that(major).described_as(1085            "Fedora/RedHat version must be greater than 7"1086        ).is_greater_than_or_equal_to(7)1087        epel_release_rpm_name = f"epel-release-latest-{major}.noarch.rpm"1088        self.install_packages(1089            f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}"1090        )1091        # replace $releasever to 8 for 8.x1092        if major == 8:1093            sed = self._node.tools[Sed]1094            sed.substitute("$releasever", "8", "/etc/yum.repos.d/epel*.repo", sudo=True)1095    def _verify_package_result(self, result: ExecutableResult, packages: Any) -> None:1096        # yum returns exit_code=1 if DNF handled an error with installation.1097        # We do not want to fail if exit_code=1, but warn since something may1098        # potentially have gone wrong.1099        if result.exit_code == 1:1100            self._log.debug(f"DNF handled error with installation of {packages}")1101        elif result.exit_code == 0:1102            self._log.debug(f"{packages} is/are installed successfully.")1103        else:1104            raise LisaException(1105                f"Failed to install {packages}. exit_code: {result.exit_code}"1106            )1107    def group_install_packages(self, group_name: str) -> None:1108        # trigger to run _initialize_package_installation1109        self._get_package_list(group_name)1110        result = self._node.execute(f'yum -y groupinstall "{group_name}"', sudo=True)1111        self._verify_package_result(result, group_name)1112    def _get_information(self) -> OsInformation:1113        cmd_result = self._node.execute(1114            # Typical output of 'cat /etc/fedora-release' is -1115            # Fedora release 22 (Twenty Two)1116            cmd="cat /etc/fedora-release",1117            no_error_log=True,1118            expected_exit_code=0,1119            expected_exit_code_failure_message="error on get os information",1120        )1121        full_version = cmd_result.stdout1122        if "Fedora" not in full_version:1123            raise LisaException("OS version information not found")1124        vendor = "Fedora"1125        release = get_matched_str(full_version, self._fedora_release_pattern_version)1126        codename = get_matched_str(full_version, self._distro_codename_pattern)1127        information = OsInformation(1128            version=self._parse_version(release),1129            vendor=vendor,1130            release=release,1131            codename=codename,1132            full_version=full_version,1133        )1134        return information1135class Redhat(Fedora):1136    # Red Hat Enterprise Linux Server release 6.9 (Santiago)1137    # CentOS release 6.9 (Final)1138    # CentOS Linux release 8.3.20111139    __legacy_redhat_information_pattern = re.compile(1140        r"^(?P<vendor>.*?)?(?: Enterprise Linux Server)?(?: Linux)?"1141        r"(?: release)? (?P<version>[0-9\.]+)(?: \((?P<codename>.*).*\))?$"1142    )1143    # Oracle Linux Server1144    # Red Hat Enterprise Linux Server1145    # Red Hat Enterprise Linux1146    __vendor_pattern = re.compile(1147        r"^(?P<vendor>.*?)?(?: Enterprise)?(?: Linux)?(?: Server)?$"1148    )1149    @classmethod1150    def name_pattern(cls) -> Pattern[str]:1151        return re.compile("^rhel|Red|AlmaLinux|Rocky|Scientific|acronis|Actifio$")1152    def replace_boot_kernel(self, kernel_version: str) -> None:1153        # Redhat kernel is replaced when installing RPM. For source code1154        # installation, it's implemented in source code installer.1155        ...1156    def capture_system_information(self, saved_path: Path) -> None:1157        super().capture_system_information(saved_path)1158        self._node.shell.copy_back(1159            self._node.get_pure_path("/etc/redhat-release"),1160            saved_path / "redhat-release.txt",1161        )1162    @retry(tries=10, delay=5)1163    def _initialize_package_installation(self) -> None:1164        information = self._get_information()1165        # We may hit issue when run any yum command, caused by out of date1166        #  rhui-microsoft-azure-rhel package....kernel_installer.py
Source:kernel_installer.py  
...103        # for ubuntu cvm kernel, there is no menuentry added into grub file104        if hasattr(installer.runbook, "source"):105            if installer.runbook.source != "linux-image-azure-fde":106                posix = cast(Posix, node.os)107                posix.replace_boot_kernel(installed_kernel_version)108            else:109                efi_files = node.execute(110                    "ls -t /usr/lib/linux/efi/kernel.efi-*-azure-cvm",111                    sudo=True,112                    shell=True,113                    expected_exit_code=0,114                    expected_exit_code_failure_message=(115                        "fail to find kernel.efi file for kernel type "116                        " linux-image-azure-fde"117                    ),118                )119                efi_file = efi_files.stdout.splitlines()[0]120                node.execute(121                    (...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
