Best Python code snippet using lisa_python
lisa_runner.py
Source:lisa_runner.py  
...253                else:254                    # rerun prepare to calculate resource again.255                    environment.status = EnvironmentStatus.New256        except Exception as identifier:257            self._attach_failed_environment_to_result(258                environment=environment,259                result=test_results[0],260                exception=identifier,261            )262            self._delete_environment_task(environment=environment, test_results=[])263    def _initialize_environment_task(264        self, environment: Environment, test_results: List[TestResult]265    ) -> None:266        self._log.debug(f"start initializing task on '{environment.name}'")267        assert test_results268        try:269            environment.initialize()270            assert (271                environment.status == EnvironmentStatus.Connected272            ), f"actual: {environment.status}"273        except Exception as identifier:274            self._attach_failed_environment_to_result(275                environment=environment,276                result=test_results[0],277                exception=identifier,278            )279            self._delete_environment_task(environment=environment, test_results=[])280    def _run_test_task(281        self,282        environment: Environment,283        test_results: List[TestResult],284        case_variables: Dict[str, VariableEntry],285    ) -> None:286        self._log.debug(287            f"start running cases on '{environment.name}', "288            f"case count: {len(test_results)}, "289            f"status {environment.status.name}"290        )291        assert test_results292        assert len(test_results) == 1, (293            f"single test result to run, " f"but {len(test_results)} found."294        )295        test_result = test_results[0]296        suite_metadata = test_result.runtime_data.metadata.suite297        test_suite: TestSuite = suite_metadata.test_class(298            suite_metadata,299        )300        test_suite.start(301            environment=environment,302            case_results=test_results,303            case_variables=case_variables,304        )305        # release environment reference to optimize memory.306        test_result.environment = None307        # Some test cases may break the ssh connections. To reduce side effects308        # on next test cases, close the connection after each test run. It will309        # be connected on the next command automatically.310        environment.nodes.close()311        # Try to connect node(s), if cannot access node(s) of this environment,312        # set the current environment as Bad. So that this environment won't be reused.313        if not is_unittest() and not environment.nodes.test_connections():314            environment.status = EnvironmentStatus.Bad315            self._log.debug(316                f"set environment '{environment.name}' as bad, "317                f"because after test case '{test_result.name}', "318                f"node(s) cannot be accessible."319            )320        environment.nodes.close()321        # keep failed environment, not to delete322        if (323            test_result.status == TestStatus.FAILED324            and self.platform.runbook.keep_environment325            == constants.ENVIRONMENT_KEEP_FAILED326        ):327            self._log.debug(328                f"keep environment '{environment.name}', "329                f"because keep_environment is 'failed', "330                f"and test case '{test_result.name}' failed on it."331            )332            environment.status = EnvironmentStatus.Deleted333        # if an environment is in bad status, it will be deleted, not run more334        # test cases. But if the setting is to keep failed environment, it may335        # be kept in above logic.336        if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337            self._log.debug(338                f"delete environment '{environment.name}', "339                f"because it's in Bad status or marked as dirty."340            )341            self._delete_environment_task(342                environment=environment, test_results=test_results343            )344    def _delete_environment_task(345        self, environment: Environment, test_results: List[TestResult]346    ) -> None:347        """348        May be called async349        """350        # the predefined environment shouldn't be deleted, because it351        # serves all test cases.352        if environment.status == EnvironmentStatus.Deleted or (353            environment.status == EnvironmentStatus.Prepared354            and not environment.is_in_use355        ):356            # The prepared only environment doesn't need to be deleted.357            # It may cause platform fail to delete non-existing environment.358            environment.status = EnvironmentStatus.Deleted359        else:360            try:361                self.platform.delete_environment(environment)362            except Exception as identifier:363                self._log.debug(364                    f"error on deleting environment '{environment.name}': {identifier}"365                )366    def _prepare_environment(self, environment: Environment) -> bool:367        success = True368        try:369            try:370                self.platform.prepare_environment(environment)371                self._reset_awaitable_timer("prepare")372            except ResourceAwaitableException as identifier:373                # if timed out, raise the exception and skip the test case. If374                # not, do nothing to keep env as new to try next time.375                if self._is_awaitable_timeout("prepare"):376                    raise SkippedException(identifier)377        except Exception as identifier:378            success = False379            matched_result = self._match_failed_environment_with_result(380                environment=environment,381                candidate_results=self.test_results,382                exception=identifier,383            )384            self._attach_failed_environment_to_result(385                environment=environment,386                result=matched_result,387                exception=identifier,388            )389        return success390    def _cleanup_deleted_environments(self) -> None:391        # remove reference to unused environments. It can save memory on big runs.392        new_environments: List[Environment] = []393        for environment in self.environments[:]:394            if environment.status != EnvironmentStatus.Deleted:395                new_environments.append(environment)396        self.environments = new_environments397    def _cleanup_done_results(self) -> None:398        # remove reference to completed test results. It can save memory on big runs.399        remaining_results: List[TestResult] = []400        for test_result in self.test_results[:]:401            if not test_result.is_completed:402                remaining_results.append(test_result)403        self.test_results = remaining_results404    def _get_results_by_priority(405        self, test_results: List[TestResult], priority: int406    ) -> List[TestResult]:407        if not test_results:408            return []409        test_results = [410            x for x in test_results if x.runtime_data.metadata.priority == priority411        ]412        return test_results413    def _generate_task(414        self,415        task_method: Callable[..., None],416        environment: Environment,417        test_results: List[TestResult],418        **kwargs: Any,419    ) -> Task[None]:420        assert not environment.is_in_use421        environment.is_in_use = True422        for test_result in test_results:423            # return assigned but not run cases424            if test_result.status == TestStatus.QUEUED:425                test_result.set_status(TestStatus.ASSIGNED, "")426        task = partial(427            self._run_task,428            task_method,429            environment=environment,430            test_results=test_results,431            **kwargs,432        )433        return Task(self.generate_task_id(), task, self._log)434    def _run_task(435        self,436        task_method: Callable[..., None],437        environment: Environment,438        test_results: List[TestResult],439        **kwargs: Any,440    ) -> None:441        assert environment.is_in_use442        task_method(environment=environment, test_results=test_results, **kwargs)443        for test_result in test_results:444            # return assigned but not run cases445            if test_result.status == TestStatus.ASSIGNED:446                test_result.set_status(TestStatus.QUEUED, "")447        environment.is_in_use = False448    def _match_failed_environment_with_result(449        self,450        environment: Environment,451        candidate_results: List[TestResult],452        exception: Exception,453    ) -> TestResult:454        if environment.source_test_result and environment.source_test_result.is_queued:455            matched_results = [environment.source_test_result]456        else:457            matched_results = self._get_runnable_test_results(458                test_results=candidate_results,459                environment=environment,460            )461        if not matched_results:462            self._log.info(463                "No requirement of test case is suitable for the preparation "464                f"error of the environment '{environment.name}'. "465                "Randomly attach a test case to this environment. "466                "This may be because the platform failed before populating the "467                "features into this environment.",468            )469            matched_results = [470                result for result in self.test_results if result.is_queued471            ]472        if not matched_results:473            raise LisaException(474                "There are no remaining test results to run, so preparation "475                "errors cannot be appended to the test results. Please correct "476                "the error and run again. "477                f"original exception: {exception}"478            )479        return matched_results[0]480    def _attach_failed_environment_to_result(481        self,482        environment: Environment,483        result: TestResult,484        exception: Exception,485    ) -> None:486        # make first fit test case failed by deployment,487        # so deployment failure can be tracked.488        environment.platform = self.platform489        result.environment = environment490        result.handle_exception(exception=exception, log=self._log, phase="deployment")491        self._log.info(492            f"'{environment.name}' attached to test case "493            f"'{result.runtime_data.metadata.full_name}({result.id_})': "494            f"{exception}"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
