How to use kill_workers method in Slash

Best Python code snippet using slash

reusable_executor.py

Source:reusable_executor.py Github

copy

Full Screen

1###############################################################################2# Reusable ProcessPoolExecutor3#4# author: Thomas Moreau and Olivier Grisel5#6import time7import warnings8import threading9import multiprocessing as mp10from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS11from .backend.context import cpu_count12from .backend import get_context13__all__ = ['get_reusable_executor']14# Python 2 compat helper15STRING_TYPE = type("")16# Singleton executor and id management17_executor_lock = threading.RLock()18_next_executor_id = 019_executor = None20_executor_kwargs = None21def _get_next_executor_id():22 """Ensure that each successive executor instance has a unique, monotonic id.23 The purpose of this monotonic id is to help debug and test automated24 instance creation.25 """26 global _next_executor_id27 with _executor_lock:28 executor_id = _next_executor_id29 _next_executor_id += 130 return executor_id31def get_reusable_executor(max_workers=None, context=None, timeout=10,32 kill_workers=False, reuse="auto",33 job_reducers=None, result_reducers=None,34 initializer=None, initargs=(), env=None):35 """Return the current ReusableExectutor instance.36 Start a new instance if it has not been started already or if the previous37 instance was left in a broken state.38 If the previous instance does not have the requested number of workers, the39 executor is dynamically resized to adjust the number of workers prior to40 returning.41 Reusing a singleton instance spares the overhead of starting new worker42 processes and importing common python packages each time.43 ``max_workers`` controls the maximum number of tasks that can be running in44 parallel in worker processes. By default this is set to the number of45 CPUs on the host.46 Setting ``timeout`` (in seconds) makes idle workers automatically shutdown47 so as to release system resources. New workers are respawn upon submission48 of new tasks so that ``max_workers`` are available to accept the newly49 submitted tasks. Setting ``timeout`` to around 100 times the time required50 to spawn new processes and import packages in them (on the order of 100ms)51 ensures that the overhead of spawning workers is negligible.52 Setting ``kill_workers=True`` makes it possible to forcibly interrupt53 previously spawned jobs to get a new instance of the reusable executor54 with new constructor argument values.55 The ``job_reducers`` and ``result_reducers`` are used to customize the56 pickling of tasks and results send to the executor.57 When provided, the ``initializer`` is run first in newly spawned58 processes with argument ``initargs``.59 The environment variable in the child process are a copy of the values in60 the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and61 ``VAR`` are string literals to overwrite the environment variable ``ENV``62 in the child processes to value ``VAL``. The environment variables are set63 in the children before any module is loaded. This only works with with the64 ``loky`` context and it is unreliable on Windows with Python < 3.6.65 """66 _executor, _ = _ReusablePoolExecutor.get_reusable_executor(67 max_workers=max_workers, context=context, timeout=timeout,68 kill_workers=kill_workers, reuse=reuse, job_reducers=job_reducers,69 result_reducers=result_reducers, initializer=initializer,70 initargs=initargs, env=env71 )72 return _executor73class _ReusablePoolExecutor(ProcessPoolExecutor):74 def __init__(self, submit_resize_lock, max_workers=None, context=None,75 timeout=None, executor_id=0, job_reducers=None,76 result_reducers=None, initializer=None, initargs=(),77 env=None):78 super(_ReusablePoolExecutor, self).__init__(79 max_workers=max_workers, context=context, timeout=timeout,80 job_reducers=job_reducers, result_reducers=result_reducers,81 initializer=initializer, initargs=initargs, env=env)82 self.executor_id = executor_id83 self._submit_resize_lock = submit_resize_lock84 @classmethod85 def get_reusable_executor(cls, max_workers=None, context=None, timeout=10,86 kill_workers=False, reuse="auto",87 job_reducers=None, result_reducers=None,88 initializer=None, initargs=(), env=None):89 with _executor_lock:90 global _executor, _executor_kwargs91 executor = _executor92 if max_workers is None:93 if reuse is True and executor is not None:94 max_workers = executor._max_workers95 else:96 max_workers = cpu_count()97 elif max_workers <= 0:98 raise ValueError(99 "max_workers must be greater than 0, got {}."100 .format(max_workers))101 if isinstance(context, STRING_TYPE):102 context = get_context(context)103 if context is not None and context.get_start_method() == "fork":104 raise ValueError(105 "Cannot use reusable executor with the 'fork' context"106 )107 kwargs = dict(context=context, timeout=timeout,108 job_reducers=job_reducers,109 result_reducers=result_reducers,110 initializer=initializer, initargs=initargs,111 env=env)112 if executor is None:113 is_reused = False114 mp.util.debug("Create a executor with max_workers={}."115 .format(max_workers))116 executor_id = _get_next_executor_id()117 _executor_kwargs = kwargs118 _executor = executor = cls(119 _executor_lock, max_workers=max_workers,120 executor_id=executor_id, **kwargs)121 else:122 if reuse == 'auto':123 reuse = kwargs == _executor_kwargs124 if (executor._flags.broken or executor._flags.shutdown125 or not reuse):126 if executor._flags.broken:127 reason = "broken"128 elif executor._flags.shutdown:129 reason = "shutdown"130 else:131 reason = "arguments have changed"132 mp.util.debug(133 "Creating a new executor with max_workers={} as the "134 "previous instance cannot be reused ({})."135 .format(max_workers, reason))136 executor.shutdown(wait=True, kill_workers=kill_workers)137 _executor = executor = _executor_kwargs = None138 # Recursive call to build a new instance139 return cls.get_reusable_executor(max_workers=max_workers,140 **kwargs)141 else:142 mp.util.debug(143 "Reusing existing executor with max_workers={}."144 .format(executor._max_workers)145 )146 is_reused = True147 executor._resize(max_workers)148 return executor, is_reused149 def submit(self, fn, *args, **kwargs):150 with self._submit_resize_lock:151 return super(_ReusablePoolExecutor, self).submit(152 fn, *args, **kwargs)153 def _resize(self, max_workers):154 with self._submit_resize_lock:155 if max_workers is None:156 raise ValueError("Trying to resize with max_workers=None")157 elif max_workers == self._max_workers:158 return159 if self._executor_manager_thread is None:160 # If the executor_manager_thread has not been started161 # then no processes have been spawned and we can just162 # update _max_workers and return163 self._max_workers = max_workers164 return165 self._wait_job_completion()166 # Some process might have returned due to timeout so check how many167 # children are still alive. Use the _process_management_lock to168 # ensure that no process are spawned or timeout during the resize.169 with self._processes_management_lock:170 processes = list(self._processes.values())171 nb_children_alive = sum(p.is_alive() for p in processes)172 self._max_workers = max_workers173 for _ in range(max_workers, nb_children_alive):174 self._call_queue.put(None)175 while (len(self._processes) > max_workers176 and not self._flags.broken):177 time.sleep(1e-3)178 self._adjust_process_count()179 processes = list(self._processes.values())180 while not all([p.is_alive() for p in processes]):181 time.sleep(1e-3)182 def _wait_job_completion(self):183 """Wait for the cache to be empty before resizing the pool."""184 # Issue a warning to the user about the bad effect of this usage.185 if len(self._pending_work_items) > 0:186 warnings.warn("Trying to resize an executor with running jobs: "187 "waiting for jobs completion before resizing.",188 UserWarning)189 mp.util.debug("Executor {} waiting for jobs completion before"190 " resizing".format(self.executor_id))191 # Wait for the completion of the jobs192 while len(self._pending_work_items) > 0:193 time.sleep(1e-3)194 def _setup_queues(self, job_reducers, result_reducers):195 # As this executor can be resized, use a large queue size to avoid196 # underestimating capacity and introducing overhead197 queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS198 super(_ReusablePoolExecutor, self)._setup_queues(...

Full Screen

Full Screen

work_handler.py

Source:work_handler.py Github

copy

Full Screen

...59 )60 )61 self.workers[-1].start()62 def _build_workers(self, func, n=1, on_kill=None, timeout=1, max_queue_size=-1):63 self.kill_workers(self.workers)64 self.workers = []65 self.queue = Queue(max_queue_size)66 for _ in range(n):67 self.workers.append(68 ThreadWorker(69 self.queue,70 func,71 on_kill=on_kill,72 on_start=on_start,73 timeout=timeout,74 )75 )76 self.workers[-1].start()77 def kill_workers(self):78 for worker in self.workers:79 worker.kill()80 worker.join()81 def kill_workers_on_completion(self):82 while True:83 time.sleep(0.2)84 if self.queue.empty():85 break86 self.kill_workers()87class RedisWorker:88 def __init__(self, q, on_kill=None):89 self.queue = q90 self.on_kill = on_kill91 def run(self):92 start_worker_cmd = shlex.split("rq worker " + self.queue.name)93 self.process = Popen(start_worker_cmd)94 def kill(self):95 self.process.kill()96 if self.on_kill:97 self.on_kill()98class RedisManager:99 def __init__(self, n=1, on_kill=None, q_name="default-redis-queue"):100 redis_conn = Redis()101 self.queue = rQueue(q_name, connection=redis_conn)102 start_redis_server_cmd = shlex.split("redis-server")103 self.server = Popen(start_redis_server_cmd)104 self.workers = []105 for _ in range(n):106 self.workers.append(RedisWorker(self.queue, on_kill=on_kill))107 self.workers[-1].run()108 def kill_workers_on_completion(self):109 while True:110 time.sleep(0.2)111 if len(self.queue.job_ids) == 0:112 break113 self.kill_workers()114 def kill_workers(self):115 self.server.kill()116 for worker in self.workers:117 worker.kill()118class ProcessWorker(multiprocessing.Process):119 def __init__(self, q, func, timeout=1, on_kill=None, on_start=None):120 multiprocessing.Process.__init__(self)121 log.info("New Worker Process: {}".format(self.name))122 self._queue = q123 self._function = func124 self._timeout = timeout125 self._kill = False126 self._on_kill = on_kill127 self._on_start = on_start128 if self._on_start:129 self._on_start()130 def run(self):131 while not self._kill:132 try:133 log.debug("{} - Pulling from Queue".format(self.name))134 work = self._queue.get(timeout=self._timeout)135 except queue.Empty:136 log.debug("{} - Timed Out".format(self.name))137 continue138 if work:139 log.debug("{} - Running {}".format(self.name, self._function))140 log.debug("{} - Input: {}".format(self.name, work))141 self._function(work)142 if self._on_kill:143 self._on_kill()144 # print('Worker {} Dying'.format(os.getpid()))145 def kill(self):146 self._kill = True147class ProcessManager:148 def __init__(149 self, func, n=1, on_kill=None, on_start=None, timeout=1, max_queue_size=-1150 ):151 self.queue = mQueue(max_queue_size)152 self._timeout = timeout153 self.workers = []154 for _ in range(n):155 self.workers.append(156 ProcessWorker(157 self.queue,158 func,159 on_kill=on_kill,160 on_start=on_start,161 timeout=timeout,162 )163 )164 self.workers[-1].start()165 def _build_workers(self, func, n=1, on_kill=None, timeout=1, max_queue_size=-1):166 self.kill_workers(self.workers)167 self.workers = []168 self.queue = mQueue(max_queue_size)169 for _ in range(n):170 self.workers.append(171 ProcessWorker(172 self.queue,173 func,174 on_kill=on_kill,175 on_start=on_start,176 timeout=timeout,177 )178 )179 self.workers[-1].start()180 def kill_workers(self):181 for worker in self.workers:182 worker.kill()183 worker.join()184 def kill_workers_on_completion(self):185 while True:186 time.sleep(0.2)187 if self.queue.empty():188 break189 self.kill_workers()190def WorkerManager(191 func=None,192 n=1,193 on_kill=None,194 on_start=None,195 timeout=1,196 max_queue_size=-1,197 parallelization="thread",198 q_name="default-redis-queue",199):200 if parallelization == "thread":201 assert callable(func)202 return ThreadManager(203 func,...

Full Screen

Full Screen

manage.py

Source:manage.py Github

copy

Full Screen

...17 logging.info(f"{sender} — {message}")18def get_pid_file():19 pid_file = path.join(utils.project_dir(), WORKER_PIDS)20 return pid_file21def kill_workers():22 pid_file = get_pid_file()23 if not path.exists(pid_file):24 print("Looks like there's no workers to stop. Please verify it manually")25 return26 count = 027 with open(pid_file) as fp:28 for pid in fp:29 try:30 pid = int(pid)31 except ValueError:32 logging.debug(f"kill_workers could not parse pid `{pid}`")33 continue34 _info(f"Stopping worker process: {pid}", "kill_workers")35 outputs = getoutput(f"ps -p {pid}")36 if WORKER in outputs:37 os.kill(pid, signal.SIGTERM)38 count += 139 else:40 logging.debug(41 f"Looks like process {pid} isn't a worker"42 f"`ps -p` result: `{outputs}`"43 )44 os.remove(pid_file)45 _info(f"Stopped {count} workers", "kill_workers")46def erase_database():47 database = DatabaseAdapter()48 database.drop_urls_table()49 _info("Database cleaned", "erase_database")50def load_urls_to_database(urls_file: str):51 _info(f"Loading `{urls_file}` into database", "load_urls_to_database")52 count = 053 errors = 054 database = DatabaseAdapter()55 if not path.isfile(urls_file):56 _info(f"Given path: `{urls_file}` is not a file", "load_urls_to_database")57 return58 with open(urls_file) as fp:59 database.create_urls_table()60 with database.connect():61 with database.commit():62 for url in fp:63 url = url.strip()64 database.naked_insert_url(url)65 count += 166 _info(f"Loaded {count} urls into database with {errors} errors", "load_urls_to_database")67def start_workers(count: int, debug: bool):68 _info(f"Starting {count} workers", "start_workers")69 worker_cmd = [path.join(utils.project_dir(), WORKER)]70 if debug:71 worker_cmd.append("-d")72 with open(get_pid_file(), "at") as fp:73 for i in range(count):74 pid = Popen(worker_cmd).pid75 _info(f"Started worker with pid {pid}", "start_workers")76 fp.write(f"{pid}\n")77 # SQLite doesn't allow multi process safe connection78 # This delay is for letting workers at least start safely79 time.sleep(START_DELAY)80 # In general this is a weak decision. That shouldn't be done this way in81 # the Production82 # On the other hand, in the production we would use better DBMS which83 # Could handle row-level locks84def start_threaded_workers(count: int = 1):85 threads = [ThreadedWorker(i) for i in range(count)]86 for t in threads:87 time.sleep(0.3)88 t.start()89 _info(f"Starting {count} threaded workers", "start_threaded_workers")90 for t in threads:91 _info(f"{t.name} — finished it's work", "start_threaded_workers")92 t.join()93 _info(f"All {count} workers are done", "start_threaded_workers")94if __name__ == '__main__':95 parser = argparse.ArgumentParser(96 description="Workers manager.",97 epilog="""98 The above flags could be used simultaneously.99 However, actions would be executed in the certain order: 100 0 — `stop`101 1 — `erase`102 2 — `load`103 3 — `workers` & `debug`104 """)105 parser.add_argument("-s", "--stop", help="Stop all running workers", action="store_true")106 parser.add_argument("-e", "--erase", help="Erase database.", action="store_true")107 parser.add_argument("-l", "--load", help="Path to a file with urls to load into database",108 type=str)109 parser.add_argument("-w", "--workers", help="Start given number of workers", type=int)110 parser.add_argument("-t", "--threads", help="Start given number of threaded workers",111 type=int)112 parser.add_argument("-d", "--debug", help="Enable debug logging in workers",113 action="store_true")114 args = parser.parse_args()115 utils.configure_logger(args.debug)116 if args.workers is not None and args.threads is not None:117 print("You couldn't start both threaded and process based workers simultaneously")118 exit(1)119 if args.stop:120 kill_workers()121 if args.erase:122 erase_database()123 if args.load:124 load_urls_to_database(args.load)125 if args.threads:126 if args.threads < 1:127 print("Please let us start at least 1 thread")128 exit(2)129 if args.threads > 10:130 print(f"We let you start {args.threads} but please reconsider it")131 start_threaded_workers(args.threads)132 if args.workers:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful