Best Python code snippet using lisa_python
ch_tests_tool.py
Source:ch_tests_tool.py  
...55        failures = [r.name for r in results if r.status == TestStatus.FAILED]56        if not failures:57            result.assert_exit_code()58        for r in results:59            self._send_subtest_msg(60                test_result.id_,61                environment,62                r.name,63                r.status,64            )65        assert_that(failures, f"Unexpected failures: {failures}").is_empty()66    def run_metrics_tests(67        self,68        test_result: TestResult,69        environment: Environment,70        hypervisor: str,71        log_path: Path,72        skip: Optional[List[str]] = None,73    ) -> None:74        perf_metrics_tests = self._list_perf_metrics_tests(hypervisor=hypervisor)75        failed_testcases = []76        for testcase in perf_metrics_tests:77            testcase_log_file = log_path.joinpath(f"{testcase}.log")78            status: TestStatus = TestStatus.QUEUED79            metrics: str = ""80            trace: str = ""81            try:82                result = self.run(83                    f"tests --hypervisor {hypervisor} --metrics -- -- \84                        --test-filter {testcase}",85                    timeout=self.TIME_OUT,86                    force_run=True,87                    cwd=self.repo_root,88                    no_info_log=False,  # print out result of each test89                    shell=True,90                )91                if result.exit_code == 0:92                    status = TestStatus.PASSED93                    metrics = self._process_perf_metric_test_result(result.stdout)94                else:95                    status = TestStatus.FAILED96                    trace = f"Testcase '{testcase}' failed: {result.stderr}"97                    failed_testcases.append(testcase)98            except Exception as e:99                self._log.info(f"Testcase failed, tescase name: {testcase}")100                status = TestStatus.FAILED101                trace = str(e)102                failed_testcases.append(testcase)103            msg = metrics if status == TestStatus.PASSED else trace104            self._send_subtest_msg(105                test_id=test_result.id_,106                environment=environment,107                test_name=testcase,108                test_status=status,109                test_message=msg,110            )111            # Write stdout of testcase to log as per given requirement112            with open(testcase_log_file, "w") as f:113                f.write(result.stdout)114        assert_that(115            failed_testcases, f"Failed Testcases: {failed_testcases}"116        ).is_empty()117    def _initialize(self, *args: Any, **kwargs: Any) -> None:118        tool_path = self.get_tool_path(use_global=True)119        self.repo_root = tool_path / "cloud-hypervisor"120        self.cmd_path = self.repo_root / "scripts" / "dev_cli.sh"121    def _install(self) -> bool:122        git = self.node.tools[Git]123        git.clone(self.repo, self.get_tool_path(use_global=True))124        if isinstance(self.node.os, CBLMariner):125            daemon_json_file = PurePath("/etc/docker/daemon.json")126            daemon_json = '{"default-ulimits":{"nofile":{"Hard":65535,"Name":"nofile","Soft":65535}}}'  # noqa: E501127            self.node.tools[Echo].write_to_file(128                daemon_json, daemon_json_file, sudo=True129            )130        self.node.execute("groupadd -f docker", expected_exit_code=0)131        username = self.node.tools[Whoami].get_username()132        res = self.node.execute("getent group docker", expected_exit_code=0)133        if username not in res.stdout:  # if current user is not in docker group134            self.node.execute(f"usermod -a -G docker {username}", sudo=True)135            # reboot for group membership change to take effect136            self.node.reboot()137        self.node.tools[Docker].start()138        return self._check_exists()139    def _extract_test_results(self, output: str) -> List[CloudHypervisorTestResult]:140        results: List[CloudHypervisorTestResult] = []141        # Cargo will output test status for each test separately in JSON format. Parse142        # the output line by line to obtain the list of all tests run along with their143        # outcomes.144        #145        # Example output:146        # { "type": "test", "event": "ok", "name": "integration::test_vfio" }147        lines = output.split("\n")148        for line in lines:149            result = {}150            try:151                result = json.loads(line)152            except json.decoder.JSONDecodeError:153                continue154            if type(result) is not dict:155                continue156            if "type" not in result or result["type"] != "test":157                continue158            if "event" not in result or result["event"] not in ["ok", "failed"]:159                continue160            status = TestStatus.PASSED if result["event"] == "ok" else TestStatus.FAILED161            results.append(162                CloudHypervisorTestResult(163                    name=result["name"],164                    status=status,165                )166            )167        return results168    def _send_subtest_msg(169        self,170        test_id: str,171        environment: Environment,172        test_name: str,173        test_status: TestStatus,174        test_message: str = "",175    ) -> None:176        subtest_msg = create_test_result_message(177            SubTestMessage, test_id, environment, test_name, test_status, test_message178        )179        notifier.notify(subtest_msg)180    def _list_perf_metrics_tests(self, hypervisor: str = "kvm") -> List[str]:181        tests_list = []182        result = self.run(...libvirt_tck_tool.py
Source:libvirt_tck_tool.py  
...81        expected_fails = [r.name for r in results if r.status == TestStatus.ATTEMPTED]82        if not failures and not expected_fails:83            result.assert_exit_code()84        for r in results:85            self._send_subtest_msg(86                test_result.id_,87                environment,88                r.name,89                r.status,90            )91        archive_path = self.repo_root / "results.tar.gz"92        self.node.tools[Chmod].chmod(str(archive_path), "a+r", sudo=True)93        self.node.shell.copy_back(94            self.repo_root / "results.tar.gz",95            log_path / "libvirt_tck_results.tar.gz",96        )97        assert_that(failures, f"Unexpected failures: {failures}").is_empty()98    def _initialize(self, *args: Any, **kwargs: Any) -> None:99        tool_path = self.get_tool_path(use_global=True)100        self.repo_root = tool_path / "libvirt-tck"101    def _install_dep(self) -> None:102        posix_os: Posix = cast(Posix, self.node.os)103        git = self.node.tools[Git]104        git.clone(self.repo, self.get_tool_path(use_global=True), fail_on_exists=False)105        # install dependency packages106        for package in list(self.deps):107            if posix_os.is_package_in_repo(package):108                posix_os.install_packages(package)109    def _install(self) -> bool:110        self._install_dep()111        if isinstance(self.node.os, CBLMariner):112            # tell libvirt to run qemu as root113            libvirt_qemu_conf = PurePath("/etc/libvirt/qemu.conf")114            self.node.tools[Echo].write_to_file(115                'user = "root"',116                libvirt_qemu_conf,117                sudo=True,118                append=True,119            )120            self.node.tools[Echo].write_to_file(121                'group = "root"',122                libvirt_qemu_conf,123                sudo=True,124                append=True,125            )126            self.node.tools[Usermod].add_user_to_group("libvirt", sudo=True)127            # Workaround for error:128            #129            # error from service: GDBus.Error:org.gtk.GDBus.UnmappedGError.Quark130            # ._g_2dfile_2derror_2dquark.Code4: Failed to open file131            # â/proc/2192/statusâ: No such file or directory132            self.node.tools[Sed].substitute(133                "hidepid=2",134                "hidepid=0",135                "/etc/fstab",136                sudo=True,137            )138            self.node.reboot()139            # After reboot, libvirtd service is in failed state and needs to140            # be restarted manually. Doing it immediately after restarts141            # fails. So wait for a while before restarting libvirtd.142            # This is an issue in Mariner and below lines can be removed once143            # it has been addressed.144            tries = 0145            while tries <= 10:146                try:147                    self.node.tools[Service].restart_service("libvirtd")148                    break149                except Exception:150                    time.sleep(1)151                    tries += 1152        modules_to_install = " ".join(153            [154                "inc::latest",155                "Module::Build",156                "IO::Interface::Simple",157                "Net::OpenSSH",158            ]159        )160        self.node.execute(161            f"cpanm install {modules_to_install}",162            sudo=True,163            timeout=self.TIME_OUT,164        )165        self.node.execute(166            "perl Build.PL", cwd=self.repo_root, expected_exit_code=0, sudo=True167        )168        self.node.execute(169            "cpanm --installdeps .",170            cwd=self.repo_root,171            sudo=True,172            expected_exit_code=0,173            timeout=self.TIME_OUT,174        )175        self.node.execute(176            "./Build install", cwd=self.repo_root, sudo=True, expected_exit_code=0177        )178        return self._check_exists()179    def _is_expected_failure(self, name: str) -> bool:180        distro = type(self.node.os).__name__181        # The test name in the output appears with its full path. Whereas the182        # self.EXPECTED_FAILURES contain just the test names. That's why183        # endswith() is used here.184        return len([f for f in self.EXPECTED_FAILURES[distro] if name.endswith(f)]) > 0185    def _extract_test_results(self, output: str) -> List[LibvirtTckTestResult]:186        results: List[LibvirtTckTestResult] = []187        # output follows the JUnit XML schema188        testsuites = ETree.fromstring(output)189        for testsuite in testsuites:190            result = LibvirtTckTestResult()191            result.name = testsuite.attrib["name"]192            skipped = int(testsuite.attrib["tests"]) == 0193            failed = int(testsuite.attrib["failures"]) > 0194            if failed:195                if self._is_expected_failure(result.name):196                    result.status = TestStatus.ATTEMPTED197                else:198                    result.status = TestStatus.FAILED199            elif skipped:200                result.status = TestStatus.SKIPPED201            else:202                result.status = TestStatus.PASSED203            results.append(result)204        return results205    def _send_subtest_msg(206        self,207        test_id: str,208        environment: Environment,209        test_name: str,210        test_status: TestStatus,211    ) -> None:212        subtest_msg = create_test_result_message(213            SubTestMessage,214            test_id,215            environment,216            test_name,217            test_status,218        )219        notifier.notify(subtest_msg)mshv_root_tests.py
Source:mshv_root_tests.py  
...93                    log,94                    node,95                    log_path,96                )97                self._send_subtest_msg(98                    result.id_,99                    environment,100                    test_name,101                    TestStatus.PASSED,102                )103            except Exception as e:104                failures += 1105                self._send_subtest_msg(106                    result.id_, environment, test_name, TestStatus.FAILED, repr(e)107                )108        assert_that(failures).is_equal_to(0)109        return110    def _mshv_stress_vm_create(111        self,112        times: int,113        cpus_per_vm: int,114        mem_per_vm_mb: int,115        log: Logger,116        node: Node,117        log_path: Path,118    ) -> None:119        log.info(120            f"MSHV stress VM create: times={times}, cpus_per_vm={cpus_per_vm}, mem_per_vm_mb={mem_per_vm_mb}"  # noqa: E501121        )122        hypervisor_fw_path = str(node.get_working_path() / self.HYPERVISOR_FW_NAME)123        disk_img_path = str(node.get_working_path() / self.DISK_IMG_NAME)124        cores = node.tools[Lscpu].get_core_count()125        vm_count = int(cores / cpus_per_vm)126        failures = 0127        for test_iter in range(times):128            log.info(f"Test iteration {test_iter + 1} of {times}")129            node.tools[Free].log_memory_stats_mb()130            procs = []131            for i in range(vm_count):132                log.info(f"Starting VM {i}")133                p = node.tools[CloudHypervisor].start_vm_async(134                    kernel=hypervisor_fw_path,135                    cpus=cpus_per_vm,136                    memory_mb=mem_per_vm_mb,137                    disk_path=disk_img_path,138                    disk_readonly=True,139                )140                assert_that(p).described_as(f"Failed to create VM {i}").is_not_none()141                procs.append(p)142                node.tools[Free].log_memory_stats_mb()143                assert_that(p.is_running()).described_as(144                    f"VM {i} failed to start"145                ).is_true()146            # keep the VMs running for a while147            time.sleep(10)148            for i in range(len(procs)):149                p = procs[i]150                if not p.is_running():151                    log.info(f"VM {i} was not running (OOM killed?)")152                    failures += 1153                    continue154                log.info(f"Killing VM {i}")155                p.kill()156            node.tools[Free].log_memory_stats_mb()157        dmesg_str = node.tools[Dmesg].get_output()158        dmesg_path = log_path / f"dmesg_{times}_{cpus_per_vm}_{mem_per_vm_mb}"159        with open(str(dmesg_path), "w") as f:160            f.write(dmesg_str)161        assert_that(failures).is_equal_to(0)162    def _send_subtest_msg(163        self,164        test_id: str,165        environment: Environment,166        test_name: str,167        test_status: TestStatus,168        test_msg: str = "",169    ) -> None:170        subtest_msg = create_test_result_message(171            SubTestMessage,172            test_id,173            environment,174            test_name,175            test_status,176            test_msg,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
