Best Python code snippet using lisa_python
test_testsuite.py
Source:test_testsuite.py  
...130    ut: TestCase, case_runbook: List[Any], expected_descriptions: List[str]131) -> List[TestCaseRuntimeData]:132    runbook = RunbookBuilder._validate_and_load({constants.TESTCASE: case_runbook})133    case_metadata = generate_cases_metadata()134    runbook.testcase = parse_testcase_filters(runbook.testcase_raw)135    filters = cast(List[schema.TestCase], runbook.testcase)136    selected = select_testcases(filters, case_metadata)137    ut.assertListEqual(expected_descriptions, [case.description for case in selected])138    return selected139class TestSuiteTestCase(TestCase):140    def generate_suite_instance(self) -> MockTestSuite:141        case_results = generate_cases_result()142        self.case_results = case_results[:2]143        suite_metadata = case_results[0].runtime_data.metadata.suite144        runbook = generate_runbook(is_single_env=True, local=True, remote=True)145        envs = load_environments(runbook)146        self.default_env = list(envs.values())[0]147        assert self.default_env148        test_suite = MockTestSuite(...runner.py
Source:runner.py  
...16from lisa.util.parallel import Task, TaskManager, cancel, set_global_task_manager17from lisa.util.perf_timer import Timer, create_timer18from lisa.util.subclasses import Factory19from lisa.variable import VariableEntry, get_case_variables, replace_variables20def parse_testcase_filters(raw_filters: List[Any]) -> List[schema.BaseTestCaseFilter]:21    if raw_filters:22        filters: List[schema.BaseTestCaseFilter] = []23        factory = Factory[schema.BaseTestCaseFilter](schema.BaseTestCaseFilter)24        for raw_filter in raw_filters:25            if constants.TYPE not in raw_filter:26                raw_filter[constants.TYPE] = constants.TESTCASE_TYPE_LISA27            filter = factory.load_typed_runbook(raw_filter)28            filters.append(filter)29    else:30        filters = [schema.TestCase(name="test", criteria=schema.Criteria(area="demo"))]31    return filters32def print_results(33    test_results: List[TestResultMessage],34    output_method: Callable[[str], Any],35) -> None:36    output_method("________________________________________")37    result_count_dict: Dict[TestStatus, int] = {}38    for test_result in test_results:39        result_name = test_result.full_name40        result_status = test_result.status41        output_method(42            f"{result_name:>50}: {result_status.name:<8} {test_result.message}"43        )44        result_count = result_count_dict.get(result_status, 0)45        result_count += 146        result_count_dict[result_status] = result_count47    output_method("test result summary")48    output_method(f"    TOTAL    : {len(test_results)}")49    for key in TestStatus:50        count = result_count_dict.get(key, 0)51        if key == TestStatus.ATTEMPTED and count == 0:52            # attempted is confusing if user don't know it.53            # so hide it if there is no attempted cases.54            continue55        output_method(f"    {key.name:<9}: {count}")56class RunnerResult(notifier.Notifier):57    """58    This is an internal notifier. It uses to collect test results for runner.59    """60    @classmethod61    def type_name(cls) -> str:62        # no type_name, not able to import from yaml book.63        return ""64    @classmethod65    def type_schema(cls) -> Type[schema.TypedSchema]:66        return schema.Notifier67    def _received_message(self, message: messages.MessageBase) -> None:68        assert isinstance(message, TestResultMessage), f"actual: {type(message)}"69        self.results[message.id_] = message70    def _subscribed_message_type(self) -> List[Type[messages.MessageBase]]:71        return [TestResultMessage]72    def _initialize(self, *args: Any, **kwargs: Any) -> None:73        self.results: Dict[str, TestResultMessage] = {}74class BaseRunner(BaseClassMixin, InitializableMixin):75    """76    Base runner of other runners. And other runners derived from this one.77    """78    def __init__(79        self,80        runbook: schema.Runbook,81        index: int,82        case_variables: Dict[str, Any],83    ) -> None:84        super().__init__()85        self._runbook = runbook86        self.id = f"{self.type_name()}_{index}"87        self._task_id = -188        self._log = get_logger("runner", str(index))89        self._log_handler: Optional[FileHandler] = None90        self._case_variables = case_variables91        self._timer = create_timer()92        self._wait_resource_timeout = runbook.wait_resource_timeout93        self._wait_resource_timers: Dict[str, Timer] = dict()94        self._wait_resource_logged: bool = False95        self.canceled = False96    def __repr__(self) -> str:97        return self.id98    @property99    def is_done(self) -> bool:100        raise NotImplementedError()101    def fetch_task(self) -> Optional[Task[None]]:102        """103        return:104            The runnable task, which can return test results105        """106        raise NotImplementedError()107    def close(self) -> None:108        self._log.debug(f"Runner finished in {self._timer.elapsed_text()}.")109        if self._log_handler:110            remove_handler(self._log_handler)111            self._log_handler.close()112    def generate_task_id(self) -> int:113        self._task_id += 1114        return self._task_id115    def _initialize(self, *args: Any, **kwargs: Any) -> None:116        # do not put this logic to __init__, since the mkdir takes time.117        # default lisa runner doesn't need separated handler.118        self._log_folder = constants.RUN_LOCAL_LOG_PATH119        self._working_folder = constants.RUN_LOCAL_WORKING_PATH120        if self.type_name() != constants.TESTCASE_TYPE_LISA:121            # create separated folder and log for each non-default runner.122            runner_path_name = f"{self.type_name()}_runner"123            self._log_folder = self._log_folder / runner_path_name124            self._log_file_name = str(self._log_folder / f"{runner_path_name}.log")125            self._log_folder.mkdir(parents=True, exist_ok=True)126            self._log_handler = create_file_handler(127                Path(self._log_file_name), self._log128            )129            self._working_folder = self._working_folder / runner_path_name130    def _is_awaitable_timeout(self, name: str) -> bool:131        _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())132        self._wait_resource_timers[name] = _wait_resource_timer133        if _wait_resource_timer.elapsed(False) < self._wait_resource_timeout * 60:134            # wait a while to prevent called too fast135            if not self._wait_resource_logged:136                self._log.info(f"{name} waiting for more resource...")137                self._wait_resource_logged = True138            time.sleep(5)139            return False140        else:141            self._log.info(f"{name} timeout on waiting for more resource...")142            return True143    def _reset_awaitable_timer(self, name: str) -> None:144        _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())145        _wait_resource_timer.reset()146        self._wait_resource_logged = False147        self._wait_resource_timers[name] = _wait_resource_timer148class RootRunner(Action):149    """150    The entry root runner, which starts other runners.151    """152    def __init__(self, runbook_builder: RunbookBuilder) -> None:153        super().__init__()154        self.exit_code: int = 0155        self._runbook_builder = runbook_builder156        self._log = get_logger("RootRunner")157        # this is to hold active runners, and will close them, if there is any158        # global error.159        self._runners: List[BaseRunner] = []160        self._runner_count: int = 0161        self._idle_logged: bool = False162    async def start(self) -> None:163        await super().start()164        self._results_collector: Optional[RunnerResult] = None165        try:166            transformer.run(167                self._runbook_builder, phase=constants.TRANSFORMER_PHASE_INIT168            )169            # update runbook for notifiers170            raw_data = copy.deepcopy(self._runbook_builder.raw_data)171            constants.RUNBOOK = replace_variables(172                raw_data, self._runbook_builder._variables173            )174            runbook = self._runbook_builder.resolve()175            self._runbook_builder.dump_variables()176            self._max_concurrency = runbook.concurrency177            self._log.debug(f"max concurrency is {self._max_concurrency}")178            self._results_collector = RunnerResult(schema.Notifier())179            register_notifier(self._results_collector)180            self._start_loop()181        except Exception as identifier:182            self._log.exception(183                "canceling runner due to exception", exc_info=identifier184            )185            cancel()186        finally:187            self._cleanup()188        if self._results_collector:189            results = [x for x in self._results_collector.results.values()]190            print_results(results, self._log.info)191            if runbook.exit_with_failed_count:192                # pass failed count to exit code193                self.exit_code = sum(194                    1 for x in results if x.status == TestStatus.FAILED195                )196        else:197            # unknown error happened, see exception log for details.198            self.exit_code = -1199    async def stop(self) -> None:200        await super().stop()201        # TODO: to be implemented202    async def close(self) -> None:203        await super().close()204    def _fetch_runners(self) -> Iterator[BaseRunner]:205        root_runbook = self._runbook_builder.resolve(self._runbook_builder.variables)206        if root_runbook.combinator:207            combinator_factory = Factory[Combinator](Combinator)208            combinator = combinator_factory.create_by_runbook(root_runbook.combinator)209            del self._runbook_builder.raw_data[constants.COMBINATOR]210            self._log.debug(211                f"found combinator '{combinator.type_name()}', to expand runbook."212            )213            combinator.initialize()214            while True:215                variables = combinator.fetch(self._runbook_builder.variables)216                if variables is None:217                    break218                sub_runbook_builder = self._runbook_builder.derive(variables=variables)219                transformer.run(220                    sub_runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED221                )222                runners = self._generate_runners(223                    sub_runbook_builder.resolve(), variables224                )225                for runner in runners:226                    yield runner227        else:228            # no combinator, use the root runbook229            transformer.run(230                self._runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED231            )232            for runner in self._generate_runners(233                root_runbook, self._runbook_builder.variables234            ):235                yield runner236    def _generate_runners(237        self, runbook: schema.Runbook, variables: Dict[str, VariableEntry]238    ) -> Iterator[BaseRunner]:239        # group filters by runner type240        case_variables = get_case_variables(variables)241        runner_filters: Dict[str, List[schema.BaseTestCaseFilter]] = {}242        for raw_filter in runbook.testcase_raw:243            # by default run all filtered cases unless 'enable' is specified as false244            filter = schema.load_by_type(schema.BaseTestCaseFilter, raw_filter)245            if filter.enabled:246                raw_filters: List[schema.BaseTestCaseFilter] = runner_filters.get(247                    filter.type, []248                )249                if not raw_filters:250                    runner_filters[filter.type] = raw_filters251                raw_filters.append(raw_filter)252            else:253                self._log.debug(f"Skip disabled filter: {raw_filter}.")254        # initialize runners255        factory = Factory[BaseRunner](BaseRunner)256        for runner_name, raw_filters in runner_filters.items():257            self._log.debug(258                f"create runner {runner_name} with {len(raw_filters)} filter(s)."259            )260            # keep filters to current runner's only.261            runbook.testcase = parse_testcase_filters(raw_filters)262            runner = factory.create_by_type_name(263                type_name=runner_name,264                runbook=runbook,265                index=self._runner_count,266                case_variables=case_variables,267            )268            runner.initialize()269            self._runners.append(runner)270            self._runner_count += 1271            yield runner272    def _submit_runner_tasks(273        self,274        runner: BaseRunner,275        task_manager: TaskManager[None],...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
