Best Python code snippet using locust
dispatch.py
Source:dispatch.py  
...99                    self._rebalance = False100                    yield self._users_on_workers101        while self._current_user_count > self._target_user_count:102            with self._wait_between_dispatch_iteration_context():103                yield self._remove_users_from_workers()104                if self._rebalance:105                    self._rebalance = False106                    yield self._users_on_workers107        self._dispatch_in_progress = False108    def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:109        """110        Initialize a new dispatch cycle.111        :param target_user_count: The desired user count at the end of the dispatch cycle112        :param spawn_rate: The spawn rate113        """114        self._target_user_count = target_user_count115        self._spawn_rate = spawn_rate116        self._user_count_per_dispatch_iteration = max(1, math.floor(self._spawn_rate))117        self._wait_between_dispatch = self._user_count_per_dispatch_iteration / self._spawn_rate118        self._initial_users_on_workers = self._users_on_workers119        self._users_on_workers = self._fast_users_on_workers_copy(self._initial_users_on_workers)120        self._current_user_count = sum(map(sum, map(dict.values, self._users_on_workers.values())))121        self._dispatcher_generator = self._dispatcher()122        self._dispatch_iteration_durations.clear()123    def add_worker(self, worker_node: "WorkerNode") -> None:124        """125        This method is to be called when a new worker connects to the master. When126        a new worker is added, the users dispatcher will flag that a rebalance is required127        and ensure that the next dispatch iteration will be made to redistribute the users128        on the new pool of workers.129        :param worker_node: The worker node to add.130        """131        self._worker_nodes.append(worker_node)132        self._worker_nodes = sorted(self._worker_nodes, key=lambda w: w.id)133        self._prepare_rebalance()134    def remove_worker(self, worker_node: "WorkerNode") -> None:135        """136        This method is similar to the above `add_worker`. When a worker disconnects137        (because of e.g. network failure, worker failure, etc.), this method will ensure that the next138        dispatch iteration redistributes the users on the remaining workers.139        :param worker_node: The worker node to remove.140        """141        self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]142        if len(self._worker_nodes) == 0:143            # TODO: Test this144            return145        self._prepare_rebalance()146    def _prepare_rebalance(self) -> None:147        """148        When a rebalance is required because of added and/or removed workers, we compute the desired state as if149        we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will150        perform a fake ramp-up without any waiting and return the final distribution.151        """152        users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count)153        self._users_on_workers = users_on_workers154        self._active_users = active_users155        # It's important to reset the generators by using the ones from `_distribute_users`156        # so that the next iterations are smooth and continuous.157        self._user_generator = user_gen158        self._worker_node_generator = worker_gen159        self._rebalance = True160    @contextlib.contextmanager161    def _wait_between_dispatch_iteration_context(self) -> None:162        t0_rel = time.perf_counter()163        # We don't use `try: ... finally: ...` because we don't want to sleep164        # if there's an exception within the context.165        yield166        delta = time.perf_counter() - t0_rel167        self._dispatch_iteration_durations.append(delta)168        # print("Dispatch cycle took {:.3f}ms".format(delta * 1000))169        if self._current_user_count == self._target_user_count:170            # No sleep when this is the last dispatch iteration171            return172        sleep_duration = max(0.0, self._wait_between_dispatch - delta)173        gevent.sleep(sleep_duration)174    def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]:175        """Add users on the workers until the target number of users is reached for the current dispatch iteration176        :return: The users that we want to run on the workers177        """178        current_user_count_target = min(179            self._current_user_count + self._user_count_per_dispatch_iteration, self._target_user_count180        )181        for user in self._user_generator:182            worker_node = next(self._worker_node_generator)183            self._users_on_workers[worker_node.id][user] += 1184            self._current_user_count += 1185            self._active_users.append((worker_node, user))186            if self._current_user_count >= current_user_count_target:187                return self._users_on_workers188    def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]:189        """Remove users from the workers until the target number of users is reached for the current dispatch iteration190        :return: The users that we want to run on the workers191        """192        current_user_count_target = max(193            self._current_user_count - self._user_count_per_dispatch_iteration, self._target_user_count194        )195        while True:196            try:197                worker_node, user = self._active_users.pop()198            except IndexError:199                return self._users_on_workers200            self._users_on_workers[worker_node.id][user] -= 1201            self._current_user_count -= 1202            if self._current_user_count == 0 or self._current_user_count <= current_user_count_target:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
