How to use _extract_test_results method in lisa

Best Python code snippet using lisa_python

ch_tests_tool.py

Source:ch_tests_tool.py Github

copy

Full Screen

...50 cwd=self.repo_root,51 no_info_log=False, # print out result of each test52 shell=True,53 )54 results = self._extract_test_results(result.stdout)55 failures = [r.name for r in results if r.status == TestStatus.FAILED]56 if not failures:57 result.assert_exit_code()58 for r in results:59 self._send_subtest_msg(60 test_result.id_,61 environment,62 r.name,63 r.status,64 )65 assert_that(failures, f"Unexpected failures: {failures}").is_empty()66 def run_metrics_tests(67 self,68 test_result: TestResult,69 environment: Environment,70 hypervisor: str,71 log_path: Path,72 skip: Optional[List[str]] = None,73 ) -> None:74 perf_metrics_tests = self._list_perf_metrics_tests(hypervisor=hypervisor)75 failed_testcases = []76 for testcase in perf_metrics_tests:77 testcase_log_file = log_path.joinpath(f"{testcase}.log")78 status: TestStatus = TestStatus.QUEUED79 metrics: str = ""80 trace: str = ""81 try:82 result = self.run(83 f"tests --hypervisor {hypervisor} --metrics -- -- \84 --test-filter {testcase}",85 timeout=self.TIME_OUT,86 force_run=True,87 cwd=self.repo_root,88 no_info_log=False, # print out result of each test89 shell=True,90 )91 if result.exit_code == 0:92 status = TestStatus.PASSED93 metrics = self._process_perf_metric_test_result(result.stdout)94 else:95 status = TestStatus.FAILED96 trace = f"Testcase '{testcase}' failed: {result.stderr}"97 failed_testcases.append(testcase)98 except Exception as e:99 self._log.info(f"Testcase failed, tescase name: {testcase}")100 status = TestStatus.FAILED101 trace = str(e)102 failed_testcases.append(testcase)103 msg = metrics if status == TestStatus.PASSED else trace104 self._send_subtest_msg(105 test_id=test_result.id_,106 environment=environment,107 test_name=testcase,108 test_status=status,109 test_message=msg,110 )111 # Write stdout of testcase to log as per given requirement112 with open(testcase_log_file, "w") as f:113 f.write(result.stdout)114 assert_that(115 failed_testcases, f"Failed Testcases: {failed_testcases}"116 ).is_empty()117 def _initialize(self, *args: Any, **kwargs: Any) -> None:118 tool_path = self.get_tool_path(use_global=True)119 self.repo_root = tool_path / "cloud-hypervisor"120 self.cmd_path = self.repo_root / "scripts" / "dev_cli.sh"121 def _install(self) -> bool:122 git = self.node.tools[Git]123 git.clone(self.repo, self.get_tool_path(use_global=True))124 if isinstance(self.node.os, CBLMariner):125 daemon_json_file = PurePath("/etc/docker/daemon.json")126 daemon_json = '{"default-ulimits":{"nofile":{"Hard":65535,"Name":"nofile","Soft":65535}}}' # noqa: E501127 self.node.tools[Echo].write_to_file(128 daemon_json, daemon_json_file, sudo=True129 )130 self.node.execute("groupadd -f docker", expected_exit_code=0)131 username = self.node.tools[Whoami].get_username()132 res = self.node.execute("getent group docker", expected_exit_code=0)133 if username not in res.stdout: # if current user is not in docker group134 self.node.execute(f"usermod -a -G docker {username}", sudo=True)135 # reboot for group membership change to take effect136 self.node.reboot()137 self.node.tools[Docker].start()138 return self._check_exists()139 def _extract_test_results(self, output: str) -> List[CloudHypervisorTestResult]:140 results: List[CloudHypervisorTestResult] = []141 # Cargo will output test status for each test separately in JSON format. Parse142 # the output line by line to obtain the list of all tests run along with their143 # outcomes.144 #145 # Example output:146 # { "type": "test", "event": "ok", "name": "integration::test_vfio" }147 lines = output.split("\n")148 for line in lines:149 result = {}150 try:151 result = json.loads(line)152 except json.decoder.JSONDecodeError:153 continue...

Full Screen

Full Screen

Results.py

Source:Results.py Github

copy

Full Screen

...137 coverage.coverage = cov['coverage']138 coverage.all_coverage = cov['all_coverage']139 coverage.dep_coverage = cov['dep_coverage']140 return coverage141def _extract_test_results(test) -> TestResults:142 if test is None:143 return None144 test_results = TestResults()145 test_results.error = test['error']146 test_results.failing = test['failing']147 test_results.passing = test['passing']148 test_results.execution_time = test['execution_time']149 return test_results150results = Results()151with open(os.path.join(os.path.dirname(__file__), '..', 'raw_results.json'), 'r') as fd:152 data = json.load(fd)153 for lib_id in data:154 library = Library()155 results.libs.append(library)156 library.repo = lib_id157 for v in data[lib_id]:158 if "woodstox" in lib_id and (v == "6.1.1" or v == "6.0.2"):159 continue160 version = Version(library)161 library.versions.append(version)162 version.version = v163 l = data[lib_id][v]164 version.compiled = l['compiled']165 version.debloat = l['debloat']166 version.type_nb_class = l['type_nb_class']167 version.type_nb_class_abstract = l['type_nb_class_abstract']168 version.type_nb_interface = l['type_nb_interface']169 version.type_nb_constant = l['type_nb_constant']170 version.type_nb_signeton = l['type_nb_signeton']171 version.type_nb_enum = l['type_nb_enum']172 version.type_nb_exception = l['type_nb_exception']173 version.type_nb_unknown = l['type_nb_unknown']174 version.nb_class = l['nb_class']175 version.nb_method = l['nb_method']176 version.nb_debloat_class = l['nb_debloat_class']177 version.nb_preserved_class = l['nb_preserved_class']178 version.nb_debloat_method = l['nb_debloat_method']179 version.debloat_time = l['debloat_time']180 version.original_execution_time = l['original_execution_time']181 version.debloat_execution_time = l['debloat_execution_time']182 version.original_jar_size = l['original_jar_size']183 version.debloat_jar_size = l['debloat_jar_size']184 version.workaround_jar_size = l['workaround_jar_size']185 version.dependencies = _extract_dependencies(l['dependencies'])186 version.coverage = _extract_coverage(l['coverage'])187 version.original_test = _extract_test_results(l['original_test'])188 version.debloat_test = _extract_test_results(l['debloat_test'])189 for i in l['clients']:190 c = l['clients'][i]191 client = Client(version)192 version.clients.append(client)193 client.repo = c['repo_name']194 client.compiled = c['compiled']195 client.debloat = c['debloat']196 client.groupId = c['groupId']197 client.artifactId = c['artifactId']198 client.static_use = c['static_use']199 client.test_cover_lib = c['test_cover_lib']200 client.original_test = _extract_test_results(201 c['original_test'])202 client.debloat_test = _extract_test_results(c['debloat_test'])203 client.coverage_original = _extract_coverage(204 c['coverage_original'])205 client.coverage_debloat = _extract_coverage(206 c['coverage_debloat'])207 client.original_execution_time = c['original_execution_time']...

Full Screen

Full Screen

libvirt_tck_tool.py

Source:libvirt_tck_tool.py Github

copy

Full Screen

...75 cwd=self.repo_root,76 sudo=True,77 shell=True,78 )79 results = self._extract_test_results(result.stdout)80 failures = [r.name for r in results if r.status == TestStatus.FAILED]81 expected_fails = [r.name for r in results if r.status == TestStatus.ATTEMPTED]82 if not failures and not expected_fails:83 result.assert_exit_code()84 for r in results:85 self._send_subtest_msg(86 test_result.id_,87 environment,88 r.name,89 r.status,90 )91 archive_path = self.repo_root / "results.tar.gz"92 self.node.tools[Chmod].chmod(str(archive_path), "a+r", sudo=True)93 self.node.shell.copy_back(94 self.repo_root / "results.tar.gz",95 log_path / "libvirt_tck_results.tar.gz",96 )97 assert_that(failures, f"Unexpected failures: {failures}").is_empty()98 def _initialize(self, *args: Any, **kwargs: Any) -> None:99 tool_path = self.get_tool_path(use_global=True)100 self.repo_root = tool_path / "libvirt-tck"101 def _install_dep(self) -> None:102 posix_os: Posix = cast(Posix, self.node.os)103 git = self.node.tools[Git]104 git.clone(self.repo, self.get_tool_path(use_global=True), fail_on_exists=False)105 # install dependency packages106 for package in list(self.deps):107 if posix_os.is_package_in_repo(package):108 posix_os.install_packages(package)109 def _install(self) -> bool:110 self._install_dep()111 if isinstance(self.node.os, CBLMariner):112 # tell libvirt to run qemu as root113 libvirt_qemu_conf = PurePath("/etc/libvirt/qemu.conf")114 self.node.tools[Echo].write_to_file(115 'user = "root"',116 libvirt_qemu_conf,117 sudo=True,118 append=True,119 )120 self.node.tools[Echo].write_to_file(121 'group = "root"',122 libvirt_qemu_conf,123 sudo=True,124 append=True,125 )126 self.node.tools[Usermod].add_user_to_group("libvirt", sudo=True)127 # Workaround for error:128 #129 # error from service: GDBus.Error:org.gtk.GDBus.UnmappedGError.Quark130 # ._g_2dfile_2derror_2dquark.Code4: Failed to open file131 # “/proc/2192/status”: No such file or directory132 self.node.tools[Sed].substitute(133 "hidepid=2",134 "hidepid=0",135 "/etc/fstab",136 sudo=True,137 )138 self.node.reboot()139 # After reboot, libvirtd service is in failed state and needs to140 # be restarted manually. Doing it immediately after restarts141 # fails. So wait for a while before restarting libvirtd.142 # This is an issue in Mariner and below lines can be removed once143 # it has been addressed.144 tries = 0145 while tries <= 10:146 try:147 self.node.tools[Service].restart_service("libvirtd")148 break149 except Exception:150 time.sleep(1)151 tries += 1152 modules_to_install = " ".join(153 [154 "inc::latest",155 "Module::Build",156 "IO::Interface::Simple",157 "Net::OpenSSH",158 ]159 )160 self.node.execute(161 f"cpanm install {modules_to_install}",162 sudo=True,163 timeout=self.TIME_OUT,164 )165 self.node.execute(166 "perl Build.PL", cwd=self.repo_root, expected_exit_code=0, sudo=True167 )168 self.node.execute(169 "cpanm --installdeps .",170 cwd=self.repo_root,171 sudo=True,172 expected_exit_code=0,173 timeout=self.TIME_OUT,174 )175 self.node.execute(176 "./Build install", cwd=self.repo_root, sudo=True, expected_exit_code=0177 )178 return self._check_exists()179 def _is_expected_failure(self, name: str) -> bool:180 distro = type(self.node.os).__name__181 # The test name in the output appears with its full path. Whereas the182 # self.EXPECTED_FAILURES contain just the test names. That's why183 # endswith() is used here.184 return len([f for f in self.EXPECTED_FAILURES[distro] if name.endswith(f)]) > 0185 def _extract_test_results(self, output: str) -> List[LibvirtTckTestResult]:186 results: List[LibvirtTckTestResult] = []187 # output follows the JUnit XML schema188 testsuites = ETree.fromstring(output)189 for testsuite in testsuites:190 result = LibvirtTckTestResult()191 result.name = testsuite.attrib["name"]192 skipped = int(testsuite.attrib["tests"]) == 0193 failed = int(testsuite.attrib["failures"]) > 0194 if failed:195 if self._is_expected_failure(result.name):196 result.status = TestStatus.ATTEMPTED197 else:198 result.status = TestStatus.FAILED199 elif skipped:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful