How to use _sort_workers method in locust

Best Python code snippet using locust

dispatch.py

Source:dispatch.py Github

copy

Full Screen

...43 :param worker_nodes: List of worker nodes44 :param user_classes: The user classes45 """46 self._worker_nodes = worker_nodes47 self._sort_workers()48 self._user_classes = sorted(user_classes, key=attrgetter("__name__"))49 assert len(user_classes) > 050 assert len(set(self._user_classes)) == len(self._user_classes)51 self._target_user_count: int = None52 self._spawn_rate: float = None53 self._user_count_per_dispatch_iteration: int = None54 self._wait_between_dispatch: float = None55 self._initial_users_on_workers = {56 worker_node.id: {user_class.__name__: 0 for user_class in self._user_classes}57 for worker_node in worker_nodes58 }59 self._users_on_workers = self._fast_users_on_workers_copy(self._initial_users_on_workers)60 self._current_user_count = self.get_current_user_count()61 self._dispatcher_generator: Generator[Dict[str, Dict[str, int]], None, None] = None62 self._user_generator = self._user_gen()63 self._worker_node_generator = itertools.cycle(self._worker_nodes)64 # To keep track of how long it takes for each dispatch iteration to compute65 self._dispatch_iteration_durations: List[float] = []66 self._active_users: List[Tuple[WorkerNode, str]] = []67 # TODO: Test that attribute is set when dispatching and unset when done dispatching68 self._dispatch_in_progress = False69 self._rebalance = False70 self._try_dispatch_fixed = True71 self._no_user_to_spawn = False72 def get_current_user_count(self) -> int:73 # need to ignore type due to https://github.com/python/mypy/issues/150774 return sum(map(sum, map(dict.values, self._users_on_workers.values()))) # type: ignore75 @property76 def dispatch_in_progress(self):77 return self._dispatch_in_progress78 @property79 def dispatch_iteration_durations(self) -> List[float]:80 return self._dispatch_iteration_durations81 def __next__(self) -> Dict[str, Dict[str, int]]:82 users_on_workers = next(self._dispatcher_generator)83 # TODO: Is this necessary to copy the users_on_workers if we know84 # it won't be mutated by external code?85 return self._fast_users_on_workers_copy(users_on_workers)86 def _sort_workers(self):87 # Sorting workers ensures repeatable behaviour88 worker_nodes_by_id = sorted(self._worker_nodes, key=lambda w: w.id)89 # Give every worker an index indicating how many workers came before it on that host90 workers_per_host = defaultdict(lambda: 0)91 for worker_node in worker_nodes_by_id:92 host = worker_node.id.split("_")[0]93 worker_node._index_within_host = workers_per_host[host]94 workers_per_host[host] = workers_per_host[host] + 195 # Sort again, first by index within host, to ensure Users get started evenly across hosts96 self._worker_nodes = sorted(self._worker_nodes, key=lambda worker: (worker._index_within_host, worker.id))97 def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]:98 self._dispatch_in_progress = True99 if self._rebalance:100 self._rebalance = False101 yield self._users_on_workers102 if self._current_user_count == self._target_user_count:103 return104 if self._current_user_count == self._target_user_count:105 yield self._initial_users_on_workers106 self._dispatch_in_progress = False107 return108 while self._current_user_count < self._target_user_count:109 with self._wait_between_dispatch_iteration_context():110 yield self._add_users_on_workers()111 if self._rebalance:112 self._rebalance = False113 yield self._users_on_workers114 if self._no_user_to_spawn:115 self._no_user_to_spawn = False116 break117 while self._current_user_count > self._target_user_count:118 with self._wait_between_dispatch_iteration_context():119 yield self._remove_users_from_workers()120 if self._rebalance:121 self._rebalance = False122 yield self._users_on_workers123 self._dispatch_in_progress = False124 def new_dispatch(self, target_user_count: int, spawn_rate: float) -> None:125 """126 Initialize a new dispatch cycle.127 :param target_user_count: The desired user count at the end of the dispatch cycle128 :param spawn_rate: The spawn rate129 """130 self._target_user_count = target_user_count131 self._spawn_rate = spawn_rate132 self._user_count_per_dispatch_iteration = max(1, math.floor(self._spawn_rate))133 self._wait_between_dispatch = self._user_count_per_dispatch_iteration / self._spawn_rate134 self._initial_users_on_workers = self._users_on_workers135 self._users_on_workers = self._fast_users_on_workers_copy(self._initial_users_on_workers)136 self._current_user_count = self.get_current_user_count()137 self._dispatcher_generator = self._dispatcher()138 self._dispatch_iteration_durations.clear()139 def add_worker(self, worker_node: "WorkerNode") -> None:140 """141 This method is to be called when a new worker connects to the master. When142 a new worker is added, the users dispatcher will flag that a rebalance is required143 and ensure that the next dispatch iteration will be made to redistribute the users144 on the new pool of workers.145 :param worker_node: The worker node to add.146 """147 self._worker_nodes.append(worker_node)148 self._sort_workers()149 self._prepare_rebalance()150 def remove_worker(self, worker_node: "WorkerNode") -> None:151 """152 This method is similar to the above `add_worker`. When a worker disconnects153 (because of e.g. network failure, worker failure, etc.), this method will ensure that the next154 dispatch iteration redistributes the users on the remaining workers.155 :param worker_node: The worker node to remove.156 """157 self._worker_nodes = [w for w in self._worker_nodes if w.id != worker_node.id]158 if len(self._worker_nodes) == 0:159 # TODO: Test this160 return161 self._prepare_rebalance()162 def _prepare_rebalance(self) -> None:...

Full Screen

Full Screen

scheduler.py

Source:scheduler.py Github

copy

Full Screen

...131 worker.devices.add(device)132 worker.load_index += device.load_index133 return workers134 @staticmethod135 def _sort_workers(worker: Worker) -> Tuple[int, int]:136 """Helper sorting method when we have no load indexes"""137 try:138 return len(worker), min(device.id_ for device in worker.devices)139 except ValueError:140 return len(worker), 0141 @staticmethod142 def get_devices_per_worker(worker_count: int, devices_count: int) -> List[int]:143 """Calculates how many devices can be assigned per worker144 Return example: 3 workers, 8 devices: [3, 3, 2]145 """146 devices_per_worker = []147 while worker_count:148 try:149 per_worker = math.ceil(devices_count/worker_count)150 except ZeroDivisionError:151 per_worker = 0152 devices_per_worker.append(per_worker)153 devices_count -= per_worker154 worker_count -= 1155 return devices_per_worker156 @classmethod157 def balance_with_count_per_worker(158 cls, workers: Set[Worker], devices: Set[Device]) -> Set[Worker]:159 """Worker rebalance method which assigns 'equal' number of devices per worker160 with minimal changes to worker's assigned devices161 """162 devices_per_worker = cls.get_devices_per_worker(len(workers), len(devices))163 devices = set(devices)164 ordered_workers = sorted(workers, key=lambda item: cls._sort_workers(item), reverse=True)165 for worker in ordered_workers:166 worker_device_count = max(devices_per_worker)167 worker_new_devices = set() # type: Set[Device]168 worker.load_index = 0169 for device in sorted(devices):170 if worker_device_count == len(worker_new_devices):171 break172 elif device in worker:173 worker_new_devices.add(device)174 devices -= worker_new_devices175 try:176 devices_per_worker.remove(len(worker_new_devices))177 except ValueError:178 pass179 worker.devices = worker_new_devices180 for leftover_device in sorted(devices):181 worker = sorted(workers, key=lambda item: cls._sort_workers(item))[0]182 worker.add_device(leftover_device)183 return workers184 @classmethod185 def balance_devices_per_worker(186 cls, workers: Set[Worker], devices: Set[Device], cache: Cache,187 worker_load_deviation: float = 0.0) -> Set[Worker]:188 """Main balancing function that decides how we balance devices189 which depends on the state of cache190 """191 if not workers or not devices:192 return workers193 devices, system_msg_count, interval = cls.fetch_cache_data(devices, cache)194 workers = set(workers)195 if system_msg_count:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful