Best Python code snippet using locust
runners.py
Source:runners.py  
...102        # I (cyberwiz) commented out this logging, because it is too noisy even for debug level103        # Uncomment it if you are specifically debugging state transitions104        # logger.debug("Updating state to '%s', old state was '%s'" % (new_state, self.state))105        self.state = new_state106    def cpu_log_warning(self):107        """Called at the end of the test to repeat the warning & return the status"""108        if self.cpu_warning_emitted:109            logger.warning(110                "CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"111            )112            return True113        return False114    def weight_users(self, amount) -> List[Type[User]]:115        """116        Distributes the amount of users for each WebLocust-class according to it's weight117        returns a list "bucket" with the weighted users118        """119        bucket = []120        weight_sum = sum([user.weight for user in self.user_classes])121        residuals = {}122        for user in self.user_classes:123            if self.environment.host is not None:124                user.host = self.environment.host125            # create users depending on weight126            percent = user.weight / float(weight_sum)127            num_users = int(round(amount * percent))128            bucket.extend([user for x in range(num_users)])129            # used to keep track of the amount of rounding was done if we need130            # to add/remove some instances from bucket131            residuals[user] = amount * percent - round(amount * percent)132        if len(bucket) < amount:133            # We got too few User classes in the bucket, so we need to create a few extra users,134            # and we do this by iterating over each of the User classes - starting with the one135            # where the residual from the rounding was the largest - and creating one of each until136            # we get the correct amount137            for user in [l for l, r in sorted(residuals.items(), key=lambda x: x[1], reverse=True)][138                : amount - len(bucket)139            ]:140                bucket.append(user)141        elif len(bucket) > amount:142            # We've got too many users due to rounding errors so we need to remove some143            for user in [l for l, r in sorted(residuals.items(), key=lambda x: x[1])][: len(bucket) - amount]:144                bucket.remove(user)145        return bucket146    def spawn_users(self, spawn_count, spawn_rate, wait=False):147        bucket = self.weight_users(spawn_count)148        spawn_count = len(bucket)149        if self.state == STATE_INIT or self.state == STATE_STOPPED:150            self.update_state(STATE_SPAWNING)151        existing_count = len(self.user_greenlets)152        logger.info(153            "Spawning %i users at the rate %g users/s (%i users already running)..."154            % (spawn_count, spawn_rate, existing_count)155        )156        occurrence_count = dict([(l.__name__, 0) for l in self.user_classes])157        def spawn():158            sleep_time = 1.0 / spawn_rate159            while True:160                if not bucket:161                    logger.info(162                        "All users spawned: %s (%i total running)"163                        % (164                            ", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]),165                            len(self.user_greenlets),166                        )167                    )168                    self.environment.events.spawning_complete.fire(user_count=len(self.user_greenlets))169                    return170                user_class = bucket.pop(random.randint(0, len(bucket) - 1))171                occurrence_count[user_class.__name__] += 1172                new_user = user_class(self.environment)173                new_user.start(self.user_greenlets)174                if len(self.user_greenlets) % 10 == 0:175                    logger.debug("%i users spawned" % len(self.user_greenlets))176                if bucket:177                    gevent.sleep(sleep_time)178        spawn()179        if wait:180            self.user_greenlets.join()181            logger.info("All users stopped\n")182    def stop_users(self, user_count, stop_rate=None):183        """184        Stop `user_count` weighted users at a rate of `stop_rate`185        """186        if user_count == 0 or stop_rate == 0:187            return188        bucket = self.weight_users(user_count)189        user_count = len(bucket)190        to_stop = []191        for user_greenlet in self.user_greenlets:192            try:193                user = user_greenlet.args[0]194            except IndexError:195                logger.error(196                    "While stopping users, we encountered a user that didnt have proper args %s", user_greenlet197                )198                continue199            for user_class in bucket:200                if isinstance(user, user_class):201                    to_stop.append(user)202                    bucket.remove(user_class)203                    break204        if not to_stop:205            return206        if stop_rate is None or stop_rate >= user_count:207            sleep_time = 0208            logger.info("Stopping %i users" % (user_count))209        else:210            sleep_time = 1.0 / stop_rate211            logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))212        async_calls_to_stop = Group()213        stop_group = Group()214        while True:215            user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1))216            logger.debug("Stopping %s" % user_to_stop._greenlet.name)217            if user_to_stop._greenlet is greenlet.getcurrent():218                # User called runner.quit(), so dont block waiting for killing to finish"219                user_to_stop._group.killone(user_to_stop._greenlet, block=False)220            elif self.environment.stop_timeout:221                async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))222                stop_group.add(user_to_stop._greenlet)223            else:224                async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))225            if to_stop:226                gevent.sleep(sleep_time)227            else:228                break229        async_calls_to_stop.join()230        if not stop_group.join(timeout=self.environment.stop_timeout):231            logger.info(232                "Not all users finished their tasks & terminated in %s seconds. Stopping them..."233                % self.environment.stop_timeout234            )235            stop_group.kill(block=True)236        logger.info("%i Users have been stopped, %g still running" % (user_count, len(self.user_greenlets)))237    def monitor_cpu(self):238        process = psutil.Process()239        while True:240            self.current_cpu_usage = process.cpu_percent()241            if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:242                logging.warning(243                    "CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"244                )245                self.cpu_warning_emitted = True246            gevent.sleep(CPU_MONITOR_INTERVAL)247    def start(self, user_count, spawn_rate, wait=False):248        """249        Start running a load test250        :param user_count: Total number of users to start251        :param spawn_rate: Number of users to spawn per second252        :param wait: If True calls to this method will block until all users are spawned.253                     If False (the default), a greenlet that spawns the users will be254                     started and the call to this method will return immediately.255        """256        if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:257            self.stats.clear_all()258            self.exceptions = {}259            self.cpu_warning_emitted = False260            self.worker_cpu_warning_emitted = False261            self.target_user_count = user_count262        if self.state != STATE_INIT and self.state != STATE_STOPPED:263            logger.debug(264                "Updating running test with %d users, %.2f spawn rate and wait=%r" % (user_count, spawn_rate, wait)265            )266            self.update_state(STATE_SPAWNING)267            if self.user_count > user_count:268                # Stop some users269                stop_count = self.user_count - user_count270                self.stop_users(stop_count, spawn_rate)271            elif self.user_count < user_count:272                # Spawn some users273                spawn_count = user_count - self.user_count274                self.spawn_users(spawn_count=spawn_count, spawn_rate=spawn_rate)275            else:276                self.environment.events.spawning_complete.fire(user_count=self.user_count)277        else:278            self.spawn_rate = spawn_rate279            self.spawn_users(user_count, spawn_rate=spawn_rate, wait=wait)280    def start_shape(self):281        if self.shape_greenlet:282            logger.info("There is an ongoing shape test running. Editing is disabled")283            return284        logger.info("Shape test starting. User count and spawn rate are ignored for this type of load test")285        self.update_state(STATE_INIT)286        self.shape_greenlet = self.greenlet.spawn(self.shape_worker)287        self.shape_greenlet.link_exception(greenlet_exception_handler)288        self.environment.shape_class.reset_time()289    def shape_worker(self):290        logger.info("Shape worker starting")291        while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING:292            new_state = self.environment.shape_class.tick()293            if new_state is None:294                logger.info("Shape test stopping")295                if self.environment.parsed_options and self.environment.parsed_options.headless:296                    self.quit()297                else:298                    self.stop()299            elif self.shape_last_state == new_state:300                gevent.sleep(1)301            else:302                user_count, spawn_rate = new_state303                logger.info("Shape test updating to %d users at %.2f spawn rate" % (user_count, spawn_rate))304                self.start(user_count=user_count, spawn_rate=spawn_rate)305                self.shape_last_state = new_state306    def stop(self):307        """308        Stop a running load test by stopping all running users309        """310        logger.debug("Stopping all users")311        self.update_state(STATE_CLEANUP)312        # if we are currently spawning users we need to kill the spawning greenlet first313        if self.spawning_greenlet and not self.spawning_greenlet.ready():314            self.spawning_greenlet.kill(block=True)315        self.stop_users(self.user_count)316        self.update_state(STATE_STOPPED)317        self.cpu_log_warning()318    def quit(self):319        """320        Stop any running load test and kill all greenlets for the runner321        """322        self.stop()323        self.greenlet.kill(block=True)324    def log_exception(self, node_id, msg, formatted_tb):325        key = hash(formatted_tb)326        row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})327        row["count"] += 1328        row["nodes"].add(node_id)329        self.exceptions[key] = row330class LocalRunner(Runner):331    """332    Runner for running single process load test333    """334    def __init__(self, environment):335        """336        :param environment: Environment instance337        """338        super().__init__(environment)339        # register listener thats logs the exception for the local runner340        def on_user_error(user_instance, exception, tb):341            formatted_tb = "".join(traceback.format_tb(tb))342            self.log_exception("local", str(exception), formatted_tb)343        self.environment.events.user_error.add_listener(on_user_error)344    def start(self, user_count, spawn_rate, wait=False):345        self.target_user_count = user_count346        if spawn_rate > 100:347            logger.warning(348                "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"349            )350        if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:351            # if we're not already running we'll fire the test_start event352            self.environment.events.test_start.fire(environment=self.environment)353        if self.spawning_greenlet:354            # kill existing spawning_greenlet before we start a new one355            self.spawning_greenlet.kill(block=True)356        self.spawning_greenlet = self.greenlet.spawn(357            lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)358        )359        self.spawning_greenlet.link_exception(greenlet_exception_handler)360    def stop(self):361        if self.state == STATE_STOPPED:362            return363        super().stop()364        self.environment.events.test_stop.fire(environment=self.environment)365class DistributedRunner(Runner):366    def __init__(self, *args, **kwargs):367        super().__init__(*args, **kwargs)368        setup_distributed_stats_event_listeners(self.environment.events, self.stats)369class WorkerNode:370    def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):371        self.id = id372        self.state = state373        self.user_count = 0374        self.heartbeat = heartbeat_liveness375        self.cpu_usage = 0376        self.cpu_warning_emitted = False377class MasterRunner(DistributedRunner):378    """379    Runner used to run distributed load tests across multiple processes and/or machines.380    MasterRunner doesn't spawn any user greenlets itself. Instead it expects381    :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct382    to start and stop user greenlets. Stats sent back from the383    :class:`WorkerRunners <WorkerRunner>` will aggregated.384    """385    def __init__(self, environment, master_bind_host, master_bind_port):386        """387        :param environment: Environment instance388        :param master_bind_host: Host/interface to use for incoming worker connections389        :param master_bind_port: Port to use for incoming worker connections390        """391        super().__init__(environment)392        self.worker_cpu_warning_emitted = False393        self.master_bind_host = master_bind_host394        self.master_bind_port = master_bind_port395        class WorkerNodesDict(dict):396            def get_by_state(self, state):397                return [c for c in self.values() if c.state == state]398            @property399            def all(self):400                return self.values()401            @property402            def ready(self):403                return self.get_by_state(STATE_INIT)404            @property405            def spawning(self):406                return self.get_by_state(STATE_SPAWNING)407            @property408            def running(self):409                return self.get_by_state(STATE_RUNNING)410            @property411            def missing(self):412                return self.get_by_state(STATE_MISSING)413        self.clients = WorkerNodesDict()414        try:415            self.server = rpc.Server(master_bind_host, master_bind_port)416        except RPCError as e:417            if e.args[0] == "Socket bind failure: Address already in use":418                port_string = (419                    master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)420                )421                logger.error(422                    f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"423                )424                sys.exit(1)425            else:426                raise427        self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)428        self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)429        # listener that gathers info on how many users the worker has spawned430        def on_worker_report(client_id, data):431            if client_id not in self.clients:432                logger.info("Discarded report from unrecognized worker %s", client_id)433                return434            self.clients[client_id].user_count = data["user_count"]435        self.environment.events.worker_report.add_listener(on_worker_report)436        # register listener that sends quit message to worker nodes437        def on_quitting(environment, **kw):438            self.quit()439        self.environment.events.quitting.add_listener(on_quitting)440    @property441    def user_count(self):442        return sum([c.user_count for c in self.clients.values()])443    def cpu_log_warning(self):444        warning_emitted = Runner.cpu_log_warning(self)445        if self.worker_cpu_warning_emitted:446            logger.warning("CPU usage threshold was exceeded on workers during the test!")447            warning_emitted = True448        return warning_emitted449    def start(self, user_count, spawn_rate):450        self.target_user_count = user_count451        num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)452        if not num_workers:453            logger.warning(454                "You are running in distributed mode but have no worker servers connected. "455                "Please connect workers prior to swarming."456            )457            return458        self.spawn_rate = spawn_rate...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
