Best Python code snippet using lisa_python
runner.py
Source:runner.py  
...200        await super().stop()201        # TODO: to be implemented202    async def close(self) -> None:203        await super().close()204    def _fetch_runners(self) -> Iterator[BaseRunner]:205        root_runbook = self._runbook_builder.resolve(self._runbook_builder.variables)206        if root_runbook.combinator:207            combinator_factory = Factory[Combinator](Combinator)208            combinator = combinator_factory.create_by_runbook(root_runbook.combinator)209            del self._runbook_builder.raw_data[constants.COMBINATOR]210            self._log.debug(211                f"found combinator '{combinator.type_name()}', to expand runbook."212            )213            combinator.initialize()214            while True:215                variables = combinator.fetch(self._runbook_builder.variables)216                if variables is None:217                    break218                sub_runbook_builder = self._runbook_builder.derive(variables=variables)219                transformer.run(220                    sub_runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED221                )222                runners = self._generate_runners(223                    sub_runbook_builder.resolve(), variables224                )225                for runner in runners:226                    yield runner227        else:228            # no combinator, use the root runbook229            transformer.run(230                self._runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED231            )232            for runner in self._generate_runners(233                root_runbook, self._runbook_builder.variables234            ):235                yield runner236    def _generate_runners(237        self, runbook: schema.Runbook, variables: Dict[str, VariableEntry]238    ) -> Iterator[BaseRunner]:239        # group filters by runner type240        case_variables = get_case_variables(variables)241        runner_filters: Dict[str, List[schema.BaseTestCaseFilter]] = {}242        for raw_filter in runbook.testcase_raw:243            # by default run all filtered cases unless 'enable' is specified as false244            filter = schema.load_by_type(schema.BaseTestCaseFilter, raw_filter)245            if filter.enabled:246                raw_filters: List[schema.BaseTestCaseFilter] = runner_filters.get(247                    filter.type, []248                )249                if not raw_filters:250                    runner_filters[filter.type] = raw_filters251                raw_filters.append(raw_filter)252            else:253                self._log.debug(f"Skip disabled filter: {raw_filter}.")254        # initialize runners255        factory = Factory[BaseRunner](BaseRunner)256        for runner_name, raw_filters in runner_filters.items():257            self._log.debug(258                f"create runner {runner_name} with {len(raw_filters)} filter(s)."259            )260            # keep filters to current runner's only.261            runbook.testcase = parse_testcase_filters(raw_filters)262            runner = factory.create_by_type_name(263                type_name=runner_name,264                runbook=runbook,265                index=self._runner_count,266                case_variables=case_variables,267            )268            runner.initialize()269            self._runners.append(runner)270            self._runner_count += 1271            yield runner272    def _submit_runner_tasks(273        self,274        runner: BaseRunner,275        task_manager: TaskManager[None],276    ) -> bool:277        has_task: bool = False278        while not runner.is_done and task_manager.has_idle_worker():279            # fetch a task and submit280            task = runner.fetch_task()281            if task:282                if isinstance(task, Task):283                    task_manager.submit_task(task)284                else:285                    raise LisaException(f"Unknown task type: '{type(task)}'")286                has_task = True287            else:288                # current runner may not be done, but it doesn't289                # have task temporarily. The root runner can start290                # tasks from next runner.291                break292        return has_task293    def _start_loop(self) -> None:294        # in case all of runners are disabled295        runner_iterator = self._fetch_runners()296        remaining_runners: List[BaseRunner] = []297        run_message = messages.TestRunMessage(298            status=messages.TestRunStatus.RUNNING,299        )300        notifier.notify(run_message)301        task_manager = TaskManager[None](self._max_concurrency, is_verbose=True)302        # set the global task manager for cancellation check303        set_global_task_manager(task_manager)304        has_more_runner = True305        # run until no idle workers are available and all runner are closed306        while task_manager.wait_worker() or has_more_runner or remaining_runners:307            assert task_manager.has_idle_worker()308            # submit tasks until idle workers are available309            while task_manager.has_idle_worker():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
