Best Python code snippet using locust
dispatch.py
Source:dispatch.py  
...43        :param worker_nodes: List of worker nodes44        :param user_classes: The user classes45        """46        self._worker_nodes = worker_nodes47        self._sort_workers()48        self._user_classes = sorted(user_classes, key=attrgetter("__name__"))49        assert len(user_classes) > 050        assert len(set(self._user_classes)) == len(self._user_classes)51        self._target_user_count: int = None52        self._spawn_rate: float = None53        self._user_count_per_dispatch_iteration: int = None54        self._wait_between_dispatch: float = None55        self._initial_users_on_workers = {56            worker_node.id: {user_class.__name__: 0 for user_class in self._user_classes}57            for worker_node in worker_nodes58        }59        self._users_on_workers = self._fast_users_on_workers_copy(self._initial_users_on_workers)60        self._current_user_count = self.get_current_user_count()61        self._dispatcher_generator: Generator[Dict[str, Dict[str, int]], None, None] = None62        self._user_generator = self._user_gen()63        self._worker_node_generator = itertools.cycle(self._worker_nodes)64        # To keep track of how long it takes for each dispatch iteration to compute65        self._dispatch_iteration_durations: List[float] = []66        self._active_users: List[Tuple[WorkerNode, str]] = []67        # TODO: Test that attribute is set when dispatching and unset when done dispatching68        self._dispatch_in_progress = False69        self._rebalance = False70        self._try_dispatch_fixed = True71        self._no_user_to_spawn = False72    def get_current_user_count(self) -> int:73        # need to ignore type due to https://github.com/python/mypy/issues/150774        return sum(map(sum, map(dict.values, self._users_on_workers.values())))  # type: ignore75    @property76    def dispatch_in_progress(self):77        return self._dispatch_in_progress78    @property79    def dispatch_iteration_durations(self) -> List[float]:80        return self._dispatch_iteration_durations81    def __next__(self) -> Dict[str, Dict[str, int]]:82        users_on_workers = next(self._dispatcher_generator)83        # TODO: Is this necessary to copy the users_on_workers if we know84        #       it won't be mutated by external code?85        return self._fast_users_on_workers_copy(users_on_workers)86    def _sort_workers(self):87        # Sorting workers ensures repeatable behaviour88        worker_nodes_by_id = sorted(self._worker_nodes, key=lambda w: w.id)89        # Give every worker an index indicating how many workers came before it on that host90        workers_per_host = defaultdict(lambda: 0)91        for worker_node in worker_nodes_by_id:92            host = worker_node.id.split("_")[0]93            worker_node._index_within_host = workers_per_host[host]94            workers_per_host[host] = workers_per_host[host] + 195        # Sort again, first by index within host, to ensure Users get started evenly across hosts96        self._worker_nodes = sorted(self._worker_nodes, key=lambda worker: (worker._index_within_host, worker.id))97    def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:98        self._dispatch_in_progress = True99        if self._rebalance:100            self._rebalance = False101            yield self._users_on_workers102            if self._current_user_count == self._target_user_count:103                return104        if self._current_user_count == self._target_user_count:105            yield self._initial_users_on_workers106            self._dispatch_in_progress = False107            return108        while self._current_user_count < self._target_user_count:109            with self._wait_between_dispatch_iteration_context():110                yield self._add_users_on_workers()111                if self._rebalance:112                    self._rebalance = False113                    yield self._users_on_workers114                if self._no_user_to_spawn:115                    self._no_user_to_spawn = False116                    break117        while self._current_user_count > self._target_user_count:118            with self._wait_between_dispatch_iteration_context():119                yield self._remove_users_from_workers()120                if self._rebalance:121                    self._rebalance = False122                    yield self._users_on_workers123        self._dispatch_in_progress = False124    def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:125        """126        Initialize a new dispatch cycle.127        :param target_user_count: The desired user count at the end of the dispatch cycle128        :param spawn_rate: The spawn rate129        """130        self._target_user_count = target_user_count131        self._spawn_rate = spawn_rate132        self._user_count_per_dispatch_iteration = max(1, math.floor(self._spawn_rate))133        self._wait_between_dispatch = self._user_count_per_dispatch_iteration / self._spawn_rate134        self._initial_users_on_workers = self._users_on_workers135        self._users_on_workers = self._fast_users_on_workers_copy(self._initial_users_on_workers)136        self._current_user_count = self.get_current_user_count()137        self._dispatcher_generator = self._dispatcher()138        self._dispatch_iteration_durations.clear()139    def add_worker(self, worker_node: "WorkerNode") -> None:140        """141        This method is to be called when a new worker connects to the master. When142        a new worker is added, the users dispatcher will flag that a rebalance is required143        and ensure that the next dispatch iteration will be made to redistribute the users144        on the new pool of workers.145        :param worker_node: The worker node to add.146        """147        self._worker_nodes.append(worker_node)148        self._sort_workers()149        self._prepare_rebalance()150    def remove_worker(self, worker_node: "WorkerNode") -> None:151        """152        This method is similar to the above `add_worker`. When a worker disconnects153        (because of e.g. network failure, worker failure, etc.), this method will ensure that the next154        dispatch iteration redistributes the users on the remaining workers.155        :param worker_node: The worker node to remove.156        """157        self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]158        if len(self._worker_nodes) == 0:159            # TODO: Test this160            return161        self._prepare_rebalance()162    def _prepare_rebalance(self) -> None:...scheduler.py
Source:scheduler.py  
...131            worker.devices.add(device)132            worker.load_index += device.load_index133        return workers134    @staticmethod135    def _sort_workers(worker: Worker) -> Tuple[int, int]:136        """Helper sorting method when we have no load indexes"""137        try:138            return len(worker), min(device.id_ for device in worker.devices)139        except ValueError:140            return len(worker), 0141    @staticmethod142    def get_devices_per_worker(worker_count: int, devices_count: int) -> List[int]:143        """Calculates how many devices can be assigned per worker144        Return example: 3 workers, 8 devices: [3, 3, 2]145        """146        devices_per_worker = []147        while worker_count:148            try:149                per_worker = math.ceil(devices_count/worker_count)150            except ZeroDivisionError:151                per_worker = 0152            devices_per_worker.append(per_worker)153            devices_count -= per_worker154            worker_count -= 1155        return devices_per_worker156    @classmethod157    def balance_with_count_per_worker(158            cls, workers: Set[Worker], devices: Set[Device]) -> Set[Worker]:159        """Worker rebalance method which assigns 'equal' number of devices per worker160        with minimal changes to worker's assigned devices161        """162        devices_per_worker = cls.get_devices_per_worker(len(workers), len(devices))163        devices = set(devices)164        ordered_workers = sorted(workers, key=lambda item: cls._sort_workers(item), reverse=True)165        for worker in ordered_workers:166            worker_device_count = max(devices_per_worker)167            worker_new_devices = set()  # type: Set[Device]168            worker.load_index = 0169            for device in sorted(devices):170                if worker_device_count == len(worker_new_devices):171                    break172                elif device in worker:173                    worker_new_devices.add(device)174            devices -= worker_new_devices175            try:176                devices_per_worker.remove(len(worker_new_devices))177            except ValueError:178                pass179            worker.devices = worker_new_devices180        for leftover_device in sorted(devices):181            worker = sorted(workers, key=lambda item: cls._sort_workers(item))[0]182            worker.add_device(leftover_device)183        return workers184    @classmethod185    def balance_devices_per_worker(186            cls, workers: Set[Worker], devices: Set[Device], cache: Cache,187            worker_load_deviation: float = 0.0) -> Set[Worker]:188        """Main balancing function that decides how we balance devices189        which depends on the state of cache190        """191        if not workers or not devices:192            return workers193        devices, system_msg_count, interval = cls.fetch_cache_data(devices, cache)194        workers = set(workers)195        if system_msg_count:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
