How to use is_task_alive method in avocado

Best Python code snippet using avocado_python

task.py

Source:task.py Github

copy

Full Screen

...30 # 如是新加入的任务则启动任务31 if not task._started.is_set():32 task.start()33 # 如任务没有存活则停止任务34 if not task.is_task_alive():35 logger.debug('daemon run stop')36 task.stop()37 # 如果任务已经停止则清理任务38 if task._stopped.is_set():39 if task.is_kill_when_stop():40 # 杀掉任务41 self.kill_task(task.name)42 else:43 # 设置为待清理44 self._tasks_running[i] = None45 # 每秒轮询46 time.sleep(1)47 logger.debug('daemon run finish')48 def exit(self):49 '''退出守护线程50 :return:51 '''52 logger.debug('daemon exit start')53 # 结束所有在运行的任务54 for task in self._tasks_running:55 task.stop()56 # 清空任务列表和在运行任务列表57 self._tasks = {}58 self._tasks_running = []59 # 设置退出标识60 self._exit.set()61 logger.debug('daemon exit finish')62 def add_task(self, class_, name, callback = None, params = None,63 **kwargs):64 task = class_(name = name, callback = callback,65 params = params, **kwargs)66 if self.has_task(name):67 return False68 self._tasks[name] = task69 self._tasks_running.append(task)70 return True71 def has_task(self, name):72 return name in self._tasks73 def kill_task(self, name):74 try:75 task = self._tasks.get(name, None)76 assert task77 if task.is_task_alive():78 task.stop()79 task.on_kill()80 if task in self._tasks_running:81 index = self._tasks_running.index(task)82 self._tasks_running[index] = None83 self._tasks.pop(name)84 return True85 except AssertionError:86 return True87 except Exception as e:88 return False89 def query_tasks(self):90 return [{'name': task.name,91 'running': True if task in self._tasks_running else92 False}93 for task in self._tasks]94 def has_task_running(self):95 return len(self._tasks_running) > 096 def call(self, name, method, *args, **kwargs):97 task = self._tasks.get(name, None)98 if task:99 return getattr(task, method)(*args, **kwargs)100 def getattr(self, name, attribute):101 task = self._tasks.get(name, None)102 if task:103 return getattr(task, attribute)104 def setattr(self, name, attribute, value):105 task = self._tasks.get(name, None)106 if task:107 return setattr(task, attribute, value)108class TaskRunner(Thread):109 """任务运行器110 """111 def __init__(self, **kwargs):112 try:113 # 取出任务名称作为线程名称114 name = kwargs.get('name', None)115 assert name116 kwargs.pop('name')117 super().__init__(name = name, kwargs = kwargs)118 # 回调句柄119 self._callback = kwargs.get('callback', None)120 # 运行参数121 self._params = kwargs.get('params', None)122 if self._params:123 for k, v in self._params.items():124 setattr(self, f'_{k}', v)125 # 线程控制成员126 self._stopped = Event() # 线程停止标识127 self._stop_by_user = False # 被用户终止标识128 self._stop_by_self = False # 被任务自身终止标识129 self._stop_by_daemon = False # 被守护线程终止标识130 self._run_times = 0 # 运行次数131 except AssertionError:132 raise TaskRunnerInitError()133 def run(self):134 # 前置运行,获得轮询间隔135 interval = self._on_before_runloop()136 interval = interval\137 if interval and (isinstance(interval, int)138 or isinstance(interval, float)) else 0.1139 while not self._stopped.is_set():140 self._run_times = self._run_times + 1141 logger.debug(142 f"[{urldecode(self.name)}] run loop start ({self._run_times})")143 # 主体运行144 self._on_runloop()145 # 按间隔轮询146 time.sleep(interval)147 logger.debug(148 f"[{urldecode(self.name)}] run loop end ({self._run_times})")149 # 后置运行,用于清场150 self._on_after_runloop()151 def stop(self):152 try:153 logger.debug(f"[{urldecode(self.name)}] stop server start")154 self._on_before_stop()155 self._stopped.set()156 self._stop_by_user = True157 self._on_after_stop()158 logger.debug(f"[{urldecode(self.name)}] stop server end")159 return True, True, self._get_stop_success_message()160 except Exception:161 return False, False, self._get_stop_success_message()162 def is_task_alive(self):163 pass164 def is_kill_when_stop(self):165 pass166 def on_kill(self):167 pass168 def _on_before_runloop(self):169 return 0.1170 def _on_runloop(self):171 pass172 def _on_after_runloop(self):173 pass174 def _on_before_stop(self):175 pass176 def _on_after_stop(self):177 pass178 def _get_stop_success_message(self):179 return _('Task has stopped')180 def _get_stop_fail_message(self):181 return _('Task stop failed')182class HeartbeatInitError(RuntimeError):183 pass184class OnceInitError(RuntimeError):185 pass186class HeartbeatRunner(TaskRunner):187 """心跳运行器188 """189 def __init__(self, **kwargs):190 super().__init__(**kwargs)191 try:192 assert self._callback193 assert self._interval194 assert self._command195 from server.android.device import\196 _client # todo change to current_client197 self._command_func = getattr(_client,198 f'cmd_{self._command}',199 None)200 self._alive_func = getattr(_client,201 f'alive_{self._command}',202 None)203 assert self._command_func204 except AssertionError:205 raise HeartbeatInitError()206 def _on_before_runloop(self):207 return self._interval208 def _on_runloop(self):209 if self._command_func:210 self._callback('android',211 (self._command, self.name) +212 self._command_func(self.name,213 **self._params))214 else:215 self._callback('android',216 (self._command, self.name, False, False,217 None))218 def _on_after_runloop(self):219 logger.debug(f'[{urldecode(self.name)}] after run loop')220 def is_task_alive(self):221 if self._alive_func:222 return self._alive_func(self.name, **self._params)223 else:224 return True225 def is_kill_when_stop(self):226 return True227class OnceRunner(TaskRunner):228 """一次性运行器229 """230 def __init__(self, **kwargs):231 super().__init__(**kwargs)232 try:233 assert self._callback234 assert self._command235 from server.android.device import\236 _client # todo change to current_client237 self._command_func = getattr(_client,238 f'cmd_{self._command}',239 None)240 assert self._command_func241 except AssertionError:242 raise OnceInitError()243 def _on_runloop(self):244 self._callback('android',245 (self._command, self.name) +246 self._command_func(self.name,247 **self._params))248 def is_task_alive(self):249 return False250 def is_kill_when_stop(self):...

Full Screen

Full Screen

test_spawner.py

Source:test_spawner.py Github

copy

Full Screen

...14 loop = asyncio.get_event_loop()15 spawned = loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))16 self.assertTrue(spawned)17 def test_never_spawned(self):18 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))19 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))20class Mock(Process):21 def setUp(self):22 runnable = nrunner.Runnable('noop', 'uri')23 task = nrunner.Task('1', runnable)24 self.runtime_task = RuntimeTask(task)25 self.spawner = MockSpawner()26 def test_spawned_is_alive(self):27 loop = asyncio.get_event_loop()28 loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))29 self.assertTrue(self.spawner.is_task_alive(self.runtime_task))30 self.assertFalse(self.spawner.is_task_alive(self.runtime_task))31class RandomMock(Mock):32 def setUp(self):33 runnable = nrunner.Runnable('noop', 'uri')34 task = nrunner.Task('1', runnable)35 self.runtime_task = RuntimeTask(task)36 self.spawner = MockRandomAliveSpawner()37 def test_spawned_is_alive(self):38 loop = asyncio.get_event_loop()39 loop.run_until_complete(self.spawner.spawn_task(self.runtime_task))40 # The likelihood of the random spawner returning the task is41 # not alive is 1 in 5. This gives the random code 1000042 # chances of returning False, so it should, famous last words,43 # be pretty safe44 finished = False45 for _ in range(10000):46 if not self.spawner.is_task_alive(self.runtime_task):47 finished = True48 break49 self.assertTrue(finished)50if __name__ == '__main__':...

Full Screen

Full Screen

taskkiller.py

Source:taskkiller.py Github

copy

Full Screen

2import os3def get_tasks():4 with open('tasks.txt') as f:5 return [x.rstrip('\n') for x in f.readlines()]6def is_task_alive(tasks):7 for x in os.popen('tasklist').readlines():8 if len(x) > 1:9 task = x.split()[0]10 if task in tasks:11 return task12def main():13 tasks = get_tasks()14 while True:15 task = is_task_alive(tasks)16 if task:17 os.system(f'taskkill /f /im {task}')18 ctypes.windll.user32.MessageBoxW(0, 'Get back to work!', 'Alert!', 0)19if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful