How to use user_classes_count method in locust

Best Python code snippet using locust

test_runners.py

Source:test_runners.py Github

copy

Full Screen

...355 runner.start(user_count=5, spawn_rate=5, wait=False)356 runner.spawning_greenlet.join()357 self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)358 runner.quit()359 def test_user_classes_count(self):360 class MyUser1(User):361 wait_time = constant(0)362 @task363 def my_task(self):364 pass365 class MyUser2(User):366 wait_time = constant(0)367 @task368 def my_task(self):369 pass370 environment = Environment(user_classes=[MyUser1, MyUser2])371 runner = LocalRunner(environment)372 runner.start(user_count=10, spawn_rate=5, wait=False)373 runner.spawning_greenlet.join()...

Full Screen

Full Screen

runners.py

Source:runners.py Github

copy

Full Screen

...126 :returns: Number of currently running users127 """128 return len(self.user_greenlets)129 @property130 def user_classes_count(self) -> Dict[str, int]:131 """132 :returns: Number of currently running users for each user class133 """134 user_classes_count = {user_class.__name__: 0 for user_class in self.user_classes}135 for user_greenlet in self.user_greenlets:136 try:137 user = user_greenlet.args[0]138 except IndexError:139 # TODO: Find out why args is sometimes empty. In gevent code,140 # the supplied args are cleared in the gevent.greenlet.Greenlet.__free,141 # so it seems a good place to start investigating. My suspicion is that142 # the supplied args are emptied whenever the greenlet is dead, so we can143 # simply ignore the greenlets with empty args.144 logger.debug(145 "ERROR: While calculating number of running users, we encountered a user that didnt have proper args %s (user_greenlet.dead=%s)",146 user_greenlet,147 user_greenlet.dead,148 )149 continue150 user_classes_count[user.__class__.__name__] += 1151 return user_classes_count152 def update_state(self, new_state):153 """154 Updates the current state155 """156 # I (cyberwiz) commented out this logging, because it is too noisy even for debug level157 # Uncomment it if you are specifically debugging state transitions158 # logger.debug("Updating state to '%s', old state was '%s'" % (new_state, self.state))159 self.state = new_state160 def cpu_log_warning(self):161 """Called at the end of the test to repeat the warning & return the status"""162 if self.cpu_warning_emitted:163 logger.warning(164 "CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"165 )166 return True167 return False168 def spawn_users(self, user_classes_spawn_count: Dict[str, int], wait: bool = False):169 if self.state == STATE_INIT or self.state == STATE_STOPPED:170 self.update_state(STATE_SPAWNING)171 logger.debug(172 "Spawning additional %s (%s already running)..."173 % (json.dumps(user_classes_spawn_count), json.dumps(self.user_classes_count))174 )175 def spawn(user_class: str, spawn_count: int):176 n = 0177 while n < spawn_count:178 new_user = self.user_classes_by_name[user_class](self.environment)179 new_user.start(self.user_greenlets)180 n += 1181 if n % 10 == 0 or n == spawn_count:182 logger.debug("%i users spawned" % self.user_count)183 logger.debug("All users of class %s spawned" % user_class)184 for user_class, spawn_count in user_classes_spawn_count.items():185 spawn(user_class, spawn_count)186 if wait:187 self.user_greenlets.join()188 logger.info("All users stopped\n")189 def stop_users(self, user_classes_stop_count: Dict[str, int]):190 async_calls_to_stop = Group()191 stop_group = Group()192 for user_class, stop_count in user_classes_stop_count.items():193 if self.user_classes_count[user_class] == 0:194 continue195 to_stop = []196 for user_greenlet in self.user_greenlets:197 if len(to_stop) == stop_count:198 break199 try:200 user = user_greenlet.args[0]201 except IndexError:202 logger.error(203 "While stopping users, we encountered a user that didnt have proper args %s", user_greenlet204 )205 continue206 if isinstance(user, self.user_classes_by_name[user_class]):207 to_stop.append(user)208 if not to_stop:209 continue210 while True:211 user_to_stop: User = to_stop.pop()212 logger.debug("Stopping %s" % user_to_stop.greenlet.name)213 if user_to_stop.greenlet is greenlet.getcurrent():214 # User called runner.quit(), so don't block waiting for killing to finish215 user_to_stop.group.killone(user_to_stop.greenlet, block=False)216 elif self.environment.stop_timeout:217 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))218 stop_group.add(user_to_stop.greenlet)219 else:220 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))221 if not to_stop:222 break223 async_calls_to_stop.join()224 if not stop_group.join(timeout=self.environment.stop_timeout):225 logger.info(226 "Not all users finished their tasks & terminated in %s seconds. Stopping them..."227 % self.environment.stop_timeout228 )229 stop_group.kill(block=True)230 logger.debug(231 "%g users have been stopped, %g still running", sum(user_classes_stop_count.values()), self.user_count232 )233 def monitor_cpu(self):234 process = psutil.Process()235 while True:236 self.current_cpu_usage = process.cpu_percent()237 if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:238 logging.warning(239 "CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"240 )241 self.cpu_warning_emitted = True242 gevent.sleep(CPU_MONITOR_INTERVAL)243 def start(self, user_count: int, spawn_rate: float, wait: bool = False):244 """245 Start running a load test246 :param user_count: Total number of users to start247 :param spawn_rate: Number of users to spawn per second248 :param wait: If True calls to this method will block until all users are spawned.249 If False (the default), a greenlet that spawns the users will be250 started and the call to this method will return immediately.251 """252 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:253 self.stats.clear_all()254 self.exceptions = {}255 self.cpu_warning_emitted = False256 self.worker_cpu_warning_emitted = False257 self.environment.events.test_start.fire(environment=self.environment)258 if wait and user_count - self.user_count > spawn_rate:259 raise ValueError("wait is True but the amount of users to add is greater than the spawn rate")260 for user_class in self.user_classes:261 if self.environment.host is not None:262 user_class.host = self.environment.host263 if self.state != STATE_INIT and self.state != STATE_STOPPED:264 self.update_state(STATE_SPAWNING)265 if self._users_dispatcher is None:266 self._users_dispatcher = UsersDispatcher(267 worker_nodes=[self._local_worker_node], user_classes=self.user_classes268 )269 logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate))270 self._users_dispatcher.new_dispatch(user_count, spawn_rate)271 try:272 for dispatched_users in self._users_dispatcher:273 user_classes_spawn_count = {}274 user_classes_stop_count = {}275 user_classes_count = dispatched_users[self._local_worker_node.id]276 logger.debug("Ramping to %s" % _format_user_classes_count_for_log(user_classes_count))277 for user_class, user_class_count in user_classes_count.items():278 if self.user_classes_count[user_class] > user_class_count:279 user_classes_stop_count[user_class] = self.user_classes_count[user_class] - user_class_count280 elif self.user_classes_count[user_class] < user_class_count:281 user_classes_spawn_count[user_class] = user_class_count - self.user_classes_count[user_class]282 if wait:283 # spawn_users will block, so we need to call stop_users first284 self.stop_users(user_classes_stop_count)285 self.spawn_users(user_classes_spawn_count, wait)286 else:287 # call spawn_users before stopping the users since stop_users288 # can be blocking because of the stop_timeout289 self.spawn_users(user_classes_spawn_count, wait)290 self.stop_users(user_classes_stop_count)291 self._local_worker_node.user_classes_count = next(iter(dispatched_users.values()))292 except KeyboardInterrupt:293 # TODO: Find a cleaner way to handle that294 # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in295 # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.296 self.quit()297 logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.user_classes_count))298 self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))299 def start_shape(self):300 if self.shape_greenlet:301 logger.info("There is an ongoing shape test running. Editing is disabled")302 return303 logger.info("Shape test starting. User count and spawn rate are ignored for this type of load test")304 self.update_state(STATE_INIT)305 self.shape_greenlet = self.greenlet.spawn(self.shape_worker)306 self.shape_greenlet.link_exception(greenlet_exception_handler)307 self.environment.shape_class.reset_time()308 def shape_worker(self):309 logger.info("Shape worker starting")310 while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING:311 new_state = self.environment.shape_class.tick()312 if new_state is None:313 logger.info("Shape test stopping")314 if self.environment.parsed_options and self.environment.parsed_options.headless:315 self.quit()316 else:317 self.stop()318 self.shape_greenlet = None319 self.shape_last_state = None320 return321 elif self.shape_last_state == new_state:322 gevent.sleep(1)323 else:324 user_count, spawn_rate = new_state325 logger.info("Shape test updating to %d users at %.2f spawn rate" % (user_count, spawn_rate))326 # TODO: This `self.start()` call is blocking until the ramp-up is completed. This can leads327 # to unexpected behaviours such as the one in the following example:328 # A load test shape has the following stages:329 # stage 1: (user_count=100, spawn_rate=1) for t < 50s330 # stage 2: (user_count=120, spawn_rate=1) for t < 100s331 # stage 3: (user_count=130, spawn_rate=1) for t < 120s332 # Because the first stage will take 100s to complete, the second stage333 # will be skipped completely because the shape worker will be blocked334 # at the `self.start()` of the first stage.335 # Of couse, this isn't a problem if the load test shape is well-defined.336 # We should probably use a `gevent.timeout` with a duration a little over337 # `(user_count - prev_user_count) / spawn_rate` in order to limit the runtime338 # of each load test shape stage.339 self.start(user_count=user_count, spawn_rate=spawn_rate)340 self.shape_last_state = new_state341 def stop(self):342 """343 Stop a running load test by stopping all running users344 """345 if self.state == STATE_STOPPED:346 return347 logger.debug("Stopping all users")348 self.update_state(STATE_CLEANUP)349 # if we are currently spawning users we need to kill the spawning greenlet first350 if self.spawning_greenlet and not self.spawning_greenlet.ready():351 self.spawning_greenlet.kill(block=True)352 if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():353 # If the test was not started yet and locust is354 # stopped/quit, shape_greenlet will be None.355 if self.shape_greenlet is not None:356 self.shape_greenlet.kill(block=True)357 self.shape_greenlet = None358 self.shape_last_state = None359 self.stop_users(self.user_classes_count)360 self.update_state(STATE_STOPPED)361 self.cpu_log_warning()362 self.environment.events.test_stop.fire(environment=self.environment)363 def quit(self):364 """365 Stop any running load test and kill all greenlets for the runner366 """367 self.stop()368 self.greenlet.kill(block=True)369 def log_exception(self, node_id, msg, formatted_tb):370 key = hash(formatted_tb)371 row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})372 row["count"] += 1373 row["nodes"].add(node_id)374 self.exceptions[key] = row375 @property376 def target_user_count(self) -> int:377 return sum(self.target_user_classes_count.values())378 def register_message(self, msg_type, listener):379 """380 Register a listener for a custom message from another node381 :param msg_type: The type of the message to listen for382 :param listener: The function to execute when the message is received383 """384 self.custom_messages[msg_type] = listener385class LocalRunner(Runner):386 """387 Runner for running single process load test388 """389 def __init__(self, environment):390 """391 :param environment: Environment instance392 """393 super().__init__(environment)394 # register listener thats logs the exception for the local runner395 def on_user_error(user_instance, exception, tb):396 formatted_tb = "".join(traceback.format_tb(tb))397 self.log_exception("local", str(exception), formatted_tb)398 self.environment.events.user_error.add_listener(on_user_error)399 def start(self, user_count: int, spawn_rate: float, wait: bool = False):400 if spawn_rate > 100:401 logger.warning(402 "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"403 )404 if self.spawning_greenlet:405 # kill existing spawning_greenlet before we start a new one406 self.spawning_greenlet.kill(block=True)407 self.spawning_greenlet = self.greenlet.spawn(408 lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)409 )410 self.spawning_greenlet.link_exception(greenlet_exception_handler)411 def stop(self):412 if self.state == STATE_STOPPED:413 return414 super().stop()415 def send_message(self, msg_type, data=None):416 """417 Emulates internodal messaging by calling registered listeners418 :param msg_type: The type of the message to emulate sending419 :param data: Optional data to include420 """421 logger.debug(f"Running locally: sending {msg_type} message to self")422 if msg_type in self.custom_messages:423 listener = self.custom_messages[msg_type]424 msg = Message(msg_type, data, "local")425 listener(environment=self.environment, msg=msg)426 else:427 logger.warning(f"Unknown message type recieved: {msg_type}")428class DistributedRunner(Runner):429 def __init__(self, *args, **kwargs):430 super().__init__(*args, **kwargs)431 self._local_worker_node = None432 setup_distributed_stats_event_listeners(self.environment.events, self.stats)433class WorkerNode:434 def __init__(self, id: str, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):435 self.id: str = id436 self.state = state437 self.heartbeat = heartbeat_liveness438 self.cpu_usage = 0439 self.cpu_warning_emitted = False440 # The reported users running on the worker441 self.user_classes_count: Dict[str, int] = {}442 @property443 def user_count(self) -> int:444 return sum(self.user_classes_count.values())445class WorkerNodes(MutableMapping):446 def __init__(self):447 self._worker_nodes = {}448 def get_by_state(self, state) -> List[WorkerNode]:449 return [c for c in self.values() if c.state == state]450 @property451 def all(self) -> ValuesView[WorkerNode]:452 return self.values()453 @property454 def ready(self) -> List[WorkerNode]:455 return self.get_by_state(STATE_INIT)456 @property457 def spawning(self) -> List[WorkerNode]:458 return self.get_by_state(STATE_SPAWNING)459 @property460 def running(self) -> List[WorkerNode]:461 return self.get_by_state(STATE_RUNNING)462 @property463 def missing(self) -> List[WorkerNode]:464 return self.get_by_state(STATE_MISSING)465 def __setitem__(self, k: str, v: WorkerNode) -> None:466 self._worker_nodes[k] = v467 def __delitem__(self, k: str) -> None:468 del self._worker_nodes[k]469 def __getitem__(self, k: str) -> WorkerNode:470 return self._worker_nodes[k]471 def __len__(self) -> int:472 return len(self._worker_nodes)473 def __iter__(self) -> Iterator[WorkerNode]:474 return iter(self._worker_nodes)475class MasterRunner(DistributedRunner):476 """477 Runner used to run distributed load tests across multiple processes and/or machines.478 MasterRunner doesn't spawn any user greenlets itself. Instead it expects479 :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct480 to start and stop user greenlets. Stats sent back from the481 :class:`WorkerRunners <WorkerRunner>` will aggregated.482 """483 def __init__(self, environment, master_bind_host, master_bind_port):484 """485 :param environment: Environment instance486 :param master_bind_host: Host/interface to use for incoming worker connections487 :param master_bind_port: Port to use for incoming worker connections488 """489 super().__init__(environment)490 self.worker_cpu_warning_emitted = False491 self.master_bind_host = master_bind_host492 self.master_bind_port = master_bind_port493 self.spawn_rate: float = 0494 self.clients = WorkerNodes()495 try:496 self.server = rpc.Server(master_bind_host, master_bind_port)497 except RPCError as e:498 if e.args[0] == "Socket bind failure: Address already in use":499 port_string = (500 master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)501 )502 logger.error(503 f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"504 )505 sys.exit(1)506 else:507 raise508 self._users_dispatcher: Union[UsersDispatcher, None] = None509 self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)510 self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)511 # listener that gathers info on how many users the worker has spawned512 def on_worker_report(client_id, data):513 if client_id not in self.clients:514 logger.info("Discarded report from unrecognized worker %s", client_id)515 return516 self.clients[client_id].user_classes_count = data["user_classes_count"]517 self.environment.events.worker_report.add_listener(on_worker_report)518 # register listener that sends quit message to worker nodes519 def on_quitting(environment, **kw):520 self.quit()521 self.environment.events.quitting.add_listener(on_quitting)522 @property523 def user_count(self) -> int:524 return sum(c.user_count for c in self.clients.values())525 def cpu_log_warning(self):526 warning_emitted = Runner.cpu_log_warning(self)527 if self.worker_cpu_warning_emitted:528 logger.warning("CPU usage threshold was exceeded on workers during the test!")529 warning_emitted = True530 return warning_emitted531 def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:532 num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)533 if not num_workers:534 logger.warning(535 "You are running in distributed mode but have no worker servers connected. "536 "Please connect workers prior to swarming."537 )538 return539 for user_class in self.user_classes:540 if self.environment.host is not None:541 user_class.host = self.environment.host542 self.spawn_rate = spawn_rate543 if self._users_dispatcher is None:544 self._users_dispatcher = UsersDispatcher(545 worker_nodes=list(self.clients.values()), user_classes=self.user_classes546 )547 logger.info(548 "Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"549 % (user_count, spawn_rate, num_workers)550 )551 worker_spawn_rate = float(spawn_rate) / (num_workers or 1)552 if worker_spawn_rate > 100:553 logger.warning(554 "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"555 )556 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:557 self.stats.clear_all()558 self.exceptions = {}559 self.environment.events.test_start.fire(environment=self.environment)560 if self.environment.shape_class:561 self.environment.shape_class.reset_time()562 self.update_state(STATE_SPAWNING)563 self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate)564 try:565 for dispatched_users in self._users_dispatcher:566 dispatch_greenlets = Group()567 for worker_node_id, worker_user_classes_count in dispatched_users.items():568 data = {569 "timestamp": time.time(),570 "user_classes_count": worker_user_classes_count,571 "host": self.environment.host,572 "stop_timeout": self.environment.stop_timeout,573 "parsed_options": vars(self.environment.parsed_options)574 if self.environment.parsed_options575 else {},576 }577 dispatch_greenlets.add(578 gevent.spawn_later(579 0,580 self.server.send_to_client,581 Message("spawn", data, worker_node_id),582 )583 )584 dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values())))585 logger.debug(586 "Sending spawn messages for %g total users to %i client(s)",587 dispatched_user_count,588 len(dispatch_greenlets),589 )590 dispatch_greenlets.join()591 logger.debug(592 "Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)593 )594 self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users)595 except KeyboardInterrupt:596 # TODO: Find a cleaner way to handle that597 # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in598 # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.599 self.quit()600 # Wait a little for workers to report their users to the master601 # so that we can give an accurate log message below and fire the `spawning_complete` event602 # when the user count is really at the desired value.603 timeout = gevent.Timeout(self._wait_for_workers_report_after_ramp_up())604 timeout.start()605 try:606 while self.user_count != self.target_user_count:607 gevent.sleep()608 except gevent.Timeout:609 pass610 finally:611 timeout.cancel()612 self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))613 logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))614 @functools.lru_cache()615 def _wait_for_workers_report_after_ramp_up(self) -> float:616 """617 The amount of time to wait after a ramp-up in order for all the workers to report their state618 to the master. If not supplied by the user, it is 100ms by default. If the supplied value is a number,619 it is taken as-is. If the supplied value is a pattern like "some_number * WORKER_REPORT_INTERVAL",620 the value will be "some_number * WORKER_REPORT_INTERVAL". The most sensible value would be something621 like "1.25 * WORKER_REPORT_INTERVAL". However, some users might find it too high, so it is left622 to a really small value of 100ms by default.623 """624 locust_wait_for_workers_report_after_ramp_up = os.getenv("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP")625 if locust_wait_for_workers_report_after_ramp_up is None:626 return 0.1627 match = re.search(628 r"^(?P<coeff>(\d+)|(\d+\.\d+))[ ]*\*[ ]*WORKER_REPORT_INTERVAL$",629 locust_wait_for_workers_report_after_ramp_up,630 )631 if match is None:632 assert float(locust_wait_for_workers_report_after_ramp_up) >= 0633 return float(locust_wait_for_workers_report_after_ramp_up)634 else:635 return float(match.group("coeff")) * WORKER_REPORT_INTERVAL636 def stop(self, send_stop_to_client: bool = True):637 if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:638 logger.debug("Stopping...")639 self.update_state(STATE_STOPPING)640 if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():641 self.shape_greenlet.kill(block=True)642 self.shape_greenlet = None643 self.shape_last_state = None644 self._users_dispatcher = None645 if send_stop_to_client:646 for client in self.clients.all:647 logger.debug("Sending stop message to client %s" % client.id)648 self.server.send_to_client(Message("stop", None, client.id))649 # Give an additional 60s for all workers to stop650 timeout = gevent.Timeout(self.environment.stop_timeout or 0 + 60)651 timeout.start()652 try:653 while self.user_count != 0:654 gevent.sleep(1)655 except gevent.Timeout:656 logger.error("Timeout waiting for all workers to stop")657 finally:658 timeout.cancel()659 self.environment.events.test_stop.fire(environment=self.environment)660 def quit(self):661 self.stop(send_stop_to_client=False)662 logger.debug("Quitting...")663 for client in self.clients.all:664 logger.debug("Sending quit message to client %s" % (client.id))665 self.server.send_to_client(Message("quit", None, client.id))666 gevent.sleep(0.5) # wait for final stats report from all workers667 self.greenlet.kill(block=True)668 def check_stopped(self):669 if (670 not self.state == STATE_INIT671 and not self.state == STATE_STOPPED672 and all(map(lambda x: x.state not in (STATE_RUNNING, STATE_SPAWNING, STATE_INIT), self.clients.all))673 ):674 self.update_state(STATE_STOPPED)675 def heartbeat_worker(self):676 while True:677 gevent.sleep(HEARTBEAT_INTERVAL)678 if self.connection_broken:679 self.reset_connection()680 continue681 for client in self.clients.all:682 if client.heartbeat < 0 and client.state != STATE_MISSING:683 logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))684 client.state = STATE_MISSING685 client.user_classes_count = {}686 if self._users_dispatcher is not None:687 self._users_dispatcher.remove_worker(client)688 # TODO: If status is `STATE_RUNNING`, call self.start()689 if self.worker_count <= 0:690 logger.info("The last worker went missing, stopping test.")691 self.stop()692 self.check_stopped()693 else:694 client.heartbeat -= 1695 def reset_connection(self):696 logger.info("Reset connection to worker")697 try:698 self.server.close()699 self.server = rpc.Server(self.master_bind_host, self.master_bind_port)700 except RPCError as e:701 logger.error("Temporary failure when resetting connection: %s, will retry later." % (e))702 def client_listener(self):703 while True:704 try:705 client_id, msg = self.server.recv_from_client()706 except RPCError as e:707 logger.error("RPCError found when receiving from client: %s" % (e))708 self.connection_broken = True709 gevent.sleep(FALLBACK_INTERVAL)710 continue711 self.connection_broken = False712 msg.node_id = client_id713 if msg.type == "client_ready":714 if not msg.data:715 logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")716 continue717 elif msg.data != __version__ and msg.data != -1:718 logger.warning(719 f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"720 )721 worker_node_id = msg.node_id722 self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)723 if self._users_dispatcher is not None:724 self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])725 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:726 # TODO: Test this situation727 self.start(self.target_user_count, self.spawn_rate)728 logger.info(729 "Client %r reported as ready. Currently %i clients ready to swarm."730 % (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))731 )732 # if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:733 # # TODO: Necessary now that UsersDispatcher handles that?734 # # balance the load distribution when new client joins735 # self.start(self.target_user_count, self.spawn_rate)736 # emit a warning if the worker's clock seem to be out of sync with our clock737 # if abs(time() - msg.data["time"]) > 5.0:738 # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")739 elif msg.type == "client_stopped":740 client = self.clients[msg.node_id]741 del self.clients[msg.node_id]742 if self._users_dispatcher is not None:743 self._users_dispatcher.remove_worker(client)744 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:745 # TODO: Test this situation746 self.start(self.target_user_count, self.spawn_rate)747 logger.info("Removing %s client from running clients" % (msg.node_id))748 elif msg.type == "heartbeat":749 if msg.node_id in self.clients:750 c = self.clients[msg.node_id]751 c.heartbeat = HEARTBEAT_LIVENESS752 client_state = msg.data["state"]753 if c.state == STATE_MISSING:754 logger.info(755 "Worker %s self-healed with heartbeat, setting state to %s." % (str(c.id), client_state)756 )757 if self._users_dispatcher is not None:758 self._users_dispatcher.add_worker(worker_node=c)759 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:760 # TODO: Test this situation761 self.start(self.target_user_count, self.spawn_rate)762 c.state = client_state763 c.cpu_usage = msg.data["current_cpu_usage"]764 if not c.cpu_warning_emitted and c.cpu_usage > 90:765 self.worker_cpu_warning_emitted = True # used to fail the test in the end766 c.cpu_warning_emitted = True # used to suppress logging for this node767 logger.warning(768 "Worker %s exceeded cpu threshold (will only log this once per worker)" % (msg.node_id)769 )770 elif msg.type == "stats":771 self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)772 elif msg.type == "spawning":773 self.clients[msg.node_id].state = STATE_SPAWNING774 elif msg.type == "spawning_complete":775 self.clients[msg.node_id].state = STATE_RUNNING776 self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]777 elif msg.type == "quit":778 if msg.node_id in self.clients:779 client = self.clients[msg.node_id]780 del self.clients[msg.node_id]781 if self._users_dispatcher is not None:782 self._users_dispatcher.remove_worker(client)783 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:784 # TODO: Test this situation785 self.start(self.target_user_count, self.spawn_rate)786 logger.info(787 "Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))788 )789 if self.worker_count - len(self.clients.missing) <= 0:790 logger.info("The last worker quit, stopping test.")791 self.stop()792 if self.environment.parsed_options and self.environment.parsed_options.headless:793 self.quit()794 elif msg.type == "exception":795 self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])796 elif msg.type in self.custom_messages:797 logger.debug(f"Recieved {msg.type} message from worker {msg.node_id}")798 self.custom_messages[msg.type](environment=self.environment, msg=msg)799 else:800 logger.warning(f"Unknown message type recieved from worker {msg.node_id}: {msg.type}")801 self.check_stopped()802 @property803 def worker_count(self):804 return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)805 @property806 def reported_user_classes_count(self) -> Dict[str, int]:807 reported_user_classes_count = defaultdict(lambda: 0)808 for client in self.clients.ready + self.clients.spawning + self.clients.running:809 for name, count in client.user_classes_count.items():810 reported_user_classes_count[name] += count811 return reported_user_classes_count812 def send_message(self, msg_type, data=None, client_id=None):813 """814 Sends a message to attached worker node(s)815 :param msg_type: The type of the message to send816 :param data: Optional data to send817 :param client_id: Optional id of the target worker node.818 If None, will send to all attached workers819 """820 if client_id:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful