How to use has_idle_worker method in lisa

Best Python code snippet using lisa_python

runner.py

Source:runner.py Github

copy

Full Screen

...274 runner: BaseRunner,275 task_manager: TaskManager[None],276 ) -> bool:277 has_task: bool = False278 while not runner.is_done and task_manager.has_idle_worker():279 # fetch a task and submit280 task = runner.fetch_task()281 if task:282 if isinstance(task, Task):283 task_manager.submit_task(task)284 else:285 raise LisaException(f"Unknown task type: '{type(task)}'")286 has_task = True287 else:288 # current runner may not be done, but it doesn't289 # have task temporarily. The root runner can start290 # tasks from next runner.291 break292 return has_task293 def _start_loop(self) -> None:294 # in case all of runners are disabled295 runner_iterator = self._fetch_runners()296 remaining_runners: List[BaseRunner] = []297 run_message = messages.TestRunMessage(298 status=messages.TestRunStatus.RUNNING,299 )300 notifier.notify(run_message)301 task_manager = TaskManager[None](self._max_concurrency, is_verbose=True)302 # set the global task manager for cancellation check303 set_global_task_manager(task_manager)304 has_more_runner = True305 # run until no idle workers are available and all runner are closed306 while task_manager.wait_worker() or has_more_runner or remaining_runners:307 assert task_manager.has_idle_worker()308 # submit tasks until idle workers are available309 while task_manager.has_idle_worker():310 for runner in remaining_runners[:]:311 has_task = self._submit_runner_tasks(runner, task_manager)312 if runner.is_done:313 runner.close()314 remaining_runners.remove(runner)315 self._runners.remove(runner)316 if has_task:317 # This makes the loop is deep first. It intends to318 # complete the prior runners firstly, instead of start319 # later runners.320 continue321 if not self._idle_logged:322 self._log.debug(323 f"running count: {task_manager.running_count}, "324 f"id: {[x.id for x in remaining_runners]} "325 )326 if task_manager.has_idle_worker():327 if has_more_runner:328 # add new runner up to max concurrency if idle workers329 # are available330 try:331 while len(remaining_runners) < self._max_concurrency:332 runner = next(runner_iterator)333 remaining_runners.append(runner)334 self._log.debug(f"Added runner {runner.id}")335 except StopIteration:336 has_more_runner = False337 self._idle_logged = False338 else:339 # reduce CPU utilization from infinite loop when idle340 # workers are present but no task to run....

Full Screen

Full Screen

mainThread.py

Source:mainThread.py Github

copy

Full Screen

1"""2author wanghaiying3date 201906014description 主调度线程5"""6from threading import Event7import traceback8from schedule.errorRetryPolicy import ErrorRetryPolicy9from schedule.idleTime import IdleTime10from schedule.manageTaskStatus import ManageTaskStatus11from schedule.metadataValidator import MetadataValidator12from schedule.preCondition import PreCondition13from schedule.manageSchedule import Schedule14from schedule.task import TaskType15from schedule.taskList import TaskList16from schedule.taskWorker import TaskWorker17from schedule.taskWorkerPool import TaskWorkerPool18from schedule.threadBase import ThreadBase19from schedule.timerThread import TimerThread20def check_pre_condition(task):21 # 判断前置条件22 condition = PreCondition().create(task)23 if not condition.is_true():24 return False25 # 错误重试策略26 erp = ErrorRetryPolicy(task=task)27 if not erp.can_retry_now():28 return False29 # 判断元数据30 meta = MetadataValidator(task=task)31 if not meta.validate():32 # 改变状态33 task.update_validate_error(meta.error_info)34 return False35 return True36def Singleton(cls):37 _instance = {}38 def _singleton(*args, **kargs):39 if cls not in _instance:40 _instance[cls] = cls(*args, **kargs)41 return _instance[cls]42 return _singleton43@Singleton44class MainThread(ThreadBase):45 def __init__(self, task_worker_pool=None, task_list=None, stop_flag=False, timer_thread=None,46 manage_task_status=None, work_event=Event(), complete_event=Event()):47 self._task_worker_pool = task_worker_pool48 self._task_list = task_list49 self._task_list.load_from_db()50 self._stop_flag = stop_flag51 self._timer_thread = timer_thread52 self._manage_task_status = manage_task_status53 manage_task_status.clear_task_status()54 self._threads = list()55 self._events = list()56 i = 057 if len(self._task_worker_pool.running_workers) > 0:58 while i < self._task_worker_pool.capacity:59 running_worker = self._task_worker_pool.running_workers[i]60 self._threads.append(running_worker)61 self._events.append(running_worker.complete_event)62 i += 163 # add timer thread64 self._threads.append(self._timer_thread)65 self._events.append(self._timer_thread.complete_event)66 # add main thread67 self._threads.append(self)68 ThreadBase.__init__(self, work_event, complete_event)69 self._events.append(self.complete_event)70 @property71 def events(self):72 return self._events73 @property74 def stop_flag(self):75 return self._stop_flag76 def run(self):77 self._timer_thread.run()78 self._task_worker_pool.run()79 def clean(self):80 # 是否需要lock81 self._stop_flag = True82 self.complete_event.set()83 self._task_worker_pool.clean()84 # wait for main thread exit85 self.own_thread.join()86 def thread_proc(self):87 i = 088 if len(self.events) > 0:89 while (not self.stop_flag or not self._task_worker_pool.all_task_complete) and i < len(self.events):90 print("thread_proc while" + str(i))91 self.event_handler(i)92 i += 193 def event_handler(self, i):94 thread = self._threads[i]95 # if isinstance(thread, TaskWorker):96 # self.on_task_complete(thread)97 if not self._stop_flag:98 self.loop_dispatch(i)99 def loop_dispatch(self, i):100 print(" loop_dispatch " + str(i))101 while self.dispatch_task(i):102 pass103 def dispatch_task(self, i):104 print(" dispatch_task " + str(i))105 # 是否有待分配任务106 task = self._task_list.current_task()107 if task is None:108 return False109 # 是否有空闲worker110 print(" has_idle_worker", self._task_worker_pool.has_idle_worker)111 if not self._task_worker_pool.has_idle_worker:112 return False113 # 只有DSA和LOOP任务才需要判断空闲时间114 # if task.task_type in (TaskType.DSA, TaskType.LOOP):115 # # 是否交易空闲时间116 # if not IdleTime().is_idle_time(now):117 # self._task_list.next()118 # return True119 # 从调度时间表获取执行日期120 # sh = Schedule().create(task.schedule_type)121 # date_id = sh.get_excute_date_id(task, now)122 # if date_id <= 0:123 # self._task_list.next()124 # return True125 # task._current_execute_date_id = date_id126 # 判断前置条件127 # if not check_pre_condition(task):128 # self._task_list.next()129 # return True130 # 写日志. log before run task131 # task.insert_log()132 # link Task & Thread133 idle_worker = self._task_worker_pool.get_idle_worker()134 task.task_worker = idle_worker135 idle_worker.task = task136 self._task_list.next()137 # idle_worker添加到running_workers数组里138 self._task_worker_pool.running_workers.append(idle_worker)139 # 执行任务. run task140 idle_worker.execute_task()141 return True142 def on_task_complete(self, task_worker):143 if task_worker.task is not None:144 # update log145 task_worker.task.update_log()146 # Separate task & thread147 task_worker.task.task_worker = None148 task_worker.task = None149 self._task_worker_pool.recovery_worker(task_worker)150if __name__ == '__main__':151 mainThread = MainThread(task_worker_pool=TaskWorkerPool(), task_list=TaskList(local_ip='127.0.0.1'),152 timer_thread=TimerThread(value=3, work_event=Event(), complete_event=Event()),153 manage_task_status=ManageTaskStatus())154 # try:155 # # mainThread.run()156 # # mainThread.clean()157 #158 # task_worker_pool = TaskWorkerPool()159 # except Exception as e:...

Full Screen

Full Screen

taskWorkerPool.py

Source:taskWorkerPool.py Github

copy

Full Screen

...35 @property36 def all_task_complete(self):37 return self.idle_workers.qsize() == self.capacity38 @property39 def has_idle_worker(self):40 # print("idle_workers qsize:" + str(self.idle_workers.qsize()))41 return self.idle_workers.qsize() > 042 def clean(self):43 self._stop_flag = True44 self._work_event.set()45 def get_idle_worker(self):46 if 0 == self.idle_workers.qsize():47 return None48 task_worker = self.idle_workers.get()49 # self.running_workers.remove(task_worker)50 return task_worker51 # def recovery_worker(self, task_worker):52 # self.idle_workers.put(task_worker)53 def thread_proc(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful