How to use on_test_stopping method in locust

Best Python code snippet using locust

test_runners.py

Source:test_runners.py Github

copy

Full Screen

...180 def my_task(self):181 pass182 environment = Environment(user_classes=[MyUser])183 @environment.events.test_stopping.add_listener184 def on_test_stopping(*_, **__):185 self.runner_stopping = True186 @environment.events.test_stop.add_listener187 def on_test_stop(*_, **__):188 self.runner_stopped = True189 runner = LocalRunner(environment)190 runner.start(user_count=3, spawn_rate=3, wait=False)191 self.assertFalse(self.runner_stopping)192 self.assertFalse(self.runner_stopped)193 runner.stop()194 self.assertTrue(self.runner_stopping)195 self.assertTrue(self.runner_stopped)196 def test_stop_event_quit(self):197 class MyUser(User):198 wait_time = constant(1)199 @task200 def my_task(self):201 pass202 environment = Environment(user_classes=[MyUser])203 @environment.events.test_stopping.add_listener204 def on_test_stopping(*_, **__):205 self.runner_stopping = True206 @environment.events.test_stop.add_listener207 def on_test_stop(*_, **__):208 self.runner_stopped = True209 runner = LocalRunner(environment)210 runner.start(user_count=3, spawn_rate=3, wait=False)211 self.assertFalse(self.runner_stopping)212 self.assertFalse(self.runner_stopped)213 runner.quit()214 self.assertTrue(self.runner_stopping)215 self.assertTrue(self.runner_stopped)216 def test_stop_event_stop_and_quit(self):217 class MyUser(User):218 wait_time = constant(1)219 @task220 def my_task(self):221 pass222 environment = Environment(user_classes=[MyUser])223 @environment.events.test_stopping.add_listener224 def on_test_stopping(*_, **__):225 self.runner_stopping = True226 @environment.events.test_stop.add_listener227 def on_test_stop(*_, **__):228 self.runner_stopped = True229 runner = LocalRunner(environment)230 runner.start(user_count=3, spawn_rate=3, wait=False)231 self.assertFalse(self.runner_stopping)232 self.assertFalse(self.runner_stopped)233 runner.stop()234 runner.quit()235 self.assertTrue(self.runner_stopping)236 self.assertTrue(self.runner_stopped)237 def test_stopping_event(self):238 on_stop_called = [False]239 class MyUser(User):240 on_stop_called = False241 wait_time = constant(1)242 @task243 def my_task(self):244 pass245 def on_stop(self):246 MyUser.on_stop_called = True247 environment = Environment(user_classes=[MyUser])248 @environment.events.test_stopping.add_listener249 def on_test_stopping(*_, **__):250 on_stop_called[0] = MyUser.on_stop_called251 self.runner_stopping = True252 runner = LocalRunner(environment)253 runner.start(user_count=3, spawn_rate=3, wait=False)254 runner.quit()255 self.assertTrue(self.runner_stopping)256 self.assertFalse(on_stop_called[0])257 def test_change_user_count_during_spawning(self):258 class MyUser(User):259 wait_time = constant(1)260 @task261 def my_task(self):262 pass263 environment = Environment(user_classes=[MyUser])264 runner = LocalRunner(environment)265 runner.start(user_count=10, spawn_rate=5, wait=False)266 sleep(0.6)267 runner.start(user_count=5, spawn_rate=5, wait=False)268 runner.spawning_greenlet.join()269 self.assertEqual(5, len(runner.user_greenlets))270 runner.quit()271 def test_reset_stats(self):272 class MyUser(User):273 @task274 class task_set(TaskSet):275 @task276 def my_task(self):277 self.user.environment.events.request.fire(278 request_type="GET",279 name="/test",280 response_time=666,281 response_length=1337,282 exception=None,283 context={},284 )285 # Make sure each user only run this task once during the test286 sleep(30)287 environment = Environment(user_classes=[MyUser], reset_stats=True)288 runner = LocalRunner(environment)289 runner.start(user_count=6, spawn_rate=1, wait=False)290 sleep(3)291 self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)292 sleep(3.25)293 self.assertLessEqual(runner.stats.get("/test", "GET").num_requests, 1)294 runner.quit()295 def test_no_reset_stats(self):296 class MyUser(User):297 @task298 class task_set(TaskSet):299 @task300 def my_task(self):301 self.user.environment.events.request.fire(302 request_type="GET",303 name="/test",304 response_time=666,305 response_length=1337,306 exception=None,307 context={},308 )309 sleep(2)310 environment = Environment(reset_stats=False, user_classes=[MyUser])311 runner = LocalRunner(environment)312 runner.start(user_count=6, spawn_rate=12, wait=False)313 sleep(0.25)314 self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)315 sleep(0.3)316 self.assertEqual(6, runner.stats.get("/test", "GET").num_requests)317 runner.quit()318 def test_runner_reference_on_environment(self):319 env = Environment()320 runner = env.create_local_runner()321 self.assertEqual(env, runner.environment)322 self.assertEqual(runner, env.runner)323 def test_users_can_call_runner_quit_without_deadlocking(self):324 class BaseUser(User):325 stop_triggered = False326 @task327 def trigger(self):328 self.environment.runner.quit()329 def on_stop(self):330 BaseUser.stop_triggered = True331 runner = Environment(user_classes=[BaseUser]).create_local_runner()332 users = runner.spawn_users({BaseUser.__name__: 1}, wait=False)333 self.assertEqual(1, len(users))334 timeout = gevent.Timeout(0.5)335 timeout.start()336 try:337 runner.greenlet.join()338 except gevent.Timeout:339 self.fail("Got Timeout exception, runner must have hung somehow.")340 finally:341 timeout.cancel()342 self.assertTrue(BaseUser.stop_triggered)343 def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):344 class BaseUser(User):345 stop_count = 0346 @task347 def trigger(self):348 pass349 def on_stop(self):350 gevent.sleep(0.1)351 BaseUser.stop_count += 1352 runner = Environment(user_classes=[BaseUser]).create_local_runner()353 users = runner.spawn_users({BaseUser.__name__: 10}, wait=False)354 self.assertEqual(10, len(users))355 timeout = gevent.Timeout(0.3)356 timeout.start()357 try:358 runner.quit()359 except gevent.Timeout:360 self.fail("Got Timeout exception, runner must have hung somehow.")361 finally:362 timeout.cancel()363 self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop364 def test_stop_users_with_spawn_rate(self):365 """366 The spawn rate does not have an effect on the rate at which the users are stopped.367 It is expected that the excess users will be stopped as soon as possible in parallel368 (while respecting the stop_timeout).369 """370 class MyUser(User):371 wait_time = constant(1)372 @task373 def my_task(self):374 pass375 environment = Environment(user_classes=[MyUser])376 runner = LocalRunner(environment)377 # Start load test, wait for users to start, then trigger ramp down378 ts = time.time()379 runner.start(10, 10, wait=False)380 runner.spawning_greenlet.join()381 delta = time.time() - ts382 self.assertTrue(383 0 <= delta <= 0.05, f"Expected user count to increase to 10 instantaneously, instead it took {delta:f}"384 )385 self.assertTrue(386 runner.user_count == 10, "User count has not decreased correctly to 2, it is : %i" % runner.user_count387 )388 ts = time.time()389 runner.start(2, 4, wait=False)390 runner.spawning_greenlet.join()391 delta = time.time() - ts392 self.assertTrue(0 <= delta <= 1.05, f"Expected user count to decrease to 2 in 1s, instead it took {delta:f}")393 self.assertTrue(394 runner.user_count == 2, "User count has not decreased correctly to 2, it is : %i" % runner.user_count395 )396 def test_attributes_populated_when_calling_start(self):397 class MyUser1(User):398 wait_time = constant(0)399 @task400 def my_task(self):401 pass402 class MyUser2(User):403 wait_time = constant(0)404 @task405 def my_task(self):406 pass407 environment = Environment(user_classes=[MyUser1, MyUser2])408 runner = LocalRunner(environment)409 runner.start(user_count=10, spawn_rate=5, wait=False)410 runner.spawning_greenlet.join()411 self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)412 runner.start(user_count=5, spawn_rate=5, wait=False)413 runner.spawning_greenlet.join()414 self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)415 runner.quit()416 def test_user_classes_count(self):417 class MyUser1(User):418 wait_time = constant(0)419 @task420 def my_task(self):421 pass422 class MyUser2(User):423 wait_time = constant(0)424 @task425 def my_task(self):426 pass427 environment = Environment(user_classes=[MyUser1, MyUser2])428 runner = LocalRunner(environment)429 runner.start(user_count=10, spawn_rate=5, wait=False)430 runner.spawning_greenlet.join()431 self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)432 runner.start(user_count=5, spawn_rate=5, wait=False)433 runner.spawning_greenlet.join()434 self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)435 runner.quit()436 def test_host_class_attribute_from_web(self):437 """If host is left empty from the webUI, we should not use it"""438 class MyUser1(User):439 host = "https://host1.com"440 @task441 def my_task(self):442 pass443 class MyUser2(User):444 host = "https://host2.com"445 @task446 def my_task(self):447 pass448 opts = mocked_options()449 # If left empty on the web, we get an empty string as host450 opts.host = ""451 environment = create_environment([MyUser1, MyUser2], opts)452 runner = LocalRunner(environment)453 # Start the runner to trigger problematic code454 runner.start(user_count=2, spawn_rate=1, wait=False)455 runner.spawning_greenlet.join()456 # Make sure we did not overwrite the host variable457 self.assertEqual(MyUser1.host, "https://host1.com")458 self.assertEqual(MyUser2.host, "https://host2.com")459 runner.quit()460 def test_custom_message(self):461 class MyUser(User):462 wait_time = constant(1)463 @task464 def my_task(self):465 pass466 test_custom_msg = [False]467 test_custom_msg_data = [{}]468 def on_custom_msg(msg, **kw):469 test_custom_msg[0] = True470 test_custom_msg_data[0] = msg.data471 environment = Environment(user_classes=[MyUser])472 runner = LocalRunner(environment)473 runner.register_message("test_custom_msg", on_custom_msg)474 runner.send_message("test_custom_msg", {"test_data": 123})475 self.assertTrue(test_custom_msg[0])476 self.assertEqual(123, test_custom_msg_data[0]["test_data"])477 def test_undefined_custom_message(self):478 class MyUser(User):479 wait_time = constant(1)480 @task481 def my_task(self):482 pass483 test_custom_msg = [False]484 def on_custom_msg(msg, **kw):485 test_custom_msg[0] = True486 environment = Environment(user_classes=[MyUser])487 runner = LocalRunner(environment)488 runner.register_message("test_custom_msg", on_custom_msg)489 runner.send_message("test_different_custom_msg")490 self.assertFalse(test_custom_msg[0])491 self.assertEqual(1, len(self.mocked_log.warning))492 msg = self.mocked_log.warning[0]493 self.assertIn("Unknown message type received", msg)494 def test_swarm_endpoint_is_non_blocking(self):495 class TestUser1(User):496 @task497 def my_task(self):498 gevent.sleep(600)499 class TestUser2(User):500 @task501 def my_task(self):502 gevent.sleep(600)503 stop_timeout = 0504 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)505 local_runner = env.create_local_runner()506 web_ui = env.create_web_ui("127.0.0.1", 0)507 gevent.sleep(0.1)508 ts = time.perf_counter()509 response = requests.post(510 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",511 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},512 )513 self.assertEqual(200, response.status_code)514 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")515 ts = time.perf_counter()516 while local_runner.state != STATE_RUNNING:517 self.assertTrue(time.perf_counter() - ts <= 4, local_runner.state)518 gevent.sleep(0.1)519 self.assertTrue(3 <= time.perf_counter() - ts <= 5)520 self.assertEqual(local_runner.user_count, 20)521 local_runner.stop()522 web_ui.stop()523 def test_can_call_stop_endpoint_if_currently_swarming(self):524 class TestUser1(User):525 @task526 def my_task(self):527 gevent.sleep(600)528 class TestUser2(User):529 @task530 def my_task(self):531 gevent.sleep(600)532 stop_timeout = 5533 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)534 local_runner = env.create_local_runner()535 web_ui = env.create_web_ui("127.0.0.1", 0)536 gevent.sleep(0.1)537 ts = time.perf_counter()538 response = requests.post(539 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",540 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},541 )542 self.assertEqual(200, response.status_code)543 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")544 gevent.sleep(5)545 self.assertEqual(local_runner.state, STATE_SPAWNING)546 self.assertLessEqual(local_runner.user_count, 10)547 ts = time.perf_counter()548 response = requests.get(549 f"http://127.0.0.1:{web_ui.server.server_port}/stop",550 )551 self.assertEqual(200, response.status_code)552 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")553 ts = time.perf_counter()554 while local_runner.state != STATE_STOPPED:555 self.assertTrue(time.perf_counter() - ts <= 2)556 gevent.sleep(0.1)557 self.assertLessEqual(local_runner.user_count, 0)558 local_runner.stop()559 web_ui.stop()560 def test_target_user_count_is_set_before_ramp_up(self):561 """Test for https://github.com/locustio/locust/issues/1883"""562 class MyUser1(User):563 wait_time = constant(0)564 @task565 def my_task(self):566 pass567 environment = Environment(user_classes=[MyUser1])568 runner = LocalRunner(environment)569 test_start_event_fired = [False]570 @environment.events.test_start.add_listener571 def on_test_start(*args, **kwargs):572 test_start_event_fired[0] = True573 self.assertEqual(runner.target_user_count, 3)574 runner.start(user_count=3, spawn_rate=1, wait=False)575 gevent.sleep(1)576 self.assertEqual(runner.target_user_count, 3)577 self.assertEqual(runner.user_count, 1)578 # However, target_user_classes_count is only updated at the end of the ramp-up/ramp-down579 # due to the way it is implemented.580 self.assertDictEqual({}, runner.target_user_classes_count)581 runner.spawning_greenlet.join()582 self.assertEqual(runner.target_user_count, 3)583 self.assertEqual(runner.user_count, 3)584 self.assertDictEqual({"MyUser1": 3}, runner.target_user_classes_count)585 runner.quit()586 self.assertTrue(test_start_event_fired[0])587 def test_stop_users_count(self):588 user_count = 10589 class BaseUser1(User):590 wait_time = constant(1)591 @task592 def task_a(self):593 pass594 class BaseUser2(BaseUser1):595 wait_time = constant(1)596 runner = Environment(user_classes=[BaseUser1, BaseUser2]).create_local_runner()597 runner.start(user_count=user_count, spawn_rate=10)598 sleep(1)599 self.assertEqual(user_count, runner.user_count)600 runner.stop()601 sleep(1)602 self.assertEqual(0, runner.user_count)603class TestMasterWorkerRunners(LocustTestCase):604 def test_distributed_integration_run(self):605 """606 Full integration test that starts both a MasterRunner and three WorkerRunner instances607 and makes sure that their stats is sent to the Master.608 """609 class TestUser(User):610 wait_time = constant(0.1)611 @task612 def incr_stats(self):613 self.environment.events.request.fire(614 request_type="GET",615 name="/",616 response_time=1337,617 response_length=666,618 exception=None,619 context={},620 )621 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):622 # start a Master runner623 master_env = Environment(user_classes=[TestUser])624 master = master_env.create_master_runner("*", 0)625 sleep(0)626 # start 3 Worker runners627 workers = []628 for i in range(3):629 worker_env = Environment(user_classes=[TestUser])630 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)631 workers.append(worker)632 # give workers time to connect633 sleep(0.1)634 # issue start command that should trigger TestUsers to be spawned in the Workers635 master.start(6, spawn_rate=1000)636 sleep(0.1)637 # check that worker nodes have started locusts638 for worker in workers:639 self.assertEqual(2, worker.user_count)640 # give time for users to generate stats, and stats to be sent to master641 sleep(1)642 master.quit()643 # make sure users are killed644 for worker in workers:645 self.assertEqual(0, worker.user_count)646 # check that stats are present in master647 self.assertGreater(648 master_env.runner.stats.total.num_requests,649 20,650 "For some reason the master node's stats has not come in",651 )652 def test_distributed_rebalanced_integration_run(self):653 """654 Full integration test that starts both a MasterRunner and three WorkerRunner instances655 and makes sure that their stats is sent to the Master.656 """657 class TestUser(User):658 wait_time = constant(0.1)659 @task660 def incr_stats(self):661 self.environment.events.request.fire(662 request_type="GET",663 name="/",664 response_time=1337,665 response_length=666,666 exception=None,667 context={},668 )669 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(670 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"671 ):672 # start a Master runner673 options = parse_options(["--enable-rebalancing"])674 master_env = Environment(user_classes=[TestUser], parsed_options=options)675 master = master_env.create_master_runner("*", 0)676 sleep(0)677 # start 3 Worker runners678 workers = []679 def add_worker():680 worker_env = Environment(user_classes=[TestUser])681 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)682 workers.append(worker)683 for i in range(3):684 add_worker()685 # give workers time to connect686 sleep(0.1)687 # issue start command that should trigger TestUsers to be spawned in the Workers688 master.start(6, spawn_rate=1000)689 sleep(0.1)690 # check that worker nodes have started locusts691 for worker in workers:692 self.assertEqual(2, worker.user_count)693 # give time for users to generate stats, and stats to be sent to master694 # Add 1 more workers (should be 4 now)695 add_worker()696 @retry(AssertionError, tries=10, delay=0.5)697 def check_rebalanced_true():698 for worker in workers:699 self.assertTrue(worker.user_count > 0)700 # Check that all workers have a user count > 0 at least701 check_rebalanced_true()702 # Add 2 more workers (should be 6 now)703 add_worker()704 add_worker()705 @retry(AssertionError, tries=10, delay=0.5)706 def check_rebalanced_equals():707 for worker in workers:708 self.assertEqual(1, worker.user_count)709 # Check that all workers have a user count = 1 now710 check_rebalanced_equals()711 # Simulate that some workers are missing by "killing" them abrutly712 for i in range(3):713 workers[i].greenlet.kill(block=True)714 @retry(AssertionError, tries=10, delay=1)715 def check_master_worker_missing_count():716 self.assertEqual(3, len(master.clients.missing))717 # Check that master detected the missing workers718 check_master_worker_missing_count()719 @retry(AssertionError, tries=10, delay=1)720 def check_remaing_worker_new_user_count():721 for i in range(3, 6):722 self.assertEqual(2, workers[i].user_count)723 # Check that remaining workers have a new count of user due to rebalancing.724 check_remaing_worker_new_user_count()725 sleep(1)726 # Finally quit and check states of remaining workers.727 master.quit()728 # make sure users are killed on remaining workers729 for i in range(3, 6):730 self.assertEqual(0, workers[i].user_count)731 # check that stats are present in master732 self.assertGreater(733 master_env.runner.stats.total.num_requests,734 20,735 "For some reason the master node's stats has not come in",736 )737 def test_distributed_run_with_custom_args(self):738 """739 Full integration test that starts both a MasterRunner and three WorkerRunner instances740 and makes sure that their stats is sent to the Master.741 """742 class TestUser(User):743 wait_time = constant(0.1)744 @task745 def incr_stats(self):746 self.environment.events.request.fire(747 request_type="GET",748 name=self.environment.parsed_options.my_str_argument,749 response_time=self.environment.parsed_options.my_int_argument,750 response_length=666,751 exception=None,752 context={},753 )754 @locust.events.init_command_line_parser.add_listener755 def _(parser, **kw):756 parser.add_argument("--my-int-argument", type=int)757 parser.add_argument("--my-str-argument", type=str, default="NOOOO")758 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):759 # start a Master runner760 master_env = Environment(user_classes=[TestUser])761 master = master_env.create_master_runner("*", 0)762 master_env.parsed_options = parse_options(763 [764 "--my-int-argument",765 "42",766 "--my-str-argument",767 "cool-string",768 ]769 )770 sleep(0)771 # start 3 Worker runners772 workers = []773 for i in range(3):774 worker_env = Environment(user_classes=[TestUser])775 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)776 workers.append(worker)777 # give workers time to connect778 sleep(0.1)779 # issue start command that should trigger TestUsers to be spawned in the Workers780 master.start(6, spawn_rate=1000)781 sleep(0.1)782 # check that worker nodes have started locusts783 for worker in workers:784 self.assertEqual(2, worker.user_count)785 # give time for users to generate stats, and stats to be sent to master786 sleep(1)787 master.quit()788 # make sure users are killed789 for worker in workers:790 self.assertEqual(0, worker.user_count)791 self.assertEqual(master_env.runner.stats.total.max_response_time, 42)792 self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)793 def test_test_stop_event(self):794 class TestUser(User):795 wait_time = constant(0.1)796 @task797 def my_task(l):798 pass799 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):800 # start a Master runner801 master_env = Environment(user_classes=[TestUser])802 test_stop_count = {"master": 0, "worker": 0}803 @master_env.events.test_stop.add_listener804 def _(*args, **kwargs):805 test_stop_count["master"] += 1806 master = master_env.create_master_runner("*", 0)807 sleep(0)808 # start a Worker runner809 worker_env = Environment(user_classes=[TestUser])810 @worker_env.events.test_stop.add_listener811 def _(*args, **kwargs):812 test_stop_count["worker"] += 1813 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)814 # give worker time to connect815 sleep(0.1)816 # issue start command that should trigger TestUsers to be spawned in the Workers817 master.start(2, spawn_rate=1000)818 sleep(0.1)819 # check that worker nodes have started locusts820 self.assertEqual(2, worker.user_count)821 # give time for users to generate stats, and stats to be sent to master822 sleep(0.1)823 master_env.events.quitting.fire(environment=master_env, reverse=True)824 master.quit()825 sleep(0.1)826 # make sure users are killed827 self.assertEqual(0, worker.user_count)828 # check the test_stop event was called one time in master and one time in worker829 self.assertEqual(830 1,831 test_stop_count["master"],832 "The test_stop event was not called exactly one time in the master node",833 )834 self.assertEqual(835 1,836 test_stop_count["worker"],837 "The test_stop event was not called exactly one time in the worker node",838 )839 def test_distributed_shape(self):840 """841 Full integration test that starts both a MasterRunner and three WorkerRunner instances842 and tests a basic LoadTestShape with scaling up and down users843 """844 class TestUser(User):845 @task846 def my_task(self):847 pass848 class TestShape(LoadTestShape):849 def tick(self):850 run_time = self.get_run_time()851 if run_time < 2:852 return 9, 9853 elif run_time < 4:854 return 21, 21855 elif run_time < 6:856 return 3, 21857 else:858 return None859 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):860 test_shape = TestShape()861 master_env = Environment(user_classes=[TestUser], shape_class=test_shape)862 master_env.shape_class.reset_time()863 master = master_env.create_master_runner("*", 0)864 workers = []865 for i in range(3):866 worker_env = Environment(user_classes=[TestUser])867 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)868 workers.append(worker)869 # Give workers time to connect870 sleep(0.1)871 # Start a shape test872 master.start_shape()873 sleep(1)874 # Ensure workers have connected and started the correct amount of users875 for worker in workers:876 self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")877 self.assertEqual(878 9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"879 )880 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})881 # Ensure new stage with more users has been reached882 sleep(2)883 for worker in workers:884 self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")885 self.assertEqual(886 21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"887 )888 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})889 # Ensure new stage with less users has been reached890 sleep(2)891 for worker in workers:892 self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")893 self.assertEqual(894 3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"895 )896 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})897 # Ensure test stops at the end898 sleep(2)899 for worker in workers:900 self.assertEqual(0, worker.user_count, "Shape test has not stopped")901 self.assertEqual(902 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"903 )904 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})905 self.assertEqual("stopped", master.state)906 def test_distributed_shape_with_fixed_users(self):907 """908 Full integration test that starts both a MasterRunner and three WorkerRunner instances909 and tests a basic LoadTestShape with scaling up and down users with 'fixed count' users910 """911 class TestUser(User):912 @task913 def my_task(self):914 pass915 class FixedUser1(User):916 fixed_count = 1917 @task918 def my_task(self):919 pass920 class FixedUser2(User):921 fixed_count = 11922 @task923 def my_task(self):924 pass925 class TestShape(LoadTestShape):926 def tick(self):927 run_time = self.get_run_time()928 if run_time < 1:929 return 12, 12930 elif run_time < 2:931 return 36, 24932 elif run_time < 3:933 return 12, 24934 else:935 return None936 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):937 test_shape = TestShape()938 master_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2], shape_class=test_shape)939 master_env.shape_class.reset_time()940 master = master_env.create_master_runner("*", 0)941 workers = []942 for _ in range(3):943 worker_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2])944 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)945 workers.append(worker)946 # Give workers time to connect947 sleep(0.1)948 # Start a shape test949 master.start_shape()950 sleep(1)951 # Ensure workers have connected and started the correct amount of users (fixed is spawn first)952 for worker in workers:953 self.assertEqual(4, worker.user_count, "Shape test has not reached stage 1")954 self.assertEqual(955 12, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"956 )957 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})958 # Ensure new stage with more users has been reached959 sleep(1)960 for worker in workers:961 self.assertEqual(12, worker.user_count, "Shape test has not reached stage 2")962 self.assertEqual(963 36, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"964 )965 self.assertDictEqual(966 master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 24}967 )968 # Ensure new stage with less users has been reached969 # and expected count of the fixed users is present970 sleep(1)971 for worker in workers:972 self.assertEqual(4, worker.user_count, "Shape test has not reached stage 3")973 self.assertEqual(974 12, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"975 )976 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})977 # Ensure test stops at the end978 sleep(0.5)979 for worker in workers:980 self.assertEqual(0, worker.user_count, "Shape test has not stopped")981 self.assertEqual(982 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"983 )984 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 0, "FixedUser2": 0, "TestUser": 0})985 try:986 with gevent.Timeout(3.0):987 while master.state != STATE_STOPPED:988 sleep(0.1)989 finally:990 self.assertEqual(STATE_STOPPED, master.state)991 def test_distributed_shape_with_stop_timeout(self):992 """993 Full integration test that starts both a MasterRunner and five WorkerRunner instances994 and tests a basic LoadTestShape with scaling up and down users995 """996 class TestUser1(User):997 def start(self, group: Group):998 gevent.sleep(0.5)999 return super().start(group)1000 @task1001 def my_task(self):1002 gevent.sleep(0)1003 class TestUser2(User):1004 def start(self, group: Group):1005 gevent.sleep(0.5)1006 return super().start(group)1007 @task1008 def my_task(self):1009 gevent.sleep(600)1010 class TestUser3(User):1011 def start(self, group: Group):1012 gevent.sleep(0.5)1013 return super().start(group)1014 @task1015 def my_task(self):1016 gevent.sleep(600)1017 class TestShape(LoadTestShape):1018 def tick(self):1019 run_time = self.get_run_time()1020 if run_time < 10:1021 return 15, 31022 elif run_time < 30:1023 return 5, 101024 else:1025 return None1026 locust_worker_additional_wait_before_ready_after_stop = 51027 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1028 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1029 str(locust_worker_additional_wait_before_ready_after_stop),1030 ):1031 stop_timeout = 51032 master_env = Environment(1033 user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout1034 )1035 master_env.shape_class.reset_time()1036 master = master_env.create_master_runner("*", 0)1037 workers = []1038 for i in range(5):1039 worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])1040 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1041 workers.append(worker)1042 # Give workers time to connect1043 sleep(0.1)1044 self.assertEqual(STATE_INIT, master.state)1045 self.assertEqual(5, len(master.clients.ready))1046 # Re-order `workers` so that it is sorted by `id`.1047 # This is required because the dispatch is done1048 # on the sorted workers.1049 workers = sorted(workers, key=lambda w: w.client_id)1050 # Start a shape test1051 master.start_shape()1052 # First stage1053 ts = time.time()1054 while master.state != STATE_SPAWNING:1055 self.assertTrue(time.time() - ts <= 1, master.state)1056 sleep()1057 sleep(5 - (time.time() - ts)) # runtime = 5s1058 ts = time.time()1059 while master.state != STATE_RUNNING:1060 self.assertTrue(time.time() - ts <= 1, master.state)1061 sleep()1062 self.assertEqual(STATE_RUNNING, master.state)1063 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1064 w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1065 w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1066 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1067 w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1068 self.assertDictEqual(w1, workers[0].user_classes_count)1069 self.assertDictEqual(w2, workers[1].user_classes_count)1070 self.assertDictEqual(w3, workers[2].user_classes_count)1071 self.assertDictEqual(w4, workers[3].user_classes_count)1072 self.assertDictEqual(w5, workers[4].user_classes_count)1073 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1074 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1075 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1076 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1077 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1078 sleep(5 - (time.time() - ts)) # runtime = 10s1079 # Fourth stage1080 ts = time.time()1081 while master.state != STATE_SPAWNING:1082 self.assertTrue(time.time() - ts <= 1, master.state)1083 sleep()1084 sleep(5 - (time.time() - ts)) # runtime = 15s1085 # Fourth stage - Excess TestUser1 have been stopped but1086 # TestUser2/TestUser3 have not reached stop timeout yet, so1087 # their number are unchanged1088 ts = time.time()1089 while master.state != STATE_RUNNING:1090 self.assertTrue(time.time() - ts <= 1, master.state)1091 sleep()1092 delta = time.time() - ts1093 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1094 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1095 w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1096 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1097 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1098 self.assertDictEqual(w1, workers[0].user_classes_count)1099 self.assertDictEqual(w2, workers[1].user_classes_count)1100 self.assertDictEqual(w3, workers[2].user_classes_count)1101 self.assertDictEqual(w4, workers[3].user_classes_count)1102 self.assertDictEqual(w5, workers[4].user_classes_count)1103 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1104 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1105 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1106 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1107 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1108 sleep(1 - delta) # runtime = 16s1109 # Fourth stage - All users are now at the desired number1110 ts = time.time()1111 while master.state != STATE_RUNNING:1112 self.assertTrue(time.time() - ts <= 1, master.state)1113 sleep()1114 delta = time.time() - ts1115 w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1116 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1117 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}1118 w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1119 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1120 self.assertDictEqual(w1, workers[0].user_classes_count)1121 self.assertDictEqual(w2, workers[1].user_classes_count)1122 self.assertDictEqual(w3, workers[2].user_classes_count)1123 self.assertDictEqual(w4, workers[3].user_classes_count)1124 self.assertDictEqual(w5, workers[4].user_classes_count)1125 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1126 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1127 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1128 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1129 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1130 sleep(10 - delta) # runtime = 26s1131 # Sleep stop_timeout and make sure the test has stopped1132 sleep(5) # runtime = 31s1133 self.assertEqual(STATE_STOPPING, master.state)1134 sleep(stop_timeout) # runtime = 36s1135 # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.1136 # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"1137 # above is that when a worker receives the stop message, it can take up to "stop_timeout"1138 # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds1139 # to send the "client_ready" message.1140 ts = time.time()1141 while len(master.clients.ready) != len(workers):1142 self.assertTrue(1143 time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,1144 f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",1145 )1146 sleep()1147 sleep(1)1148 # Check that no users are running1149 w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1150 w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1151 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1152 w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1153 w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1154 self.assertDictEqual(w1, workers[0].user_classes_count)1155 self.assertDictEqual(w2, workers[1].user_classes_count)1156 self.assertDictEqual(w3, workers[2].user_classes_count)1157 self.assertDictEqual(w4, workers[3].user_classes_count)1158 self.assertDictEqual(w5, workers[4].user_classes_count)1159 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1160 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1161 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1162 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1163 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1164 ts = time.time()1165 while master.state != STATE_STOPPED:1166 self.assertTrue(time.time() - ts <= 5, master.state)1167 sleep()1168 master.stop()1169 @unittest.skip(reason="takes a lot of time and has randomness to it")1170 def test_distributed_shape_fuzzy_test(self):1171 """1172 Incredibility useful test to find issues with dispatch logic. This test allowed to find1173 multiple small corner cases with the new dispatch logic of locust v2.1174 The test is disabled by default because it takes a lot of time to run and has randomness to it.1175 However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.1176 """1177 class BaseUser(User):1178 @task1179 def my_task(self):1180 gevent.sleep(600)1181 class TestUser01(BaseUser):1182 pass1183 class TestUser02(BaseUser):1184 pass1185 class TestUser03(BaseUser):1186 pass1187 class TestUser04(BaseUser):1188 pass1189 class TestUser05(BaseUser):1190 pass1191 class TestUser06(BaseUser):1192 pass1193 class TestUser07(BaseUser):1194 pass1195 class TestUser08(BaseUser):1196 pass1197 class TestUser09(BaseUser):1198 pass1199 class TestUser10(BaseUser):1200 pass1201 class TestUser11(BaseUser):1202 pass1203 class TestUser12(BaseUser):1204 pass1205 class TestUser13(BaseUser):1206 pass1207 class TestUser14(BaseUser):1208 pass1209 class TestUser15(BaseUser):1210 pass1211 class TestShape(LoadTestShape):1212 def __init__(self):1213 super().__init__()1214 self.stages = []1215 runtime = 01216 for _ in range(100):1217 runtime += random.uniform(3, 15)1218 self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))1219 def tick(self):1220 run_time = self.get_run_time()1221 for stage in self.stages:1222 if run_time < stage[0]:1223 return stage[1], stage[2]1224 user_classes = [1225 TestUser01,1226 TestUser02,1227 TestUser03,1228 TestUser04,1229 TestUser05,1230 TestUser06,1231 TestUser07,1232 TestUser08,1233 TestUser09,1234 TestUser10,1235 TestUser11,1236 TestUser12,1237 TestUser13,1238 TestUser14,1239 TestUser15,1240 ]1241 chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))1242 for user_class in chosen_user_classes:1243 user_class.weight = random.uniform(1, 20)1244 locust_worker_additional_wait_before_ready_after_stop = 51245 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1246 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1247 str(locust_worker_additional_wait_before_ready_after_stop),1248 ):1249 stop_timeout = 51250 master_env = Environment(1251 user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout1252 )1253 master_env.shape_class.reset_time()1254 master = master_env.create_master_runner("*", 0)1255 workers = []1256 for i in range(random.randint(1, 30)):1257 worker_env = Environment(user_classes=chosen_user_classes)1258 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1259 workers.append(worker)1260 # Give workers time to connect1261 sleep(0.1)1262 self.assertEqual(STATE_INIT, master.state)1263 self.assertEqual(len(workers), len(master.clients.ready))1264 # Start a shape test1265 master.start_shape()1266 ts = time.time()1267 while master.state != STATE_STOPPED:1268 self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)1269 print(1270 "{:.2f}/{:.2f} | {} | {:.0f} | ".format(1271 time.time() - ts,1272 master_env.shape_class.stages[-1][0],1273 master.state,1274 sum(master.reported_user_classes_count.values()),1275 )1276 + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))1277 )1278 sleep(1)1279 master.stop()1280 def test_distributed_shape_stop_and_restart(self):1281 """1282 Test stopping and then restarting a LoadTestShape1283 """1284 class TestUser(User):1285 @task1286 def my_task(self):1287 pass1288 class TestShape(LoadTestShape):1289 def tick(self):1290 run_time = self.get_run_time()1291 if run_time < 10:1292 return 4, 41293 else:1294 return None1295 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1296 master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1297 master_env.shape_class.reset_time()1298 master = master_env.create_master_runner("*", 0)1299 workers = []1300 for i in range(2):1301 worker_env = Environment(user_classes=[TestUser])1302 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1303 workers.append(worker)1304 # Give workers time to connect1305 sleep(0.1)1306 # Start a shape test and ensure workers have connected and started the correct amount of users1307 master.start_shape()1308 sleep(1)1309 for worker in workers:1310 self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1311 # Stop the test and ensure all user count is 01312 master.stop()1313 sleep(1)1314 for worker in workers:1315 self.assertEqual(0, worker.user_count, "Shape test has not stopped")1316 # Then restart the test again and ensure workers have connected and started the correct amount of users1317 master.start_shape()1318 sleep(1)1319 for worker in workers:1320 self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1321 master.stop()1322 def test_distributed_stop_with_stopping_state(self):1323 """1324 Test stopping state when workers have stopped and now ready for a next test1325 """1326 class TestUser(User):1327 @task1328 def my_task(self):1329 pass1330 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1331 master_env = Environment(user_classes=[TestUser])1332 master = master_env.create_master_runner("*", 0)1333 workers = []1334 for i in range(3):1335 worker_env = Environment(user_classes=[TestUser])1336 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1337 workers.append(worker)1338 for worker in workers:1339 worker.send_message("client_stopped", None)1340 sleep(1)1341 for worker in workers:1342 self.assertEqual(STATE_INIT, worker.state, "Worker sent a client_stopped, should be ready once stopped")1343 self.assertEqual(STATE_STOPPED, master.state)1344 def test_distributed_shape_statuses_transition(self):1345 """1346 Full integration test that starts both a MasterRunner and five WorkerRunner instances1347 The goal of this test is to validate the status on the master is correctly transitioned for each of the1348 test phases.1349 """1350 class TestUser1(User):1351 @task1352 def my_task(self):1353 gevent.sleep(600)1354 class TestShape(LoadTestShape):1355 def tick(self):1356 run_time = self.get_run_time()1357 if run_time < 5:1358 return 5, 2.51359 elif run_time < 10:1360 return 10, 2.51361 elif run_time < 15:1362 return 15, 2.51363 else:1364 return None1365 locust_worker_additional_wait_before_ready_after_stop = 21366 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1367 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1368 str(locust_worker_additional_wait_before_ready_after_stop),1369 ):1370 stop_timeout = 01371 master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1372 master_env.shape_class.reset_time()1373 master = master_env.create_master_runner("*", 0)1374 workers = []1375 for i in range(5):1376 worker_env = Environment(user_classes=[TestUser1])1377 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1378 workers.append(worker)1379 # Give workers time to connect1380 sleep(0.1)1381 self.assertEqual(STATE_INIT, master.state)1382 self.assertEqual(5, len(master.clients.ready))1383 statuses = []1384 ts = time.perf_counter()1385 master.start_shape()1386 while master.state != STATE_STOPPED:1387 # +5s buffer to let master stop1388 self.assertTrue(1389 time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1390 master.state,1391 )1392 statuses.append((time.perf_counter() - ts, master.state, master.user_count))1393 sleep(0.1)1394 self.assertEqual(statuses[0][1], STATE_INIT)1395 stage = 11396 tolerance = 1 # in s1397 for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1398 if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1399 self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1400 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1401 self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1402 stage += 11403 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1404 self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1405 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1406 self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1407 stage += 11408 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1409 self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1410 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1411 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1412 stage += 11413 elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1414 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1415 def test_swarm_endpoint_is_non_blocking(self):1416 class TestUser1(User):1417 @task1418 def my_task(self):1419 gevent.sleep(600)1420 class TestUser2(User):1421 @task1422 def my_task(self):1423 gevent.sleep(600)1424 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1425 stop_timeout = 01426 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1427 master = master_env.create_master_runner("*", 0)1428 web_ui = master_env.create_web_ui("127.0.0.1", 0)1429 workers = []1430 for i in range(2):1431 worker_env = Environment(user_classes=[TestUser1, TestUser2])1432 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1433 workers.append(worker)1434 # Give workers time to connect1435 sleep(0.1)1436 self.assertEqual(STATE_INIT, master.state)1437 self.assertEqual(len(master.clients.ready), len(workers))1438 ts = time.perf_counter()1439 response = requests.post(1440 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1441 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},1442 )1443 self.assertEqual(200, response.status_code)1444 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1445 ts = time.perf_counter()1446 while master.state != STATE_RUNNING:1447 self.assertTrue(time.perf_counter() - ts <= 4, master.state)1448 gevent.sleep(0.1)1449 self.assertTrue(3 <= time.perf_counter() - ts <= 5)1450 self.assertEqual(master.user_count, 20)1451 master.stop()1452 web_ui.stop()1453 def test_can_call_stop_endpoint_if_currently_swarming(self):1454 class TestUser1(User):1455 @task1456 def my_task(self):1457 gevent.sleep(600)1458 class TestUser2(User):1459 @task1460 def my_task(self):1461 gevent.sleep(600)1462 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1463 stop_timeout = 51464 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1465 master = master_env.create_master_runner("*", 0)1466 web_ui = master_env.create_web_ui("127.0.0.1", 0)1467 workers = []1468 for i in range(2):1469 worker_env = Environment(user_classes=[TestUser1, TestUser2])1470 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1471 workers.append(worker)1472 # Give workers time to connect1473 sleep(0.1)1474 self.assertEqual(STATE_INIT, master.state)1475 self.assertEqual(len(master.clients.ready), len(workers))1476 ts = time.perf_counter()1477 response = requests.post(1478 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1479 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},1480 )1481 self.assertEqual(200, response.status_code)1482 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1483 gevent.sleep(5)1484 self.assertEqual(master.state, STATE_SPAWNING)1485 self.assertLessEqual(master.user_count, 10)1486 ts = time.perf_counter()1487 response = requests.get(1488 f"http://127.0.0.1:{web_ui.server.server_port}/stop",1489 )1490 self.assertEqual(200, response.status_code)1491 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")1492 ts = time.perf_counter()1493 while master.state != STATE_STOPPED:1494 self.assertTrue(time.perf_counter() - ts <= 2)1495 gevent.sleep(0.1)1496 self.assertLessEqual(master.user_count, 0)1497 master.stop()1498 web_ui.stop()1499 def test_target_user_count_is_set_before_ramp_up(self):1500 """Test for https://github.com/locustio/locust/issues/1883"""1501 class MyUser1(User):1502 wait_time = constant(0)1503 @task1504 def my_task(self):1505 pass1506 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1507 # start a Master runner1508 master_env = Environment(user_classes=[MyUser1])1509 master = master_env.create_master_runner("*", 0)1510 test_start_event_fired = [False]1511 @master_env.events.test_start.add_listener1512 def on_test_start(*args, **kwargs):1513 test_start_event_fired[0] = True1514 self.assertEqual(master.target_user_count, 3)1515 sleep(0)1516 # start 1 worker runner1517 worker_env = Environment(user_classes=[MyUser1])1518 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1519 # give worker time to connect1520 sleep(0.1)1521 gevent.spawn(master.start, 3, spawn_rate=1)1522 sleep(1)1523 self.assertEqual(master.target_user_count, 3)1524 self.assertEqual(master.user_count, 1)1525 # However, target_user_classes_count is only updated at the end of the ramp-up/ramp-down1526 # due to the way it is implemented.1527 self.assertDictEqual({}, master.target_user_classes_count)1528 sleep(2)1529 self.assertEqual(master.target_user_count, 3)1530 self.assertEqual(master.user_count, 3)1531 self.assertDictEqual({"MyUser1": 3}, master.target_user_classes_count)1532 master.quit()1533 # make sure users are killed1534 self.assertEqual(0, worker.user_count)1535 self.assertTrue(test_start_event_fired[0])1536 def test_long_running_test_start_is_run_to_completion_on_worker(self):1537 """Test for https://github.com/locustio/locust/issues/1986"""1538 class MyUser1(User):1539 wait_time = constant(0)1540 @task1541 def my_task(self):1542 pass1543 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1544 master_env = Environment(user_classes=[MyUser1])1545 master = master_env.create_master_runner("*", 0)1546 sleep(0)1547 # start 1 worker runner1548 worker_env = Environment(user_classes=[MyUser1])1549 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1550 test_start_exec_count = 01551 @worker_env.events.test_start.add_listener1552 def on_test_start(*_, **__):1553 nonlocal test_start_exec_count1554 test_start_exec_count += 11555 sleep(3)1556 # give worker time to connect1557 sleep(0.1)1558 gevent.spawn(master.start, 3, spawn_rate=1)1559 t0 = time.perf_counter()1560 while master.user_count != 3:1561 self.assertLessEqual(time.perf_counter() - t0, 5, "Expected 3 users to be spawned")1562 sleep(0.1)1563 master.quit()1564 # make sure users are killed1565 self.assertEqual(0, worker.user_count)1566 self.assertEqual(test_start_exec_count, 1)1567class TestMasterRunner(LocustRunnerTestCase):1568 def setUp(self):1569 super().setUp()1570 self.environment = Environment(events=locust.events, catch_exceptions=False)1571 def tearDown(self):1572 super().tearDown()1573 def get_runner(self, user_classes=None):1574 if user_classes is not None:1575 self.environment.user_classes = user_classes1576 return self.environment.create_master_runner("*", 5557)1577 def test_worker_connect(self):1578 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1579 master = self.get_runner()1580 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1581 self.assertEqual(1, len(master.clients))1582 self.assertTrue(1583 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1584 )1585 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1586 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client3"))1587 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client4"))1588 self.assertEqual(4, len(master.clients))1589 server.mocked_send(Message("quit", None, "zeh_fake_client3"))1590 self.assertEqual(3, len(master.clients))1591 def test_worker_connect_with_special_versions(self):1592 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1593 master = self.get_runner()1594 server.mocked_send(Message("client_ready", None, "1.x_style_client_should_not_be_allowed"))1595 self.assertEqual(1, len(self.mocked_log.error))1596 self.assertEqual(0, len(master.clients))1597 server.mocked_send(Message("client_ready", "abcd", "other_version_mismatch_should_just_give_a_warning"))1598 self.assertEqual(1, len(self.mocked_log.warning))1599 self.assertEqual(1, len(master.clients))1600 server.mocked_send(Message("client_ready", -1, "version_check_bypass_should_not_warn"))1601 self.assertEqual(1, len(self.mocked_log.warning))1602 self.assertEqual(2, len(master.clients))1603 server.mocked_send(1604 Message("client_ready", __version__ + "1", "difference_in_patch_version_should_not_warn")1605 )1606 self.assertEqual(3, len(master.clients))1607 self.assertEqual(1, len(self.mocked_log.warning))1608 def test_worker_stats_report_median(self):1609 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1610 master = self.get_runner()1611 server.mocked_send(Message("client_ready", __version__, "fake_client"))1612 master.stats.get("/", "GET").log(100, 23455)1613 master.stats.get("/", "GET").log(800, 23455)1614 master.stats.get("/", "GET").log(700, 23455)1615 data = {"user_count": 1}1616 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1617 master.stats.clear_all()1618 server.mocked_send(Message("stats", data, "fake_client"))1619 s = master.stats.get("/", "GET")1620 self.assertEqual(700, s.median_response_time)1621 def test_worker_stats_report_with_none_response_times(self):1622 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1623 master = self.get_runner()1624 server.mocked_send(Message("client_ready", __version__, "fake_client"))1625 master.stats.get("/mixed", "GET").log(0, 23455)1626 master.stats.get("/mixed", "GET").log(800, 23455)1627 master.stats.get("/mixed", "GET").log(700, 23455)1628 master.stats.get("/mixed", "GET").log(None, 23455)1629 master.stats.get("/mixed", "GET").log(None, 23455)1630 master.stats.get("/mixed", "GET").log(None, 23455)1631 master.stats.get("/mixed", "GET").log(None, 23455)1632 master.stats.get("/onlyNone", "GET").log(None, 23455)1633 data = {"user_count": 1}1634 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1635 master.stats.clear_all()1636 server.mocked_send(Message("stats", data, "fake_client"))1637 s1 = master.stats.get("/mixed", "GET")1638 self.assertEqual(700, s1.median_response_time)1639 self.assertEqual(500, s1.avg_response_time)1640 s2 = master.stats.get("/onlyNone", "GET")1641 self.assertEqual(0, s2.median_response_time)1642 self.assertEqual(0, s2.avg_response_time)1643 def test_master_marks_downed_workers_as_missing(self):1644 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1645 master = self.get_runner()1646 server.mocked_send(Message("client_ready", __version__, "fake_client"))1647 sleep(6)1648 # print(master.clients['fake_client'].__dict__)1649 assert master.clients["fake_client"].state == STATE_MISSING1650 def test_last_worker_quitting_stops_test(self):1651 class TestUser(User):1652 @task1653 def my_task(self):1654 gevent.sleep(600)1655 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1656 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1657 ):1658 master = self.get_runner(user_classes=[TestUser])1659 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1660 server.mocked_send(Message("client_ready", __version__, "fake_client2"))1661 master.start(1, 2)1662 server.mocked_send(Message("spawning", None, "fake_client1"))1663 server.mocked_send(Message("spawning", None, "fake_client2"))1664 server.mocked_send(Message("quit", None, "fake_client1"))1665 sleep(0.1)1666 self.assertEqual(1, len(master.clients.all))1667 self.assertNotEqual(STATE_STOPPED, master.state, "Not all workers quit but test stopped anyway.")1668 server.mocked_send(Message("quit", None, "fake_client2"))1669 sleep(0.1)1670 self.assertEqual(0, len(master.clients.all))1671 self.assertEqual(STATE_STOPPED, master.state, "All workers quit but test didn't stop.")1672 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)1673 def test_last_worker_missing_stops_test(self):1674 class TestUser(User):1675 @task1676 def my_task(self):1677 gevent.sleep(600)1678 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1679 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1680 ):1681 master = self.get_runner(user_classes=[TestUser])1682 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1683 server.mocked_send(Message("client_ready", __version__, "fake_client2"))1684 server.mocked_send(Message("client_ready", __version__, "fake_client3"))1685 master.start(3, 3)1686 server.mocked_send(Message("spawning", None, "fake_client1"))1687 server.mocked_send(Message("spawning", None, "fake_client2"))1688 server.mocked_send(Message("spawning", None, "fake_client3"))1689 sleep(0.2)1690 server.mocked_send(1691 Message(1692 "heartbeat",1693 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1694 "fake_client1",1695 )1696 )1697 server.mocked_send(1698 Message(1699 "heartbeat",1700 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1701 "fake_client2",1702 )1703 )1704 server.mocked_send(1705 Message(1706 "heartbeat",1707 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1708 "fake_client3",1709 )1710 )1711 sleep(0.2)1712 self.assertEqual(0, len(master.clients.missing))1713 self.assertEqual(3, master.worker_count)1714 self.assertNotIn(1715 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1716 )1717 server.mocked_send(1718 Message(1719 "heartbeat",1720 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1721 "fake_client1",1722 )1723 )1724 sleep(0.4)1725 self.assertEqual(2, len(master.clients.missing))1726 self.assertEqual(1, master.worker_count)1727 self.assertNotIn(1728 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1729 )1730 sleep(0.2)1731 self.assertEqual(3, len(master.clients.missing))1732 self.assertEqual(0, master.worker_count)1733 self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")1734 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)1735 @mock.patch("locust.runners.HEARTBEAT_DEAD_INTERNAL", new=-3)1736 def test_worker_missing_after_heartbeat_dead_interval(self):1737 class TestUser(User):1738 @task1739 def my_task(self):1740 gevent.sleep(600)1741 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1742 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1743 ):1744 master = self.get_runner(user_classes=[TestUser])1745 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1746 server.mocked_send(Message("client_ready", __version__, "fake_client2"))1747 server.mocked_send(Message("client_ready", __version__, "fake_client3"))1748 master.start(3, 3)1749 server.mocked_send(Message("spawning", None, "fake_client1"))1750 server.mocked_send(Message("spawning", None, "fake_client2"))1751 server.mocked_send(Message("spawning", None, "fake_client3"))1752 sleep(0.1)1753 server.mocked_send(1754 Message(1755 "heartbeat",1756 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1757 "fake_client1",1758 )1759 )1760 server.mocked_send(1761 Message(1762 "heartbeat",1763 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1764 "fake_client2",1765 )1766 )1767 server.mocked_send(1768 Message(1769 "heartbeat",1770 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1771 "fake_client3",1772 )1773 )1774 sleep(0.1)1775 # initially all workers are in active state1776 self.assertEqual(0, len(master.clients.missing))1777 self.assertEqual(3, master.worker_count)1778 server.mocked_send(1779 Message(1780 "heartbeat",1781 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1782 "fake_client1",1783 )1784 )1785 server.mocked_send(1786 Message(1787 "heartbeat",1788 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1789 "fake_client2",1790 )1791 )1792 sleep(0.6)1793 # 4 intervals are passed since all 3 heart beats all workers are in missing state1794 self.assertEqual(3, len(master.clients.missing))1795 self.assertEqual(0, master.worker_count)1796 server.mocked_send(1797 Message(1798 "heartbeat",1799 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1800 "fake_client1",1801 )1802 )1803 server.mocked_send(1804 Message(1805 "heartbeat",1806 {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1807 "fake_client2",1808 )1809 )1810 sleep(0.2)1811 # hearbeat received from two workers so they are active, for fake_client3 HEARTBEAT_DEAD_INTERNAL has been breached, so it will be removed from worker list1812 self.assertEqual(0, len(master.clients.missing))1813 self.assertEqual(2, master.worker_count)1814 master.stop()1815 def test_master_total_stats(self):1816 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1817 master = self.get_runner()1818 server.mocked_send(Message("client_ready", __version__, "fake_client"))1819 stats = RequestStats()1820 stats.log_request("GET", "/1", 100, 3546)1821 stats.log_request("GET", "/1", 800, 56743)1822 stats2 = RequestStats()1823 stats2.log_request("GET", "/2", 700, 2201)1824 server.mocked_send(1825 Message(1826 "stats",1827 {1828 "stats": stats.serialize_stats(),1829 "stats_total": stats.total.serialize(),1830 "errors": stats.serialize_errors(),1831 "user_count": 1,1832 },1833 "fake_client",1834 )1835 )1836 server.mocked_send(1837 Message(1838 "stats",1839 {1840 "stats": stats2.serialize_stats(),1841 "stats_total": stats2.total.serialize(),1842 "errors": stats2.serialize_errors(),1843 "user_count": 2,1844 },1845 "fake_client",1846 )1847 )1848 self.assertEqual(700, master.stats.total.median_response_time)1849 def test_master_total_stats_with_none_response_times(self):1850 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1851 master = self.get_runner()1852 server.mocked_send(Message("client_ready", __version__, "fake_client"))1853 stats = RequestStats()1854 stats.log_request("GET", "/1", 100, 3546)1855 stats.log_request("GET", "/1", 800, 56743)1856 stats.log_request("GET", "/1", None, 56743)1857 stats2 = RequestStats()1858 stats2.log_request("GET", "/2", 700, 2201)1859 stats2.log_request("GET", "/2", None, 2201)1860 stats3 = RequestStats()1861 stats3.log_request("GET", "/3", None, 2201)1862 server.mocked_send(1863 Message(1864 "stats",1865 {1866 "stats": stats.serialize_stats(),1867 "stats_total": stats.total.serialize(),1868 "errors": stats.serialize_errors(),1869 "user_count": 1,1870 },1871 "fake_client",1872 )1873 )1874 server.mocked_send(1875 Message(1876 "stats",1877 {1878 "stats": stats2.serialize_stats(),1879 "stats_total": stats2.total.serialize(),1880 "errors": stats2.serialize_errors(),1881 "user_count": 2,1882 },1883 "fake_client",1884 )1885 )1886 server.mocked_send(1887 Message(1888 "stats",1889 {1890 "stats": stats3.serialize_stats(),1891 "stats_total": stats3.total.serialize(),1892 "errors": stats3.serialize_errors(),1893 "user_count": 2,1894 },1895 "fake_client",1896 )1897 )1898 self.assertEqual(700, master.stats.total.median_response_time)1899 def test_master_current_response_times(self):1900 start_time = 11901 with mock.patch("time.time") as mocked_time:1902 mocked_time.return_value = start_time1903 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1904 master = self.get_runner()1905 self.environment.stats.reset_all()1906 mocked_time.return_value += 1.02341907 server.mocked_send(Message("client_ready", __version__, "fake_client"))1908 stats = RequestStats()1909 stats.log_request("GET", "/1", 100, 3546)1910 stats.log_request("GET", "/1", 800, 56743)1911 server.mocked_send(1912 Message(1913 "stats",1914 {1915 "stats": stats.serialize_stats(),1916 "stats_total": stats.total.get_stripped_report(),1917 "errors": stats.serialize_errors(),1918 "user_count": 1,1919 },1920 "fake_client",1921 )1922 )1923 mocked_time.return_value += 11924 stats2 = RequestStats()1925 stats2.log_request("GET", "/2", 400, 2201)1926 server.mocked_send(1927 Message(1928 "stats",1929 {1930 "stats": stats2.serialize_stats(),1931 "stats_total": stats2.total.get_stripped_report(),1932 "errors": stats2.serialize_errors(),1933 "user_count": 2,1934 },1935 "fake_client",1936 )1937 )1938 mocked_time.return_value += 41939 self.assertEqual(400, master.stats.total.get_current_response_time_percentile(0.5))1940 self.assertEqual(800, master.stats.total.get_current_response_time_percentile(0.95))1941 # let 10 second pass, do some more requests, send it to the master and make1942 # sure the current response time percentiles only accounts for these new requests1943 mocked_time.return_value += 10.100231944 stats.log_request("GET", "/1", 20, 1)1945 stats.log_request("GET", "/1", 30, 1)1946 stats.log_request("GET", "/1", 3000, 1)1947 server.mocked_send(1948 Message(1949 "stats",1950 {1951 "stats": stats.serialize_stats(),1952 "stats_total": stats.total.get_stripped_report(),1953 "errors": stats.serialize_errors(),1954 "user_count": 2,1955 },1956 "fake_client",1957 )1958 )1959 self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))1960 self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))1961 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=600)1962 def test_rebalance_locust_users_on_worker_connect(self):1963 class TestUser(User):1964 @task1965 def my_task(self):1966 pass1967 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1968 master = self.get_runner(user_classes=[TestUser])1969 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1970 self.assertEqual(1, len(master.clients))1971 self.assertTrue(1972 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1973 )1974 master.start(100, 20)1975 self.assertEqual(6, len(server.outbox))1976 # First element of the outbox list is ack msg. That is why it is skipped in for loop1977 for i, (_, msg) in enumerate(server.outbox[1:].copy()):1978 self.assertDictEqual({"TestUser": int((i + 1) * 20)}, msg.data["user_classes_count"])1979 server.outbox.pop()1980 # Normally, this attribute would be updated when the1981 # master receives the report from the worker.1982 master.clients["zeh_fake_client1"].user_classes_count = {"TestUser": 100}1983 # let another worker connect1984 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1985 self.assertEqual(2, len(master.clients))1986 sleep(0.1) # give time for messages to be sent to clients1987 self.assertEqual(4, len(server.outbox))1988 client_id, msg = server.outbox.pop()1989 self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1990 client_id, msg = server.outbox.pop()1991 self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1992 def test_sends_spawn_data_to_ready_running_spawning_workers(self):1993 """Sends spawn job to running, ready, or spawning workers"""1994 class TestUser(User):1995 @task1996 def my_task(self):1997 pass1998 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1999 master = self.get_runner(user_classes=[TestUser])2000 master.clients[1] = WorkerNode("1")2001 master.clients[2] = WorkerNode("2")2002 master.clients[3] = WorkerNode("3")2003 master.clients[1].state = STATE_INIT2004 master.clients[2].state = STATE_SPAWNING2005 master.clients[3].state = STATE_RUNNING2006 master.start(user_count=5, spawn_rate=5)2007 self.assertEqual(3, len(server.outbox))2008 def test_start_event(self):2009 """2010 Tests that test_start event is fired2011 """2012 class TestUser(User):2013 @task2014 def my_task(self):2015 pass2016 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2017 master = self.get_runner(user_classes=[TestUser])2018 run_count = [0]2019 @self.environment.events.test_start.add_listener2020 def on_test_start(*a, **kw):2021 run_count[0] += 12022 for i in range(5):2023 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2024 master.start(7, 7)2025 self.assertEqual(10, len(server.outbox))2026 self.assertEqual(1, run_count[0])2027 # change number of users and check that test_start isn't fired again2028 master.start(7, 7)2029 self.assertEqual(1, run_count[0])2030 # stop and start to make sure test_start is fired again2031 master.stop()2032 master.start(3, 3)2033 self.assertEqual(2, run_count[0])2034 master.quit()2035 def test_stop_event(self):2036 """2037 Tests that test_stop event is fired2038 """2039 class TestUser(User):2040 @task2041 def my_task(self):2042 pass2043 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2044 master = self.get_runner(user_classes=[TestUser])2045 @self.environment.events.test_stopping.add_listener2046 def on_test_stopping(*_, **__):2047 self.runner_stopping = True2048 @self.environment.events.test_stop.add_listener2049 def on_test_stop(*_, **__):2050 self.runner_stopped = True2051 for i in range(5):2052 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2053 master.start(7, 7)2054 self.assertEqual(10, len(server.outbox))2055 master.stop()2056 self.assertTrue(self.runner_stopping)2057 self.assertTrue(self.runner_stopped)2058 self.reset_state()2059 for i in range(5):2060 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2061 master.start(7, 7)2062 master.stop()2063 master.quit()2064 self.assertTrue(self.runner_stopping)2065 self.assertTrue(self.runner_stopped)2066 def test_stop_event_quit(self):2067 """2068 Tests that test_stop event is fired when quit() is called directly2069 """2070 class TestUser(User):2071 @task2072 def my_task(self):2073 pass2074 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2075 master = self.get_runner(user_classes=[TestUser])2076 @self.environment.events.test_stopping.add_listener2077 def on_test_stopping(*_, **__):2078 self.runner_stopping = True2079 @self.environment.events.test_stop.add_listener2080 def on_test_stop(*_, **__):2081 self.runner_stopped = True2082 for i in range(5):2083 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2084 master.start(7, 7)2085 self.assertEqual(10, len(server.outbox))2086 master.quit()2087 self.assertTrue(self.runner_stopping)2088 self.assertTrue(self.runner_stopped)2089 def test_spawn_zero_locusts(self):2090 class MyTaskSet(TaskSet):2091 @task...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful