Best Python code snippet using lisa_python
lisa_runner.py
Source:lisa_runner.py  
...129                    for x in available_environments130                ):131                    # if there is no environment in used, new, and results are132                    # not fit envs. those results cannot be run.133                    self._skip_test_results(can_run_results)134        elif available_results:135            # no available environments, so mark all test results skipped.136            self._skip_test_results(available_results)137            self.status = ActionStatus.SUCCESS138        return None139    def close(self) -> None:140        if hasattr(self, "environments") and self.environments:141            for environment in self.environments:142                self._delete_environment_task(environment, [])143        super().close()144    def _dispatch_test_result(145        self, environment: Environment, test_results: List[TestResult]146    ) -> Optional[Task[None]]:147        check_cancelled()148        assert test_results149        can_run_results = test_results150        # deploy151        if environment.status == EnvironmentStatus.Prepared and can_run_results:152            return self._generate_task(153                task_method=self._deploy_environment_task,154                environment=environment,155                test_results=can_run_results[:1],156            )157        # run on deployed environment158        can_run_results = [x for x in can_run_results if x.can_run]159        if environment.status == EnvironmentStatus.Deployed and can_run_results:160            selected_test_results = self._get_test_results_to_run(161                test_results=test_results, environment=environment162            )163            if selected_test_results:164                return self._generate_task(165                    task_method=self._run_test_task,166                    environment=environment,167                    test_results=selected_test_results,168                    case_variables=self._case_variables,169                )170            # Check if there is case to run in a connected environment. If so,171            # initialize the environment172            initialization_results = self._get_runnable_test_results(173                test_results=test_results,174                environment_status=EnvironmentStatus.Connected,175                environment=environment,176            )177            if initialization_results:178                return self._generate_task(179                    task_method=self._initialize_environment_task,180                    environment=environment,181                    test_results=initialization_results,182                )183        # run on connected environment184        can_run_results = [x for x in can_run_results if x.can_run]185        if environment.status == EnvironmentStatus.Connected and can_run_results:186            selected_test_results = self._get_test_results_to_run(187                test_results=test_results, environment=environment188            )189            if selected_test_results:190                return self._generate_task(191                    task_method=self._run_test_task,192                    environment=environment,193                    test_results=selected_test_results,194                    case_variables=self._case_variables,195                )196        return None197    def _delete_unused_environments(self) -> Optional[Task[None]]:198        available_environments = self._sort_environments(self.environments)199        # check deletable environments200        for environment in available_environments:201            # if an environment is in using, or not deployed, they won't be202            # deleted until end of runner.203            if environment.is_in_use or environment.status in [204                EnvironmentStatus.New,205                EnvironmentStatus.Prepared,206            ]:207                continue208            can_run_results = self._get_runnable_test_results(209                self.test_results, environment=environment210            )211            if not can_run_results:212                # no more test need this environment, delete it.213                self._log.debug(214                    f"generating delete environment task on '{environment.name}'"215                )216                return self._generate_task(217                    task_method=self._delete_environment_task,218                    environment=environment,219                    test_results=[],220                )221        return None222    def _prepare_environments(self) -> None:223        if all(x.status != EnvironmentStatus.New for x in self.environments):224            return225        proceeded_environments: List[Environment] = []226        for candidate_environment in self.environments:227            success = True228            if candidate_environment.status == EnvironmentStatus.New:229                success = self._prepare_environment(candidate_environment)230            if success:231                proceeded_environments.append(candidate_environment)232        # sort by environment source and cost cases233        # user defined should be higher priority than test cases' requirement234        proceeded_environments.sort(key=lambda x: (not x.is_predefined, x.cost))235        self.environments = proceeded_environments236    def _deploy_environment_task(237        self, environment: Environment, test_results: List[TestResult]238    ) -> None:239        try:240            try:241                self.platform.deploy_environment(environment)242                assert (243                    environment.status == EnvironmentStatus.Deployed244                ), f"actual: {environment.status}"245                self._reset_awaitable_timer("deploy")246            except ResourceAwaitableException as identifier:247                if self._is_awaitable_timeout("deploy"):248                    self._log.info(249                        f"[{environment.name}] timeout on waiting for more resource: "250                        f"{identifier}, skip assigning case."251                    )252                    raise SkippedException(identifier)253                else:254                    # rerun prepare to calculate resource again.255                    environment.status = EnvironmentStatus.New256        except Exception as identifier:257            self._attach_failed_environment_to_result(258                environment=environment,259                result=test_results[0],260                exception=identifier,261            )262            self._delete_environment_task(environment=environment, test_results=[])263    def _initialize_environment_task(264        self, environment: Environment, test_results: List[TestResult]265    ) -> None:266        self._log.debug(f"start initializing task on '{environment.name}'")267        assert test_results268        try:269            environment.initialize()270            assert (271                environment.status == EnvironmentStatus.Connected272            ), f"actual: {environment.status}"273        except Exception as identifier:274            self._attach_failed_environment_to_result(275                environment=environment,276                result=test_results[0],277                exception=identifier,278            )279            self._delete_environment_task(environment=environment, test_results=[])280    def _run_test_task(281        self,282        environment: Environment,283        test_results: List[TestResult],284        case_variables: Dict[str, VariableEntry],285    ) -> None:286        self._log.debug(287            f"start running cases on '{environment.name}', "288            f"case count: {len(test_results)}, "289            f"status {environment.status.name}"290        )291        assert test_results292        assert len(test_results) == 1, (293            f"single test result to run, " f"but {len(test_results)} found."294        )295        test_result = test_results[0]296        suite_metadata = test_result.runtime_data.metadata.suite297        test_suite: TestSuite = suite_metadata.test_class(298            suite_metadata,299        )300        test_suite.start(301            environment=environment,302            case_results=test_results,303            case_variables=case_variables,304        )305        # release environment reference to optimize memory.306        test_result.environment = None307        # Some test cases may break the ssh connections. To reduce side effects308        # on next test cases, close the connection after each test run. It will309        # be connected on the next command automatically.310        environment.nodes.close()311        # Try to connect node(s), if cannot access node(s) of this environment,312        # set the current environment as Bad. So that this environment won't be reused.313        if not is_unittest() and not environment.nodes.test_connections():314            environment.status = EnvironmentStatus.Bad315            self._log.debug(316                f"set environment '{environment.name}' as bad, "317                f"because after test case '{test_result.name}', "318                f"node(s) cannot be accessible."319            )320        environment.nodes.close()321        # keep failed environment, not to delete322        if (323            test_result.status == TestStatus.FAILED324            and self.platform.runbook.keep_environment325            == constants.ENVIRONMENT_KEEP_FAILED326        ):327            self._log.debug(328                f"keep environment '{environment.name}', "329                f"because keep_environment is 'failed', "330                f"and test case '{test_result.name}' failed on it."331            )332            environment.status = EnvironmentStatus.Deleted333        # if an environment is in bad status, it will be deleted, not run more334        # test cases. But if the setting is to keep failed environment, it may335        # be kept in above logic.336        if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337            self._log.debug(338                f"delete environment '{environment.name}', "339                f"because it's in Bad status or marked as dirty."340            )341            self._delete_environment_task(342                environment=environment, test_results=test_results343            )344    def _delete_environment_task(345        self, environment: Environment, test_results: List[TestResult]346    ) -> None:347        """348        May be called async349        """350        # the predefined environment shouldn't be deleted, because it351        # serves all test cases.352        if environment.status == EnvironmentStatus.Deleted or (353            environment.status == EnvironmentStatus.Prepared354            and not environment.is_in_use355        ):356            # The prepared only environment doesn't need to be deleted.357            # It may cause platform fail to delete non-existing environment.358            environment.status = EnvironmentStatus.Deleted359        else:360            try:361                self.platform.delete_environment(environment)362            except Exception as identifier:363                self._log.debug(364                    f"error on deleting environment '{environment.name}': {identifier}"365                )366    def _prepare_environment(self, environment: Environment) -> bool:367        success = True368        try:369            try:370                self.platform.prepare_environment(environment)371                self._reset_awaitable_timer("prepare")372            except ResourceAwaitableException as identifier:373                # if timed out, raise the exception and skip the test case. If374                # not, do nothing to keep env as new to try next time.375                if self._is_awaitable_timeout("prepare"):376                    raise SkippedException(identifier)377        except Exception as identifier:378            success = False379            matched_result = self._match_failed_environment_with_result(380                environment=environment,381                candidate_results=self.test_results,382                exception=identifier,383            )384            self._attach_failed_environment_to_result(385                environment=environment,386                result=matched_result,387                exception=identifier,388            )389        return success390    def _cleanup_deleted_environments(self) -> None:391        # remove reference to unused environments. It can save memory on big runs.392        new_environments: List[Environment] = []393        for environment in self.environments[:]:394            if environment.status != EnvironmentStatus.Deleted:395                new_environments.append(environment)396        self.environments = new_environments397    def _cleanup_done_results(self) -> None:398        # remove reference to completed test results. It can save memory on big runs.399        remaining_results: List[TestResult] = []400        for test_result in self.test_results[:]:401            if not test_result.is_completed:402                remaining_results.append(test_result)403        self.test_results = remaining_results404    def _get_results_by_priority(405        self, test_results: List[TestResult], priority: int406    ) -> List[TestResult]:407        if not test_results:408            return []409        test_results = [410            x for x in test_results if x.runtime_data.metadata.priority == priority411        ]412        return test_results413    def _generate_task(414        self,415        task_method: Callable[..., None],416        environment: Environment,417        test_results: List[TestResult],418        **kwargs: Any,419    ) -> Task[None]:420        assert not environment.is_in_use421        environment.is_in_use = True422        for test_result in test_results:423            # return assigned but not run cases424            if test_result.status == TestStatus.QUEUED:425                test_result.set_status(TestStatus.ASSIGNED, "")426        task = partial(427            self._run_task,428            task_method,429            environment=environment,430            test_results=test_results,431            **kwargs,432        )433        return Task(self.generate_task_id(), task, self._log)434    def _run_task(435        self,436        task_method: Callable[..., None],437        environment: Environment,438        test_results: List[TestResult],439        **kwargs: Any,440    ) -> None:441        assert environment.is_in_use442        task_method(environment=environment, test_results=test_results, **kwargs)443        for test_result in test_results:444            # return assigned but not run cases445            if test_result.status == TestStatus.ASSIGNED:446                test_result.set_status(TestStatus.QUEUED, "")447        environment.is_in_use = False448    def _match_failed_environment_with_result(449        self,450        environment: Environment,451        candidate_results: List[TestResult],452        exception: Exception,453    ) -> TestResult:454        if environment.source_test_result and environment.source_test_result.is_queued:455            matched_results = [environment.source_test_result]456        else:457            matched_results = self._get_runnable_test_results(458                test_results=candidate_results,459                environment=environment,460            )461        if not matched_results:462            self._log.info(463                "No requirement of test case is suitable for the preparation "464                f"error of the environment '{environment.name}'. "465                "Randomly attach a test case to this environment. "466                "This may be because the platform failed before populating the "467                "features into this environment.",468            )469            matched_results = [470                result for result in self.test_results if result.is_queued471            ]472        if not matched_results:473            raise LisaException(474                "There are no remaining test results to run, so preparation "475                "errors cannot be appended to the test results. Please correct "476                "the error and run again. "477                f"original exception: {exception}"478            )479        return matched_results[0]480    def _attach_failed_environment_to_result(481        self,482        environment: Environment,483        result: TestResult,484        exception: Exception,485    ) -> None:486        # make first fit test case failed by deployment,487        # so deployment failure can be tracked.488        environment.platform = self.platform489        result.environment = environment490        result.handle_exception(exception=exception, log=self._log, phase="deployment")491        self._log.info(492            f"'{environment.name}' attached to test case "493            f"'{result.runtime_data.metadata.full_name}({result.id_})': "494            f"{exception}"495        )496        # release environment reference to optimize memory.497        result.environment = None498    def _get_runnable_test_results(499        self,500        test_results: List[TestResult],501        use_new_environment: Optional[bool] = None,502        environment_status: Optional[EnvironmentStatus] = None,503        environment: Optional[Environment] = None,504    ) -> List[TestResult]:505        results = [506            x507            for x in test_results508            if x.is_queued509            and (510                use_new_environment is None511                or x.runtime_data.use_new_environment == use_new_environment512            )513            and (514                environment_status is None515                or x.runtime_data.metadata.requirement.environment_status516                == environment_status517            )518        ]519        if environment:520            runnable_results: List[TestResult] = []521            for result in results:522                try:523                    if result.check_environment(524                        environment=environment, save_reason=True525                    ) and (526                        not result.runtime_data.use_new_environment527                        or environment.is_new528                    ):529                        runnable_results.append(result)530                except SkippedException as identifier:531                    # when check the environment, the test result may be marked532                    # as skipped, due to the test result is assumed not to match533                    # any environment.534                    result.handle_exception(identifier, log=self._log, phase="check")535            results = runnable_results536        # only select one test case, which needs the new environment. Others537        # will be dropped to next environment.538        if sum(1 for x in results if x.runtime_data.use_new_environment) > 1:539            new_results: List[TestResult] = []540            has_new_result: bool = False541            for x in results:542                if x.runtime_data.use_new_environment:543                    # skip from second new result544                    if has_new_result:545                        continue546                    has_new_result = True547                    new_results.append(x)548                else:549                    new_results.append(x)550            results = new_results551        results = self._sort_test_results(results)552        return results553    def _get_test_results_to_run(554        self, test_results: List[TestResult], environment: Environment555    ) -> List[TestResult]:556        to_run_results = self._get_runnable_test_results(557            test_results=test_results,558            environment_status=environment.status,559            environment=environment,560        )561        if to_run_results:562            to_run_test_result = next(563                (x for x in to_run_results if x.runtime_data.use_new_environment),564                None,565            )566            if not to_run_test_result:567                to_run_test_result = to_run_results[0]568            to_run_results = [to_run_test_result]569        return to_run_results570    def _sort_environments(self, environments: List[Environment]) -> List[Environment]:571        results: List[Environment] = []572        # sort environments by the status list573        sorted_status = [574            EnvironmentStatus.Connected,575            EnvironmentStatus.Deployed,576            EnvironmentStatus.Prepared,577            EnvironmentStatus.New,578        ]579        if environments:580            for status in sorted_status:581                results.extend(582                    x for x in environments if x.status == status and x.is_alive583                )584        return results585    def _sort_test_results(self, test_results: List[TestResult]) -> List[TestResult]:586        results = test_results.copy()587        # sort by priority, use new environment, environment status and suite name.588        results.sort(589            key=lambda r: str(r.runtime_data.metadata.suite.name),590        )591        # this step make sure Deployed is before Connected592        results.sort(593            reverse=True,594            key=lambda r: str(r.runtime_data.metadata.requirement.environment_status),595        )596        results.sort(597            reverse=True,598            key=lambda r: str(r.runtime_data.use_new_environment),599        )600        results.sort(key=lambda r: r.runtime_data.metadata.priority)601        return results602    def _skip_test_results(603        self,604        test_results: List[TestResult],605        additional_reason: str = "no available environment",606    ) -> None:607        for test_result in test_results:608            if test_result.is_completed:609                # already completed, don't skip it.610                continue611            if test_result.check_results and test_result.check_results.reasons:612                reasons = f"{additional_reason}: {test_result.check_results.reasons}"613            else:614                reasons = additional_reason615            test_result.set_status(TestStatus.SKIPPED, reasons)616    def _merge_test_requirements(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
