How to use _merge_test_requirements method in lisa

Best Python code snippet using lisa_python

lisa_runner.py

Source:lisa_runner.py Github

copy

Full Screen

...55 # load environments56 runbook_environments = load_environments(self._runbook.environment)57 if not runbook_environments:58 # if no runbook environment defined, generate from requirements59 self._merge_test_requirements(60 test_results=self.test_results,61 existing_environments=runbook_environments,62 platform_type=self.platform.type_name(),63 )64 self.environments: List[Environment] = [65 x for x in runbook_environments.values()66 ]67 self._log.debug(f"candidate environment count: {len(self.environments)}")68 @property69 def is_done(self) -> bool:70 is_all_results_completed = all(71 result.is_completed for result in self.test_results72 )73 # all environment should not be used and not be deployed.74 is_all_environment_completed = hasattr(self, "environments") and all(75 (not env.is_in_use)76 and (env.status in [EnvironmentStatus.Prepared, EnvironmentStatus.Deleted])77 for env in self.environments78 )79 return is_all_results_completed and is_all_environment_completed80 def fetch_task(self) -> Optional[Task[None]]:81 self._prepare_environments()82 self._cleanup_deleted_environments()83 self._cleanup_done_results()84 # sort environments by status85 available_environments = self._sort_environments(self.environments)86 available_results = self._sort_test_results(87 [x for x in self.test_results if x.can_run]88 )89 # check deletable environments90 delete_task = self._delete_unused_environments()91 if delete_task:92 return delete_task93 if available_results and available_environments:94 for priority in range(6):95 can_run_results = self._get_results_by_priority(96 available_results, priority97 )98 if not can_run_results:99 continue100 # it means there are test cases and environment, so it needs to101 # schedule task.102 for environment in available_environments:103 if environment.is_in_use:104 # skip in used environments105 continue106 # try to pick the designated test result.107 environment_results = [108 x109 for x in can_run_results110 if environment.source_test_result111 and x.id_ == environment.source_test_result.id_112 ]113 if not environment_results:114 environment_results = self._get_runnable_test_results(115 test_results=can_run_results, environment=environment116 )117 if not environment_results:118 continue119 task = self._dispatch_test_result(120 environment=environment, test_results=environment_results121 )122 # there is more checking conditions. If some conditions doesn't123 # meet, the task is None. If so, not return, and try next124 # conditions or skip this test case.125 if task:126 return task127 if not any(128 x.is_in_use or x.status == EnvironmentStatus.New129 for x in available_environments130 ):131 # if there is no environment in used, new, and results are132 # not fit envs. those results cannot be run.133 self._skip_test_results(can_run_results)134 elif available_results:135 # no available environments, so mark all test results skipped.136 self._skip_test_results(available_results)137 self.status = ActionStatus.SUCCESS138 return None139 def close(self) -> None:140 if hasattr(self, "environments") and self.environments:141 for environment in self.environments:142 self._delete_environment_task(environment, [])143 super().close()144 def _dispatch_test_result(145 self, environment: Environment, test_results: List[TestResult]146 ) -> Optional[Task[None]]:147 check_cancelled()148 assert test_results149 can_run_results = test_results150 # deploy151 if environment.status == EnvironmentStatus.Prepared and can_run_results:152 return self._generate_task(153 task_method=self._deploy_environment_task,154 environment=environment,155 test_results=can_run_results[:1],156 )157 # run on deployed environment158 can_run_results = [x for x in can_run_results if x.can_run]159 if environment.status == EnvironmentStatus.Deployed and can_run_results:160 selected_test_results = self._get_test_results_to_run(161 test_results=test_results, environment=environment162 )163 if selected_test_results:164 return self._generate_task(165 task_method=self._run_test_task,166 environment=environment,167 test_results=selected_test_results,168 case_variables=self._case_variables,169 )170 # Check if there is case to run in a connected environment. If so,171 # initialize the environment172 initialization_results = self._get_runnable_test_results(173 test_results=test_results,174 environment_status=EnvironmentStatus.Connected,175 environment=environment,176 )177 if initialization_results:178 return self._generate_task(179 task_method=self._initialize_environment_task,180 environment=environment,181 test_results=initialization_results,182 )183 # run on connected environment184 can_run_results = [x for x in can_run_results if x.can_run]185 if environment.status == EnvironmentStatus.Connected and can_run_results:186 selected_test_results = self._get_test_results_to_run(187 test_results=test_results, environment=environment188 )189 if selected_test_results:190 return self._generate_task(191 task_method=self._run_test_task,192 environment=environment,193 test_results=selected_test_results,194 case_variables=self._case_variables,195 )196 return None197 def _delete_unused_environments(self) -> Optional[Task[None]]:198 available_environments = self._sort_environments(self.environments)199 # check deletable environments200 for environment in available_environments:201 # if an environment is in using, or not deployed, they won't be202 # deleted until end of runner.203 if environment.is_in_use or environment.status in [204 EnvironmentStatus.New,205 EnvironmentStatus.Prepared,206 ]:207 continue208 can_run_results = self._get_runnable_test_results(209 self.test_results, environment=environment210 )211 if not can_run_results:212 # no more test need this environment, delete it.213 self._log.debug(214 f"generating delete environment task on '{environment.name}'"215 )216 return self._generate_task(217 task_method=self._delete_environment_task,218 environment=environment,219 test_results=[],220 )221 return None222 def _prepare_environments(self) -> None:223 if all(x.status != EnvironmentStatus.New for x in self.environments):224 return225 proceeded_environments: List[Environment] = []226 for candidate_environment in self.environments:227 success = True228 if candidate_environment.status == EnvironmentStatus.New:229 success = self._prepare_environment(candidate_environment)230 if success:231 proceeded_environments.append(candidate_environment)232 # sort by environment source and cost cases233 # user defined should be higher priority than test cases' requirement234 proceeded_environments.sort(key=lambda x: (not x.is_predefined, x.cost))235 self.environments = proceeded_environments236 def _deploy_environment_task(237 self, environment: Environment, test_results: List[TestResult]238 ) -> None:239 try:240 try:241 self.platform.deploy_environment(environment)242 assert (243 environment.status == EnvironmentStatus.Deployed244 ), f"actual: {environment.status}"245 self._reset_awaitable_timer("deploy")246 except ResourceAwaitableException as identifier:247 if self._is_awaitable_timeout("deploy"):248 self._log.info(249 f"[{environment.name}] timeout on waiting for more resource: "250 f"{identifier}, skip assigning case."251 )252 raise SkippedException(identifier)253 else:254 # rerun prepare to calculate resource again.255 environment.status = EnvironmentStatus.New256 except Exception as identifier:257 self._attach_failed_environment_to_result(258 environment=environment,259 result=test_results[0],260 exception=identifier,261 )262 self._delete_environment_task(environment=environment, test_results=[])263 def _initialize_environment_task(264 self, environment: Environment, test_results: List[TestResult]265 ) -> None:266 self._log.debug(f"start initializing task on '{environment.name}'")267 assert test_results268 try:269 environment.initialize()270 assert (271 environment.status == EnvironmentStatus.Connected272 ), f"actual: {environment.status}"273 except Exception as identifier:274 self._attach_failed_environment_to_result(275 environment=environment,276 result=test_results[0],277 exception=identifier,278 )279 self._delete_environment_task(environment=environment, test_results=[])280 def _run_test_task(281 self,282 environment: Environment,283 test_results: List[TestResult],284 case_variables: Dict[str, VariableEntry],285 ) -> None:286 self._log.debug(287 f"start running cases on '{environment.name}', "288 f"case count: {len(test_results)}, "289 f"status {environment.status.name}"290 )291 assert test_results292 assert len(test_results) == 1, (293 f"single test result to run, " f"but {len(test_results)} found."294 )295 test_result = test_results[0]296 suite_metadata = test_result.runtime_data.metadata.suite297 test_suite: TestSuite = suite_metadata.test_class(298 suite_metadata,299 )300 test_suite.start(301 environment=environment,302 case_results=test_results,303 case_variables=case_variables,304 )305 # release environment reference to optimize memory.306 test_result.environment = None307 # Some test cases may break the ssh connections. To reduce side effects308 # on next test cases, close the connection after each test run. It will309 # be connected on the next command automatically.310 environment.nodes.close()311 # Try to connect node(s), if cannot access node(s) of this environment,312 # set the current environment as Bad. So that this environment won't be reused.313 if not is_unittest() and not environment.nodes.test_connections():314 environment.status = EnvironmentStatus.Bad315 self._log.debug(316 f"set environment '{environment.name}' as bad, "317 f"because after test case '{test_result.name}', "318 f"node(s) cannot be accessible."319 )320 environment.nodes.close()321 # keep failed environment, not to delete322 if (323 test_result.status == TestStatus.FAILED324 and self.platform.runbook.keep_environment325 == constants.ENVIRONMENT_KEEP_FAILED326 ):327 self._log.debug(328 f"keep environment '{environment.name}', "329 f"because keep_environment is 'failed', "330 f"and test case '{test_result.name}' failed on it."331 )332 environment.status = EnvironmentStatus.Deleted333 # if an environment is in bad status, it will be deleted, not run more334 # test cases. But if the setting is to keep failed environment, it may335 # be kept in above logic.336 if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337 self._log.debug(338 f"delete environment '{environment.name}', "339 f"because it's in Bad status or marked as dirty."340 )341 self._delete_environment_task(342 environment=environment, test_results=test_results343 )344 def _delete_environment_task(345 self, environment: Environment, test_results: List[TestResult]346 ) -> None:347 """348 May be called async349 """350 # the predefined environment shouldn't be deleted, because it351 # serves all test cases.352 if environment.status == EnvironmentStatus.Deleted or (353 environment.status == EnvironmentStatus.Prepared354 and not environment.is_in_use355 ):356 # The prepared only environment doesn't need to be deleted.357 # It may cause platform fail to delete non-existing environment.358 environment.status = EnvironmentStatus.Deleted359 else:360 try:361 self.platform.delete_environment(environment)362 except Exception as identifier:363 self._log.debug(364 f"error on deleting environment '{environment.name}': {identifier}"365 )366 def _prepare_environment(self, environment: Environment) -> bool:367 success = True368 try:369 try:370 self.platform.prepare_environment(environment)371 self._reset_awaitable_timer("prepare")372 except ResourceAwaitableException as identifier:373 # if timed out, raise the exception and skip the test case. If374 # not, do nothing to keep env as new to try next time.375 if self._is_awaitable_timeout("prepare"):376 raise SkippedException(identifier)377 except Exception as identifier:378 success = False379 matched_result = self._match_failed_environment_with_result(380 environment=environment,381 candidate_results=self.test_results,382 exception=identifier,383 )384 self._attach_failed_environment_to_result(385 environment=environment,386 result=matched_result,387 exception=identifier,388 )389 return success390 def _cleanup_deleted_environments(self) -> None:391 # remove reference to unused environments. It can save memory on big runs.392 new_environments: List[Environment] = []393 for environment in self.environments[:]:394 if environment.status != EnvironmentStatus.Deleted:395 new_environments.append(environment)396 self.environments = new_environments397 def _cleanup_done_results(self) -> None:398 # remove reference to completed test results. It can save memory on big runs.399 remaining_results: List[TestResult] = []400 for test_result in self.test_results[:]:401 if not test_result.is_completed:402 remaining_results.append(test_result)403 self.test_results = remaining_results404 def _get_results_by_priority(405 self, test_results: List[TestResult], priority: int406 ) -> List[TestResult]:407 if not test_results:408 return []409 test_results = [410 x for x in test_results if x.runtime_data.metadata.priority == priority411 ]412 return test_results413 def _generate_task(414 self,415 task_method: Callable[..., None],416 environment: Environment,417 test_results: List[TestResult],418 **kwargs: Any,419 ) -> Task[None]:420 assert not environment.is_in_use421 environment.is_in_use = True422 for test_result in test_results:423 # return assigned but not run cases424 if test_result.status == TestStatus.QUEUED:425 test_result.set_status(TestStatus.ASSIGNED, "")426 task = partial(427 self._run_task,428 task_method,429 environment=environment,430 test_results=test_results,431 **kwargs,432 )433 return Task(self.generate_task_id(), task, self._log)434 def _run_task(435 self,436 task_method: Callable[..., None],437 environment: Environment,438 test_results: List[TestResult],439 **kwargs: Any,440 ) -> None:441 assert environment.is_in_use442 task_method(environment=environment, test_results=test_results, **kwargs)443 for test_result in test_results:444 # return assigned but not run cases445 if test_result.status == TestStatus.ASSIGNED:446 test_result.set_status(TestStatus.QUEUED, "")447 environment.is_in_use = False448 def _match_failed_environment_with_result(449 self,450 environment: Environment,451 candidate_results: List[TestResult],452 exception: Exception,453 ) -> TestResult:454 if environment.source_test_result and environment.source_test_result.is_queued:455 matched_results = [environment.source_test_result]456 else:457 matched_results = self._get_runnable_test_results(458 test_results=candidate_results,459 environment=environment,460 )461 if not matched_results:462 self._log.info(463 "No requirement of test case is suitable for the preparation "464 f"error of the environment '{environment.name}'. "465 "Randomly attach a test case to this environment. "466 "This may be because the platform failed before populating the "467 "features into this environment.",468 )469 matched_results = [470 result for result in self.test_results if result.is_queued471 ]472 if not matched_results:473 raise LisaException(474 "There are no remaining test results to run, so preparation "475 "errors cannot be appended to the test results. Please correct "476 "the error and run again. "477 f"original exception: {exception}"478 )479 return matched_results[0]480 def _attach_failed_environment_to_result(481 self,482 environment: Environment,483 result: TestResult,484 exception: Exception,485 ) -> None:486 # make first fit test case failed by deployment,487 # so deployment failure can be tracked.488 environment.platform = self.platform489 result.environment = environment490 result.handle_exception(exception=exception, log=self._log, phase="deployment")491 self._log.info(492 f"'{environment.name}' attached to test case "493 f"'{result.runtime_data.metadata.full_name}({result.id_})': "494 f"{exception}"495 )496 # release environment reference to optimize memory.497 result.environment = None498 def _get_runnable_test_results(499 self,500 test_results: List[TestResult],501 use_new_environment: Optional[bool] = None,502 environment_status: Optional[EnvironmentStatus] = None,503 environment: Optional[Environment] = None,504 ) -> List[TestResult]:505 results = [506 x507 for x in test_results508 if x.is_queued509 and (510 use_new_environment is None511 or x.runtime_data.use_new_environment == use_new_environment512 )513 and (514 environment_status is None515 or x.runtime_data.metadata.requirement.environment_status516 == environment_status517 )518 ]519 if environment:520 runnable_results: List[TestResult] = []521 for result in results:522 try:523 if result.check_environment(524 environment=environment, save_reason=True525 ) and (526 not result.runtime_data.use_new_environment527 or environment.is_new528 ):529 runnable_results.append(result)530 except SkippedException as identifier:531 # when check the environment, the test result may be marked532 # as skipped, due to the test result is assumed not to match533 # any environment.534 result.handle_exception(identifier, log=self._log, phase="check")535 results = runnable_results536 # only select one test case, which needs the new environment. Others537 # will be dropped to next environment.538 if sum(1 for x in results if x.runtime_data.use_new_environment) > 1:539 new_results: List[TestResult] = []540 has_new_result: bool = False541 for x in results:542 if x.runtime_data.use_new_environment:543 # skip from second new result544 if has_new_result:545 continue546 has_new_result = True547 new_results.append(x)548 else:549 new_results.append(x)550 results = new_results551 results = self._sort_test_results(results)552 return results553 def _get_test_results_to_run(554 self, test_results: List[TestResult], environment: Environment555 ) -> List[TestResult]:556 to_run_results = self._get_runnable_test_results(557 test_results=test_results,558 environment_status=environment.status,559 environment=environment,560 )561 if to_run_results:562 to_run_test_result = next(563 (x for x in to_run_results if x.runtime_data.use_new_environment),564 None,565 )566 if not to_run_test_result:567 to_run_test_result = to_run_results[0]568 to_run_results = [to_run_test_result]569 return to_run_results570 def _sort_environments(self, environments: List[Environment]) -> List[Environment]:571 results: List[Environment] = []572 # sort environments by the status list573 sorted_status = [574 EnvironmentStatus.Connected,575 EnvironmentStatus.Deployed,576 EnvironmentStatus.Prepared,577 EnvironmentStatus.New,578 ]579 if environments:580 for status in sorted_status:581 results.extend(582 x for x in environments if x.status == status and x.is_alive583 )584 return results585 def _sort_test_results(self, test_results: List[TestResult]) -> List[TestResult]:586 results = test_results.copy()587 # sort by priority, use new environment, environment status and suite name.588 results.sort(589 key=lambda r: str(r.runtime_data.metadata.suite.name),590 )591 # this step make sure Deployed is before Connected592 results.sort(593 reverse=True,594 key=lambda r: str(r.runtime_data.metadata.requirement.environment_status),595 )596 results.sort(597 reverse=True,598 key=lambda r: str(r.runtime_data.use_new_environment),599 )600 results.sort(key=lambda r: r.runtime_data.metadata.priority)601 return results602 def _skip_test_results(603 self,604 test_results: List[TestResult],605 additional_reason: str = "no available environment",606 ) -> None:607 for test_result in test_results:608 if test_result.is_completed:609 # already completed, don't skip it.610 continue611 if test_result.check_results and test_result.check_results.reasons:612 reasons = f"{additional_reason}: {test_result.check_results.reasons}"613 else:614 reasons = additional_reason615 test_result.set_status(TestStatus.SKIPPED, reasons)616 def _merge_test_requirements(617 self,618 test_results: List[TestResult],619 existing_environments: Environments,620 platform_type: str,621 ) -> None:622 assert platform_type623 platform_type_set = search_space.SetSpace[str](624 is_allow_set=True, items=[platform_type]625 )626 # if platform defined requirement, replace the requirement from627 # test case.628 for test_result in test_results:629 platform_requirement = self._create_platform_requirement()630 test_req: TestCaseRequirement = test_result.runtime_data.requirement...

Full Screen

Full Screen

test_lisa_runner.py

Source:test_lisa_runner.py Github

copy

Full Screen

...56 [x for x in envs],57 )58 runner = generate_runner(None)59 test_results = test_testsuite.generate_cases_result()60 runner._merge_test_requirements(61 test_results=test_results,62 existing_environments=envs,63 platform_type=constants.PLATFORM_MOCK,64 )65 # 3 cases create 3 environments.66 self.assertListEqual(67 ["generated_0", "generated_1", "generated_2"],68 list(envs),69 )70 self.verify_test_results(71 expected_test_order=["mock_ut1", "mock_ut2", "mock_ut3"],72 expected_envs=["", "", ""],73 expected_status=[TestStatus.QUEUED, TestStatus.QUEUED, TestStatus.QUEUED],74 expected_message=["", "", ""],75 test_results=test_results,76 )77 def test_merge_req(self) -> None:78 # each test case will create an environment candidate.79 env_runbook = generate_env_runbook(remote=True)80 envs = load_environments(env_runbook)81 self.assertListEqual(82 ["customized_0"],83 list(envs),84 )85 runner = generate_runner(env_runbook)86 test_results = test_testsuite.generate_cases_result()87 runner._merge_test_requirements(88 test_results=test_results,89 existing_environments=envs,90 platform_type=constants.PLATFORM_MOCK,91 )92 self.assertListEqual(93 ["customized_0", "generated_1", "generated_2", "generated_3"],94 list(envs),95 )96 self.assertListEqual(97 [98 TestStatus.QUEUED,99 TestStatus.QUEUED,100 TestStatus.QUEUED,101 ],102 [x.status for x in test_results],103 )104 def test_merge_req_create_on_use_new(self) -> None:105 # same runbook as test_merge_req_run_not_create_on_equal106 # but all 3 cases asks a new env, so create 3 envs107 # note, when running cases, predefined env will be treat as a new env.108 env_runbook = generate_env_runbook(remote=True)109 envs = load_environments(env_runbook)110 self.assertListEqual(111 ["customized_0"],112 list(envs),113 )114 runner = generate_runner(env_runbook)115 test_results = test_testsuite.generate_cases_result()116 for test_result in test_results:117 test_result.runtime_data.use_new_environment = True118 runner._merge_test_requirements(119 test_results=test_results,120 existing_environments=envs,121 platform_type=constants.PLATFORM_MOCK,122 )123 # All 3 cases needed a new environment, so it created 3.124 self.assertListEqual(125 ["customized_0", "generated_1", "generated_2", "generated_3"],126 list(envs),127 )128 self.verify_test_results(129 expected_test_order=["mock_ut1", "mock_ut2", "mock_ut3"],130 expected_envs=["", "", ""],131 expected_status=[TestStatus.QUEUED, TestStatus.QUEUED, TestStatus.QUEUED],132 expected_message=["", "", ""],133 test_results=test_results,134 )135 def test_merge_req_all_generated(self) -> None:136 # force to use existing env, not to create new.137 # this case doesn't provide predefined env, but no case skipped on this stage.138 env_runbook = generate_env_runbook(is_single_env=False)139 envs = load_environments(env_runbook)140 self.assertListEqual(141 [],142 list(envs),143 )144 runner = generate_runner(None)145 test_results = test_testsuite.generate_cases_result()146 runner._merge_test_requirements(147 test_results=test_results,148 existing_environments=envs,149 platform_type=constants.PLATFORM_MOCK,150 )151 self.assertListEqual(152 ["generated_0", "generated_1", "generated_2"],153 list(envs),154 )155 self.verify_test_results(156 expected_test_order=["mock_ut1", "mock_ut2", "mock_ut3"],157 expected_envs=["", "", ""],158 expected_status=[159 TestStatus.QUEUED,160 TestStatus.QUEUED,161 TestStatus.QUEUED,162 ],163 expected_message=["", "", ""],164 test_results=test_results,165 )166 def test_merge_req_platform_type_checked(self) -> None:167 # check if current platform supported,168 # for example, some case run on azure only.169 # platform check happens in runner, so this case is here170 # a simple check is enough. More covered by search_space.SetSpace171 env_runbook = generate_env_runbook(is_single_env=False)172 envs = load_environments(env_runbook)173 self.assertListEqual(174 [],175 list(envs),176 )177 runner = generate_runner(None)178 test_results = test_testsuite.generate_cases_result()179 for test_result in test_results:180 metadata = test_result.runtime_data.metadata181 metadata.requirement = simple_requirement(182 supported_platform_type=["does-not-exist"]183 )184 runner._merge_test_requirements(185 test_results=test_results,186 existing_environments=envs,187 platform_type=constants.PLATFORM_MOCK,188 )189 platform_unsupported = "capability cannot support some of requirement"190 self.verify_test_results(191 expected_test_order=["mock_ut1", "mock_ut2", "mock_ut3"],192 expected_envs=["", "", ""],193 expected_status=[194 TestStatus.SKIPPED,195 TestStatus.SKIPPED,196 TestStatus.SKIPPED,197 ],198 expected_message=[...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful