Best Python code snippet using slash
reusable_executor.py
Source:reusable_executor.py  
1###############################################################################2# Reusable ProcessPoolExecutor3#4# author: Thomas Moreau and Olivier Grisel5#6import time7import warnings8import threading9import multiprocessing as mp10from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS11from .backend.context import cpu_count12from .backend import get_context13__all__ = ['get_reusable_executor']14# Python 2 compat helper15STRING_TYPE = type("")16# Singleton executor and id management17_executor_lock = threading.RLock()18_next_executor_id = 019_executor = None20_executor_kwargs = None21def _get_next_executor_id():22    """Ensure that each successive executor instance has a unique, monotonic id.23    The purpose of this monotonic id is to help debug and test automated24    instance creation.25    """26    global _next_executor_id27    with _executor_lock:28        executor_id = _next_executor_id29        _next_executor_id += 130        return executor_id31def get_reusable_executor(max_workers=None, context=None, timeout=10,32                          kill_workers=False, reuse="auto",33                          job_reducers=None, result_reducers=None,34                          initializer=None, initargs=(), env=None):35    """Return the current ReusableExectutor instance.36    Start a new instance if it has not been started already or if the previous37    instance was left in a broken state.38    If the previous instance does not have the requested number of workers, the39    executor is dynamically resized to adjust the number of workers prior to40    returning.41    Reusing a singleton instance spares the overhead of starting new worker42    processes and importing common python packages each time.43    ``max_workers`` controls the maximum number of tasks that can be running in44    parallel in worker processes. By default this is set to the number of45    CPUs on the host.46    Setting ``timeout`` (in seconds) makes idle workers automatically shutdown47    so as to release system resources. New workers are respawn upon submission48    of new tasks so that ``max_workers`` are available to accept the newly49    submitted tasks. Setting ``timeout`` to around 100 times the time required50    to spawn new processes and import packages in them (on the order of 100ms)51    ensures that the overhead of spawning workers is negligible.52    Setting ``kill_workers=True`` makes it possible to forcibly interrupt53    previously spawned jobs to get a new instance of the reusable executor54    with new constructor argument values.55    The ``job_reducers`` and ``result_reducers`` are used to customize the56    pickling of tasks and results send to the executor.57    When provided, the ``initializer`` is run first in newly spawned58    processes with argument ``initargs``.59    The environment variable in the child process are a copy of the values in60    the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and61    ``VAR`` are string literals to overwrite the environment variable ``ENV``62    in the child processes to value ``VAL``. The environment variables are set63    in the children before any module is loaded. This only works with with the64    ``loky`` context and it is unreliable on Windows with Python < 3.6.65    """66    _executor, _ = _ReusablePoolExecutor.get_reusable_executor(67        max_workers=max_workers, context=context, timeout=timeout,68        kill_workers=kill_workers, reuse=reuse, job_reducers=job_reducers,69        result_reducers=result_reducers, initializer=initializer,70        initargs=initargs, env=env71    )72    return _executor73class _ReusablePoolExecutor(ProcessPoolExecutor):74    def __init__(self, submit_resize_lock, max_workers=None, context=None,75                 timeout=None, executor_id=0, job_reducers=None,76                 result_reducers=None, initializer=None, initargs=(),77                 env=None):78        super(_ReusablePoolExecutor, self).__init__(79            max_workers=max_workers, context=context, timeout=timeout,80            job_reducers=job_reducers, result_reducers=result_reducers,81            initializer=initializer, initargs=initargs, env=env)82        self.executor_id = executor_id83        self._submit_resize_lock = submit_resize_lock84    @classmethod85    def get_reusable_executor(cls, max_workers=None, context=None, timeout=10,86                              kill_workers=False, reuse="auto",87                              job_reducers=None, result_reducers=None,88                              initializer=None, initargs=(), env=None):89        with _executor_lock:90            global _executor, _executor_kwargs91            executor = _executor92            if max_workers is None:93                if reuse is True and executor is not None:94                    max_workers = executor._max_workers95                else:96                    max_workers = cpu_count()97            elif max_workers <= 0:98                raise ValueError(99                    "max_workers must be greater than 0, got {}."100                    .format(max_workers))101            if isinstance(context, STRING_TYPE):102                context = get_context(context)103            if context is not None and context.get_start_method() == "fork":104                raise ValueError(105                    "Cannot use reusable executor with the 'fork' context"106                )107            kwargs = dict(context=context, timeout=timeout,108                          job_reducers=job_reducers,109                          result_reducers=result_reducers,110                          initializer=initializer, initargs=initargs,111                          env=env)112            if executor is None:113                is_reused = False114                mp.util.debug("Create a executor with max_workers={}."115                              .format(max_workers))116                executor_id = _get_next_executor_id()117                _executor_kwargs = kwargs118                _executor = executor = cls(119                    _executor_lock, max_workers=max_workers,120                    executor_id=executor_id, **kwargs)121            else:122                if reuse == 'auto':123                    reuse = kwargs == _executor_kwargs124                if (executor._flags.broken or executor._flags.shutdown125                        or not reuse):126                    if executor._flags.broken:127                        reason = "broken"128                    elif executor._flags.shutdown:129                        reason = "shutdown"130                    else:131                        reason = "arguments have changed"132                    mp.util.debug(133                        "Creating a new executor with max_workers={} as the "134                        "previous instance cannot be reused ({})."135                        .format(max_workers, reason))136                    executor.shutdown(wait=True, kill_workers=kill_workers)137                    _executor = executor = _executor_kwargs = None138                    # Recursive call to build a new instance139                    return cls.get_reusable_executor(max_workers=max_workers,140                                                     **kwargs)141                else:142                    mp.util.debug(143                        "Reusing existing executor with max_workers={}."144                        .format(executor._max_workers)145                    )146                    is_reused = True147                    executor._resize(max_workers)148        return executor, is_reused149    def submit(self, fn, *args, **kwargs):150        with self._submit_resize_lock:151            return super(_ReusablePoolExecutor, self).submit(152                fn, *args, **kwargs)153    def _resize(self, max_workers):154        with self._submit_resize_lock:155            if max_workers is None:156                raise ValueError("Trying to resize with max_workers=None")157            elif max_workers == self._max_workers:158                return159            if self._executor_manager_thread is None:160                # If the executor_manager_thread has not been started161                # then no processes have been spawned and we can just162                # update _max_workers and return163                self._max_workers = max_workers164                return165            self._wait_job_completion()166            # Some process might have returned due to timeout so check how many167            # children are still alive. Use the _process_management_lock to168            # ensure that no process are spawned or timeout during the resize.169            with self._processes_management_lock:170                processes = list(self._processes.values())171                nb_children_alive = sum(p.is_alive() for p in processes)172                self._max_workers = max_workers173                for _ in range(max_workers, nb_children_alive):174                    self._call_queue.put(None)175            while (len(self._processes) > max_workers176                   and not self._flags.broken):177                time.sleep(1e-3)178            self._adjust_process_count()179            processes = list(self._processes.values())180            while not all([p.is_alive() for p in processes]):181                time.sleep(1e-3)182    def _wait_job_completion(self):183        """Wait for the cache to be empty before resizing the pool."""184        # Issue a warning to the user about the bad effect of this usage.185        if len(self._pending_work_items) > 0:186            warnings.warn("Trying to resize an executor with running jobs: "187                          "waiting for jobs completion before resizing.",188                          UserWarning)189            mp.util.debug("Executor {} waiting for jobs completion before"190                          " resizing".format(self.executor_id))191        # Wait for the completion of the jobs192        while len(self._pending_work_items) > 0:193            time.sleep(1e-3)194    def _setup_queues(self, job_reducers, result_reducers):195        # As this executor can be resized, use a large queue size to avoid196        # underestimating capacity and introducing overhead197        queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS198        super(_ReusablePoolExecutor, self)._setup_queues(...work_handler.py
Source:work_handler.py  
...59                )60            )61            self.workers[-1].start()62    def _build_workers(self, func, n=1, on_kill=None, timeout=1, max_queue_size=-1):63        self.kill_workers(self.workers)64        self.workers = []65        self.queue = Queue(max_queue_size)66        for _ in range(n):67            self.workers.append(68                ThreadWorker(69                    self.queue,70                    func,71                    on_kill=on_kill,72                    on_start=on_start,73                    timeout=timeout,74                )75            )76            self.workers[-1].start()77    def kill_workers(self):78        for worker in self.workers:79            worker.kill()80            worker.join()81    def kill_workers_on_completion(self):82        while True:83            time.sleep(0.2)84            if self.queue.empty():85                break86        self.kill_workers()87class RedisWorker:88    def __init__(self, q, on_kill=None):89        self.queue = q90        self.on_kill = on_kill91    def run(self):92        start_worker_cmd = shlex.split("rq worker " + self.queue.name)93        self.process = Popen(start_worker_cmd)94    def kill(self):95        self.process.kill()96        if self.on_kill:97            self.on_kill()98class RedisManager:99    def __init__(self, n=1, on_kill=None, q_name="default-redis-queue"):100        redis_conn = Redis()101        self.queue = rQueue(q_name, connection=redis_conn)102        start_redis_server_cmd = shlex.split("redis-server")103        self.server = Popen(start_redis_server_cmd)104        self.workers = []105        for _ in range(n):106            self.workers.append(RedisWorker(self.queue, on_kill=on_kill))107            self.workers[-1].run()108    def kill_workers_on_completion(self):109        while True:110            time.sleep(0.2)111            if len(self.queue.job_ids) == 0:112                break113        self.kill_workers()114    def kill_workers(self):115        self.server.kill()116        for worker in self.workers:117            worker.kill()118class ProcessWorker(multiprocessing.Process):119    def __init__(self, q, func, timeout=1, on_kill=None, on_start=None):120        multiprocessing.Process.__init__(self)121        log.info("New Worker Process: {}".format(self.name))122        self._queue = q123        self._function = func124        self._timeout = timeout125        self._kill = False126        self._on_kill = on_kill127        self._on_start = on_start128        if self._on_start:129            self._on_start()130    def run(self):131        while not self._kill:132            try:133                log.debug("{} - Pulling from Queue".format(self.name))134                work = self._queue.get(timeout=self._timeout)135            except queue.Empty:136                log.debug("{} - Timed Out".format(self.name))137                continue138            if work:139                log.debug("{} - Running {}".format(self.name, self._function))140                log.debug("{} - Input: {}".format(self.name, work))141                self._function(work)142        if self._on_kill:143            self._on_kill()144        # print('Worker {} Dying'.format(os.getpid()))145    def kill(self):146        self._kill = True147class ProcessManager:148    def __init__(149        self, func, n=1, on_kill=None, on_start=None, timeout=1, max_queue_size=-1150    ):151        self.queue = mQueue(max_queue_size)152        self._timeout = timeout153        self.workers = []154        for _ in range(n):155            self.workers.append(156                ProcessWorker(157                    self.queue,158                    func,159                    on_kill=on_kill,160                    on_start=on_start,161                    timeout=timeout,162                )163            )164            self.workers[-1].start()165    def _build_workers(self, func, n=1, on_kill=None, timeout=1, max_queue_size=-1):166        self.kill_workers(self.workers)167        self.workers = []168        self.queue = mQueue(max_queue_size)169        for _ in range(n):170            self.workers.append(171                ProcessWorker(172                    self.queue,173                    func,174                    on_kill=on_kill,175                    on_start=on_start,176                    timeout=timeout,177                )178            )179            self.workers[-1].start()180    def kill_workers(self):181        for worker in self.workers:182            worker.kill()183            worker.join()184    def kill_workers_on_completion(self):185        while True:186            time.sleep(0.2)187            if self.queue.empty():188                break189        self.kill_workers()190def WorkerManager(191    func=None,192    n=1,193    on_kill=None,194    on_start=None,195    timeout=1,196    max_queue_size=-1,197    parallelization="thread",198    q_name="default-redis-queue",199):200    if parallelization == "thread":201        assert callable(func)202        return ThreadManager(203            func,...manage.py
Source:manage.py  
...17    logging.info(f"{sender} â {message}")18def get_pid_file():19    pid_file = path.join(utils.project_dir(), WORKER_PIDS)20    return pid_file21def kill_workers():22    pid_file = get_pid_file()23    if not path.exists(pid_file):24        print("Looks like there's no workers to stop. Please verify it manually")25        return26    count = 027    with open(pid_file) as fp:28        for pid in fp:29            try:30                pid = int(pid)31            except ValueError:32                logging.debug(f"kill_workers could not parse pid `{pid}`")33                continue34            _info(f"Stopping worker process: {pid}", "kill_workers")35            outputs = getoutput(f"ps -p {pid}")36            if WORKER in outputs:37                os.kill(pid, signal.SIGTERM)38                count += 139            else:40                logging.debug(41                    f"Looks like process {pid} isn't a worker"42                    f"`ps -p` result: `{outputs}`"43                )44    os.remove(pid_file)45    _info(f"Stopped {count} workers", "kill_workers")46def erase_database():47    database = DatabaseAdapter()48    database.drop_urls_table()49    _info("Database cleaned", "erase_database")50def load_urls_to_database(urls_file: str):51    _info(f"Loading `{urls_file}` into database", "load_urls_to_database")52    count = 053    errors = 054    database = DatabaseAdapter()55    if not path.isfile(urls_file):56        _info(f"Given path: `{urls_file}` is not a file", "load_urls_to_database")57        return58    with open(urls_file) as fp:59        database.create_urls_table()60        with database.connect():61            with database.commit():62                for url in fp:63                    url = url.strip()64                    database.naked_insert_url(url)65                    count += 166    _info(f"Loaded {count} urls into database with {errors} errors", "load_urls_to_database")67def start_workers(count: int, debug: bool):68    _info(f"Starting {count} workers", "start_workers")69    worker_cmd = [path.join(utils.project_dir(), WORKER)]70    if debug:71        worker_cmd.append("-d")72    with open(get_pid_file(), "at") as fp:73        for i in range(count):74            pid = Popen(worker_cmd).pid75            _info(f"Started worker with pid {pid}", "start_workers")76            fp.write(f"{pid}\n")77            # SQLite doesn't allow multi process safe connection78            # This delay is for letting workers at least start safely79            time.sleep(START_DELAY)80            # In general this is a weak decision. That shouldn't be done this way in81            # the Production82            # On the other hand, in the production we would use better DBMS which83            # Could handle row-level locks84def start_threaded_workers(count: int = 1):85    threads = [ThreadedWorker(i) for i in range(count)]86    for t in threads:87        time.sleep(0.3)88        t.start()89    _info(f"Starting {count} threaded workers", "start_threaded_workers")90    for t in threads:91        _info(f"{t.name} â finished it's work", "start_threaded_workers")92        t.join()93    _info(f"All {count} workers are done", "start_threaded_workers")94if __name__ == '__main__':95    parser = argparse.ArgumentParser(96        description="Workers manager.",97        epilog="""98        The above flags could be used simultaneously.99        However, actions would be executed in the certain order: 100        0 â `stop`101        1 â `erase`102        2 â `load`103        3 â `workers` & `debug`104        """)105    parser.add_argument("-s", "--stop", help="Stop all running workers", action="store_true")106    parser.add_argument("-e", "--erase", help="Erase database.", action="store_true")107    parser.add_argument("-l", "--load", help="Path to a file with urls to load into database",108                        type=str)109    parser.add_argument("-w", "--workers", help="Start given number of workers", type=int)110    parser.add_argument("-t", "--threads", help="Start given number of threaded workers",111                        type=int)112    parser.add_argument("-d", "--debug", help="Enable debug logging in workers",113                        action="store_true")114    args = parser.parse_args()115    utils.configure_logger(args.debug)116    if args.workers is not None and args.threads is not None:117        print("You couldn't start both threaded and process based workers simultaneously")118        exit(1)119    if args.stop:120        kill_workers()121    if args.erase:122        erase_database()123    if args.load:124        load_urls_to_database(args.load)125    if args.threads:126        if args.threads < 1:127            print("Please let us start at least 1 thread")128            exit(2)129        if args.threads > 10:130            print(f"We let you start {args.threads} but please reconsider it")131        start_threaded_workers(args.threads)132    if args.workers:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
