Best Python code snippet using playwright-python
test_data.py
Source:test_data.py  
...125    def run(self):126        "Main master loop."127        self.start()128        util._setproctitle("master [%s]" % self.proc_name)129        self.manage_workers()130        while True:131            try:132                self.reap_workers()133                sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None134                if sig is None:135                    self.sleep()136                    self.murder_workers()137                    self.manage_workers()138                    continue139                if sig not in self.SIG_NAMES:140                    self.log.info("Ignoring unknown signal: %s", sig)141                    continue142                signame = self.SIG_NAMES.get(sig)143                handler = getattr(self, "handle_%s" % signame, None)144                if not handler:145                    self.log.error("Unhandled signal: %s", signame)146                    continue147                self.log.info("Handling signal: %s", signame)148                handler()149                self.wakeup()150            except StopIteration:151                self.halt()152            except KeyboardInterrupt:153                self.halt()154            except HaltServer, inst:155                self.halt(reason=inst.reason, exit_status=inst.exit_status)156            except SystemExit:157                raise158            except Exception:159                self.log.info("Unhandled exception in main loop:\n%s",160                            traceback.format_exc())161                self.stop(False)162                if self.pidfile is not None:163                    self.pidfile.unlink()164                sys.exit(-1)165    def handle_chld(self, sig, frame):166        "SIGCHLD handling"167        self.wakeup()168    def handle_hup(self):169        """\170        HUP handling.171        - Reload configuration172        - Start the new worker processes with a new configuration173        - Gracefully shutdown the old worker processes174        """175        self.log.info("Hang up: %s", self.master_name)176        self.reload()177    def handle_quit(self):178        "SIGQUIT handling"179        raise StopIteration180    def handle_int(self):181        "SIGINT handling"182        self.stop(False)183        raise StopIteration184    def handle_term(self):185        "SIGTERM handling"186        self.stop(False)187        raise StopIteration188    def handle_ttin(self):189        """\190        SIGTTIN handling.191        Increases the number of workers by one.192        """193        self.num_workers += 1194        self.manage_workers()195    def handle_ttou(self):196        """\197        SIGTTOU handling.198        Decreases the number of workers by one.199        """200        if self.num_workers <= 1:201            return202        self.num_workers -= 1203        self.manage_workers()204    def handle_usr1(self):205        """\206        SIGUSR1 handling.207        Kill all workers by sending them a SIGUSR1208        """209        self.kill_workers(signal.SIGUSR1)210        self.log.reopen_files()211    def handle_usr2(self):212        """\213        SIGUSR2 handling.214        Creates a new master/worker set as a slave of the current215        master without affecting old workers. Use this to do live216        deployment with the ability to backout a change.217        """218        self.reexec()219    def handle_winch(self):220        "SIGWINCH handling"221        if os.getppid() == 1 or os.getpgrp() != os.getpid():222            self.log.info("graceful stop of workers")223            self.num_workers = 0224            self.kill_workers(signal.SIGQUIT)225        else:226            self.log.info("SIGWINCH ignored. Not daemonized")227    def wakeup(self):228        """\229        Wake up the arbiter by writing to the PIPE230        """231        try:232            os.write(self.PIPE[1], '.')233        except IOError, e:234            if e.errno not in [errno.EAGAIN, errno.EINTR]:235                raise236    def halt(self, reason=None, exit_status=0):237        """ halt arbiter """238        self.stop()239        self.log.info("Shutting down: %s", self.master_name)240        if reason is not None:241            self.log.info("Reason: %s", reason)242        if self.pidfile is not None:243            self.pidfile.unlink()244        sys.exit(exit_status)245    def sleep(self):246        """\247        Sleep until PIPE is readable or we timeout.248        A readable PIPE means a signal occurred.249        """250        try:251            ready = select.select([self.PIPE[0]], [], [], 1.0)252            if not ready[0]:253                return254            while os.read(self.PIPE[0], 1):255                pass256        except select.error, e:257            if e[0] not in [errno.EAGAIN, errno.EINTR]:258                raise259        except OSError, e:260            if e.errno not in [errno.EAGAIN, errno.EINTR]:261                raise262        except KeyboardInterrupt:263            sys.exit()264    def stop(self, graceful=True):265        """\266        Stop workers267        :attr graceful: boolean, If True (the default) workers will be268        killed gracefully  (ie. trying to wait for the current connection)269        """270        try:271            self.LISTENER.close()272        except Exception:273            pass274        self.LISTENER = None275        sig = signal.SIGQUIT276        if not graceful:277            sig = signal.SIGTERM278        limit = time.time() + self.cfg.graceful_timeout279        while self.WORKERS and time.time() < limit:280            self.kill_workers(sig)281            time.sleep(0.1)282            self.reap_workers()283        self.kill_workers(signal.SIGKILL)284    def reexec(self):285        """\286        Relaunch the master and workers.287        """288        if self.pidfile is not None:289            self.pidfile.rename("%s.oldbin" % self.pidfile.fname)290        self.reexec_pid = os.fork()291        if self.reexec_pid != 0:292            self.master_name = "Old Master"293            return294        os.environ['GUNICORN_FD'] = str(self.LISTENER.fileno())295        os.chdir(self.START_CTX['cwd'])296        self.cfg.pre_exec(self)297        util.closerange(3, self.LISTENER.fileno())298        util.closerange(self.LISTENER.fileno()+1, util.get_maxfd())299        os.execvpe(self.START_CTX[0], self.START_CTX['args'], os.environ)300    def reload(self):301        old_address = self.cfg.address302        # reload conf303        self.app.reload()304        self.setup(self.app)305        # reopen log files306        self.log.reopen_files()307        # do we need to change listener ?308        if old_address != self.cfg.address:309            self.LISTENER.close()310            self.LISTENER = create_socket(self.cfg, self.log)311            self.log.info("Listening at: %s", self.LISTENER)312        # do some actions on reload313        self.cfg.on_reload(self)314        # unlink pidfile315        if self.pidfile is not None:316            self.pidfile.unlink()317        # create new pidfile318        if self.cfg.pidfile is not None:319            self.pidfile = Pidfile(self.cfg.pidfile)320            self.pidfile.create(self.pid)321        # set new proc_name322        util._setproctitle("master [%s]" % self.proc_name)323        # spawn new workers324        for i in range(self.cfg.workers):325            self.spawn_worker()326        # manage workers327        self.manage_workers()328    def murder_workers(self):329        """\330        Kill unused/idle workers331        """332        for (pid, worker) in self.WORKERS.items():333            try:334                if time.time() - worker.tmp.last_update() <= self.timeout:335                    continue336            except ValueError:337                continue338            self.log.critical("WORKER TIMEOUT (pid:%s)", pid)339            self.kill_worker(pid, signal.SIGKILL)340    def reap_workers(self):341        """\342        Reap workers to avoid zombie processes343        """344        try:345            while True:346                wpid, status = os.waitpid(-1, os.WNOHANG)347                if not wpid:348                    break349                if self.reexec_pid == wpid:350                    self.reexec_pid = 0351                else:352                    # A worker said it cannot boot. We'll shutdown353                    # to avoid infinite start/stop cycles.354                    exitcode = status >> 8355                    if exitcode == self.WORKER_BOOT_ERROR:356                        reason = "Worker failed to boot."357                        raise HaltServer(reason, self.WORKER_BOOT_ERROR)358                    worker = self.WORKERS.pop(wpid, None)359                    if not worker:360                        continue361                    worker.tmp.close()362        except OSError, e:363            if e.errno == errno.ECHILD:364                pass365    def manage_workers(self):366        """\367        Maintain the number of workers by spawning or killing368        as required.369        """370        if len(self.WORKERS.keys()) < self.num_workers:371            self.spawn_workers()372        workers = self.WORKERS.items()373        workers.sort(key=lambda w: w[1].age)374        while len(workers) > self.num_workers:375            (pid, _) = workers.pop(0)376            self.kill_worker(pid, signal.SIGQUIT)377    def spawn_worker(self):378        self.worker_age += 1379        worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,380                                    self.app, self.timeout/2.0,381                                    self.cfg, self.log)382        self.cfg.pre_fork(self, worker)383        pid = os.fork()384        if pid != 0:385            self.WORKERS[pid] = worker386            return pid387        # Process Child388        worker_pid = os.getpid()389        try:390            util._setproctitle("worker [%s]" % self.proc_name)391            self.log.info("Booting worker with pid: %s", worker_pid)392            self.cfg.post_fork(self, worker)393            worker.init_process()394            sys.exit(0)395        except SystemExit:396            raise397        except:398            self.log.debug("Exception in worker process:\n%s",399                    traceback.format_exc())400            if not worker.booted:401                sys.exit(self.WORKER_BOOT_ERROR)402            sys.exit(-1)403        finally:404            self.log.info("Worker exiting (pid: %s)", worker_pid)405            try:406                worker.tmp.close()407                self.cfg.worker_exit(self, worker)408            except:409                pass410    def spawn_workers(self):411        """\412        Spawn new workers as needed.413        This is where a worker process leaves the main loop414        of the master process.415        """416        for i in range(self.num_workers - len(self.WORKERS.keys())):417            self.spawn_worker()418    def kill_workers(self, sig):419        """\420        Kill all workers with the signal `sig`421        :attr sig: `signal.SIG*` value422        """423        for pid in self.WORKERS.keys():424            self.kill_worker(pid, sig)425    def kill_worker(self, pid, sig):426        """\427        Kill a worker428        :attr pid: int, worker pid429        :attr sig: `signal.SIG*` value430         """431        try:432            os.kill(pid, sig)...test_workers.py
Source:test_workers.py  
1import unittest2from mock import patch, Mock, DEFAULT3import zmq4import json5import config.test_config as config6import common.broker.workers as workers7import time8from collections import OrderedDict9from common.broker.workers import WorkerShutdown10from ModelingMachine.engine.vertex_factory import VertexCache, ModelCache11from config.engine import EngConfig12class ZmqFakeSocket(object):13    def __init__(self, socktype):14        self.opts = {}15        self.messages = []16        self.recv_message = ['', workers.Protocol.REQUEST, 'client_address', '', 'service_name', '"request"']17    def bind(self, addr):18        pass19    def connect(self, host):20        pass21    def close(self):22        pass23    def setsockopt(self, opt, *args):24        self.opts[opt] = args25    def send_multipart(self, message):26        self.messages.append(str(message))27    def recv_multipart(self):28        return self.recv_message29class WorkersTestCase(unittest.TestCase):30    @classmethod31    def setUpClass(self):32        self.workers_context_patch = patch('common.broker.workers.zmq.Context')33        self.workers_context_mock = self.workers_context_patch.start()34        self.workers_context_mock.return_value.socket = ZmqFakeSocket35        self.workers_poller_patch = patch('common.broker.workers.zmq.Poller')36        self.workers_poller_mock = self.workers_poller_patch.start()37        self.workers_get_id_patch = patch('common.broker.workers.Workers.get_id')38        self.workers_get_id_mock = self.workers_get_id_patch.start()39        self.workers_get_id_mock.return_value = "1"40        self.workers_hb_patch = patch('common.broker.workers.Workers.send_heartbeats')41        self.workers_hb_mock = self.workers_hb_patch.start()42        self.workers_cp_patch = patch('common.broker.workers.Workers.check_pipes')43        self.workers_cp_mock = self.workers_cp_patch.start()44        self.workers_add_services_patch = patch('common.broker.workers.Workers.add_services')45        self.workers_add_services_mock = self.workers_add_services_patch.start()46        self.workers_register_patch = patch('common.broker.workers.Workers.register')47        self.workers_register_mock = self.workers_register_patch.start()48        self.workers_wp_patch = patch('common.broker.workers.WorkerProcess')49        self.workers_wp_mock = self.workers_wp_patch.start()50        self.workers_wp_mock.side_effect = Exception('WorkerProcess disabled')51    @classmethod52    def tearDownClass(self):53        self.workers_context_patch.stop()54        self.workers_poller_patch.stop()55        self.workers_get_id_patch.stop()56        self.workers_hb_patch.stop()57        self.workers_cp_patch.stop()58        self.workers_add_services_patch.stop()59        self.workers_register_patch.stop()60        self.workers_wp_patch.stop()61    def setUp(self):62        self.workers = workers.Workers(broker = None)63        self.workers.services = []64        self.workers.reconnect_to_broker()65    def test_send_services(self):66        self.workers.send_services()67        #message body "[]" is the empty json list of services68        self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.STATUS, "1", "[]"]))69    def test_process_msg(self):70        self.assertIsNone(self.workers.process_msg(workers.Protocol.HEARTBEAT, [""]))71        self.assertRaises(WorkerShutdown, self.workers.process_msg, workers.Protocol.SHUTDOWN, [""])72        self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.SHUTDOWN, "1"]))73        self.assertIsNone(self.workers.process_msg(workers.Protocol.STATUS, [""]))74        #message body "[]" is the empty json list of services75        self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.STATUS, "1", "[]"]))76        #this creates a new worker_socket77        self.assertIsNone(self.workers.process_msg(workers.Protocol.DISCONNECT, [""]))78        self.assertIsNone(self.workers.process_msg(workers.Protocol.INITIALIZE, [""]))79        self.assertEqual(self.workers.worker_socket.messages[-1], str(['', workers.Protocol.DISCONNECT, ""]))80        self.assertItemsEqual(self.workers.process_msg(workers.Protocol.REQUEST, ["client_address", "", "body"]), ["body"])81    @patch('common.broker.workers.FLIPPERS', autospec=True)82    def test_add_service(self, mock_flippers):83        mock_flippers.request_accounting = False84        self.workers.add_service("service_name")85        self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": None}])86    @patch('common.broker.workers.FLIPPERS', autospec=True)87    def test_add_service_with_flipper_on(self, mock_flippers):88        mock_flippers.request_accounting = True89        self.workers.add_service("service_name")90        self.assertItemsEqual(self.workers.services,91                [{"name": "service_name", "request": None, 'request_id': None}])92    @patch('common.broker.workers.FLIPPERS', autospec=True)93    def test_assign_request(self, mock_flippers):94        mock_flippers.request_accounting = False95        self.workers.add_service("service_name")96        self.assertTrue(self.workers.assign_request("service_name", {'qid': '1'}))97        self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])98        #the one service is already occupied by a request99        self.assertFalse(self.workers.assign_request("service_name", {'qid': '1'}))100        self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])101        #service 'manager' is always accepted102        self.assertTrue(self.workers.assign_request("manager", {'qid': '1'}))103        self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": {'qid': '1'}}])104    @patch('common.broker.workers.FLIPPERS', autospec=True)105    def test_clear_request(self, mock_flippers):106        mock_flippers.request_accounting = False107        self.workers.add_service("service_name")108        self.workers.assign_request("service_name", {'qid': '1'})109        self.workers.clear_request("service_name", {'qid': '1'})110        self.assertItemsEqual(self.workers.services, [{"name": "service_name", "request": None}])111        self.assertEqual(self.workers.worker_socket.messages[0], str(['', workers.Protocol.STATUS, "1", '["service_name"]']))112        self.assertTrue(self.workers.clear_request("manager", ""))113    @patch('common.broker.workers.FLIPPERS', autospec=True)114    def test_clear_request_with_flipper_on(self, mock_flippers):115        mock_flippers.request_accounting = True116        self.workers.add_service("service_name")117        self.workers.assign_request("service_name", {'qid': '1'})118        self.workers.clear_request("service_name", {'qid': '1'})119        self.assertItemsEqual(self.workers.services, [120            {"name": "service_name", "request": None, 'request_id': None}])121        self.assertEqual(self.workers.worker_socket.messages[0], str(['', workers.Protocol.STATUS, "1", '["service_name"]']))122        self.assertTrue(self.workers.clear_request("manager", ""))123    def test_cleanup_processes(self):124        with patch('common.broker.workers.sys', autospec=True) as mock_sys:125            wp = Mock()126            wp.is_alive.return_value = False127            wp.service = 'service1'128            wp.request = 'request1'129            wp2 = Mock()130            wp2.is_alive.return_value = True131            self.workers.worker_processes = [wp, wp2]132            with patch.object(self.workers, "clear_request") as mock_clear_request:133                self.workers.cleanup_processes()134                mock_clear_request.assert_called_once_with('service1', 'request1')135            self.assertItemsEqual(self.workers.worker_processes, [wp2])136    def test_poll_socket(self):137        self.workers.poller.poll.return_value = False138        self.assertIsNone(self.workers.poll_socket())139        self.workers.poller.poll.return_value = True140        self.assertEqual(self.workers.poll_socket(), self.workers.worker_socket.recv_message)141    def test_process_request(self):142        self.workers.poller.poll.return_value = True143        self.assertIsNone(self.workers.process_request())144    def test_process_manager_request_to_kill(self):145        service = 'manager'146        request =[service, '{"command": "kill"}']147        with patch.multiple(self.workers, wait_for_request = DEFAULT, kill_worker_by_request = DEFAULT, run_request=DEFAULT) as mocks:148            mocks['wait_for_request'].return_value = request149            result = self.workers.process_request()150            self.assertIsNone(result)151            mocks['kill_worker_by_request'].assert_called_once_with({'command': 'kill'})152    def test_process_manager_request_to_broadcast(self):153        service = 'manager'154        self.workers.add_service('service_name')155        self.workers.add_service('manager')156        request =[service, '{"command": "broadcast_command"}']157        with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:158            mocks['wait_for_request'].return_value = request159            result = self.workers.process_request()160            self.assertIsNone(result)161            mocks['add_worker'].assert_called_once_with({'command': 'broadcast_command'}, None)162    def test_process_predict_request_with_cache(self):163        service = 'fit_single'164        self.workers.add_service('service_name')165        self.workers.add_service('fit_single')166        self.workers.model_cache.has_model_cache = True167        req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}168        request =[service, json.dumps(req)]169        with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:170            mocks['wait_for_request'].return_value = request171            self.workers.model_cache.get_cached_model = Mock()172            self.workers.model_cache.get_cached_model.return_value = 'test_cache'173            result = self.workers.process_request()174            self.assertIsNone(result)175            mocks['add_worker'].assert_called_once_with(req, 'test_cache')176    def test_process_predict_request_without_cache(self):177        service = 'fit_single'178        self.workers.add_service('service_name')179        self.workers.add_service('fit_single')180        self.workers.model_cache.has_model_cache = False181        req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}182        request =[service, json.dumps(req)]183        with patch.multiple(self.workers, wait_for_request = DEFAULT, add_worker = DEFAULT) as mocks:184            mocks['wait_for_request'].return_value = request185            self.workers.model_cache.get_cached_model = Mock()186            self.workers.model_cache.get_cached_model.return_value = 'test_cache'187            result = self.workers.process_request()188            self.assertIsNone(result)189            mocks['add_worker'].assert_called_once_with(req, None)190    def test_get_cached_model(self):191        self.workers.model_cache.has_model_cache = 3192        req = {'command':'predict_whatever', 'pid':'1234', 'blueprint_id':'1234', 'dataset_id':'1234', 'samplepct':'50', 'partitions':[[-1,-1]]}193        #test get new model194        out = self.workers.model_cache.get_cached_model(req)195        self.assertIsInstance(out, VertexCache)196        self.assertEqual(OrderedDict(), self.workers.model_cache.cached_models)197        #test update198        self.workers.model_cache.update_cached_model(out,req)199        self.assertEqual(out, self.workers.model_cache.cached_models.values()[0])200        #test get existing model201        out2 = self.workers.model_cache.get_cached_model(req)202        self.assertEqual(out, out2)203    def test_shutdown(self):204        with patch.multiple(self.workers, try_run_once_at_shutdown=DEFAULT,205                            current_requests=DEFAULT) as mocks:206            self.workers.stop = True207            self.workers.stop_time = time.time()208            self.workers.worker_processes = [1]209            mocks['current_requests'].return_value = [{'pid': 'pid', 'uid': 'uid'}]210            self.assertFalse(self.workers.shutdown())211            self.workers.stop = False212            self.assertFalse(self.workers.shutdown())213if __name__ == '__main__':...bench_frameworks.py
Source:bench_frameworks.py  
...15    start = time.time()16    res = func(*args, **kwargs)17    elapsed = time.time() - start18    return res, elapsed19def spawn_workers(n_workers):20    from ipyparallel.apps.ipengineapp import launch_new_instance21    pids = []22    import os23    for _ in range(n_workers):24        pid = os.fork()25        if pid == 0:26            launch_new_instance()27        else:28            pids.append(pid)29    #launch_new_instance()30    return pids31def bench_fiber(tasks, workers, task_duration, warmup=True, pool=None):32    if warmup:33        if not pool:34            pool = fiber.Pool(workers)35        pool.map(sleep_worker, [task_duration for x in range(tasks)],36                 chunksize=1)37        logger.debug("warm up finished")38    res, elapsed = timeit(39        pool.map, sleep_worker, [task_duration for x in range(tasks)],40        chunksize=1,41    )42    return elapsed43def bench_fiber_seq(tasks, workers, task_duration, warmup=True, pool=None):44    def run(pool, duration):45        res = [None] * workers46        for i in range(tasks // workers):47            for j in range(workers):48                handle = pool.apply_async(sleep_worker, (duration,))49                res[j] = handle50            for j in range(workers):51                res[j].get()52    if warmup:53        if not pool:54            pool = mp.Pool(workers)55        pool.map(sleep_worker, [task_duration for x in range(tasks)],56                 chunksize=1)57        logger.debug("warm up finished")58    res, elapsed = timeit(run, pool, task_duration)59    return elapsed60def bench_mp(tasks, workers, task_duration, warmup=True):61    logger.debug("benchmarking multiprocessing")62    pool = None63    if warmup:64        logger.debug("warming up")65        pool = mp.Pool(workers)66        pool.map(sleep_worker, [task_duration for x in range(tasks)],67                 chunksize=1)68        logger.debug("warm up finished")69    res, elapsed = timeit(70        pool.map, sleep_worker, [task_duration for x in range(tasks)],71        chunksize=172    )73    return elapsed74def bench_mp_seq(tasks, workers, task_duration, warmup=True, pool=None):75    def run(pool, duration):76        res = [None] * workers77        for i in range(tasks // workers):78            for j in range(workers):79                handle = pool.apply_async(sleep_worker, (duration,))80                res[j] = handle81            for j in range(workers):82                res[j].get()83    if warmup:84        if not pool:85            pool = mp.Pool(workers)86        pool.map(sleep_worker, [task_duration for x in range(tasks)],87                 chunksize=1)88        logger.debug("warm up finished")89    res, elapsed = timeit(run, pool, task_duration)90    return elapsed91def pyspark_parallel(sc, tasks, task_duration):92    nums = sc.parallelize([task_duration for i in range(tasks)])93    nums.map(sleep_worker).collect()94def pyspark_parallel_seq(sc, tasks, task_duration, workers):95    for i in range(tasks // workers):96        nums = sc.parallelize([task_duration for i in range(workers)])97        nums.map(sleep_worker).collect()98def bench_spark(tasks, workers, task_duration, warmup=True, sc=None):99    if warmup:100        pyspark_parallel(sc, tasks, task_duration)101    res, elapsed = timeit(pyspark_parallel, sc, tasks, task_duration)102    return elapsed103def bench_spark_seq(tasks, workers, task_duration, warmup=True, sc=None):104    if warmup:105        pyspark_parallel(sc, tasks, task_duration)106    res, elapsed = timeit(pyspark_parallel_seq, sc, tasks,107                          task_duration, workers)108    return elapsed109def bench_ray(tasks, workers, task_duration, warmup=True):110    import ray111    @ray.remote112    def ray_sleep(duration):113        time.sleep(duration)114    if warmup:115        ray.get([ray_sleep.remote(task_duration) for x in range(tasks)])116    res, elapsed = timeit(117        ray.get, [ray_sleep.remote(task_duration) for x in range(tasks)]118    )119    return elapsed120def bench_ray_seq(tasks, workers, task_duration, warmup=True):121    import ray122    @ray.remote123    def ray_sleep(duration):124        time.sleep(duration)125    def ray_parallel_seq(tasks, workers):126        for i in range(tasks // workers):127            ray.get([ray_sleep.remote(task_duration) for x in range(workers)])128    if warmup:129        ray.get([ray_sleep.remote(task_duration) for x in range(tasks)])130    res, elapsed = timeit(131        ray_parallel_seq, tasks, workers132    )133    return elapsed134def bench_ipp_seq(tasks, workers, task_duration, warmup=True):135    from ipyparallel import Client136    rc = Client()137    #dview = rc[:]138    dview = rc.load_balanced_view()139    if warmup:140        dview.map_sync(sleep_worker, [task_duration for i in range(tasks)])141    dview.block = True142    def run(tasks):143        objs = [dview.apply_async(sleep_worker, task_duration) for i in range(tasks)]144        for task in objs:145            task.get()146    res, elapsed = timeit(147        run, tasks148    )149    return elapsed150def main():151    parser = argparse.ArgumentParser()152    parser.add_argument(153        'frameworks', nargs='+',154        choices=['mp', 'fiber', 'pyspark', 'ray', 'ipyparallel'],155        help='frameworks to benchmark'156    )157    parser.add_argument('-t', '--total-duration', type=int, default=1,158                        help='total running time')159    parser.add_argument('-d', '--task-duration', type=float, default=None,160                        choices=[0.001, 0.01, 0.1, 1],161                        help='task duration in ms')162    args = parser.parse_args()163    workers = 5164    max_duration = args.total_duration165    results = {}166    frameworks = args.frameworks167    for framework in frameworks:168        results[framework] = []169        results[framework + "_seq"] = []170    if "pyspark" in frameworks:171        from pyspark import SparkContext172        import pyspark173        sc = SparkContext()174        conf = pyspark.SparkConf().setAll([("spark.cores.max", 5)])175        sc.stop()176        sc = pyspark.SparkContext(conf=conf)177    if "ray" in frameworks:178        import ray179        ray.init()180    if "fiber" in frameworks:181        import fiber.pool182        fiber_pool = fiber.Pool(workers)183    if "ipyparallel" in frameworks:184        print("before popen")185        #ipp_controller = subprocess.Popen(["ipcontroller", "--ip", "*"])186        print("after popen")187        import atexit188        import signal189        import os190        #atexit.register(ipp_controller.kill)191        pids = spawn_workers(workers)192        for pid in pids:193            atexit.register(os.kill, pid, signal.SIGKILL)194        time.sleep(4)195    for i in range(4):196        factor = 10 ** i197        duration = 1 / factor198        if args.task_duration is not None:199            print(args.task_duration, duration, type(args.task_duration), type(duration))200            if args.task_duration != duration:201                continue202        tasks = int(max_duration * workers / duration)203        print(204            "Benchmarking {} workers with {} tasks each takes {} "205            "seconds".format(...workers_real_time_statistics.py
Source:workers_real_time_statistics.py  
...166        :rtype: dict167        """168        return self._properties['activity_statistics']169    @property170    def total_workers(self):171        """172        :returns: The total_workers173        :rtype: unicode174        """175        return self._properties['total_workers']176    @property177    def workspace_sid(self):178        """179        :returns: The workspace_sid180        :rtype: unicode181        """182        return self._properties['workspace_sid']183    @property184    def url(self):...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
