How to use _sort_environments method in lisa

Best Python code snippet using lisa_python

lisa_runner.py

Source:lisa_runner.py Github

copy

Full Screen

...81 self._prepare_environments()82 self._cleanup_deleted_environments()83 self._cleanup_done_results()84 # sort environments by status85 available_environments = self._sort_environments(self.environments)86 available_results = self._sort_test_results(87 [x for x in self.test_results if x.can_run]88 )89 # check deletable environments90 delete_task = self._delete_unused_environments()91 if delete_task:92 return delete_task93 if available_results and available_environments:94 for priority in range(6):95 can_run_results = self._get_results_by_priority(96 available_results, priority97 )98 if not can_run_results:99 continue100 # it means there are test cases and environment, so it needs to101 # schedule task.102 for environment in available_environments:103 if environment.is_in_use:104 # skip in used environments105 continue106 # try to pick the designated test result.107 environment_results = [108 x109 for x in can_run_results110 if environment.source_test_result111 and x.id_ == environment.source_test_result.id_112 ]113 if not environment_results:114 environment_results = self._get_runnable_test_results(115 test_results=can_run_results, environment=environment116 )117 if not environment_results:118 continue119 task = self._dispatch_test_result(120 environment=environment, test_results=environment_results121 )122 # there is more checking conditions. If some conditions doesn't123 # meet, the task is None. If so, not return, and try next124 # conditions or skip this test case.125 if task:126 return task127 if not any(128 x.is_in_use or x.status == EnvironmentStatus.New129 for x in available_environments130 ):131 # if there is no environment in used, new, and results are132 # not fit envs. those results cannot be run.133 self._skip_test_results(can_run_results)134 elif available_results:135 # no available environments, so mark all test results skipped.136 self._skip_test_results(available_results)137 self.status = ActionStatus.SUCCESS138 return None139 def close(self) -> None:140 if hasattr(self, "environments") and self.environments:141 for environment in self.environments:142 self._delete_environment_task(environment, [])143 super().close()144 def _dispatch_test_result(145 self, environment: Environment, test_results: List[TestResult]146 ) -> Optional[Task[None]]:147 check_cancelled()148 assert test_results149 can_run_results = test_results150 # deploy151 if environment.status == EnvironmentStatus.Prepared and can_run_results:152 return self._generate_task(153 task_method=self._deploy_environment_task,154 environment=environment,155 test_results=can_run_results[:1],156 )157 # run on deployed environment158 can_run_results = [x for x in can_run_results if x.can_run]159 if environment.status == EnvironmentStatus.Deployed and can_run_results:160 selected_test_results = self._get_test_results_to_run(161 test_results=test_results, environment=environment162 )163 if selected_test_results:164 return self._generate_task(165 task_method=self._run_test_task,166 environment=environment,167 test_results=selected_test_results,168 case_variables=self._case_variables,169 )170 # Check if there is case to run in a connected environment. If so,171 # initialize the environment172 initialization_results = self._get_runnable_test_results(173 test_results=test_results,174 environment_status=EnvironmentStatus.Connected,175 environment=environment,176 )177 if initialization_results:178 return self._generate_task(179 task_method=self._initialize_environment_task,180 environment=environment,181 test_results=initialization_results,182 )183 # run on connected environment184 can_run_results = [x for x in can_run_results if x.can_run]185 if environment.status == EnvironmentStatus.Connected and can_run_results:186 selected_test_results = self._get_test_results_to_run(187 test_results=test_results, environment=environment188 )189 if selected_test_results:190 return self._generate_task(191 task_method=self._run_test_task,192 environment=environment,193 test_results=selected_test_results,194 case_variables=self._case_variables,195 )196 return None197 def _delete_unused_environments(self) -> Optional[Task[None]]:198 available_environments = self._sort_environments(self.environments)199 # check deletable environments200 for environment in available_environments:201 # if an environment is in using, or not deployed, they won't be202 # deleted until end of runner.203 if environment.is_in_use or environment.status in [204 EnvironmentStatus.New,205 EnvironmentStatus.Prepared,206 ]:207 continue208 can_run_results = self._get_runnable_test_results(209 self.test_results, environment=environment210 )211 if not can_run_results:212 # no more test need this environment, delete it.213 self._log.debug(214 f"generating delete environment task on '{environment.name}'"215 )216 return self._generate_task(217 task_method=self._delete_environment_task,218 environment=environment,219 test_results=[],220 )221 return None222 def _prepare_environments(self) -> None:223 if all(x.status != EnvironmentStatus.New for x in self.environments):224 return225 proceeded_environments: List[Environment] = []226 for candidate_environment in self.environments:227 success = True228 if candidate_environment.status == EnvironmentStatus.New:229 success = self._prepare_environment(candidate_environment)230 if success:231 proceeded_environments.append(candidate_environment)232 # sort by environment source and cost cases233 # user defined should be higher priority than test cases' requirement234 proceeded_environments.sort(key=lambda x: (not x.is_predefined, x.cost))235 self.environments = proceeded_environments236 def _deploy_environment_task(237 self, environment: Environment, test_results: List[TestResult]238 ) -> None:239 try:240 try:241 self.platform.deploy_environment(environment)242 assert (243 environment.status == EnvironmentStatus.Deployed244 ), f"actual: {environment.status}"245 self._reset_awaitable_timer("deploy")246 except ResourceAwaitableException as identifier:247 if self._is_awaitable_timeout("deploy"):248 self._log.info(249 f"[{environment.name}] timeout on waiting for more resource: "250 f"{identifier}, skip assigning case."251 )252 raise SkippedException(identifier)253 else:254 # rerun prepare to calculate resource again.255 environment.status = EnvironmentStatus.New256 except Exception as identifier:257 self._attach_failed_environment_to_result(258 environment=environment,259 result=test_results[0],260 exception=identifier,261 )262 self._delete_environment_task(environment=environment, test_results=[])263 def _initialize_environment_task(264 self, environment: Environment, test_results: List[TestResult]265 ) -> None:266 self._log.debug(f"start initializing task on '{environment.name}'")267 assert test_results268 try:269 environment.initialize()270 assert (271 environment.status == EnvironmentStatus.Connected272 ), f"actual: {environment.status}"273 except Exception as identifier:274 self._attach_failed_environment_to_result(275 environment=environment,276 result=test_results[0],277 exception=identifier,278 )279 self._delete_environment_task(environment=environment, test_results=[])280 def _run_test_task(281 self,282 environment: Environment,283 test_results: List[TestResult],284 case_variables: Dict[str, VariableEntry],285 ) -> None:286 self._log.debug(287 f"start running cases on '{environment.name}', "288 f"case count: {len(test_results)}, "289 f"status {environment.status.name}"290 )291 assert test_results292 assert len(test_results) == 1, (293 f"single test result to run, " f"but {len(test_results)} found."294 )295 test_result = test_results[0]296 suite_metadata = test_result.runtime_data.metadata.suite297 test_suite: TestSuite = suite_metadata.test_class(298 suite_metadata,299 )300 test_suite.start(301 environment=environment,302 case_results=test_results,303 case_variables=case_variables,304 )305 # release environment reference to optimize memory.306 test_result.environment = None307 # Some test cases may break the ssh connections. To reduce side effects308 # on next test cases, close the connection after each test run. It will309 # be connected on the next command automatically.310 environment.nodes.close()311 # Try to connect node(s), if cannot access node(s) of this environment,312 # set the current environment as Bad. So that this environment won't be reused.313 if not is_unittest() and not environment.nodes.test_connections():314 environment.status = EnvironmentStatus.Bad315 self._log.debug(316 f"set environment '{environment.name}' as bad, "317 f"because after test case '{test_result.name}', "318 f"node(s) cannot be accessible."319 )320 environment.nodes.close()321 # keep failed environment, not to delete322 if (323 test_result.status == TestStatus.FAILED324 and self.platform.runbook.keep_environment325 == constants.ENVIRONMENT_KEEP_FAILED326 ):327 self._log.debug(328 f"keep environment '{environment.name}', "329 f"because keep_environment is 'failed', "330 f"and test case '{test_result.name}' failed on it."331 )332 environment.status = EnvironmentStatus.Deleted333 # if an environment is in bad status, it will be deleted, not run more334 # test cases. But if the setting is to keep failed environment, it may335 # be kept in above logic.336 if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337 self._log.debug(338 f"delete environment '{environment.name}', "339 f"because it's in Bad status or marked as dirty."340 )341 self._delete_environment_task(342 environment=environment, test_results=test_results343 )344 def _delete_environment_task(345 self, environment: Environment, test_results: List[TestResult]346 ) -> None:347 """348 May be called async349 """350 # the predefined environment shouldn't be deleted, because it351 # serves all test cases.352 if environment.status == EnvironmentStatus.Deleted or (353 environment.status == EnvironmentStatus.Prepared354 and not environment.is_in_use355 ):356 # The prepared only environment doesn't need to be deleted.357 # It may cause platform fail to delete non-existing environment.358 environment.status = EnvironmentStatus.Deleted359 else:360 try:361 self.platform.delete_environment(environment)362 except Exception as identifier:363 self._log.debug(364 f"error on deleting environment '{environment.name}': {identifier}"365 )366 def _prepare_environment(self, environment: Environment) -> bool:367 success = True368 try:369 try:370 self.platform.prepare_environment(environment)371 self._reset_awaitable_timer("prepare")372 except ResourceAwaitableException as identifier:373 # if timed out, raise the exception and skip the test case. If374 # not, do nothing to keep env as new to try next time.375 if self._is_awaitable_timeout("prepare"):376 raise SkippedException(identifier)377 except Exception as identifier:378 success = False379 matched_result = self._match_failed_environment_with_result(380 environment=environment,381 candidate_results=self.test_results,382 exception=identifier,383 )384 self._attach_failed_environment_to_result(385 environment=environment,386 result=matched_result,387 exception=identifier,388 )389 return success390 def _cleanup_deleted_environments(self) -> None:391 # remove reference to unused environments. It can save memory on big runs.392 new_environments: List[Environment] = []393 for environment in self.environments[:]:394 if environment.status != EnvironmentStatus.Deleted:395 new_environments.append(environment)396 self.environments = new_environments397 def _cleanup_done_results(self) -> None:398 # remove reference to completed test results. It can save memory on big runs.399 remaining_results: List[TestResult] = []400 for test_result in self.test_results[:]:401 if not test_result.is_completed:402 remaining_results.append(test_result)403 self.test_results = remaining_results404 def _get_results_by_priority(405 self, test_results: List[TestResult], priority: int406 ) -> List[TestResult]:407 if not test_results:408 return []409 test_results = [410 x for x in test_results if x.runtime_data.metadata.priority == priority411 ]412 return test_results413 def _generate_task(414 self,415 task_method: Callable[..., None],416 environment: Environment,417 test_results: List[TestResult],418 **kwargs: Any,419 ) -> Task[None]:420 assert not environment.is_in_use421 environment.is_in_use = True422 for test_result in test_results:423 # return assigned but not run cases424 if test_result.status == TestStatus.QUEUED:425 test_result.set_status(TestStatus.ASSIGNED, "")426 task = partial(427 self._run_task,428 task_method,429 environment=environment,430 test_results=test_results,431 **kwargs,432 )433 return Task(self.generate_task_id(), task, self._log)434 def _run_task(435 self,436 task_method: Callable[..., None],437 environment: Environment,438 test_results: List[TestResult],439 **kwargs: Any,440 ) -> None:441 assert environment.is_in_use442 task_method(environment=environment, test_results=test_results, **kwargs)443 for test_result in test_results:444 # return assigned but not run cases445 if test_result.status == TestStatus.ASSIGNED:446 test_result.set_status(TestStatus.QUEUED, "")447 environment.is_in_use = False448 def _match_failed_environment_with_result(449 self,450 environment: Environment,451 candidate_results: List[TestResult],452 exception: Exception,453 ) -> TestResult:454 if environment.source_test_result and environment.source_test_result.is_queued:455 matched_results = [environment.source_test_result]456 else:457 matched_results = self._get_runnable_test_results(458 test_results=candidate_results,459 environment=environment,460 )461 if not matched_results:462 self._log.info(463 "No requirement of test case is suitable for the preparation "464 f"error of the environment '{environment.name}'. "465 "Randomly attach a test case to this environment. "466 "This may be because the platform failed before populating the "467 "features into this environment.",468 )469 matched_results = [470 result for result in self.test_results if result.is_queued471 ]472 if not matched_results:473 raise LisaException(474 "There are no remaining test results to run, so preparation "475 "errors cannot be appended to the test results. Please correct "476 "the error and run again. "477 f"original exception: {exception}"478 )479 return matched_results[0]480 def _attach_failed_environment_to_result(481 self,482 environment: Environment,483 result: TestResult,484 exception: Exception,485 ) -> None:486 # make first fit test case failed by deployment,487 # so deployment failure can be tracked.488 environment.platform = self.platform489 result.environment = environment490 result.handle_exception(exception=exception, log=self._log, phase="deployment")491 self._log.info(492 f"'{environment.name}' attached to test case "493 f"'{result.runtime_data.metadata.full_name}({result.id_})': "494 f"{exception}"495 )496 # release environment reference to optimize memory.497 result.environment = None498 def _get_runnable_test_results(499 self,500 test_results: List[TestResult],501 use_new_environment: Optional[bool] = None,502 environment_status: Optional[EnvironmentStatus] = None,503 environment: Optional[Environment] = None,504 ) -> List[TestResult]:505 results = [506 x507 for x in test_results508 if x.is_queued509 and (510 use_new_environment is None511 or x.runtime_data.use_new_environment == use_new_environment512 )513 and (514 environment_status is None515 or x.runtime_data.metadata.requirement.environment_status516 == environment_status517 )518 ]519 if environment:520 runnable_results: List[TestResult] = []521 for result in results:522 try:523 if result.check_environment(524 environment=environment, save_reason=True525 ) and (526 not result.runtime_data.use_new_environment527 or environment.is_new528 ):529 runnable_results.append(result)530 except SkippedException as identifier:531 # when check the environment, the test result may be marked532 # as skipped, due to the test result is assumed not to match533 # any environment.534 result.handle_exception(identifier, log=self._log, phase="check")535 results = runnable_results536 # only select one test case, which needs the new environment. Others537 # will be dropped to next environment.538 if sum(1 for x in results if x.runtime_data.use_new_environment) > 1:539 new_results: List[TestResult] = []540 has_new_result: bool = False541 for x in results:542 if x.runtime_data.use_new_environment:543 # skip from second new result544 if has_new_result:545 continue546 has_new_result = True547 new_results.append(x)548 else:549 new_results.append(x)550 results = new_results551 results = self._sort_test_results(results)552 return results553 def _get_test_results_to_run(554 self, test_results: List[TestResult], environment: Environment555 ) -> List[TestResult]:556 to_run_results = self._get_runnable_test_results(557 test_results=test_results,558 environment_status=environment.status,559 environment=environment,560 )561 if to_run_results:562 to_run_test_result = next(563 (x for x in to_run_results if x.runtime_data.use_new_environment),564 None,565 )566 if not to_run_test_result:567 to_run_test_result = to_run_results[0]568 to_run_results = [to_run_test_result]569 return to_run_results570 def _sort_environments(self, environments: List[Environment]) -> List[Environment]:571 results: List[Environment] = []572 # sort environments by the status list573 sorted_status = [574 EnvironmentStatus.Connected,575 EnvironmentStatus.Deployed,576 EnvironmentStatus.Prepared,577 EnvironmentStatus.New,578 ]579 if environments:580 for status in sorted_status:581 results.extend(582 x for x in environments if x.status == status and x.is_alive583 )584 return results...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful