Best Python code snippet using autotest_python
worker.py
Source:worker.py  
1# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing,10# software distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""15General code for parallelizing the workers16"""17import time18from queue import Empty, Full, Queue19from threading import Thread20from typing import Any, Callable, Iterator, List21__all__ = ["ParallelWorker"]22class ParallelWorker(object):23    """24    Multi threading worker to parallelize tasks25    :param worker_func: the function to parallelize across multiple tasks26    :param num_workers: number of workers to use27    :param indefinite: True to keep the thread pooling running so that28        more tasks can be added, False to stop after no more tasks are added29    :param max_source_size: the maximum size for the source queue30    """31    def __init__(32        self,33        worker_func: Callable,34        num_workers: int,35        indefinite: bool,36        max_source_size: int = -1,37    ):38        self._worker_func = worker_func39        self._num_workers = num_workers40        self._pending_count = 041        self._source_queue = (42            Queue(maxsize=max_source_size) if max_source_size > 0 else Queue()43        )44        self._completed = Queue()45        self._indefinite = Queue()46        self._shutdown = Queue()47        self.indefinite = indefinite48    def __iter__(self) -> Iterator[Any]:49        while self._shutdown.empty() and not (50            self._indefinite.empty()51            and self._pending_count < 152            and self._completed.empty()53        ):54            try:55                res = self._completed.get(block=True, timeout=1.0)56                self._pending_count -= 157                yield res58            except Empty:59                continue60    def __len__(self):61        return self._pending_count62    @property63    def indefinite(self) -> bool:64        """65        :return: True to keep the thread pooling running so that66            more tasks can be added, False to stop after no more tasks are added67        """68        return not self._indefinite.empty()69    @indefinite.setter70    def indefinite(self, value: bool):71        """72        :param value: True to keep the thread pooling running so that73            more tasks can be added, False to stop after no more tasks are added74        """75        if value and self._indefinite.empty():76            self._indefinite.put(True)77        elif not value and not self._indefinite.empty():78            self._indefinite.get()79    def start(self):80        """81        Start the workers82        """83        for _ in range(self._num_workers):84            Thread(85                target=ParallelWorker._worker,86                args=(87                    self._worker_func,88                    self._source_queue,89                    self._completed,90                    self._indefinite,91                    self._shutdown,92                ),93            ).start()94    def shutdown(self):95        """96        Stop the workers97        """98        self._shutdown.put(True)99    def add(self, vals: List[Any]):100        """101        :param vals: the values to add for processing work102        """103        self._pending_count += len(vals)104        ParallelWorker._adder(vals, self._source_queue, self._shutdown)105    def add_async(self, vals: List[Any]):106        """107        :param vals: the values to add for async workers108        """109        self._pending_count += len(vals)110        Thread(111            target=ParallelWorker._adder,112            args=(vals, self._source_queue, self._shutdown),113        ).start()114    def add_async_generator(self, gen: Iterator[Any]):115        """116        :param gen: add an async generator to pull values from for processing117        """118        Thread(119            target=ParallelWorker._gen_adder,120            args=(gen, self._source_queue, self._shutdown, self._indefinite),121        ).start()122    def add_item(self, val: Any):123        """124        :param val: add a single item for processing125        """126        self._pending_count += 1127        self._source_queue.put(val)128    @staticmethod129    def _worker(130        worker_func: Callable,131        source_queue: Queue,132        completed: Queue,133        indefinite: Queue,134        shutdown: Queue,135    ):136        while True:137            if not shutdown.empty() or (source_queue.empty() and indefinite.empty()):138                return139            try:140                val = source_queue.get(block=True, timeout=0.01)141            except Empty:142                continue143            res = worker_func(val)144            completed.put(res)145            source_queue.task_done()146    @staticmethod147    def _adder(vals: List[Any], source_queue: Queue, shutdown: Queue):148        index = 0149        while index < len(vals) and shutdown.empty():150            try:151                source_queue.put(vals[index], block=True, timeout=0.01)152                index += 1153            except Full:154                continue155    @staticmethod156    def _gen_adder(157        gen: Iterator[Any], source_queue: Queue, shutdown: Queue, indefinite: Queue158    ):159        indefinite.put(True)160        for val in gen:161            while True:162                if not shutdown.empty():163                    return164                try:165                    source_queue.put(val, block=True, timeout=0.01)166                    break167                except Full:168                    continue169        # give some time for everything to complete since we didn't add to pending count170        # need to architect this better in the future to get rid of171        # the edge case (last items don't complete in 1 sec)172        while not source_queue.empty():173            time.sleep(0.1)174        time.sleep(1.0)175        while not indefinite.empty():176            try:177                indefinite.get_nowait()178            except Empty:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
