How to use task_stop method in autotest

Best Python code snippet using autotest_python

task_base.py

Source:task_base.py Github

copy

Full Screen

...15 """16 Initializer17 Args:18 stop_timeout_secs (int): Number of seconds to wait for process to exit19 upon calling task_stop(). If the process fails to stop before the20 specified timeout, it will attemp to kill the process via brute21 force. If you would like to wait indefinitely, pass in `None`.22 """23 self._stop_timeout_secs = stop_timeout_secs24 self._task_process = None25 self.task_stopping_event = multiprocessing.Event()26 def task_worker(self):27 raise NotImplementedError28 def task_run(self):29 if self.task_stopping_event.is_set():30 return31 self._task_process = multiprocessing.Process(target=self.task_worker)32 self._task_process.start()33 def task_stop(self):34 # Signal the process to stop35 self.task_stopping_event.set()36 # Wait for the process to exit37 self._task_process.join(self._stop_timeout_secs)38 # If the process didn't exit, attempt to kill it39 if self._task_process.is_alive():40 os.kill(self._task_process.pid, signal.SIGKILL)41 if self._task_process.is_alive():42 return False43 return True44#45# ThreadTaskBase =====================================================================46#47class ThreadTaskBase(object):48 """49 Base class for creating an object that gets spawned as a separate thread50 Child class needs to implement the task_worker method, which should be51 designed to return if task_stopping_event is set52 """53 def __init__(self, stop_timeout_secs=None):54 """55 Initializer56 Args:57 stop_timeout_secs (int): Number of seconds to wait for thread to exit58 upon calling task_stop(). If you would like to wait indefinitely,59 pass in None.60 """61 self._stop_timeout_secs = stop_timeout_secs62 self._task_thread = None63 self.task_stopping_event = threading.Event()64 def task_worker(self):65 raise NotImplementedError66 def task_run(self):67 if self.task_stopping_event.is_set():68 return69 self._task_thread = threading.Thread(target=self.task_worker)70 self._task_thread.start()71 def task_stop(self):72 # Signal the thread to stop73 self.task_stopping_event.set()74 # Wait for the thread to exit75 self._task_thread.join(self._stop_timeout_secs)76 if self._task_thread.is_alive():77 return False...

Full Screen

Full Screen

test_task_context.py

Source:test_task_context.py Github

copy

Full Screen

...7 @hookimpl8 def task_start(self):9 self.events.append("task_start")10 @hookimpl11 def task_stop(self, failed):12 self.events.append(("task_stop", failed))13@pytest.fixture14def spy():15 instance = EventSpy()16 pm.register(instance)17 yield instance18 pm.unregister(instance)19def test_task_ok(spy):20 """task_context emits expected events during successful non-exit case"""21 # Initially no events22 assert not spy.events23 # Simulate a task24 with task_context():25 # task_start should have already been fired...

Full Screen

Full Screen

spider.py

Source:spider.py Github

copy

Full Screen

1import threading2import time3from queue import Queue4from settings import MAX_PAGE, DL_PSD, FILE_DIR, MAX_QUEUE, MAX_THREAD5from httprequest import UrlHandler, Download6class Producer(threading.Thread):7 def __init__(self, queue, task,):8 threading.Thread.__init__(self)9 self.task_stop = False10 self.task = task11 self.queue = queue12 def run(self):13 tasks = self.task.get_tasks()14 while True:15 try:16 task = next(tasks)17 except StopIteration:18 break19 else:20 self.queue.put(task)21 def stop(self):22 self.task_stop = True23class Consumer(threading.Thread):24 def __init__(self, queue, donwload):25 threading.Thread.__init__(self)26 self.task_stop = False27 self.queue = queue28 self.download = donwload29 def run(self):30 while not self.task_stop:31 try:32 data_ = self.queue.get(timeout=4)33 except Exception as e:34 print(e.args)35 break36 else:37 self.download(data_, dl_dir=FILE_DIR, has_psd=DL_PSD, ).work()38 self.queue.task_done()39 def stop(self):40 self.task_stop = True41def wrapper(func, *args, **kwargs):42 def timer():43 start_time = time.time()44 func(*args, **kwargs)45 end_time = time.time()46 print('总用时:-->', int(end_time - start_time))47 return timer48@wrapper49def main():50 q = Queue(MAX_QUEUE)51 t1_thread = []52 t2_thread = []53 for url in ['http://www.uimaker.com/uimakerdown/list_36_{}.html'.format(i) for i in range(1, MAX_PAGE)]:54 t1 = Producer(q, UrlHandler(url))55 t1.start()56 t1_thread.append(t1)57 for i in range(MAX_THREAD):58 t2 = Consumer(q, Download)59 t2.start()60 for t1 in t1_thread:61 t1.join()62 for t2 in t2_thread:63 t2.join()64 q.join()65if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful