Best Python code snippet using lisa_python
runner.py
Source:runner.py  
...274        runner: BaseRunner,275        task_manager: TaskManager[None],276    ) -> bool:277        has_task: bool = False278        while not runner.is_done and task_manager.has_idle_worker():279            # fetch a task and submit280            task = runner.fetch_task()281            if task:282                if isinstance(task, Task):283                    task_manager.submit_task(task)284                else:285                    raise LisaException(f"Unknown task type: '{type(task)}'")286                has_task = True287            else:288                # current runner may not be done, but it doesn't289                # have task temporarily. The root runner can start290                # tasks from next runner.291                break292        return has_task293    def _start_loop(self) -> None:294        # in case all of runners are disabled295        runner_iterator = self._fetch_runners()296        remaining_runners: List[BaseRunner] = []297        run_message = messages.TestRunMessage(298            status=messages.TestRunStatus.RUNNING,299        )300        notifier.notify(run_message)301        task_manager = TaskManager[None](self._max_concurrency, is_verbose=True)302        # set the global task manager for cancellation check303        set_global_task_manager(task_manager)304        has_more_runner = True305        # run until no idle workers are available and all runner are closed306        while task_manager.wait_worker() or has_more_runner or remaining_runners:307            assert task_manager.has_idle_worker()308            # submit tasks until idle workers are available309            while task_manager.has_idle_worker():310                for runner in remaining_runners[:]:311                    has_task = self._submit_runner_tasks(runner, task_manager)312                    if runner.is_done:313                        runner.close()314                        remaining_runners.remove(runner)315                        self._runners.remove(runner)316                    if has_task:317                        # This makes the loop is deep first. It intends to318                        # complete the prior runners firstly, instead of start319                        # later runners.320                        continue321                if not self._idle_logged:322                    self._log.debug(323                        f"running count: {task_manager.running_count}, "324                        f"id: {[x.id for x in remaining_runners]} "325                    )326                if task_manager.has_idle_worker():327                    if has_more_runner:328                        # add new runner up to max concurrency if idle workers329                        # are available330                        try:331                            while len(remaining_runners) < self._max_concurrency:332                                runner = next(runner_iterator)333                                remaining_runners.append(runner)334                                self._log.debug(f"Added runner {runner.id}")335                        except StopIteration:336                            has_more_runner = False337                        self._idle_logged = False338                    else:339                        # reduce CPU utilization from infinite loop when idle340                        # workers are present but no task to run....mainThread.py
Source:mainThread.py  
1"""2author wanghaiying3date 201906014description 主è°åº¦çº¿ç¨5"""6from threading import Event7import traceback8from schedule.errorRetryPolicy import ErrorRetryPolicy9from schedule.idleTime import IdleTime10from schedule.manageTaskStatus import ManageTaskStatus11from schedule.metadataValidator import MetadataValidator12from schedule.preCondition import PreCondition13from schedule.manageSchedule import Schedule14from schedule.task import TaskType15from schedule.taskList import TaskList16from schedule.taskWorker import TaskWorker17from schedule.taskWorkerPool import TaskWorkerPool18from schedule.threadBase import ThreadBase19from schedule.timerThread import TimerThread20def check_pre_condition(task):21    # 夿åç½®æ¡ä»¶22    condition = PreCondition().create(task)23    if not condition.is_true():24        return False25    # é误éè¯çç¥26    erp = ErrorRetryPolicy(task=task)27    if not erp.can_retry_now():28        return False29    # 夿å
æ°æ®30    meta = MetadataValidator(task=task)31    if not meta.validate():32        # æ¹åç¶æ33        task.update_validate_error(meta.error_info)34        return False35    return True36def Singleton(cls):37    _instance = {}38    def _singleton(*args, **kargs):39        if cls not in _instance:40            _instance[cls] = cls(*args, **kargs)41        return _instance[cls]42    return _singleton43@Singleton44class MainThread(ThreadBase):45    def __init__(self, task_worker_pool=None, task_list=None, stop_flag=False, timer_thread=None,46                 manage_task_status=None, work_event=Event(), complete_event=Event()):47        self._task_worker_pool = task_worker_pool48        self._task_list = task_list49        self._task_list.load_from_db()50        self._stop_flag = stop_flag51        self._timer_thread = timer_thread52        self._manage_task_status = manage_task_status53        manage_task_status.clear_task_status()54        self._threads = list()55        self._events = list()56        i = 057        if len(self._task_worker_pool.running_workers) > 0:58            while i < self._task_worker_pool.capacity:59                running_worker = self._task_worker_pool.running_workers[i]60                self._threads.append(running_worker)61                self._events.append(running_worker.complete_event)62                i += 163        # add timer thread64        self._threads.append(self._timer_thread)65        self._events.append(self._timer_thread.complete_event)66        # add main thread67        self._threads.append(self)68        ThreadBase.__init__(self, work_event, complete_event)69        self._events.append(self.complete_event)70    @property71    def events(self):72        return self._events73    @property74    def stop_flag(self):75        return self._stop_flag76    def run(self):77        self._timer_thread.run()78        self._task_worker_pool.run()79    def clean(self):80        # æ¯å¦éè¦lock81        self._stop_flag = True82        self.complete_event.set()83        self._task_worker_pool.clean()84        # wait for main thread exit85        self.own_thread.join()86    def thread_proc(self):87        i = 088        if len(self.events) > 0:89            while (not self.stop_flag or not self._task_worker_pool.all_task_complete) and i < len(self.events):90                print("thread_proc while" + str(i))91                self.event_handler(i)92                i += 193    def event_handler(self, i):94        thread = self._threads[i]95        # if isinstance(thread, TaskWorker):96        #     self.on_task_complete(thread)97        if not self._stop_flag:98            self.loop_dispatch(i)99    def loop_dispatch(self, i):100        print(" loop_dispatch " + str(i))101        while self.dispatch_task(i):102            pass103    def dispatch_task(self, i):104        print("     dispatch_task " + str(i))105        # æ¯å¦æå¾
åé
ä»»å¡106        task = self._task_list.current_task()107        if task is None:108            return False109        # æ¯å¦æç©ºé²worker110        print("     has_idle_worker", self._task_worker_pool.has_idle_worker)111        if not self._task_worker_pool.has_idle_worker:112            return False113        # åªæDSAåLOOP任塿éè¦å¤æç©ºé²æ¶é´114        # if task.task_type in (TaskType.DSA, TaskType.LOOP):115        #     # æ¯å¦äº¤æç©ºé²æ¶é´116        #     if not IdleTime().is_idle_time(now):117        #         self._task_list.next()118        #         return True119        # ä»è°åº¦æ¶é´è¡¨è·åæ§è¡æ¥æ120        # sh = Schedule().create(task.schedule_type)121        # date_id = sh.get_excute_date_id(task, now)122        # if date_id <= 0:123        #     self._task_list.next()124        #     return True125        # task._current_execute_date_id = date_id126        # 夿åç½®æ¡ä»¶127        # if not check_pre_condition(task):128        #     self._task_list.next()129        #     return True130        # 忥å¿. log before run task131        # task.insert_log()132        # link Task & Thread133        idle_worker = self._task_worker_pool.get_idle_worker()134        task.task_worker = idle_worker135        idle_worker.task = task136        self._task_list.next()137        # idle_workeræ·»å å°running_workersæ°ç»é138        self._task_worker_pool.running_workers.append(idle_worker)139        # æ§è¡ä»»å¡. run task140        idle_worker.execute_task()141        return True142    def on_task_complete(self, task_worker):143        if task_worker.task is not None:144            # update log145            task_worker.task.update_log()146            # Separate task & thread147            task_worker.task.task_worker = None148            task_worker.task = None149        self._task_worker_pool.recovery_worker(task_worker)150if __name__ == '__main__':151    mainThread = MainThread(task_worker_pool=TaskWorkerPool(), task_list=TaskList(local_ip='127.0.0.1'),152                            timer_thread=TimerThread(value=3, work_event=Event(), complete_event=Event()),153                            manage_task_status=ManageTaskStatus())154    # try:155    #     # mainThread.run()156    #     # mainThread.clean()157    #158    #     task_worker_pool = TaskWorkerPool()159    # except Exception as e:...taskWorkerPool.py
Source:taskWorkerPool.py  
...35    @property36    def all_task_complete(self):37        return self.idle_workers.qsize() == self.capacity38    @property39    def has_idle_worker(self):40        # print("idle_workers qsize:" + str(self.idle_workers.qsize()))41        return self.idle_workers.qsize() > 042    def clean(self):43        self._stop_flag = True44        self._work_event.set()45    def get_idle_worker(self):46        if 0 == self.idle_workers.qsize():47            return None48        task_worker = self.idle_workers.get()49        # self.running_workers.remove(task_worker)50        return task_worker51    # def recovery_worker(self, task_worker):52    #     self.idle_workers.put(task_worker)53    def thread_proc(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
