How to use cpu_log_warning method in locust

Best Python code snippet using locust

runners.py

Source:runners.py Github

copy

Full Screen

...102 # I (cyberwiz) commented out this logging, because it is too noisy even for debug level103 # Uncomment it if you are specifically debugging state transitions104 # logger.debug("Updating state to '%s', old state was '%s'" % (new_state, self.state))105 self.state = new_state106 def cpu_log_warning(self):107 """Called at the end of the test to repeat the warning & return the status"""108 if self.cpu_warning_emitted:109 logger.warning(110 "CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"111 )112 return True113 return False114 def weight_users(self, amount) -> List[Type[User]]:115 """116 Distributes the amount of users for each WebLocust-class according to it's weight117 returns a list "bucket" with the weighted users118 """119 bucket = []120 weight_sum = sum([user.weight for user in self.user_classes])121 residuals = {}122 for user in self.user_classes:123 if self.environment.host is not None:124 user.host = self.environment.host125 # create users depending on weight126 percent = user.weight / float(weight_sum)127 num_users = int(round(amount * percent))128 bucket.extend([user for x in range(num_users)])129 # used to keep track of the amount of rounding was done if we need130 # to add/remove some instances from bucket131 residuals[user] = amount * percent - round(amount * percent)132 if len(bucket) < amount:133 # We got too few User classes in the bucket, so we need to create a few extra users,134 # and we do this by iterating over each of the User classes - starting with the one135 # where the residual from the rounding was the largest - and creating one of each until136 # we get the correct amount137 for user in [l for l, r in sorted(residuals.items(), key=lambda x: x[1], reverse=True)][138 : amount - len(bucket)139 ]:140 bucket.append(user)141 elif len(bucket) > amount:142 # We've got too many users due to rounding errors so we need to remove some143 for user in [l for l, r in sorted(residuals.items(), key=lambda x: x[1])][: len(bucket) - amount]:144 bucket.remove(user)145 return bucket146 def spawn_users(self, spawn_count, spawn_rate, wait=False):147 bucket = self.weight_users(spawn_count)148 spawn_count = len(bucket)149 if self.state == STATE_INIT or self.state == STATE_STOPPED:150 self.update_state(STATE_SPAWNING)151 existing_count = len(self.user_greenlets)152 logger.info(153 "Spawning %i users at the rate %g users/s (%i users already running)..."154 % (spawn_count, spawn_rate, existing_count)155 )156 occurrence_count = dict([(l.__name__, 0) for l in self.user_classes])157 def spawn():158 sleep_time = 1.0 / spawn_rate159 while True:160 if not bucket:161 logger.info(162 "All users spawned: %s (%i total running)"163 % (164 ", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]),165 len(self.user_greenlets),166 )167 )168 self.environment.events.spawning_complete.fire(user_count=len(self.user_greenlets))169 return170 user_class = bucket.pop(random.randint(0, len(bucket) - 1))171 occurrence_count[user_class.__name__] += 1172 new_user = user_class(self.environment)173 new_user.start(self.user_greenlets)174 if len(self.user_greenlets) % 10 == 0:175 logger.debug("%i users spawned" % len(self.user_greenlets))176 if bucket:177 gevent.sleep(sleep_time)178 spawn()179 if wait:180 self.user_greenlets.join()181 logger.info("All users stopped\n")182 def stop_users(self, user_count, stop_rate=None):183 """184 Stop `user_count` weighted users at a rate of `stop_rate`185 """186 if user_count == 0 or stop_rate == 0:187 return188 bucket = self.weight_users(user_count)189 user_count = len(bucket)190 to_stop = []191 for user_greenlet in self.user_greenlets:192 try:193 user = user_greenlet.args[0]194 except IndexError:195 logger.error(196 "While stopping users, we encountered a user that didnt have proper args %s", user_greenlet197 )198 continue199 for user_class in bucket:200 if isinstance(user, user_class):201 to_stop.append(user)202 bucket.remove(user_class)203 break204 if not to_stop:205 return206 if stop_rate is None or stop_rate >= user_count:207 sleep_time = 0208 logger.info("Stopping %i users" % (user_count))209 else:210 sleep_time = 1.0 / stop_rate211 logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))212 async_calls_to_stop = Group()213 stop_group = Group()214 while True:215 user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1))216 logger.debug("Stopping %s" % user_to_stop._greenlet.name)217 if user_to_stop._greenlet is greenlet.getcurrent():218 # User called runner.quit(), so dont block waiting for killing to finish"219 user_to_stop._group.killone(user_to_stop._greenlet, block=False)220 elif self.environment.stop_timeout:221 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))222 stop_group.add(user_to_stop._greenlet)223 else:224 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))225 if to_stop:226 gevent.sleep(sleep_time)227 else:228 break229 async_calls_to_stop.join()230 if not stop_group.join(timeout=self.environment.stop_timeout):231 logger.info(232 "Not all users finished their tasks & terminated in %s seconds. Stopping them..."233 % self.environment.stop_timeout234 )235 stop_group.kill(block=True)236 logger.info("%i Users have been stopped, %g still running" % (user_count, len(self.user_greenlets)))237 def monitor_cpu(self):238 process = psutil.Process()239 while True:240 self.current_cpu_usage = process.cpu_percent()241 if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:242 logging.warning(243 "CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"244 )245 self.cpu_warning_emitted = True246 gevent.sleep(CPU_MONITOR_INTERVAL)247 def start(self, user_count, spawn_rate, wait=False):248 """249 Start running a load test250 :param user_count: Total number of users to start251 :param spawn_rate: Number of users to spawn per second252 :param wait: If True calls to this method will block until all users are spawned.253 If False (the default), a greenlet that spawns the users will be254 started and the call to this method will return immediately.255 """256 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:257 self.stats.clear_all()258 self.exceptions = {}259 self.cpu_warning_emitted = False260 self.worker_cpu_warning_emitted = False261 self.target_user_count = user_count262 if self.state != STATE_INIT and self.state != STATE_STOPPED:263 logger.debug(264 "Updating running test with %d users, %.2f spawn rate and wait=%r" % (user_count, spawn_rate, wait)265 )266 self.update_state(STATE_SPAWNING)267 if self.user_count > user_count:268 # Stop some users269 stop_count = self.user_count - user_count270 self.stop_users(stop_count, spawn_rate)271 elif self.user_count < user_count:272 # Spawn some users273 spawn_count = user_count - self.user_count274 self.spawn_users(spawn_count=spawn_count, spawn_rate=spawn_rate)275 else:276 self.environment.events.spawning_complete.fire(user_count=self.user_count)277 else:278 self.spawn_rate = spawn_rate279 self.spawn_users(user_count, spawn_rate=spawn_rate, wait=wait)280 def start_shape(self):281 if self.shape_greenlet:282 logger.info("There is an ongoing shape test running. Editing is disabled")283 return284 logger.info("Shape test starting. User count and spawn rate are ignored for this type of load test")285 self.update_state(STATE_INIT)286 self.shape_greenlet = self.greenlet.spawn(self.shape_worker)287 self.shape_greenlet.link_exception(greenlet_exception_handler)288 self.environment.shape_class.reset_time()289 def shape_worker(self):290 logger.info("Shape worker starting")291 while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING:292 new_state = self.environment.shape_class.tick()293 if new_state is None:294 logger.info("Shape test stopping")295 if self.environment.parsed_options and self.environment.parsed_options.headless:296 self.quit()297 else:298 self.stop()299 elif self.shape_last_state == new_state:300 gevent.sleep(1)301 else:302 user_count, spawn_rate = new_state303 logger.info("Shape test updating to %d users at %.2f spawn rate" % (user_count, spawn_rate))304 self.start(user_count=user_count, spawn_rate=spawn_rate)305 self.shape_last_state = new_state306 def stop(self):307 """308 Stop a running load test by stopping all running users309 """310 logger.debug("Stopping all users")311 self.update_state(STATE_CLEANUP)312 # if we are currently spawning users we need to kill the spawning greenlet first313 if self.spawning_greenlet and not self.spawning_greenlet.ready():314 self.spawning_greenlet.kill(block=True)315 self.stop_users(self.user_count)316 self.update_state(STATE_STOPPED)317 self.cpu_log_warning()318 def quit(self):319 """320 Stop any running load test and kill all greenlets for the runner321 """322 self.stop()323 self.greenlet.kill(block=True)324 def log_exception(self, node_id, msg, formatted_tb):325 key = hash(formatted_tb)326 row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})327 row["count"] += 1328 row["nodes"].add(node_id)329 self.exceptions[key] = row330class LocalRunner(Runner):331 """332 Runner for running single process load test333 """334 def __init__(self, environment):335 """336 :param environment: Environment instance337 """338 super().__init__(environment)339 # register listener thats logs the exception for the local runner340 def on_user_error(user_instance, exception, tb):341 formatted_tb = "".join(traceback.format_tb(tb))342 self.log_exception("local", str(exception), formatted_tb)343 self.environment.events.user_error.add_listener(on_user_error)344 def start(self, user_count, spawn_rate, wait=False):345 self.target_user_count = user_count346 if spawn_rate > 100:347 logger.warning(348 "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"349 )350 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:351 # if we're not already running we'll fire the test_start event352 self.environment.events.test_start.fire(environment=self.environment)353 if self.spawning_greenlet:354 # kill existing spawning_greenlet before we start a new one355 self.spawning_greenlet.kill(block=True)356 self.spawning_greenlet = self.greenlet.spawn(357 lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)358 )359 self.spawning_greenlet.link_exception(greenlet_exception_handler)360 def stop(self):361 if self.state == STATE_STOPPED:362 return363 super().stop()364 self.environment.events.test_stop.fire(environment=self.environment)365class DistributedRunner(Runner):366 def __init__(self, *args, **kwargs):367 super().__init__(*args, **kwargs)368 setup_distributed_stats_event_listeners(self.environment.events, self.stats)369class WorkerNode:370 def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):371 self.id = id372 self.state = state373 self.user_count = 0374 self.heartbeat = heartbeat_liveness375 self.cpu_usage = 0376 self.cpu_warning_emitted = False377class MasterRunner(DistributedRunner):378 """379 Runner used to run distributed load tests across multiple processes and/or machines.380 MasterRunner doesn't spawn any user greenlets itself. Instead it expects381 :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct382 to start and stop user greenlets. Stats sent back from the383 :class:`WorkerRunners <WorkerRunner>` will aggregated.384 """385 def __init__(self, environment, master_bind_host, master_bind_port):386 """387 :param environment: Environment instance388 :param master_bind_host: Host/interface to use for incoming worker connections389 :param master_bind_port: Port to use for incoming worker connections390 """391 super().__init__(environment)392 self.worker_cpu_warning_emitted = False393 self.master_bind_host = master_bind_host394 self.master_bind_port = master_bind_port395 class WorkerNodesDict(dict):396 def get_by_state(self, state):397 return [c for c in self.values() if c.state == state]398 @property399 def all(self):400 return self.values()401 @property402 def ready(self):403 return self.get_by_state(STATE_INIT)404 @property405 def spawning(self):406 return self.get_by_state(STATE_SPAWNING)407 @property408 def running(self):409 return self.get_by_state(STATE_RUNNING)410 @property411 def missing(self):412 return self.get_by_state(STATE_MISSING)413 self.clients = WorkerNodesDict()414 try:415 self.server = rpc.Server(master_bind_host, master_bind_port)416 except RPCError as e:417 if e.args[0] == "Socket bind failure: Address already in use":418 port_string = (419 master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)420 )421 logger.error(422 f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"423 )424 sys.exit(1)425 else:426 raise427 self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)428 self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)429 # listener that gathers info on how many users the worker has spawned430 def on_worker_report(client_id, data):431 if client_id not in self.clients:432 logger.info("Discarded report from unrecognized worker %s", client_id)433 return434 self.clients[client_id].user_count = data["user_count"]435 self.environment.events.worker_report.add_listener(on_worker_report)436 # register listener that sends quit message to worker nodes437 def on_quitting(environment, **kw):438 self.quit()439 self.environment.events.quitting.add_listener(on_quitting)440 @property441 def user_count(self):442 return sum([c.user_count for c in self.clients.values()])443 def cpu_log_warning(self):444 warning_emitted = Runner.cpu_log_warning(self)445 if self.worker_cpu_warning_emitted:446 logger.warning("CPU usage threshold was exceeded on workers during the test!")447 warning_emitted = True448 return warning_emitted449 def start(self, user_count, spawn_rate):450 self.target_user_count = user_count451 num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)452 if not num_workers:453 logger.warning(454 "You are running in distributed mode but have no worker servers connected. "455 "Please connect workers prior to swarming."456 )457 return458 self.spawn_rate = spawn_rate...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful