How to use test_swarm_endpoint_is_non_blocking method in locust

Best Python code snippet using locust

test_runners.py

Source:test_runners.py Github

copy

Full Screen

...409 self.assertFalse(test_custom_msg[0])410 self.assertEqual(1, len(self.mocked_log.warning))411 msg = self.mocked_log.warning[0]412 self.assertIn("Unknown message type recieved", msg)413 def test_swarm_endpoint_is_non_blocking(self):414 class TestUser1(User):415 @task416 def my_task(self):417 gevent.sleep(600)418 class TestUser2(User):419 @task420 def my_task(self):421 gevent.sleep(600)422 stop_timeout = 0423 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)424 local_runner = env.create_local_runner()425 web_ui = env.create_web_ui("127.0.0.1", 0)426 gevent.sleep(0.1)427 ts = time.perf_counter()428 response = requests.post(429 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),430 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},431 )432 self.assertEqual(200, response.status_code)433 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")434 ts = time.perf_counter()435 while local_runner.state != STATE_RUNNING:436 self.assertTrue(time.perf_counter() - ts <= 4, local_runner.state)437 gevent.sleep(0.1)438 self.assertTrue(3 <= time.perf_counter() - ts <= 5)439 self.assertEqual(local_runner.user_count, 20)440 local_runner.stop()441 web_ui.stop()442 def test_can_call_stop_endpoint_if_currently_swarming(self):443 class TestUser1(User):444 @task445 def my_task(self):446 gevent.sleep(600)447 class TestUser2(User):448 @task449 def my_task(self):450 gevent.sleep(600)451 stop_timeout = 5452 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)453 local_runner = env.create_local_runner()454 web_ui = env.create_web_ui("127.0.0.1", 0)455 gevent.sleep(0.1)456 ts = time.perf_counter()457 response = requests.post(458 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),459 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},460 )461 self.assertEqual(200, response.status_code)462 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")463 gevent.sleep(5)464 self.assertEqual(local_runner.state, STATE_SPAWNING)465 self.assertLessEqual(local_runner.user_count, 10)466 ts = time.perf_counter()467 response = requests.get(468 "http://127.0.0.1:{}/stop".format(web_ui.server.server_port),469 )470 self.assertEqual(200, response.status_code)471 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")472 ts = time.perf_counter()473 while local_runner.state != STATE_STOPPED:474 self.assertTrue(time.perf_counter() - ts <= 2)475 gevent.sleep(0.1)476 self.assertLessEqual(local_runner.user_count, 0)477 local_runner.stop()478 web_ui.stop()479class TestMasterWorkerRunners(LocustTestCase):480 def test_distributed_integration_run(self):481 """482 Full integration test that starts both a MasterRunner and three WorkerRunner instances483 and makes sure that their stats is sent to the Master.484 """485 class TestUser(User):486 wait_time = constant(0.1)487 @task488 def incr_stats(self):489 self.environment.events.request.fire(490 request_type="GET",491 name="/",492 response_time=1337,493 response_length=666,494 exception=None,495 context={},496 )497 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):498 # start a Master runner499 master_env = Environment(user_classes=[TestUser])500 master = master_env.create_master_runner("*", 0)501 sleep(0)502 # start 3 Worker runners503 workers = []504 for i in range(3):505 worker_env = Environment(user_classes=[TestUser])506 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)507 workers.append(worker)508 # give workers time to connect509 sleep(0.1)510 # issue start command that should trigger TestUsers to be spawned in the Workers511 master.start(6, spawn_rate=1000)512 sleep(0.1)513 # check that worker nodes have started locusts514 for worker in workers:515 self.assertEqual(2, worker.user_count)516 # give time for users to generate stats, and stats to be sent to master517 sleep(1)518 master.quit()519 # make sure users are killed520 for worker in workers:521 self.assertEqual(0, worker.user_count)522 # check that stats are present in master523 self.assertGreater(524 master_env.runner.stats.total.num_requests,525 20,526 "For some reason the master node's stats has not come in",527 )528 def test_distributed_run_with_custom_args(self):529 """530 Full integration test that starts both a MasterRunner and three WorkerRunner instances531 and makes sure that their stats is sent to the Master.532 """533 class TestUser(User):534 wait_time = constant(0.1)535 @task536 def incr_stats(self):537 self.environment.events.request.fire(538 request_type="GET",539 name=self.environment.parsed_options.my_str_argument,540 response_time=self.environment.parsed_options.my_int_argument,541 response_length=666,542 exception=None,543 context={},544 )545 @locust.events.init_command_line_parser.add_listener546 def _(parser, **kw):547 parser.add_argument("--my-int-argument", type=int)548 parser.add_argument("--my-str-argument", type=str, default="NOOOO")549 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):550 # start a Master runner551 master_env = Environment(user_classes=[TestUser])552 master = master_env.create_master_runner("*", 0)553 master_env.parsed_options = parse_options(554 [555 "--my-int-argument",556 "42",557 "--my-str-argument",558 "cool-string",559 ]560 )561 sleep(0)562 # start 3 Worker runners563 workers = []564 for i in range(3):565 worker_env = Environment(user_classes=[TestUser])566 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)567 workers.append(worker)568 # give workers time to connect569 sleep(0.1)570 # issue start command that should trigger TestUsers to be spawned in the Workers571 master.start(6, spawn_rate=1000)572 sleep(0.1)573 # check that worker nodes have started locusts574 for worker in workers:575 self.assertEqual(2, worker.user_count)576 # give time for users to generate stats, and stats to be sent to master577 sleep(1)578 master.quit()579 # make sure users are killed580 for worker in workers:581 self.assertEqual(0, worker.user_count)582 self.assertEqual(master_env.runner.stats.total.max_response_time, 42)583 self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)584 def test_test_stop_event(self):585 class TestUser(User):586 wait_time = constant(0.1)587 @task588 def my_task(l):589 pass590 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):591 # start a Master runner592 master_env = Environment(user_classes=[TestUser])593 test_stop_count = {"master": 0, "worker": 0}594 @master_env.events.test_stop.add_listener595 def _(*args, **kwargs):596 test_stop_count["master"] += 1597 master = master_env.create_master_runner("*", 0)598 sleep(0)599 # start a Worker runner600 worker_env = Environment(user_classes=[TestUser])601 @worker_env.events.test_stop.add_listener602 def _(*args, **kwargs):603 test_stop_count["worker"] += 1604 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)605 # give worker time to connect606 sleep(0.1)607 # issue start command that should trigger TestUsers to be spawned in the Workers608 master.start(2, spawn_rate=1000)609 sleep(0.1)610 # check that worker nodes have started locusts611 self.assertEqual(2, worker.user_count)612 # give time for users to generate stats, and stats to be sent to master613 sleep(0.1)614 master_env.events.quitting.fire(environment=master_env, reverse=True)615 master.quit()616 sleep(0.1)617 # make sure users are killed618 self.assertEqual(0, worker.user_count)619 # check the test_stop event was called one time in master and one time in worker620 self.assertEqual(621 1,622 test_stop_count["master"],623 "The test_stop event was not called exactly one time in the master node",624 )625 self.assertEqual(626 1,627 test_stop_count["worker"],628 "The test_stop event was not called exactly one time in the worker node",629 )630 def test_distributed_shape(self):631 """632 Full integration test that starts both a MasterRunner and three WorkerRunner instances633 and tests a basic LoadTestShape with scaling up and down users634 """635 class TestUser(User):636 @task637 def my_task(self):638 pass639 class TestShape(LoadTestShape):640 def tick(self):641 run_time = self.get_run_time()642 if run_time < 2:643 return 9, 9644 elif run_time < 4:645 return 21, 21646 elif run_time < 6:647 return 3, 21648 else:649 return None650 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):651 test_shape = TestShape()652 master_env = Environment(user_classes=[TestUser], shape_class=test_shape)653 master_env.shape_class.reset_time()654 master = master_env.create_master_runner("*", 0)655 workers = []656 for i in range(3):657 worker_env = Environment(user_classes=[TestUser])658 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)659 workers.append(worker)660 # Give workers time to connect661 sleep(0.1)662 # Start a shape test663 master.start_shape()664 sleep(1)665 # Ensure workers have connected and started the correct amount of users666 for worker in workers:667 self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")668 self.assertEqual(669 9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"670 )671 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})672 # Ensure new stage with more users has been reached673 sleep(2)674 for worker in workers:675 self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")676 self.assertEqual(677 21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"678 )679 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})680 # Ensure new stage with less users has been reached681 sleep(2)682 for worker in workers:683 self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")684 self.assertEqual(685 3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"686 )687 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})688 # Ensure test stops at the end689 sleep(2)690 for worker in workers:691 self.assertEqual(0, worker.user_count, "Shape test has not stopped")692 self.assertEqual(693 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"694 )695 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})696 self.assertEqual("stopped", master.state)697 def test_distributed_shape_with_stop_timeout(self):698 """699 Full integration test that starts both a MasterRunner and five WorkerRunner instances700 and tests a basic LoadTestShape with scaling up and down users701 """702 class TestUser1(User):703 def start(self, group: Group):704 gevent.sleep(0.5)705 return super().start(group)706 @task707 def my_task(self):708 gevent.sleep(0)709 class TestUser2(User):710 def start(self, group: Group):711 gevent.sleep(0.5)712 return super().start(group)713 @task714 def my_task(self):715 gevent.sleep(600)716 class TestUser3(User):717 def start(self, group: Group):718 gevent.sleep(0.5)719 return super().start(group)720 @task721 def my_task(self):722 gevent.sleep(600)723 class TestShape(LoadTestShape):724 def tick(self):725 run_time = self.get_run_time()726 if run_time < 10:727 return 15, 3728 elif run_time < 30:729 return 5, 10730 else:731 return None732 locust_worker_additional_wait_before_ready_after_stop = 5733 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(734 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",735 str(locust_worker_additional_wait_before_ready_after_stop),736 ):737 stop_timeout = 5738 master_env = Environment(739 user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout740 )741 master_env.shape_class.reset_time()742 master = master_env.create_master_runner("*", 0)743 workers = []744 for i in range(5):745 worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])746 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)747 workers.append(worker)748 # Give workers time to connect749 sleep(0.1)750 self.assertEqual(STATE_INIT, master.state)751 self.assertEqual(5, len(master.clients.ready))752 # Re-order `workers` so that it is sorted by `id`.753 # This is required because the dispatch is done754 # on the sorted workers.755 workers = sorted(workers, key=lambda w: w.client_id)756 # Start a shape test757 master.start_shape()758 # First stage759 ts = time.time()760 while master.state != STATE_SPAWNING:761 self.assertTrue(time.time() - ts <= 1, master.state)762 sleep()763 sleep(5 - (time.time() - ts)) # runtime = 5s764 ts = time.time()765 while master.state != STATE_RUNNING:766 self.assertTrue(time.time() - ts <= 1, master.state)767 sleep()768 self.assertEqual(STATE_RUNNING, master.state)769 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}770 w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}771 w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}772 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}773 w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}774 self.assertDictEqual(w1, workers[0].user_classes_count)775 self.assertDictEqual(w2, workers[1].user_classes_count)776 self.assertDictEqual(w3, workers[2].user_classes_count)777 self.assertDictEqual(w4, workers[3].user_classes_count)778 self.assertDictEqual(w5, workers[4].user_classes_count)779 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)780 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)781 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)782 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)783 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)784 sleep(5 - (time.time() - ts)) # runtime = 10s785 # Fourth stage786 ts = time.time()787 while master.state != STATE_SPAWNING:788 self.assertTrue(time.time() - ts <= 1, master.state)789 sleep()790 sleep(5 - (time.time() - ts)) # runtime = 15s791 # Fourth stage - Excess TestUser1 have been stopped but792 # TestUser2/TestUser3 have not reached stop timeout yet, so793 # their number are unchanged794 ts = time.time()795 while master.state != STATE_RUNNING:796 self.assertTrue(time.time() - ts <= 1, master.state)797 sleep()798 delta = time.time() - ts799 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}800 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}801 w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}802 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}803 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}804 self.assertDictEqual(w1, workers[0].user_classes_count)805 self.assertDictEqual(w2, workers[1].user_classes_count)806 self.assertDictEqual(w3, workers[2].user_classes_count)807 self.assertDictEqual(w4, workers[3].user_classes_count)808 self.assertDictEqual(w5, workers[4].user_classes_count)809 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)810 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)811 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)812 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)813 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)814 sleep(1 - delta) # runtime = 16s815 # Fourth stage - All users are now at the desired number816 ts = time.time()817 while master.state != STATE_RUNNING:818 self.assertTrue(time.time() - ts <= 1, master.state)819 sleep()820 delta = time.time() - ts821 w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}822 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}823 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}824 w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}825 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}826 self.assertDictEqual(w1, workers[0].user_classes_count)827 self.assertDictEqual(w2, workers[1].user_classes_count)828 self.assertDictEqual(w3, workers[2].user_classes_count)829 self.assertDictEqual(w4, workers[3].user_classes_count)830 self.assertDictEqual(w5, workers[4].user_classes_count)831 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)832 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)833 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)834 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)835 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)836 sleep(10 - delta) # runtime = 26s837 # Sleep stop_timeout and make sure the test has stopped838 sleep(5) # runtime = 31s839 self.assertEqual(STATE_STOPPING, master.state)840 sleep(stop_timeout) # runtime = 36s841 # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.842 # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"843 # above is that when a worker receives the stop message, it can take up to "stop_timeout"844 # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds845 # to send the "client_ready" message.846 ts = time.time()847 while len(master.clients.ready) != len(workers):848 self.assertTrue(849 time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,850 f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",851 )852 sleep()853 sleep(1)854 # Check that no users are running855 w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}856 w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}857 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}858 w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}859 w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}860 self.assertDictEqual(w1, workers[0].user_classes_count)861 self.assertDictEqual(w2, workers[1].user_classes_count)862 self.assertDictEqual(w3, workers[2].user_classes_count)863 self.assertDictEqual(w4, workers[3].user_classes_count)864 self.assertDictEqual(w5, workers[4].user_classes_count)865 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)866 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)867 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)868 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)869 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)870 ts = time.time()871 while master.state != STATE_STOPPED:872 self.assertTrue(time.time() - ts <= 5, master.state)873 sleep()874 master.stop()875 @unittest.skip876 def test_distributed_shape_fuzzy_test(self):877 """878 Incredibility useful test to find issues with dispatch logic. This test allowed to find879 multiple small corner cases with the new dispatch logic of locust v2.880 The test is disabled by default because it takes a lot of time to run and has randomness to it.881 However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.882 """883 class BaseUser(User):884 @task885 def my_task(self):886 gevent.sleep(600)887 class TestUser01(BaseUser):888 pass889 class TestUser02(BaseUser):890 pass891 class TestUser03(BaseUser):892 pass893 class TestUser04(BaseUser):894 pass895 class TestUser05(BaseUser):896 pass897 class TestUser06(BaseUser):898 pass899 class TestUser07(BaseUser):900 pass901 class TestUser08(BaseUser):902 pass903 class TestUser09(BaseUser):904 pass905 class TestUser10(BaseUser):906 pass907 class TestUser11(BaseUser):908 pass909 class TestUser12(BaseUser):910 pass911 class TestUser13(BaseUser):912 pass913 class TestUser14(BaseUser):914 pass915 class TestUser15(BaseUser):916 pass917 class TestShape(LoadTestShape):918 def __init__(self):919 super().__init__()920 self.stages = []921 runtime = 0922 for _ in range(100):923 runtime += random.uniform(3, 15)924 self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))925 def tick(self):926 run_time = self.get_run_time()927 for stage in self.stages:928 if run_time < stage[0]:929 return stage[1], stage[2]930 user_classes = [931 TestUser01,932 TestUser02,933 TestUser03,934 TestUser04,935 TestUser05,936 TestUser06,937 TestUser07,938 TestUser08,939 TestUser09,940 TestUser10,941 TestUser11,942 TestUser12,943 TestUser13,944 TestUser14,945 TestUser15,946 ]947 chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))948 for user_class in chosen_user_classes:949 user_class.weight = random.uniform(1, 20)950 locust_worker_additional_wait_before_ready_after_stop = 5951 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(952 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",953 str(locust_worker_additional_wait_before_ready_after_stop),954 ):955 stop_timeout = 5956 master_env = Environment(957 user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout958 )959 master_env.shape_class.reset_time()960 master = master_env.create_master_runner("*", 0)961 workers = []962 for i in range(random.randint(1, 30)):963 worker_env = Environment(user_classes=chosen_user_classes)964 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)965 workers.append(worker)966 # Give workers time to connect967 sleep(0.1)968 self.assertEqual(STATE_INIT, master.state)969 self.assertEqual(len(workers), len(master.clients.ready))970 # Start a shape test971 master.start_shape()972 ts = time.time()973 while master.state != STATE_STOPPED:974 self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)975 print(976 "{:.2f}/{:.2f} | {} | {:.0f} | ".format(977 time.time() - ts,978 master_env.shape_class.stages[-1][0],979 master.state,980 sum(master.reported_user_classes_count.values()),981 )982 + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))983 )984 sleep(1)985 master.stop()986 def test_distributed_shape_stop_and_restart(self):987 """988 Test stopping and then restarting a LoadTestShape989 """990 class TestUser(User):991 @task992 def my_task(self):993 pass994 class TestShape(LoadTestShape):995 def tick(self):996 run_time = self.get_run_time()997 if run_time < 10:998 return 4, 4999 else:1000 return None1001 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1002 master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1003 master_env.shape_class.reset_time()1004 master = master_env.create_master_runner("*", 0)1005 workers = []1006 for i in range(2):1007 worker_env = Environment(user_classes=[TestUser])1008 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1009 workers.append(worker)1010 # Give workers time to connect1011 sleep(0.1)1012 # Start a shape test and ensure workers have connected and started the correct amount of users1013 master.start_shape()1014 sleep(1)1015 for worker in workers:1016 self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1017 # Stop the test and ensure all user count is 01018 master.stop()1019 sleep(1)1020 for worker in workers:1021 self.assertEqual(0, worker.user_count, "Shape test has not stopped")1022 # Then restart the test again and ensure workers have connected and started the correct amount of users1023 master.start_shape()1024 sleep(1)1025 for worker in workers:1026 self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1027 master.stop()1028 def test_distributed_shape_statuses_transition(self):1029 """1030 Full integration test that starts both a MasterRunner and five WorkerRunner instances1031 The goal of this test is to validate the status on the master is correctly transitioned for each of the1032 test phases.1033 """1034 class TestUser1(User):1035 @task1036 def my_task(self):1037 gevent.sleep(600)1038 class TestShape(LoadTestShape):1039 def tick(self):1040 run_time = self.get_run_time()1041 if run_time < 5:1042 return 5, 2.51043 elif run_time < 10:1044 return 10, 2.51045 elif run_time < 15:1046 return 15, 2.51047 else:1048 return None1049 locust_worker_additional_wait_before_ready_after_stop = 21050 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(1051 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1052 str(locust_worker_additional_wait_before_ready_after_stop),1053 ):1054 stop_timeout = 01055 master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1056 master_env.shape_class.reset_time()1057 master = master_env.create_master_runner("*", 0)1058 workers = []1059 for i in range(5):1060 worker_env = Environment(user_classes=[TestUser1])1061 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1062 workers.append(worker)1063 # Give workers time to connect1064 sleep(0.1)1065 self.assertEqual(STATE_INIT, master.state)1066 self.assertEqual(5, len(master.clients.ready))1067 statuses = []1068 ts = time.perf_counter()1069 master.start_shape()1070 while master.state != STATE_STOPPED:1071 # +5s buffer to let master stop1072 self.assertTrue(1073 time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1074 master.state,1075 )1076 statuses.append((time.perf_counter() - ts, master.state, master.user_count))1077 sleep(0.1)1078 self.assertEqual(statuses[0][1], STATE_INIT)1079 stage = 11080 tolerance = 1 # in s1081 for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1082 if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1083 self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1084 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1085 self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1086 stage += 11087 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1088 self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1089 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1090 self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1091 stage += 11092 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1093 self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1094 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1095 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1096 stage += 11097 elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1098 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1099 def test_swarm_endpoint_is_non_blocking(self):1100 class TestUser1(User):1101 @task1102 def my_task(self):1103 gevent.sleep(600)1104 class TestUser2(User):1105 @task1106 def my_task(self):1107 gevent.sleep(600)1108 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1109 stop_timeout = 01110 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1111 master = master_env.create_master_runner("*", 0)1112 web_ui = master_env.create_web_ui("127.0.0.1", 0)1113 workers = []...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful