How to use _pending_count method in autotest

1# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# Unless required by applicable law or agreed to in writing,10# software distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""15General code for parallelizing the workers16"""17import time18from queue import Empty, Full, Queue19from threading import Thread20from typing import Any, Callable, Iterator, List21__all__ = ["ParallelWorker"]22class ParallelWorker(object):23 """24 Multi threading worker to parallelize tasks25 :param worker_func: the function to parallelize across multiple tasks26 :param num_workers: number of workers to use27 :param indefinite: True to keep the thread pooling running so that28 more tasks can be added, False to stop after no more tasks are added29 :param max_source_size: the maximum size for the source queue30 """31 def __init__(32 self,33 worker_func: Callable,34 num_workers: int,35 indefinite: bool,36 max_source_size: int = -1,37 ):38 self._worker_func = worker_func39 self._num_workers = num_workers40 self._pending_count = 041 self._source_queue = (42 Queue(maxsize=max_source_size) if max_source_size > 0 else Queue()43 )44 self._completed = Queue()45 self._indefinite = Queue()46 self._shutdown = Queue()47 self.indefinite = indefinite48 def __iter__(self) -> Iterator[Any]:49 while self._shutdown.empty() and not (50 self._indefinite.empty()51 and self._pending_count < 152 and self._completed.empty()53 ):54 try:55 res = self._completed.get(block=True, timeout=1.0)56 self._pending_count -= 157 yield res58 except Empty:59 continue60 def __len__(self):61 return self._pending_count62 @property63 def indefinite(self) -> bool:64 """65 :return: True to keep the thread pooling running so that66 more tasks can be added, False to stop after no more tasks are added67 """68 return not self._indefinite.empty()69 @indefinite.setter70 def indefinite(self, value: bool):71 """72 :param value: True to keep the thread pooling running so that73 more tasks can be added, False to stop after no more tasks are added74 """75 if value and self._indefinite.empty():76 self._indefinite.put(True)77 elif not value and not self._indefinite.empty():78 self._indefinite.get()79 def start(self):80 """81 Start the workers82 """83 for _ in range(self._num_workers):84 Thread(85 target=ParallelWorker._worker,86 args=(87 self._worker_func,88 self._source_queue,89 self._completed,90 self._indefinite,91 self._shutdown,92 ),93 ).start()94 def shutdown(self):95 """96 Stop the workers97 """98 self._shutdown.put(True)99 def add(self, vals: List[Any]):100 """101 :param vals: the values to add for processing work102 """103 self._pending_count += len(vals)104 ParallelWorker._adder(vals, self._source_queue, self._shutdown)105 def add_async(self, vals: List[Any]):106 """107 :param vals: the values to add for async workers108 """109 self._pending_count += len(vals)110 Thread(111 target=ParallelWorker._adder,112 args=(vals, self._source_queue, self._shutdown),113 ).start()114 def add_async_generator(self, gen: Iterator[Any]):115 """116 :param gen: add an async generator to pull values from for processing117 """118 Thread(119 target=ParallelWorker._gen_adder,120 args=(gen, self._source_queue, self._shutdown, self._indefinite),121 ).start()122 def add_item(self, val: Any):123 """124 :param val: add a single item for processing125 """126 self._pending_count += 1127 self._source_queue.put(val)128 @staticmethod129 def _worker(130 worker_func: Callable,131 source_queue: Queue,132 completed: Queue,133 indefinite: Queue,134 shutdown: Queue,135 ):136 while True:137 if not shutdown.empty() or (source_queue.empty() and indefinite.empty()):138 return139 try:140 val = source_queue.get(block=True, timeout=0.01)141 except Empty:142 continue143 res = worker_func(val)144 completed.put(res)145 source_queue.task_done()146 @staticmethod147 def _adder(vals: List[Any], source_queue: Queue, shutdown: Queue):148 index = 0149 while index < len(vals) and shutdown.empty():150 try:151 source_queue.put(vals[index], block=True, timeout=0.01)152 index += 1153 except Full:154 continue155 @staticmethod156 def _gen_adder(157 gen: Iterator[Any], source_queue: Queue, shutdown: Queue, indefinite: Queue158 ):159 indefinite.put(True)160 for val in gen:161 while True:162 if not shutdown.empty():163 return164 try:165 source_queue.put(val, block=True, timeout=0.01)166 break167 except Full:168 continue169 # give some time for everything to complete since we didn't add to pending count170 # need to architect this better in the future to get rid of171 # the edge case (last items don't complete in 1 sec)172 while not source_queue.empty():173 time.sleep(0.1)174 time.sleep(1.0)175 while not indefinite.empty():176 try:177 indefinite.get_nowait()178 except Empty:...

