How to use _initialize_package_installation method in lisa

Best Python code snippet using lisa_python

operating_system.py

Source:operating_system.py Github

copy

Full Screen

...295 Return Value - bool296 """297 package_name = self.__resolve_package_name(package)298 if self._first_time_installation:299 self._initialize_package_installation()300 self._first_time_installation = False301 return self._is_package_in_repo(package_name)302 def update_packages(303 self,304 packages: Union[str, Tool, Type[Tool], Sequence[Union[str, Tool, Type[Tool]]]],305 ) -> None:306 package_names = self._get_package_list(packages)307 self._update_packages(package_names)308 def capture_system_information(self, saved_path: Path) -> None:309 # avoid to involve node, it's ok if some command doesn't exist.310 self._node.execute("uname -vrmo").save_stdout_to_file(saved_path / "uname.txt")311 self._node.execute(312 "uptime -s || last reboot -F | head -1 | awk '{print $9,$6,$7,$8}'",313 shell=True,314 ).save_stdout_to_file(saved_path / "uptime.txt")315 self._node.execute("modinfo hv_netvsc").save_stdout_to_file(316 saved_path / "modinfo-hv_netvsc.txt"317 )318 try:319 self._node.shell.copy_back(320 self._node.get_pure_path("/etc/os-release"),321 saved_path / "os-release.txt",322 )323 except FileNotFoundError:324 self._log.debug("File /etc/os-release doesn't exist.")325 def get_package_information(326 self, package_name: str, use_cached: bool = True327 ) -> VersionInfo:328 found = self._packages.get(package_name, None)329 if found and use_cached:330 return found331 return self._get_package_information(package_name)332 def get_repositories(self) -> List[RepositoryInfo]:333 raise NotImplementedError("get_repositories is not implemented")334 def _process_extra_package_args(self, extra_args: Optional[List[str]]) -> str:335 if extra_args:336 add_args = " ".join(extra_args)337 else:338 add_args = ""339 return add_args340 def _install_packages(341 self,342 packages: List[str],343 signed: bool = True,344 timeout: int = 600,345 extra_args: Optional[List[str]] = None,346 ) -> None:347 raise NotImplementedError()348 def _update_packages(self, packages: Optional[List[str]] = None) -> None:349 raise NotImplementedError()350 def _package_exists(self, package: str) -> bool:351 raise NotImplementedError()352 def _is_package_in_repo(self, package: str) -> bool:353 raise NotImplementedError()354 def _initialize_package_installation(self) -> None:355 # sub os can override it, but it's optional356 pass357 def _get_package_information(self, package_name: str) -> VersionInfo:358 raise NotImplementedError()359 def _get_version_info_from_named_regex_match(360 self, package_name: str, named_matches: Match[str]361 ) -> VersionInfo:362 essential_matches = ["major", "minor", "build"]363 # verify all essential keys are in our match dict364 assert_that(365 all(map(lambda x: x in named_matches.groupdict().keys(), essential_matches))366 ).described_as(367 "VersionInfo fetch could not identify all required parameters."368 ).is_true()369 # fill in 'patch' version if it's missing370 patch_match = named_matches.group("patch")371 if not patch_match:372 patch_match = "0"373 major_match = named_matches.group("major")374 minor_match = named_matches.group("minor")375 build_match = named_matches.group("build")376 major, minor, patch = map(377 int,378 [major_match, minor_match, patch_match],379 )380 build_match = named_matches.group("build")381 self._node.log.debug(382 f"Found {package_name} version "383 f"{major_match}.{minor_match}.{patch_match}-{build_match}"384 )385 return VersionInfo(major, minor, patch, build=build_match)386 def _cache_and_return_version_info(387 self, package_name: str, info: VersionInfo388 ) -> VersionInfo:389 self._packages[package_name] = info390 return info391 def _get_information(self) -> OsInformation:392 # try to set version info from /etc/os-release.393 cat = self._node.tools[Cat]394 cmd_result = cat.run(395 "/etc/os-release",396 expected_exit_code=0,397 expected_exit_code_failure_message="error on get os information",398 )399 vendor: str = ""400 release: str = ""401 codename: str = ""402 full_version: str = ""403 for row in cmd_result.stdout.splitlines():404 os_release_info = self._os_info_pattern.match(row)405 if not os_release_info:406 continue407 if os_release_info.group("name") == "NAME":408 vendor = os_release_info.group("value")409 elif os_release_info.group("name") == "VERSION_ID":410 release = os_release_info.group("value")411 elif os_release_info.group("name") == "VERSION":412 codename = get_matched_str(413 os_release_info.group("value"),414 self._distro_codename_pattern,415 )416 elif os_release_info.group("name") == "PRETTY_NAME":417 full_version = os_release_info.group("value")418 if vendor == "":419 raise LisaException("OS vendor information not found")420 if release == "":421 raise LisaException("OS release information not found")422 information = OsInformation(423 version=self._parse_version(release),424 vendor=vendor,425 release=release,426 codename=codename,427 full_version=full_version,428 )429 return information430 def _get_package_list(431 self,432 packages: Union[433 str,434 Tool,435 Type[Tool],436 Sequence[Union[str, Tool, Type[Tool]]],437 ],438 ) -> List[str]:439 package_names: List[str] = []440 if isinstance(packages, (str, Tool, type)):441 packages = [packages]442 package_names = [self.__resolve_package_name(item) for item in packages]443 if self._first_time_installation:444 self._first_time_installation = False445 self._initialize_package_installation()446 return package_names447 def _install_package_from_url(448 self,449 package_url: str,450 package_name: str = "",451 signed: bool = True,452 timeout: int = 600,453 ) -> None:454 """455 Used if the package to be installed needs to be downloaded from a url first.456 """457 # when package is URL, download the package first at the working path.458 wget_tool = self._node.tools[Wget]459 pkg = wget_tool.get(package_url, str(self._node.working_path), package_name)460 self.install_packages(pkg, signed, timeout)461 def wait_running_process(self, process_name: str, timeout: int = 5) -> None:462 # by default, wait for 5 minutes463 timeout = 60 * timeout464 timer = create_timer()465 while timeout > timer.elapsed(False):466 cmd_result = self._node.execute(f"pidof {process_name}")467 if cmd_result.exit_code == 1:468 # not found dpkg or zypper process, it's ok to exit.469 break470 time.sleep(1)471 if timeout < timer.elapsed():472 raise Exception(f"timeout to wait previous {process_name} process stop.")473 def __resolve_package_name(self, package: Union[str, Tool, Type[Tool]]) -> str:474 """475 A package can be a string or a tool or a type of tool.476 Resolve it to a standard package_name so it can be installed.477 """478 if isinstance(package, str):479 package_name = package480 elif isinstance(package, Tool):481 package_name = package.package_name482 else:483 assert isinstance(package, type), f"actual:{type(package)}"484 # Create a temp object, it doesn't query.485 # So they can be queried together.486 tool = package.create(self._node)487 package_name = tool.package_name488 return package_name489class BSD(Posix):490 ...491class MacOS(Posix):492 @classmethod493 def name_pattern(cls) -> Pattern[str]:494 return re.compile("^Darwin$")495class Linux(Posix):496 ...497class CoreOs(Linux):498 @classmethod499 def name_pattern(cls) -> Pattern[str]:500 return re.compile("^coreos|Flatcar|flatcar$")501@dataclass502# `apt-get update` repolist is of the form `<status>:<id> <uri> <name> <metadata>`503# Example:504# Get:5 http://azure.archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [1298 kB] # noqa: E501505class DebianRepositoryInfo(RepositoryInfo):506 # status for the repository. Examples: `Hit`, `Get`507 status: str508 # id for the repository. Examples : 1, 2509 id: str510 # uri for the repository. Example: `http://azure.archive.ubuntu.com/ubuntu`511 uri: str512 # metadata for the repository. Example: `amd64 Packages [1298 kB]`513 metadata: str514class Debian(Linux):515 # Get:5 http://azure.archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages [1298 kB] # noqa: E501516 _debian_repository_info_pattern = re.compile(517 r"(?P<status>\S+):(?P<id>\d+)\s+(?P<uri>\S+)\s+(?P<name>\S+)"518 r"\s+(?P<metadata>.*)\s*"519 )520 """ Package: dpdk521 Version: 20.11.3-0ubuntu1~backport20.04-202111041420~ubuntu20.04.1522 Version: 1:2.25.1-1ubuntu3.2523 """524 _debian_package_information_regex = re.compile(525 r"Package: ([a-zA-Z0-9:_\-\.]+)\r?\n" # package name group526 r"Version: ([a-zA-Z0-9:_\-\.~+]+)\r?\n" # version number group527 )528 _debian_version_splitter_regex = re.compile(529 r"([0-9]+:)?" # some examples have a mystery number followed by a ':' (git)530 r"(?P<major>[0-9]+)\." # major531 r"(?P<minor>[0-9]+)\." # minor532 r"(?P<patch>[0-9]+)" # patch533 r"-(?P<build>[a-zA-Z0-9-_\.~+]+)" # build534 )535 # apt-cache policy git536 # git:537 # Installed: 1:2.17.1-1ubuntu0.9538 # Candidate: 1:2.17.1-1ubuntu0.9539 # Version table:540 # *** 1:2.17.1-1ubuntu0.9 500541 # 500 http://azure.archive.ubuntu.com/ubuntu bionic-updates/main amd64 Packages # noqa: E501542 # 500 http://security.ubuntu.com/ubuntu bionic-security/main amd64 Packages # noqa: E501543 # 100 /var/lib/dpkg/status544 # 1:2.17.0-1ubuntu1 500545 # 500 http://azure.archive.ubuntu.com/ubuntu bionic/main amd64 Packages546 # apt-cache policy mock547 # mock:548 # Installed: (none)549 # Candidate: 1.3.2-2550 # Version table:551 # 1.3.2-2 500552 # 500 http://azure.archive.ubuntu.com/ubuntu bionic/universe amd64 Packages # noqa: E501553 # apt-cache policy test554 # N: Unable to locate package test555 _package_candidate_pattern = re.compile(556 r"([\w\W]*?)(Candidate: \(none\)|Unable to locate package.*)", re.M557 )558 # E: The repository 'http://azure.archive.ubuntu.com/ubuntu groovy Release' no longer has a Release file. # noqa: E501559 _repo_not_exist_pattern = re.compile("does not have a Release file", re.M)560 @classmethod561 def name_pattern(cls) -> Pattern[str]:562 return re.compile("^debian|Forcepoint|Kali$")563 def get_apt_error(self, stdout: str) -> List[str]:564 error_lines: List[str] = []565 for line in stdout.splitlines(keepends=False):566 if line.startswith("E: "):567 error_lines.append(line)568 return error_lines569 def _get_package_information(self, package_name: str) -> VersionInfo:570 # run update of package info571 apt_info = self._node.execute(572 f"apt show {package_name}",573 expected_exit_code=0,574 expected_exit_code_failure_message=(575 f"Could not find package information for package {package_name}"576 ),577 )578 match = self._debian_package_information_regex.search(apt_info.stdout)579 if not match:580 raise LisaException(581 "Package information parsing could not find regex match "582 f" for {package_name} using regex "583 f"{self._debian_package_information_regex.pattern}"584 )585 version_str = match.group(2)586 match = self._debian_version_splitter_regex.search(version_str)587 if not match:588 raise LisaException(589 f"Could not parse version info: {version_str} "590 "for package {package_name}"591 )592 self._node.log.debug(f"Attempting to parse version string: {version_str}")593 version_info = self._get_version_info_from_named_regex_match(594 package_name, match595 )596 return self._cache_and_return_version_info(package_name, version_info)597 def wait_running_package_process(self) -> None:598 is_first_time: bool = True599 # wait for 10 minutes600 timeout = 60 * 10601 timer = create_timer()602 while timeout > timer.elapsed(False):603 # fix the dpkg, in case it's broken.604 dpkg_result = self._node.execute(605 "dpkg --force-all --configure -a", sudo=True606 )607 pidof_result = self._node.execute("pidof dpkg dpkg-deb")608 if dpkg_result.exit_code == 0 and pidof_result.exit_code == 1:609 # not found dpkg process, it's ok to exit.610 break611 if is_first_time:612 is_first_time = False613 self._log.debug("found system dpkg process, waiting it...")614 time.sleep(1)615 if timeout < timer.elapsed():616 raise Exception("timeout to wait previous dpkg process stop.")617 def get_repositories(self) -> List[RepositoryInfo]:618 self._initialize_package_installation()619 repo_list_str = self._node.execute("apt-get update", sudo=True).stdout620 repositories: List[RepositoryInfo] = []621 for line in repo_list_str.splitlines():622 matched = self._debian_repository_info_pattern.search(line)623 if matched:624 repositories.append(625 DebianRepositoryInfo(626 name=matched.group("name"),627 status=matched.group("status"),628 id=matched.group("id"),629 uri=matched.group("uri"),630 metadata=matched.group("metadata"),631 )632 )633 return repositories634 @retry(tries=10, delay=5)635 def add_repository(636 self,637 repo: str,638 no_gpgcheck: bool = True,639 repo_name: Optional[str] = None,640 keys_location: Optional[List[str]] = None,641 ) -> None:642 if keys_location:643 for key_location in keys_location:644 wget = self._node.tools[Wget]645 key_file_path = wget.get(646 url=key_location,647 file_path=str(self._node.working_path),648 force_run=True,649 )650 self._node.execute(651 cmd=f"apt-key add {key_file_path}",652 sudo=True,653 expected_exit_code=0,654 expected_exit_code_failure_message="fail to add apt key",655 )656 # This command will trigger apt update too, so it doesn't need to update657 # repos again.658 self._node.execute(659 cmd=f'apt-add-repository -y "{repo}"',660 sudo=True,661 expected_exit_code=0,662 expected_exit_code_failure_message="fail to add repository",663 )664 # apt update will not be triggered on Debian during add repo665 if type(self._node.os) == Debian:666 self._node.execute("apt-get update", sudo=True)667 @retry(tries=10, delay=5)668 def _initialize_package_installation(self) -> None:669 # wait running system package process.670 self.wait_running_package_process()671 result = self._node.execute("apt-get update", sudo=True)672 if self._repo_not_exist_pattern.search(result.stdout):673 raise RepoNotExistException(self._node.os)674 result.assert_exit_code(message="\n".join(self.get_apt_error(result.stdout)))675 @retry(tries=10, delay=5)676 def _install_packages(677 self,678 packages: List[str],679 signed: bool = True,680 timeout: int = 600,681 extra_args: Optional[List[str]] = None,682 ) -> None:683 file_packages = []684 for index, package in enumerate(packages):685 if package.endswith(".deb"):686 # If the package is a .deb file then it would first need to be unpacked.687 # using dpkg command before installing it like other packages.688 file_packages.append(package)689 package = Path(package).stem690 packages[index] = package691 add_args = self._process_extra_package_args(extra_args)692 command = (693 f"DEBIAN_FRONTEND=noninteractive apt-get {add_args} "694 f"-y install {' '.join(packages)}"695 )696 if not signed:697 command += " --allow-unauthenticated"698 self.wait_running_package_process()699 if file_packages:700 self._node.execute(701 f"dpkg -i {' '.join(file_packages)}", sudo=True, timeout=timeout702 )703 # after install package, need update the repo704 self._initialize_package_installation()705 install_result = self._node.execute(706 command, shell=True, sudo=True, timeout=timeout707 )708 # get error lines.709 install_result.assert_exit_code(710 0,711 f"Failed to install {packages}, "712 f"please check the package name and repo are correct or not.\n"713 + "\n".join(self.get_apt_error(install_result.stdout))714 + "\n",715 )716 def _package_exists(self, package: str) -> bool:717 command = "dpkg --get-selections"718 result = self._node.execute(command, sudo=True, shell=True)719 package_pattern = re.compile(f"{package}([ \t]+)install")720 # Not installed package not shown in the output721 # Uninstall package will show as deinstall722 # vim deinstall723 # vim-common install724 if len(list(filter(package_pattern.match, result.stdout.splitlines()))) == 1:725 return True726 return False727 def _is_package_in_repo(self, package: str) -> bool:728 command = f"apt-cache policy {package}"729 result = self._node.execute(command, sudo=True, shell=True)730 matched = get_matched_str(result.stdout, self._package_candidate_pattern)731 if matched:732 return False733 return True734 def _get_information(self) -> OsInformation:735 # try to set version info from /etc/os-release.736 cat = self._node.tools[Cat]737 cmd_result = cat.run(738 "/etc/os-release",739 expected_exit_code=0,740 expected_exit_code_failure_message="error on get os information",741 )742 vendor: str = ""743 release: str = ""744 codename: str = ""745 full_version: str = ""746 for row in cmd_result.stdout.splitlines():747 os_release_info = super()._os_info_pattern.match(row)748 if not os_release_info:749 continue750 if os_release_info.group("name") == "NAME":751 vendor = os_release_info.group("value")752 elif os_release_info.group("name") == "VERSION":753 codename = get_matched_str(754 os_release_info.group("value"),755 super()._distro_codename_pattern,756 )757 elif os_release_info.group("name") == "PRETTY_NAME":758 full_version = os_release_info.group("value")759 # version return from /etc/os-release is integer in debian760 # so get the precise version from /etc/debian_version761 # e.g.762 # marketplace image - credativ debian 9-backports 9.20190313.0763 # version from /etc/os-release is 9764 # version from /etc/debian_version is 9.8765 # marketplace image - debian debian-10 10-backports-gen2 0.20210201.535766 # version from /etc/os-release is 10767 # version from /etc/debian_version is 10.7768 cmd_result = cat.run(769 "/etc/debian_version",770 expected_exit_code=0,771 expected_exit_code_failure_message="error on get debian version",772 )773 release = cmd_result.stdout774 if vendor == "":775 raise LisaException("OS vendor information not found")776 if release == "":777 raise LisaException("OS release information not found")778 information = OsInformation(779 version=self._parse_version(release),780 vendor=vendor,781 release=release,782 codename=codename,783 full_version=full_version,784 )785 return information786 def _update_packages(self, packages: Optional[List[str]] = None) -> None:787 command = (788 "DEBIAN_FRONTEND=noninteractive apt-get upgrade -y "789 '-o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" '790 )791 if packages:792 command += " ".join(packages)793 self._node.execute(command, sudo=True, timeout=3600)794class Ubuntu(Debian):795 __lsb_os_info_pattern = re.compile(796 r"^(?P<name>.*):(\s+)(?P<value>.*?)?$", re.MULTILINE797 )798 # gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb799 # prefix: gnulinux800 # postfix: advanced-3fdd2548-1430-450b-b16d-9191404598fb801 __menu_id_parts_pattern = re.compile(802 r"^(?P<prefix>.*?)-.*-(?P<postfix>.*?-.*?-.*?-.*?-.*?-.*?)?$"803 )804 @classmethod805 def name_pattern(cls) -> Pattern[str]:806 return re.compile("^Ubuntu|ubuntu$")807 def replace_boot_kernel(self, kernel_version: str) -> None:808 # set installed kernel to default809 #810 # get boot entry id811 # positive example:812 # menuentry 'Ubuntu, with Linux 5.11.0-1011-azure' --class ubuntu813 # --class gnu-linux --class gnu --class os $menuentry_id_option814 # 'gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb' {815 #816 # negative example:817 # menuentry 'Ubuntu, with Linux 5.11.0-1011-azure (recovery mode)'818 # --class ubuntu --class gnu-linux --class gnu --class os $menuentry_id_option819 # 'gnulinux-5.11.0-1011-azure-recovery-3fdd2548-1430-450b-b16d-9191404598fb' {820 cat = self._node.tools[Cat]821 menu_id_pattern = re.compile(822 r"^.*?menuentry '.*?(?:"823 + kernel_version824 + r"[^ ]*?)(?<! \(recovery mode\))' "825 r".*?\$menuentry_id_option .*?'(?P<menu_id>.*)'.*$",826 re.M,827 )828 result = cat.run("/boot/grub/grub.cfg", sudo=True)829 submenu_id = get_matched_str(result.stdout, menu_id_pattern)830 assert submenu_id, (831 f"cannot find sub menu id from grub config by pattern: "832 f"{menu_id_pattern.pattern}"833 )834 self._log.debug(f"matched submenu_id: {submenu_id}")835 # get first level menu id in boot menu836 # input is the sub menu id like:837 # gnulinux-5.11.0-1011-azure-advanced-3fdd2548-1430-450b-b16d-9191404598fb838 # output is,839 # gnulinux-advanced-3fdd2548-1430-450b-b16d-9191404598fb840 menu_id = self.__menu_id_parts_pattern.sub(841 r"\g<prefix>-\g<postfix>", submenu_id842 )843 assert menu_id, f"cannot composite menu id from {submenu_id}"844 # composite boot menu in grub845 menu_entry = f"{menu_id}>{submenu_id}"846 self._log.debug(f"composited menu_entry: {menu_entry}")847 self._replace_default_entry(menu_entry)848 self._node.execute("update-grub", sudo=True)849 try:850 # install tool packages851 self.install_packages(852 [853 f"linux-tools-{kernel_version}-azure",854 f"linux-cloud-tools-{kernel_version}-azure",855 f"linux-headers-{kernel_version}-azure",856 ]857 )858 except Exception as identifier:859 self._log.debug(860 f"ignorable error on install packages after replaced kernel: "861 f"{identifier}"862 )863 def _get_information(self) -> OsInformation:864 cmd_result = self._node.execute(865 cmd="lsb_release -a",866 shell=True,867 no_error_log=True,868 expected_exit_code=0,869 expected_exit_code_failure_message="error on get os information",870 )871 assert cmd_result.stdout, "not found os information from 'lsb_release -a'"872 for row in cmd_result.stdout.splitlines():873 os_release_info = self.__lsb_os_info_pattern.match(row)874 if os_release_info:875 if os_release_info.group("name") == "Distributor ID":876 vendor = os_release_info.group("value")877 elif os_release_info.group("name") == "Release":878 release = os_release_info.group("value")879 elif os_release_info.group("name") == "Codename":880 codename = os_release_info.group("value")881 elif os_release_info.group("name") == "Description":882 full_version = os_release_info.group("value")883 if vendor == "":884 raise LisaException("OS vendor information not found")885 if release == "":886 raise LisaException("OS release information not found")887 information = OsInformation(888 version=self._parse_version(release),889 vendor=vendor,890 release=release,891 codename=codename,892 full_version=full_version,893 )894 return information895 def _replace_default_entry(self, entry: str) -> None:896 self._log.debug(f"set boot entry to: {entry}")897 sed = self._node.tools[Sed]898 sed.substitute(899 regexp="GRUB_DEFAULT=.*",900 replacement=f"GRUB_DEFAULT='{entry}'",901 file="/etc/default/grub",902 sudo=True,903 )904 # output to log for troubleshooting905 cat = self._node.tools[Cat]906 cat.run("/etc/default/grub")907class FreeBSD(BSD):908 @retry(tries=10, delay=5)909 def _install_packages(910 self,911 packages: List[str],912 signed: bool = True,913 timeout: int = 600,914 extra_args: Optional[List[str]] = None,915 ) -> None:916 if self._first_time_installation:917 self._initialize_package_installation()918 self._first_time_installation = False919 command = f"env ASSUME_ALWAYS_YES=yes pkg install -y {' '.join(packages)}"920 install_result = self._node.execute(921 command, shell=True, sudo=True, timeout=timeout922 )923 # get error lines.924 install_result.assert_exit_code(925 0,926 f"Failed to install {packages}, "927 f"please check the package name and repo are correct or not.\n",928 )929 @retry(tries=10, delay=5)930 def _initialize_package_installation(self) -> None:931 result = self._node.execute("pkg update", sudo=True)932 result.assert_exit_code(message="fail to run pkg update")933class OpenBSD(BSD):934 ...935@dataclass936# dnf repolist is of the form `<id> <name>`937# Example:938# microsoft-azure-rhel8-eus Microsoft Azure RPMs for RHEL8 Extended Update Support939class RPMRepositoryInfo(RepositoryInfo):940 # id for the repository, for example: microsoft-azure-rhel8-eus941 id: str942# Linux distros that use RPM.943class RPMDistro(Linux):944 # microsoft-azure-rhel8-eus Microsoft Azure RPMs for RHEL8 Extended Update Support945 _rpm_repository_info_pattern = re.compile(r"(?P<id>\S+)\s+(?P<name>\S.*\S)\s*")946 # ex: dpdk-20.11-3.el8.x86_64 or dpdk-18.11.8-1.el7_8.x86_64947 _rpm_version_splitter_regex = re.compile(948 r"(?P<package_name>[a-zA-Z0-9\-_]+)-"949 r"(?P<major>[0-9]+)\."950 r"(?P<minor>[0-9]+)\.?"951 r"(?P<patch>[0-9]+)?"952 r"(?P<build>-[a-zA-Z0-9-_\.]+)?"953 )954 def get_repositories(self) -> List[RepositoryInfo]:955 if self._first_time_installation:956 self._initialize_package_installation()957 self._first_time_installation = False958 repo_list_str = self._node.execute(959 f"{self._dnf_tool()} repolist", sudo=True960 ).stdout.splitlines()961 # skip to the first entry in the output962 for index, repo_str in enumerate(repo_list_str):963 if repo_str.startswith("repo id"):964 header_index = index965 break966 repo_list_str = repo_list_str[header_index + 1 :]967 repositories: List[RepositoryInfo] = []968 for line in repo_list_str:969 repo_info = self._rpm_repository_info_pattern.search(line)970 if repo_info:971 repositories.append(972 RPMRepositoryInfo(973 name=repo_info.group("name"), id=repo_info.group("id").lower()974 )975 )976 return repositories977 def add_repository(978 self,979 repo: str,980 no_gpgcheck: bool = True,981 repo_name: Optional[str] = None,982 keys_location: Optional[List[str]] = None,983 ) -> None:984 self._node.tools[YumConfigManager].add_repository(repo, no_gpgcheck)985 def _get_package_information(self, package_name: str) -> VersionInfo:986 rpm_info = self._node.execute(987 f"rpm -q {package_name}",988 expected_exit_code=0,989 expected_exit_code_failure_message=(990 f"Could not find package information for package {package_name}"991 ),992 )993 # rpm package should be of format (package_name)-(version)994 matches = self._rpm_version_splitter_regex.search(rpm_info.stdout)995 if not matches:996 raise LisaException(997 f"Could not parse package version {rpm_info} for {package_name}"998 )999 self._node.log.debug(f"Attempting to parse version string: {rpm_info.stdout}")1000 version_info = self._get_version_info_from_named_regex_match(1001 package_name, matches1002 )1003 return self._cache_and_return_version_info(package_name, version_info)1004 def _install_packages(1005 self,1006 packages: List[str],1007 signed: bool = True,1008 timeout: int = 600,1009 extra_args: Optional[List[str]] = None,1010 ) -> None:1011 add_args = self._process_extra_package_args(extra_args)1012 command = f"{self._dnf_tool()} install {add_args} -y {' '.join(packages)}"1013 if not signed:1014 command += " --nogpgcheck"1015 self._node.execute(1016 command,1017 shell=True,1018 sudo=True,1019 timeout=timeout,1020 expected_exit_code=0,1021 expected_exit_code_failure_message=f"Failed to install {packages}.",1022 )1023 self._log.debug(f"{packages} is/are installed successfully.")1024 def _package_exists(self, package: str) -> bool:1025 command = f"{self._dnf_tool()} list installed {package}"1026 result = self._node.execute(command, sudo=True)1027 if result.exit_code == 0:1028 for row in result.stdout.splitlines():1029 if package in row:1030 return True1031 return False1032 def _is_package_in_repo(self, package: str) -> bool:1033 command = f"{self._dnf_tool()} list {package} -y"1034 result = self._node.execute(command, sudo=True, shell=True)1035 return 0 == result.exit_code1036 def _dnf_tool(self) -> str:1037 return "dnf"1038 def _update_packages(self, packages: Optional[List[str]] = None) -> None:1039 command = f"{self._dnf_tool()} -y --nogpgcheck update "1040 if packages:1041 command += " ".join(packages)1042 self._node.execute(command, sudo=True, timeout=3600)1043class Fedora(RPMDistro):1044 # Red Hat Enterprise Linux Server 7.8 (Maipo) => 7.81045 _fedora_release_pattern_version = re.compile(r"^.*release\s+([0-9\.]+).*$")1046 # 305.40.1.el8_4.x86_641047 # 240.el8.x86_641048 __kernel_version_parts_pattern = re.compile(1049 r"^(?P<part1>\d+)\.(?P<part2>\d+)?\.?(?P<part3>\d+)?\.?"1050 r"(?P<distro>.*?)\.(?P<platform>.*?)$"1051 )1052 @classmethod1053 def name_pattern(cls) -> Pattern[str]:1054 return re.compile("^Fedora|fedora$")1055 def get_kernel_information(self, force_run: bool = False) -> KernelInformation:1056 kernel_information = super().get_kernel_information(force_run)1057 # original parts: version_parts=['4', '18', '0', '305.40.1.el8_4.x86_64', '']1058 # target parts: version_parts=['4', '18', '0', '305', '40', '1', 'el8_4',1059 # 'x86_64']1060 groups = find_group_in_lines(1061 kernel_information.version_parts[3], self.__kernel_version_parts_pattern1062 )1063 new_parts = kernel_information.version_parts[:3]1064 # the default '1' is trying to build a meaningful Redhat version number.1065 new_parts.extend(1066 [1067 groups["part1"],1068 groups["part2"],1069 groups["part3"],1070 groups["distro"],1071 groups["platform"],1072 ]1073 )1074 for index, part in enumerate(new_parts):1075 if part is None:1076 new_parts[index] = ""1077 kernel_information.version_parts = new_parts1078 return kernel_information1079 def install_epel(self) -> None:1080 # Extra Packages for Enterprise Linux (EPEL) is a special interest group1081 # (SIG) from the Fedora Project that provides a set of additional packages1082 # for RHEL (and CentOS, and others) from the Fedora sources.1083 major = self._node.os.information.version.major1084 assert_that(major).described_as(1085 "Fedora/RedHat version must be greater than 7"1086 ).is_greater_than_or_equal_to(7)1087 epel_release_rpm_name = f"epel-release-latest-{major}.noarch.rpm"1088 self.install_packages(1089 f"https://dl.fedoraproject.org/pub/epel/{epel_release_rpm_name}"1090 )1091 # replace $releasever to 8 for 8.x1092 if major == 8:1093 sed = self._node.tools[Sed]1094 sed.substitute("$releasever", "8", "/etc/yum.repos.d/epel*.repo", sudo=True)1095 def _verify_package_result(self, result: ExecutableResult, packages: Any) -> None:1096 # yum returns exit_code=1 if DNF handled an error with installation.1097 # We do not want to fail if exit_code=1, but warn since something may1098 # potentially have gone wrong.1099 if result.exit_code == 1:1100 self._log.debug(f"DNF handled error with installation of {packages}")1101 elif result.exit_code == 0:1102 self._log.debug(f"{packages} is/are installed successfully.")1103 else:1104 raise LisaException(1105 f"Failed to install {packages}. exit_code: {result.exit_code}"1106 )1107 def group_install_packages(self, group_name: str) -> None:1108 # trigger to run _initialize_package_installation1109 self._get_package_list(group_name)1110 result = self._node.execute(f'yum -y groupinstall "{group_name}"', sudo=True)1111 self._verify_package_result(result, group_name)1112 def _get_information(self) -> OsInformation:1113 cmd_result = self._node.execute(1114 # Typical output of 'cat /etc/fedora-release' is -1115 # Fedora release 22 (Twenty Two)1116 cmd="cat /etc/fedora-release",1117 no_error_log=True,1118 expected_exit_code=0,1119 expected_exit_code_failure_message="error on get os information",1120 )1121 full_version = cmd_result.stdout1122 if "Fedora" not in full_version:1123 raise LisaException("OS version information not found")1124 vendor = "Fedora"1125 release = get_matched_str(full_version, self._fedora_release_pattern_version)1126 codename = get_matched_str(full_version, self._distro_codename_pattern)1127 information = OsInformation(1128 version=self._parse_version(release),1129 vendor=vendor,1130 release=release,1131 codename=codename,1132 full_version=full_version,1133 )1134 return information1135class Redhat(Fedora):1136 # Red Hat Enterprise Linux Server release 6.9 (Santiago)1137 # CentOS release 6.9 (Final)1138 # CentOS Linux release 8.3.20111139 __legacy_redhat_information_pattern = re.compile(1140 r"^(?P<vendor>.*?)?(?: Enterprise Linux Server)?(?: Linux)?"1141 r"(?: release)? (?P<version>[0-9\.]+)(?: \((?P<codename>.*).*\))?$"1142 )1143 # Oracle Linux Server1144 # Red Hat Enterprise Linux Server1145 # Red Hat Enterprise Linux1146 __vendor_pattern = re.compile(1147 r"^(?P<vendor>.*?)?(?: Enterprise)?(?: Linux)?(?: Server)?$"1148 )1149 @classmethod1150 def name_pattern(cls) -> Pattern[str]:1151 return re.compile("^rhel|Red|AlmaLinux|Rocky|Scientific|acronis|Actifio$")1152 def replace_boot_kernel(self, kernel_version: str) -> None:1153 # Redhat kernel is replaced when installing RPM. For source code1154 # installation, it's implemented in source code installer.1155 ...1156 def capture_system_information(self, saved_path: Path) -> None:1157 super().capture_system_information(saved_path)1158 self._node.shell.copy_back(1159 self._node.get_pure_path("/etc/redhat-release"),1160 saved_path / "redhat-release.txt",1161 )1162 @retry(tries=10, delay=5)1163 def _initialize_package_installation(self) -> None:1164 information = self._get_information()1165 # We may hit issue when run any yum command, caused by out of date1166 # rhui-microsoft-azure-rhel package.1167 # Use below command to update rhui-microsoft-azure-rhel package from microsoft1168 # repo to resolve the issue.1169 # Details please refer https://docs.microsoft.com/en-us/azure/virtual-machines/workloads/redhat/redhat-rhui#azure-rhui-infrastructure # noqa: E5011170 if "Red Hat" == information.vendor:1171 self._node.execute(1172 "yum update -y --disablerepo='*' --enablerepo='*microsoft*' ",1173 sudo=True,1174 expected_exit_code=0,1175 )1176 def _install_packages(1177 self,1178 packages: List[str],1179 signed: bool = True,1180 timeout: int = 600,1181 extra_args: Optional[List[str]] = None,1182 ) -> None:1183 add_args = self._process_extra_package_args(extra_args)1184 command = f"yum install {add_args} -y {' '.join(packages)}"1185 if not signed:1186 command += " --nogpgcheck"1187 install_result = self._node.execute(1188 command, shell=True, sudo=True, timeout=timeout1189 )1190 # RedHat will fail package installation is a single missing package is1191 # detected, therefore we check the output to see if we were missing1192 # a package. If so, fail. Otherwise we will warn in verify package result.1193 if install_result.exit_code == 1:1194 missing_packages = []1195 for line in install_result.stdout.splitlines():1196 if line.startswith("No match for argument:"):1197 package = line.split(":")[1].strip()1198 missing_packages.append(package)1199 if missing_packages:1200 raise MissingPackagesException(missing_packages)1201 super()._verify_package_result(install_result, packages)1202 def _package_exists(self, package: str) -> bool:1203 command = f"yum list installed {package}"1204 result = self._node.execute(command, sudo=True)1205 if result.exit_code == 0:1206 return True1207 return False1208 def _is_package_in_repo(self, package: str) -> bool:1209 command = f"yum --showduplicates list {package}"1210 result = self._node.execute(command, sudo=True, shell=True)1211 return 0 == result.exit_code1212 def _get_information(self) -> OsInformation:1213 # The higher version above 7.0 support os-version.1214 try:1215 information = super(Fedora, self)._get_information()1216 # remove Linux Server in vendor1217 information.vendor = get_matched_str(1218 information.vendor, self.__vendor_pattern1219 )1220 except Exception:1221 # Parse /etc/redhat-release to support 6.x and 8.x. Refer to1222 # examples of __legacy_redhat_information_pattern.1223 cmd_result = self._node.execute(1224 cmd="cat /etc/redhat-release", no_error_log=True, expected_exit_code=01225 )1226 full_version = cmd_result.stdout1227 matches = self.__legacy_redhat_information_pattern.match(full_version)1228 assert matches, f"cannot match version information from: {full_version}"1229 assert matches.group("vendor")1230 information = OsInformation(1231 version=self._parse_version(matches.group("version")),1232 vendor=matches.group("vendor"),1233 release=matches.group("version"),1234 codename=matches.group("codename"),1235 full_version=full_version,1236 )1237 return information1238 def _update_packages(self, packages: Optional[List[str]] = None) -> None:1239 command = "yum -y --nogpgcheck update "1240 if packages:1241 command += " ".join(packages)1242 # older images cost much longer time when update packages1243 # smaller sizes cost much longer time when update packages, e.g.1244 # Basic_A1, Standard_A5, Standard_A1_v2, Standard_D11245 # redhat rhel 7-lvm 7.7.2019102813 Basic_A1 cost 2371.568 seconds1246 # redhat rhel 8.1 8.1.2020020415 Basic_A0 cost 2409.116 seconds1247 self._node.execute(command, sudo=True, timeout=3600)1248 def _dnf_tool(self) -> str:1249 return "yum"1250class CentOs(Redhat):1251 @classmethod1252 def name_pattern(cls) -> Pattern[str]:1253 return re.compile("^CentOS|Centos|centos|clear-linux-os$")1254 def capture_system_information(self, saved_path: Path) -> None:1255 super(Linux, self).capture_system_information(saved_path)1256 self._node.shell.copy_back(1257 self._node.get_pure_path("/etc/centos-release"),1258 saved_path / "centos-release.txt",1259 )1260 def _initialize_package_installation(self) -> None:1261 information = self._get_information()1262 if 8 == information.version.major:1263 # refer https://www.centos.org/centos-linux-eol/ CentOS 8 is EOL,1264 # old repo mirror was moved to vault.centos.org1265 # CentOS-AppStream.repo, CentOS-Base.repo may contain non-existed1266 # repo use skip_if_unavailable to avoid installation issues brought1267 # in by above issue1268 cmd_results = self._node.execute("yum repolist -v", sudo=True)1269 if 0 != cmd_results.exit_code:1270 self._node.tools[YumConfigManager].set_opt("skip_if_unavailable=true")1271class Oracle(Redhat):1272 @classmethod1273 def name_pattern(cls) -> Pattern[str]:1274 # The name is "Oracle Linux Server", which doesn't support the default1275 # full match.1276 return re.compile("^Oracle")1277class CBLMariner(RPMDistro):1278 @classmethod1279 def name_pattern(cls) -> Pattern[str]:1280 return re.compile("^Common Base Linux Mariner|mariner$")1281 def __init__(self, node: Any) -> None:1282 super().__init__(node)1283 self._dnf_tool_name: str1284 def _initialize_package_installation(self) -> None:1285 result = self._node.execute("command -v dnf", no_info_log=True, shell=True)1286 if result.exit_code == 0:1287 self._dnf_tool_name = "dnf"1288 return1289 self._dnf_tool_name = "tdnf -q"1290 def _dnf_tool(self) -> str:1291 return self._dnf_tool_name1292@dataclass1293# `zypper lr` repolist is of the form1294# `<id>|<alias>|<name>|<enabled>|<gpg_check>|<refresh>`1295# Example:1296# # 4 | repo-oss | Main Repository | Yes | (r ) Yes | Yes1297class SuseRepositoryInfo(RepositoryInfo):1298 # id for the repository. Example: 41299 id: str1300 # alias for the repository. Example: repo-oss1301 alias: str1302 # is repository enabled. Example: True/False1303 enabled: bool1304 # is gpg_check enabled. Example: True/False1305 gpg_check: bool1306 # is repository refreshed. Example: True/False1307 refresh: bool1308class Suse(Linux):1309 # 55 | Web_and_Scripting_Module_x86_64:SLE-Module-Web-Scripting15-SP2-Updates | SLE-Module-Web-Scripting15-SP2-Updates | Yes | ( p) Yes | Yes # noqa: E5011310 # 4 | repo-oss | Main Repository | Yes | (r ) Yes | Yes # noqa: E5011311 _zypper_table_entry = re.compile(1312 r"\s*(?P<id>\d+)\s+[|]\s+(?P<alias>\S.+\S)\s+\|\s+(?P<name>\S.+\S)\s+\|"1313 r"\s+(?P<enabled>\S.*\S)\s+\|\s+(?P<gpg_check>\S.*\S)\s+\|"1314 r"\s+(?P<refresh>\S.*\S)\s*"1315 )1316 # Warning: There are no enabled repositories defined.1317 _no_repo_defined = re.compile("There are no enabled repositories defined.", re.M)1318 @classmethod1319 def name_pattern(cls) -> Pattern[str]:1320 return re.compile("^SUSE|opensuse-leap$")1321 def get_repositories(self) -> List[RepositoryInfo]:1322 # Parse output of command "zypper lr"1323 # Example output:1324 # 1 | Basesystem_Module_x86_64:SLE-Module-Basesystem15-SP2-Debuginfo-Pool | SLE-Module-Basesystem15-SP2-Debuginfo-Pool | No | ---- | ---- # noqa: E5011325 # 2 | Basesystem_Module_x86_64:SLE-Module-Basesystem15-SP2-Debuginfo-Updates | SLE-Module-Basesystem15-SP2-Debuginfo-Updates | No | ---- | ---- # noqa: E5011326 self._initialize_package_installation()1327 output = filter_ansi_escape(self._node.execute("zypper lr", sudo=True).stdout)1328 repo_list: List[RepositoryInfo] = []1329 for line in output.splitlines():1330 matched = self._zypper_table_entry.search(line)1331 if matched:1332 is_repository_enabled = (1333 True if "Yes" in matched.group("enabled") else False1334 )1335 is_gpg_check_enabled = (1336 True if "Yes" in matched.group("gpg_check") else False1337 )1338 is_repository_refreshed = (1339 True if "Yes" in matched.group("refresh") else False1340 )1341 if matched:1342 repo_list.append(1343 SuseRepositoryInfo(1344 name=matched.group("name"),1345 id=matched.group("id"),1346 alias=matched.group("alias"),1347 enabled=is_repository_enabled,1348 gpg_check=is_gpg_check_enabled,1349 refresh=is_repository_refreshed,1350 )1351 )1352 return repo_list1353 def add_repository(1354 self,1355 repo: str,1356 no_gpgcheck: bool = True,1357 repo_name: Optional[str] = None,1358 keys_location: Optional[List[str]] = None,1359 ) -> None:1360 cmd = "zypper ar"1361 if no_gpgcheck:1362 cmd += " -G "1363 cmd += f" {repo} {repo_name}"1364 cmd_result = self._node.execute(cmd=cmd, sudo=True)1365 if "already exists. Please use another alias." not in cmd_result.stdout:1366 cmd_result.assert_exit_code(0, f"fail to add repo {repo}")1367 else:1368 self._log.debug(f"repo {repo_name} already exist")1369 def _initialize_package_installation(self) -> None:1370 self.wait_running_process("zypper")1371 output = self._node.execute(1372 "zypper --non-interactive --gpg-auto-import-keys refresh", sudo=True1373 ).stdout1374 if self._no_repo_defined.search(output):1375 raise LisaException(1376 f"There are no enabled repositories defined in "1377 f"{self._node.os.name} {self._node.os.information.version}"1378 )1379 def _install_packages(1380 self,1381 packages: List[str],1382 signed: bool = True,1383 timeout: int = 600,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful