Best Python code snippet using locust
test_runners.py
Source:test_runners.py  
...355        runner.start(user_count=5, spawn_rate=5, wait=False)356        runner.spawning_greenlet.join()357        self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)358        runner.quit()359    def test_user_classes_count(self):360        class MyUser1(User):361            wait_time = constant(0)362            @task363            def my_task(self):364                pass365        class MyUser2(User):366            wait_time = constant(0)367            @task368            def my_task(self):369                pass370        environment = Environment(user_classes=[MyUser1, MyUser2])371        runner = LocalRunner(environment)372        runner.start(user_count=10, spawn_rate=5, wait=False)373        runner.spawning_greenlet.join()...runners.py
Source:runners.py  
...126        :returns: Number of currently running users127        """128        return len(self.user_greenlets)129    @property130    def user_classes_count(self) -> Dict[str, int]:131        """132        :returns: Number of currently running users for each user class133        """134        user_classes_count = {user_class.__name__: 0 for user_class in self.user_classes}135        for user_greenlet in self.user_greenlets:136            try:137                user = user_greenlet.args[0]138            except IndexError:139                # TODO: Find out why args is sometimes empty. In gevent code,140                #       the supplied args are cleared in the gevent.greenlet.Greenlet.__free,141                #       so it seems a good place to start investigating. My suspicion is that142                #       the supplied args are emptied whenever the greenlet is dead, so we can143                #       simply ignore the greenlets with empty args.144                logger.debug(145                    "ERROR: While calculating number of running users, we encountered a user that didnt have proper args %s (user_greenlet.dead=%s)",146                    user_greenlet,147                    user_greenlet.dead,148                )149                continue150            user_classes_count[user.__class__.__name__] += 1151        return user_classes_count152    def update_state(self, new_state):153        """154        Updates the current state155        """156        # I (cyberwiz) commented out this logging, because it is too noisy even for debug level157        # Uncomment it if you are specifically debugging state transitions158        # logger.debug("Updating state to '%s', old state was '%s'" % (new_state, self.state))159        self.state = new_state160    def cpu_log_warning(self):161        """Called at the end of the test to repeat the warning & return the status"""162        if self.cpu_warning_emitted:163            logger.warning(164                "CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"165            )166            return True167        return False168    def spawn_users(self, user_classes_spawn_count: Dict[str, int], wait: bool = False):169        if self.state == STATE_INIT or self.state == STATE_STOPPED:170            self.update_state(STATE_SPAWNING)171        logger.debug(172            "Spawning additional %s (%s already running)..."173            % (json.dumps(user_classes_spawn_count), json.dumps(self.user_classes_count))174        )175        def spawn(user_class: str, spawn_count: int):176            n = 0177            while n < spawn_count:178                new_user = self.user_classes_by_name[user_class](self.environment)179                new_user.start(self.user_greenlets)180                n += 1181                if n % 10 == 0 or n == spawn_count:182                    logger.debug("%i users spawned" % self.user_count)183            logger.debug("All users of class %s spawned" % user_class)184        for user_class, spawn_count in user_classes_spawn_count.items():185            spawn(user_class, spawn_count)186        if wait:187            self.user_greenlets.join()188            logger.info("All users stopped\n")189    def stop_users(self, user_classes_stop_count: Dict[str, int]):190        async_calls_to_stop = Group()191        stop_group = Group()192        for user_class, stop_count in user_classes_stop_count.items():193            if self.user_classes_count[user_class] == 0:194                continue195            to_stop = []196            for user_greenlet in self.user_greenlets:197                if len(to_stop) == stop_count:198                    break199                try:200                    user = user_greenlet.args[0]201                except IndexError:202                    logger.error(203                        "While stopping users, we encountered a user that didnt have proper args %s", user_greenlet204                    )205                    continue206                if isinstance(user, self.user_classes_by_name[user_class]):207                    to_stop.append(user)208            if not to_stop:209                continue210            while True:211                user_to_stop: User = to_stop.pop()212                logger.debug("Stopping %s" % user_to_stop.greenlet.name)213                if user_to_stop.greenlet is greenlet.getcurrent():214                    # User called runner.quit(), so don't block waiting for killing to finish215                    user_to_stop.group.killone(user_to_stop.greenlet, block=False)216                elif self.environment.stop_timeout:217                    async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))218                    stop_group.add(user_to_stop.greenlet)219                else:220                    async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))221                if not to_stop:222                    break223        async_calls_to_stop.join()224        if not stop_group.join(timeout=self.environment.stop_timeout):225            logger.info(226                "Not all users finished their tasks & terminated in %s seconds. Stopping them..."227                % self.environment.stop_timeout228            )229            stop_group.kill(block=True)230        logger.debug(231            "%g users have been stopped, %g still running", sum(user_classes_stop_count.values()), self.user_count232        )233    def monitor_cpu(self):234        process = psutil.Process()235        while True:236            self.current_cpu_usage = process.cpu_percent()237            if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:238                logging.warning(239                    "CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"240                )241                self.cpu_warning_emitted = True242            gevent.sleep(CPU_MONITOR_INTERVAL)243    def start(self, user_count: int, spawn_rate: float, wait: bool = False):244        """245        Start running a load test246        :param user_count: Total number of users to start247        :param spawn_rate: Number of users to spawn per second248        :param wait: If True calls to this method will block until all users are spawned.249                     If False (the default), a greenlet that spawns the users will be250                     started and the call to this method will return immediately.251        """252        if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:253            self.stats.clear_all()254            self.exceptions = {}255            self.cpu_warning_emitted = False256            self.worker_cpu_warning_emitted = False257            self.environment.events.test_start.fire(environment=self.environment)258        if wait and user_count - self.user_count > spawn_rate:259            raise ValueError("wait is True but the amount of users to add is greater than the spawn rate")260        for user_class in self.user_classes:261            if self.environment.host is not None:262                user_class.host = self.environment.host263        if self.state != STATE_INIT and self.state != STATE_STOPPED:264            self.update_state(STATE_SPAWNING)265        if self._users_dispatcher is None:266            self._users_dispatcher = UsersDispatcher(267                worker_nodes=[self._local_worker_node], user_classes=self.user_classes268            )269        logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate))270        self._users_dispatcher.new_dispatch(user_count, spawn_rate)271        try:272            for dispatched_users in self._users_dispatcher:273                user_classes_spawn_count = {}274                user_classes_stop_count = {}275                user_classes_count = dispatched_users[self._local_worker_node.id]276                logger.debug("Ramping to %s" % _format_user_classes_count_for_log(user_classes_count))277                for user_class, user_class_count in user_classes_count.items():278                    if self.user_classes_count[user_class] > user_class_count:279                        user_classes_stop_count[user_class] = self.user_classes_count[user_class] - user_class_count280                    elif self.user_classes_count[user_class] < user_class_count:281                        user_classes_spawn_count[user_class] = user_class_count - self.user_classes_count[user_class]282                if wait:283                    # spawn_users will block, so we need to call stop_users first284                    self.stop_users(user_classes_stop_count)285                    self.spawn_users(user_classes_spawn_count, wait)286                else:287                    # call spawn_users before stopping the users since stop_users288                    # can be blocking because of the stop_timeout289                    self.spawn_users(user_classes_spawn_count, wait)290                    self.stop_users(user_classes_stop_count)291                self._local_worker_node.user_classes_count = next(iter(dispatched_users.values()))292        except KeyboardInterrupt:293            # TODO: Find a cleaner way to handle that294            # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in295            # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.296            self.quit()297        logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.user_classes_count))298        self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))299    def start_shape(self):300        if self.shape_greenlet:301            logger.info("There is an ongoing shape test running. Editing is disabled")302            return303        logger.info("Shape test starting. User count and spawn rate are ignored for this type of load test")304        self.update_state(STATE_INIT)305        self.shape_greenlet = self.greenlet.spawn(self.shape_worker)306        self.shape_greenlet.link_exception(greenlet_exception_handler)307        self.environment.shape_class.reset_time()308    def shape_worker(self):309        logger.info("Shape worker starting")310        while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING:311            new_state = self.environment.shape_class.tick()312            if new_state is None:313                logger.info("Shape test stopping")314                if self.environment.parsed_options and self.environment.parsed_options.headless:315                    self.quit()316                else:317                    self.stop()318                self.shape_greenlet = None319                self.shape_last_state = None320                return321            elif self.shape_last_state == new_state:322                gevent.sleep(1)323            else:324                user_count, spawn_rate = new_state325                logger.info("Shape test updating to %d users at %.2f spawn rate" % (user_count, spawn_rate))326                # TODO: This `self.start()` call is blocking until the ramp-up is completed. This can leads327                #       to unexpected behaviours such as the one in the following example:328                #       A load test shape has the following stages:329                #           stage 1: (user_count=100, spawn_rate=1) for t < 50s330                #           stage 2: (user_count=120, spawn_rate=1) for t < 100s331                #           stage 3: (user_count=130, spawn_rate=1) for t < 120s332                #        Because the first stage will take 100s to complete, the second stage333                #        will be skipped completely because the shape worker will be blocked334                #        at the `self.start()` of the first stage.335                #        Of couse, this isn't a problem if the load test shape is well-defined.336                #        We should probably use a `gevent.timeout` with a duration a little over337                #        `(user_count - prev_user_count) / spawn_rate` in order to limit the runtime338                #        of each load test shape stage.339                self.start(user_count=user_count, spawn_rate=spawn_rate)340                self.shape_last_state = new_state341    def stop(self):342        """343        Stop a running load test by stopping all running users344        """345        if self.state == STATE_STOPPED:346            return347        logger.debug("Stopping all users")348        self.update_state(STATE_CLEANUP)349        # if we are currently spawning users we need to kill the spawning greenlet first350        if self.spawning_greenlet and not self.spawning_greenlet.ready():351            self.spawning_greenlet.kill(block=True)352        if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():353            # If the test was not started yet and locust is354            # stopped/quit, shape_greenlet will be None.355            if self.shape_greenlet is not None:356                self.shape_greenlet.kill(block=True)357                self.shape_greenlet = None358            self.shape_last_state = None359        self.stop_users(self.user_classes_count)360        self.update_state(STATE_STOPPED)361        self.cpu_log_warning()362        self.environment.events.test_stop.fire(environment=self.environment)363    def quit(self):364        """365        Stop any running load test and kill all greenlets for the runner366        """367        self.stop()368        self.greenlet.kill(block=True)369    def log_exception(self, node_id, msg, formatted_tb):370        key = hash(formatted_tb)371        row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})372        row["count"] += 1373        row["nodes"].add(node_id)374        self.exceptions[key] = row375    @property376    def target_user_count(self) -> int:377        return sum(self.target_user_classes_count.values())378    def register_message(self, msg_type, listener):379        """380        Register a listener for a custom message from another node381        :param msg_type: The type of the message to listen for382        :param listener: The function to execute when the message is received383        """384        self.custom_messages[msg_type] = listener385class LocalRunner(Runner):386    """387    Runner for running single process load test388    """389    def __init__(self, environment):390        """391        :param environment: Environment instance392        """393        super().__init__(environment)394        # register listener thats logs the exception for the local runner395        def on_user_error(user_instance, exception, tb):396            formatted_tb = "".join(traceback.format_tb(tb))397            self.log_exception("local", str(exception), formatted_tb)398        self.environment.events.user_error.add_listener(on_user_error)399    def start(self, user_count: int, spawn_rate: float, wait: bool = False):400        if spawn_rate > 100:401            logger.warning(402                "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"403            )404        if self.spawning_greenlet:405            # kill existing spawning_greenlet before we start a new one406            self.spawning_greenlet.kill(block=True)407        self.spawning_greenlet = self.greenlet.spawn(408            lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)409        )410        self.spawning_greenlet.link_exception(greenlet_exception_handler)411    def stop(self):412        if self.state == STATE_STOPPED:413            return414        super().stop()415    def send_message(self, msg_type, data=None):416        """417        Emulates internodal messaging by calling registered listeners418        :param msg_type: The type of the message to emulate sending419        :param data: Optional data to include420        """421        logger.debug(f"Running locally: sending {msg_type} message to self")422        if msg_type in self.custom_messages:423            listener = self.custom_messages[msg_type]424            msg = Message(msg_type, data, "local")425            listener(environment=self.environment, msg=msg)426        else:427            logger.warning(f"Unknown message type recieved: {msg_type}")428class DistributedRunner(Runner):429    def __init__(self, *args, **kwargs):430        super().__init__(*args, **kwargs)431        self._local_worker_node = None432        setup_distributed_stats_event_listeners(self.environment.events, self.stats)433class WorkerNode:434    def __init__(self, id: str, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):435        self.id: str = id436        self.state = state437        self.heartbeat = heartbeat_liveness438        self.cpu_usage = 0439        self.cpu_warning_emitted = False440        # The reported users running on the worker441        self.user_classes_count: Dict[str, int] = {}442    @property443    def user_count(self) -> int:444        return sum(self.user_classes_count.values())445class WorkerNodes(MutableMapping):446    def __init__(self):447        self._worker_nodes = {}448    def get_by_state(self, state) -> List[WorkerNode]:449        return [c for c in self.values() if c.state == state]450    @property451    def all(self) -> ValuesView[WorkerNode]:452        return self.values()453    @property454    def ready(self) -> List[WorkerNode]:455        return self.get_by_state(STATE_INIT)456    @property457    def spawning(self) -> List[WorkerNode]:458        return self.get_by_state(STATE_SPAWNING)459    @property460    def running(self) -> List[WorkerNode]:461        return self.get_by_state(STATE_RUNNING)462    @property463    def missing(self) -> List[WorkerNode]:464        return self.get_by_state(STATE_MISSING)465    def __setitem__(self, k: str, v: WorkerNode) -> None:466        self._worker_nodes[k] = v467    def __delitem__(self, k: str) -> None:468        del self._worker_nodes[k]469    def __getitem__(self, k: str) -> WorkerNode:470        return self._worker_nodes[k]471    def __len__(self) -> int:472        return len(self._worker_nodes)473    def __iter__(self) -> Iterator[WorkerNode]:474        return iter(self._worker_nodes)475class MasterRunner(DistributedRunner):476    """477    Runner used to run distributed load tests across multiple processes and/or machines.478    MasterRunner doesn't spawn any user greenlets itself. Instead it expects479    :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct480    to start and stop user greenlets. Stats sent back from the481    :class:`WorkerRunners <WorkerRunner>` will aggregated.482    """483    def __init__(self, environment, master_bind_host, master_bind_port):484        """485        :param environment: Environment instance486        :param master_bind_host: Host/interface to use for incoming worker connections487        :param master_bind_port: Port to use for incoming worker connections488        """489        super().__init__(environment)490        self.worker_cpu_warning_emitted = False491        self.master_bind_host = master_bind_host492        self.master_bind_port = master_bind_port493        self.spawn_rate: float = 0494        self.clients = WorkerNodes()495        try:496            self.server = rpc.Server(master_bind_host, master_bind_port)497        except RPCError as e:498            if e.args[0] == "Socket bind failure: Address already in use":499                port_string = (500                    master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)501                )502                logger.error(503                    f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"504                )505                sys.exit(1)506            else:507                raise508        self._users_dispatcher: Union[UsersDispatcher, None] = None509        self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)510        self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)511        # listener that gathers info on how many users the worker has spawned512        def on_worker_report(client_id, data):513            if client_id not in self.clients:514                logger.info("Discarded report from unrecognized worker %s", client_id)515                return516            self.clients[client_id].user_classes_count = data["user_classes_count"]517        self.environment.events.worker_report.add_listener(on_worker_report)518        # register listener that sends quit message to worker nodes519        def on_quitting(environment, **kw):520            self.quit()521        self.environment.events.quitting.add_listener(on_quitting)522    @property523    def user_count(self) -> int:524        return sum(c.user_count for c in self.clients.values())525    def cpu_log_warning(self):526        warning_emitted = Runner.cpu_log_warning(self)527        if self.worker_cpu_warning_emitted:528            logger.warning("CPU usage threshold was exceeded on workers during the test!")529            warning_emitted = True530        return warning_emitted531    def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:532        num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)533        if not num_workers:534            logger.warning(535                "You are running in distributed mode but have no worker servers connected. "536                "Please connect workers prior to swarming."537            )538            return539        for user_class in self.user_classes:540            if self.environment.host is not None:541                user_class.host = self.environment.host542        self.spawn_rate = spawn_rate543        if self._users_dispatcher is None:544            self._users_dispatcher = UsersDispatcher(545                worker_nodes=list(self.clients.values()), user_classes=self.user_classes546            )547        logger.info(548            "Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"549            % (user_count, spawn_rate, num_workers)550        )551        worker_spawn_rate = float(spawn_rate) / (num_workers or 1)552        if worker_spawn_rate > 100:553            logger.warning(554                "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"555            )556        if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:557            self.stats.clear_all()558            self.exceptions = {}559            self.environment.events.test_start.fire(environment=self.environment)560            if self.environment.shape_class:561                self.environment.shape_class.reset_time()562        self.update_state(STATE_SPAWNING)563        self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate)564        try:565            for dispatched_users in self._users_dispatcher:566                dispatch_greenlets = Group()567                for worker_node_id, worker_user_classes_count in dispatched_users.items():568                    data = {569                        "timestamp": time.time(),570                        "user_classes_count": worker_user_classes_count,571                        "host": self.environment.host,572                        "stop_timeout": self.environment.stop_timeout,573                        "parsed_options": vars(self.environment.parsed_options)574                        if self.environment.parsed_options575                        else {},576                    }577                    dispatch_greenlets.add(578                        gevent.spawn_later(579                            0,580                            self.server.send_to_client,581                            Message("spawn", data, worker_node_id),582                        )583                    )584                dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values())))585                logger.debug(586                    "Sending spawn messages for %g total users to %i client(s)",587                    dispatched_user_count,588                    len(dispatch_greenlets),589                )590                dispatch_greenlets.join()591                logger.debug(592                    "Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)593                )594            self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users)595        except KeyboardInterrupt:596            # TODO: Find a cleaner way to handle that597            # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in598            # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.599            self.quit()600        # Wait a little for workers to report their users to the master601        # so that we can give an accurate log message below and fire the `spawning_complete` event602        # when the user count is really at the desired value.603        timeout = gevent.Timeout(self._wait_for_workers_report_after_ramp_up())604        timeout.start()605        try:606            while self.user_count != self.target_user_count:607                gevent.sleep()608        except gevent.Timeout:609            pass610        finally:611            timeout.cancel()612        self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))613        logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))614    @functools.lru_cache()615    def _wait_for_workers_report_after_ramp_up(self) -> float:616        """617        The amount of time to wait after a ramp-up in order for all the workers to report their state618        to the master. If not supplied by the user, it is 100ms by default. If the supplied value is a number,619        it is taken as-is. If the supplied value is a pattern like "some_number * WORKER_REPORT_INTERVAL",620        the value will be "some_number * WORKER_REPORT_INTERVAL". The most sensible value would be something621        like "1.25 * WORKER_REPORT_INTERVAL". However, some users might find it too high, so it is left622        to a really small value of 100ms by default.623        """624        locust_wait_for_workers_report_after_ramp_up = os.getenv("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP")625        if locust_wait_for_workers_report_after_ramp_up is None:626            return 0.1627        match = re.search(628            r"^(?P<coeff>(\d+)|(\d+\.\d+))[ ]*\*[ ]*WORKER_REPORT_INTERVAL$",629            locust_wait_for_workers_report_after_ramp_up,630        )631        if match is None:632            assert float(locust_wait_for_workers_report_after_ramp_up) >= 0633            return float(locust_wait_for_workers_report_after_ramp_up)634        else:635            return float(match.group("coeff")) * WORKER_REPORT_INTERVAL636    def stop(self, send_stop_to_client: bool = True):637        if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:638            logger.debug("Stopping...")639            self.update_state(STATE_STOPPING)640            if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():641                self.shape_greenlet.kill(block=True)642                self.shape_greenlet = None643                self.shape_last_state = None644            self._users_dispatcher = None645            if send_stop_to_client:646                for client in self.clients.all:647                    logger.debug("Sending stop message to client %s" % client.id)648                    self.server.send_to_client(Message("stop", None, client.id))649                # Give an additional 60s for all workers to stop650                timeout = gevent.Timeout(self.environment.stop_timeout or 0 + 60)651                timeout.start()652                try:653                    while self.user_count != 0:654                        gevent.sleep(1)655                except gevent.Timeout:656                    logger.error("Timeout waiting for all workers to stop")657                finally:658                    timeout.cancel()659            self.environment.events.test_stop.fire(environment=self.environment)660    def quit(self):661        self.stop(send_stop_to_client=False)662        logger.debug("Quitting...")663        for client in self.clients.all:664            logger.debug("Sending quit message to client %s" % (client.id))665            self.server.send_to_client(Message("quit", None, client.id))666        gevent.sleep(0.5)  # wait for final stats report from all workers667        self.greenlet.kill(block=True)668    def check_stopped(self):669        if (670            not self.state == STATE_INIT671            and not self.state == STATE_STOPPED672            and all(map(lambda x: x.state not in (STATE_RUNNING, STATE_SPAWNING, STATE_INIT), self.clients.all))673        ):674            self.update_state(STATE_STOPPED)675    def heartbeat_worker(self):676        while True:677            gevent.sleep(HEARTBEAT_INTERVAL)678            if self.connection_broken:679                self.reset_connection()680                continue681            for client in self.clients.all:682                if client.heartbeat < 0 and client.state != STATE_MISSING:683                    logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))684                    client.state = STATE_MISSING685                    client.user_classes_count = {}686                    if self._users_dispatcher is not None:687                        self._users_dispatcher.remove_worker(client)688                        # TODO: If status is `STATE_RUNNING`, call self.start()689                    if self.worker_count <= 0:690                        logger.info("The last worker went missing, stopping test.")691                        self.stop()692                        self.check_stopped()693                else:694                    client.heartbeat -= 1695    def reset_connection(self):696        logger.info("Reset connection to worker")697        try:698            self.server.close()699            self.server = rpc.Server(self.master_bind_host, self.master_bind_port)700        except RPCError as e:701            logger.error("Temporary failure when resetting connection: %s, will retry later." % (e))702    def client_listener(self):703        while True:704            try:705                client_id, msg = self.server.recv_from_client()706            except RPCError as e:707                logger.error("RPCError found when receiving from client: %s" % (e))708                self.connection_broken = True709                gevent.sleep(FALLBACK_INTERVAL)710                continue711            self.connection_broken = False712            msg.node_id = client_id713            if msg.type == "client_ready":714                if not msg.data:715                    logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")716                    continue717                elif msg.data != __version__ and msg.data != -1:718                    logger.warning(719                        f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"720                    )721                worker_node_id = msg.node_id722                self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)723                if self._users_dispatcher is not None:724                    self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])725                    if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:726                        # TODO: Test this situation727                        self.start(self.target_user_count, self.spawn_rate)728                logger.info(729                    "Client %r reported as ready. Currently %i clients ready to swarm."730                    % (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))731                )732                # if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:733                #     # TODO: Necessary now that UsersDispatcher handles that?734                #     # balance the load distribution when new client joins735                #     self.start(self.target_user_count, self.spawn_rate)736                # emit a warning if the worker's clock seem to be out of sync with our clock737                # if abs(time() - msg.data["time"]) > 5.0:738                #    warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")739            elif msg.type == "client_stopped":740                client = self.clients[msg.node_id]741                del self.clients[msg.node_id]742                if self._users_dispatcher is not None:743                    self._users_dispatcher.remove_worker(client)744                    if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:745                        # TODO: Test this situation746                        self.start(self.target_user_count, self.spawn_rate)747                logger.info("Removing %s client from running clients" % (msg.node_id))748            elif msg.type == "heartbeat":749                if msg.node_id in self.clients:750                    c = self.clients[msg.node_id]751                    c.heartbeat = HEARTBEAT_LIVENESS752                    client_state = msg.data["state"]753                    if c.state == STATE_MISSING:754                        logger.info(755                            "Worker %s self-healed with heartbeat, setting state to %s." % (str(c.id), client_state)756                        )757                        if self._users_dispatcher is not None:758                            self._users_dispatcher.add_worker(worker_node=c)759                            if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:760                                # TODO: Test this situation761                                self.start(self.target_user_count, self.spawn_rate)762                    c.state = client_state763                    c.cpu_usage = msg.data["current_cpu_usage"]764                    if not c.cpu_warning_emitted and c.cpu_usage > 90:765                        self.worker_cpu_warning_emitted = True  # used to fail the test in the end766                        c.cpu_warning_emitted = True  # used to suppress logging for this node767                        logger.warning(768                            "Worker %s exceeded cpu threshold (will only log this once per worker)" % (msg.node_id)769                        )770            elif msg.type == "stats":771                self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)772            elif msg.type == "spawning":773                self.clients[msg.node_id].state = STATE_SPAWNING774            elif msg.type == "spawning_complete":775                self.clients[msg.node_id].state = STATE_RUNNING776                self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]777            elif msg.type == "quit":778                if msg.node_id in self.clients:779                    client = self.clients[msg.node_id]780                    del self.clients[msg.node_id]781                    if self._users_dispatcher is not None:782                        self._users_dispatcher.remove_worker(client)783                        if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:784                            # TODO: Test this situation785                            self.start(self.target_user_count, self.spawn_rate)786                    logger.info(787                        "Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))788                    )789                    if self.worker_count - len(self.clients.missing) <= 0:790                        logger.info("The last worker quit, stopping test.")791                        self.stop()792                        if self.environment.parsed_options and self.environment.parsed_options.headless:793                            self.quit()794            elif msg.type == "exception":795                self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])796            elif msg.type in self.custom_messages:797                logger.debug(f"Recieved {msg.type} message from worker {msg.node_id}")798                self.custom_messages[msg.type](environment=self.environment, msg=msg)799            else:800                logger.warning(f"Unknown message type recieved from worker {msg.node_id}: {msg.type}")801            self.check_stopped()802    @property803    def worker_count(self):804        return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)805    @property806    def reported_user_classes_count(self) -> Dict[str, int]:807        reported_user_classes_count = defaultdict(lambda: 0)808        for client in self.clients.ready + self.clients.spawning + self.clients.running:809            for name, count in client.user_classes_count.items():810                reported_user_classes_count[name] += count811        return reported_user_classes_count812    def send_message(self, msg_type, data=None, client_id=None):813        """814        Sends a message to attached worker node(s)815        :param msg_type: The type of the message to send816        :param data: Optional data to send817        :param client_id: Optional id of the target worker node.818                            If None, will send to all attached workers819        """820        if client_id:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
