Best Python code snippet using lisa_python
lisa_runner.py
Source:lisa_runner.py  
...79        return is_all_results_completed and is_all_environment_completed80    def fetch_task(self) -> Optional[Task[None]]:81        self._prepare_environments()82        self._cleanup_deleted_environments()83        self._cleanup_done_results()84        # sort environments by status85        available_environments = self._sort_environments(self.environments)86        available_results = self._sort_test_results(87            [x for x in self.test_results if x.can_run]88        )89        # check deletable environments90        delete_task = self._delete_unused_environments()91        if delete_task:92            return delete_task93        if available_results and available_environments:94            for priority in range(6):95                can_run_results = self._get_results_by_priority(96                    available_results, priority97                )98                if not can_run_results:99                    continue100                # it means there are test cases and environment, so it needs to101                # schedule task.102                for environment in available_environments:103                    if environment.is_in_use:104                        # skip in used environments105                        continue106                    # try to pick the designated test result.107                    environment_results = [108                        x109                        for x in can_run_results110                        if environment.source_test_result111                        and x.id_ == environment.source_test_result.id_112                    ]113                    if not environment_results:114                        environment_results = self._get_runnable_test_results(115                            test_results=can_run_results, environment=environment116                        )117                    if not environment_results:118                        continue119                    task = self._dispatch_test_result(120                        environment=environment, test_results=environment_results121                    )122                    # there is more checking conditions. If some conditions doesn't123                    # meet, the task is None. If so, not return, and try next124                    # conditions or skip this test case.125                    if task:126                        return task127                if not any(128                    x.is_in_use or x.status == EnvironmentStatus.New129                    for x in available_environments130                ):131                    # if there is no environment in used, new, and results are132                    # not fit envs. those results cannot be run.133                    self._skip_test_results(can_run_results)134        elif available_results:135            # no available environments, so mark all test results skipped.136            self._skip_test_results(available_results)137            self.status = ActionStatus.SUCCESS138        return None139    def close(self) -> None:140        if hasattr(self, "environments") and self.environments:141            for environment in self.environments:142                self._delete_environment_task(environment, [])143        super().close()144    def _dispatch_test_result(145        self, environment: Environment, test_results: List[TestResult]146    ) -> Optional[Task[None]]:147        check_cancelled()148        assert test_results149        can_run_results = test_results150        # deploy151        if environment.status == EnvironmentStatus.Prepared and can_run_results:152            return self._generate_task(153                task_method=self._deploy_environment_task,154                environment=environment,155                test_results=can_run_results[:1],156            )157        # run on deployed environment158        can_run_results = [x for x in can_run_results if x.can_run]159        if environment.status == EnvironmentStatus.Deployed and can_run_results:160            selected_test_results = self._get_test_results_to_run(161                test_results=test_results, environment=environment162            )163            if selected_test_results:164                return self._generate_task(165                    task_method=self._run_test_task,166                    environment=environment,167                    test_results=selected_test_results,168                    case_variables=self._case_variables,169                )170            # Check if there is case to run in a connected environment. If so,171            # initialize the environment172            initialization_results = self._get_runnable_test_results(173                test_results=test_results,174                environment_status=EnvironmentStatus.Connected,175                environment=environment,176            )177            if initialization_results:178                return self._generate_task(179                    task_method=self._initialize_environment_task,180                    environment=environment,181                    test_results=initialization_results,182                )183        # run on connected environment184        can_run_results = [x for x in can_run_results if x.can_run]185        if environment.status == EnvironmentStatus.Connected and can_run_results:186            selected_test_results = self._get_test_results_to_run(187                test_results=test_results, environment=environment188            )189            if selected_test_results:190                return self._generate_task(191                    task_method=self._run_test_task,192                    environment=environment,193                    test_results=selected_test_results,194                    case_variables=self._case_variables,195                )196        return None197    def _delete_unused_environments(self) -> Optional[Task[None]]:198        available_environments = self._sort_environments(self.environments)199        # check deletable environments200        for environment in available_environments:201            # if an environment is in using, or not deployed, they won't be202            # deleted until end of runner.203            if environment.is_in_use or environment.status in [204                EnvironmentStatus.New,205                EnvironmentStatus.Prepared,206            ]:207                continue208            can_run_results = self._get_runnable_test_results(209                self.test_results, environment=environment210            )211            if not can_run_results:212                # no more test need this environment, delete it.213                self._log.debug(214                    f"generating delete environment task on '{environment.name}'"215                )216                return self._generate_task(217                    task_method=self._delete_environment_task,218                    environment=environment,219                    test_results=[],220                )221        return None222    def _prepare_environments(self) -> None:223        if all(x.status != EnvironmentStatus.New for x in self.environments):224            return225        proceeded_environments: List[Environment] = []226        for candidate_environment in self.environments:227            success = True228            if candidate_environment.status == EnvironmentStatus.New:229                success = self._prepare_environment(candidate_environment)230            if success:231                proceeded_environments.append(candidate_environment)232        # sort by environment source and cost cases233        # user defined should be higher priority than test cases' requirement234        proceeded_environments.sort(key=lambda x: (not x.is_predefined, x.cost))235        self.environments = proceeded_environments236    def _deploy_environment_task(237        self, environment: Environment, test_results: List[TestResult]238    ) -> None:239        try:240            try:241                self.platform.deploy_environment(environment)242                assert (243                    environment.status == EnvironmentStatus.Deployed244                ), f"actual: {environment.status}"245                self._reset_awaitable_timer("deploy")246            except ResourceAwaitableException as identifier:247                if self._is_awaitable_timeout("deploy"):248                    self._log.info(249                        f"[{environment.name}] timeout on waiting for more resource: "250                        f"{identifier}, skip assigning case."251                    )252                    raise SkippedException(identifier)253                else:254                    # rerun prepare to calculate resource again.255                    environment.status = EnvironmentStatus.New256        except Exception as identifier:257            self._attach_failed_environment_to_result(258                environment=environment,259                result=test_results[0],260                exception=identifier,261            )262            self._delete_environment_task(environment=environment, test_results=[])263    def _initialize_environment_task(264        self, environment: Environment, test_results: List[TestResult]265    ) -> None:266        self._log.debug(f"start initializing task on '{environment.name}'")267        assert test_results268        try:269            environment.initialize()270            assert (271                environment.status == EnvironmentStatus.Connected272            ), f"actual: {environment.status}"273        except Exception as identifier:274            self._attach_failed_environment_to_result(275                environment=environment,276                result=test_results[0],277                exception=identifier,278            )279            self._delete_environment_task(environment=environment, test_results=[])280    def _run_test_task(281        self,282        environment: Environment,283        test_results: List[TestResult],284        case_variables: Dict[str, VariableEntry],285    ) -> None:286        self._log.debug(287            f"start running cases on '{environment.name}', "288            f"case count: {len(test_results)}, "289            f"status {environment.status.name}"290        )291        assert test_results292        assert len(test_results) == 1, (293            f"single test result to run, " f"but {len(test_results)} found."294        )295        test_result = test_results[0]296        suite_metadata = test_result.runtime_data.metadata.suite297        test_suite: TestSuite = suite_metadata.test_class(298            suite_metadata,299        )300        test_suite.start(301            environment=environment,302            case_results=test_results,303            case_variables=case_variables,304        )305        # release environment reference to optimize memory.306        test_result.environment = None307        # Some test cases may break the ssh connections. To reduce side effects308        # on next test cases, close the connection after each test run. It will309        # be connected on the next command automatically.310        environment.nodes.close()311        # Try to connect node(s), if cannot access node(s) of this environment,312        # set the current environment as Bad. So that this environment won't be reused.313        if not is_unittest() and not environment.nodes.test_connections():314            environment.status = EnvironmentStatus.Bad315            self._log.debug(316                f"set environment '{environment.name}' as bad, "317                f"because after test case '{test_result.name}', "318                f"node(s) cannot be accessible."319            )320        environment.nodes.close()321        # keep failed environment, not to delete322        if (323            test_result.status == TestStatus.FAILED324            and self.platform.runbook.keep_environment325            == constants.ENVIRONMENT_KEEP_FAILED326        ):327            self._log.debug(328                f"keep environment '{environment.name}', "329                f"because keep_environment is 'failed', "330                f"and test case '{test_result.name}' failed on it."331            )332            environment.status = EnvironmentStatus.Deleted333        # if an environment is in bad status, it will be deleted, not run more334        # test cases. But if the setting is to keep failed environment, it may335        # be kept in above logic.336        if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337            self._log.debug(338                f"delete environment '{environment.name}', "339                f"because it's in Bad status or marked as dirty."340            )341            self._delete_environment_task(342                environment=environment, test_results=test_results343            )344    def _delete_environment_task(345        self, environment: Environment, test_results: List[TestResult]346    ) -> None:347        """348        May be called async349        """350        # the predefined environment shouldn't be deleted, because it351        # serves all test cases.352        if environment.status == EnvironmentStatus.Deleted or (353            environment.status == EnvironmentStatus.Prepared354            and not environment.is_in_use355        ):356            # The prepared only environment doesn't need to be deleted.357            # It may cause platform fail to delete non-existing environment.358            environment.status = EnvironmentStatus.Deleted359        else:360            try:361                self.platform.delete_environment(environment)362            except Exception as identifier:363                self._log.debug(364                    f"error on deleting environment '{environment.name}': {identifier}"365                )366    def _prepare_environment(self, environment: Environment) -> bool:367        success = True368        try:369            try:370                self.platform.prepare_environment(environment)371                self._reset_awaitable_timer("prepare")372            except ResourceAwaitableException as identifier:373                # if timed out, raise the exception and skip the test case. If374                # not, do nothing to keep env as new to try next time.375                if self._is_awaitable_timeout("prepare"):376                    raise SkippedException(identifier)377        except Exception as identifier:378            success = False379            matched_result = self._match_failed_environment_with_result(380                environment=environment,381                candidate_results=self.test_results,382                exception=identifier,383            )384            self._attach_failed_environment_to_result(385                environment=environment,386                result=matched_result,387                exception=identifier,388            )389        return success390    def _cleanup_deleted_environments(self) -> None:391        # remove reference to unused environments. It can save memory on big runs.392        new_environments: List[Environment] = []393        for environment in self.environments[:]:394            if environment.status != EnvironmentStatus.Deleted:395                new_environments.append(environment)396        self.environments = new_environments397    def _cleanup_done_results(self) -> None:398        # remove reference to completed test results. It can save memory on big runs.399        remaining_results: List[TestResult] = []400        for test_result in self.test_results[:]:401            if not test_result.is_completed:402                remaining_results.append(test_result)403        self.test_results = remaining_results404    def _get_results_by_priority(405        self, test_results: List[TestResult], priority: int406    ) -> List[TestResult]:407        if not test_results:408            return []409        test_results = [410            x for x in test_results if x.runtime_data.metadata.priority == priority411        ]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
