Best Python code snippet using locust
test_runners.py
Source:test_runners.py  
...180            def my_task(self):181                pass182        environment = Environment(user_classes=[MyUser])183        @environment.events.test_stopping.add_listener184        def on_test_stopping(*_, **__):185            self.runner_stopping = True186        @environment.events.test_stop.add_listener187        def on_test_stop(*_, **__):188            self.runner_stopped = True189        runner = LocalRunner(environment)190        runner.start(user_count=3, spawn_rate=3, wait=False)191        self.assertFalse(self.runner_stopping)192        self.assertFalse(self.runner_stopped)193        runner.stop()194        self.assertTrue(self.runner_stopping)195        self.assertTrue(self.runner_stopped)196    def test_stop_event_quit(self):197        class MyUser(User):198            wait_time = constant(1)199            @task200            def my_task(self):201                pass202        environment = Environment(user_classes=[MyUser])203        @environment.events.test_stopping.add_listener204        def on_test_stopping(*_, **__):205            self.runner_stopping = True206        @environment.events.test_stop.add_listener207        def on_test_stop(*_, **__):208            self.runner_stopped = True209        runner = LocalRunner(environment)210        runner.start(user_count=3, spawn_rate=3, wait=False)211        self.assertFalse(self.runner_stopping)212        self.assertFalse(self.runner_stopped)213        runner.quit()214        self.assertTrue(self.runner_stopping)215        self.assertTrue(self.runner_stopped)216    def test_stop_event_stop_and_quit(self):217        class MyUser(User):218            wait_time = constant(1)219            @task220            def my_task(self):221                pass222        environment = Environment(user_classes=[MyUser])223        @environment.events.test_stopping.add_listener224        def on_test_stopping(*_, **__):225            self.runner_stopping = True226        @environment.events.test_stop.add_listener227        def on_test_stop(*_, **__):228            self.runner_stopped = True229        runner = LocalRunner(environment)230        runner.start(user_count=3, spawn_rate=3, wait=False)231        self.assertFalse(self.runner_stopping)232        self.assertFalse(self.runner_stopped)233        runner.stop()234        runner.quit()235        self.assertTrue(self.runner_stopping)236        self.assertTrue(self.runner_stopped)237    def test_stopping_event(self):238        on_stop_called = [False]239        class MyUser(User):240            on_stop_called = False241            wait_time = constant(1)242            @task243            def my_task(self):244                pass245            def on_stop(self):246                MyUser.on_stop_called = True247        environment = Environment(user_classes=[MyUser])248        @environment.events.test_stopping.add_listener249        def on_test_stopping(*_, **__):250            on_stop_called[0] = MyUser.on_stop_called251            self.runner_stopping = True252        runner = LocalRunner(environment)253        runner.start(user_count=3, spawn_rate=3, wait=False)254        runner.quit()255        self.assertTrue(self.runner_stopping)256        self.assertFalse(on_stop_called[0])257    def test_change_user_count_during_spawning(self):258        class MyUser(User):259            wait_time = constant(1)260            @task261            def my_task(self):262                pass263        environment = Environment(user_classes=[MyUser])264        runner = LocalRunner(environment)265        runner.start(user_count=10, spawn_rate=5, wait=False)266        sleep(0.6)267        runner.start(user_count=5, spawn_rate=5, wait=False)268        runner.spawning_greenlet.join()269        self.assertEqual(5, len(runner.user_greenlets))270        runner.quit()271    def test_reset_stats(self):272        class MyUser(User):273            @task274            class task_set(TaskSet):275                @task276                def my_task(self):277                    self.user.environment.events.request.fire(278                        request_type="GET",279                        name="/test",280                        response_time=666,281                        response_length=1337,282                        exception=None,283                        context={},284                    )285                    # Make sure each user only run this task once during the test286                    sleep(30)287        environment = Environment(user_classes=[MyUser], reset_stats=True)288        runner = LocalRunner(environment)289        runner.start(user_count=6, spawn_rate=1, wait=False)290        sleep(3)291        self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)292        sleep(3.25)293        self.assertLessEqual(runner.stats.get("/test", "GET").num_requests, 1)294        runner.quit()295    def test_no_reset_stats(self):296        class MyUser(User):297            @task298            class task_set(TaskSet):299                @task300                def my_task(self):301                    self.user.environment.events.request.fire(302                        request_type="GET",303                        name="/test",304                        response_time=666,305                        response_length=1337,306                        exception=None,307                        context={},308                    )309                    sleep(2)310        environment = Environment(reset_stats=False, user_classes=[MyUser])311        runner = LocalRunner(environment)312        runner.start(user_count=6, spawn_rate=12, wait=False)313        sleep(0.25)314        self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)315        sleep(0.3)316        self.assertEqual(6, runner.stats.get("/test", "GET").num_requests)317        runner.quit()318    def test_runner_reference_on_environment(self):319        env = Environment()320        runner = env.create_local_runner()321        self.assertEqual(env, runner.environment)322        self.assertEqual(runner, env.runner)323    def test_users_can_call_runner_quit_without_deadlocking(self):324        class BaseUser(User):325            stop_triggered = False326            @task327            def trigger(self):328                self.environment.runner.quit()329            def on_stop(self):330                BaseUser.stop_triggered = True331        runner = Environment(user_classes=[BaseUser]).create_local_runner()332        users = runner.spawn_users({BaseUser.__name__: 1}, wait=False)333        self.assertEqual(1, len(users))334        timeout = gevent.Timeout(0.5)335        timeout.start()336        try:337            runner.greenlet.join()338        except gevent.Timeout:339            self.fail("Got Timeout exception, runner must have hung somehow.")340        finally:341            timeout.cancel()342        self.assertTrue(BaseUser.stop_triggered)343    def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):344        class BaseUser(User):345            stop_count = 0346            @task347            def trigger(self):348                pass349            def on_stop(self):350                gevent.sleep(0.1)351                BaseUser.stop_count += 1352        runner = Environment(user_classes=[BaseUser]).create_local_runner()353        users = runner.spawn_users({BaseUser.__name__: 10}, wait=False)354        self.assertEqual(10, len(users))355        timeout = gevent.Timeout(0.3)356        timeout.start()357        try:358            runner.quit()359        except gevent.Timeout:360            self.fail("Got Timeout exception, runner must have hung somehow.")361        finally:362            timeout.cancel()363        self.assertEqual(10, BaseUser.stop_count)  # verify that all users executed on_stop364    def test_stop_users_with_spawn_rate(self):365        """366        The spawn rate does not have an effect on the rate at which the users are stopped.367        It is expected that the excess users will be stopped as soon as possible in parallel368        (while respecting the stop_timeout).369        """370        class MyUser(User):371            wait_time = constant(1)372            @task373            def my_task(self):374                pass375        environment = Environment(user_classes=[MyUser])376        runner = LocalRunner(environment)377        # Start load test, wait for users to start, then trigger ramp down378        ts = time.time()379        runner.start(10, 10, wait=False)380        runner.spawning_greenlet.join()381        delta = time.time() - ts382        self.assertTrue(383            0 <= delta <= 0.05, f"Expected user count to increase to 10 instantaneously, instead it took {delta:f}"384        )385        self.assertTrue(386            runner.user_count == 10, "User count has not decreased correctly to 2, it is : %i" % runner.user_count387        )388        ts = time.time()389        runner.start(2, 4, wait=False)390        runner.spawning_greenlet.join()391        delta = time.time() - ts392        self.assertTrue(0 <= delta <= 1.05, f"Expected user count to decrease to 2 in 1s, instead it took {delta:f}")393        self.assertTrue(394            runner.user_count == 2, "User count has not decreased correctly to 2, it is : %i" % runner.user_count395        )396    def test_attributes_populated_when_calling_start(self):397        class MyUser1(User):398            wait_time = constant(0)399            @task400            def my_task(self):401                pass402        class MyUser2(User):403            wait_time = constant(0)404            @task405            def my_task(self):406                pass407        environment = Environment(user_classes=[MyUser1, MyUser2])408        runner = LocalRunner(environment)409        runner.start(user_count=10, spawn_rate=5, wait=False)410        runner.spawning_greenlet.join()411        self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)412        runner.start(user_count=5, spawn_rate=5, wait=False)413        runner.spawning_greenlet.join()414        self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)415        runner.quit()416    def test_user_classes_count(self):417        class MyUser1(User):418            wait_time = constant(0)419            @task420            def my_task(self):421                pass422        class MyUser2(User):423            wait_time = constant(0)424            @task425            def my_task(self):426                pass427        environment = Environment(user_classes=[MyUser1, MyUser2])428        runner = LocalRunner(environment)429        runner.start(user_count=10, spawn_rate=5, wait=False)430        runner.spawning_greenlet.join()431        self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)432        runner.start(user_count=5, spawn_rate=5, wait=False)433        runner.spawning_greenlet.join()434        self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)435        runner.quit()436    def test_host_class_attribute_from_web(self):437        """If host is left empty from the webUI, we should not use it"""438        class MyUser1(User):439            host = "https://host1.com"440            @task441            def my_task(self):442                pass443        class MyUser2(User):444            host = "https://host2.com"445            @task446            def my_task(self):447                pass448        opts = mocked_options()449        # If left empty on the web, we get an empty string as host450        opts.host = ""451        environment = create_environment([MyUser1, MyUser2], opts)452        runner = LocalRunner(environment)453        # Start the runner to trigger problematic code454        runner.start(user_count=2, spawn_rate=1, wait=False)455        runner.spawning_greenlet.join()456        # Make sure we did not overwrite the host variable457        self.assertEqual(MyUser1.host, "https://host1.com")458        self.assertEqual(MyUser2.host, "https://host2.com")459        runner.quit()460    def test_custom_message(self):461        class MyUser(User):462            wait_time = constant(1)463            @task464            def my_task(self):465                pass466        test_custom_msg = [False]467        test_custom_msg_data = [{}]468        def on_custom_msg(msg, **kw):469            test_custom_msg[0] = True470            test_custom_msg_data[0] = msg.data471        environment = Environment(user_classes=[MyUser])472        runner = LocalRunner(environment)473        runner.register_message("test_custom_msg", on_custom_msg)474        runner.send_message("test_custom_msg", {"test_data": 123})475        self.assertTrue(test_custom_msg[0])476        self.assertEqual(123, test_custom_msg_data[0]["test_data"])477    def test_undefined_custom_message(self):478        class MyUser(User):479            wait_time = constant(1)480            @task481            def my_task(self):482                pass483        test_custom_msg = [False]484        def on_custom_msg(msg, **kw):485            test_custom_msg[0] = True486        environment = Environment(user_classes=[MyUser])487        runner = LocalRunner(environment)488        runner.register_message("test_custom_msg", on_custom_msg)489        runner.send_message("test_different_custom_msg")490        self.assertFalse(test_custom_msg[0])491        self.assertEqual(1, len(self.mocked_log.warning))492        msg = self.mocked_log.warning[0]493        self.assertIn("Unknown message type received", msg)494    def test_swarm_endpoint_is_non_blocking(self):495        class TestUser1(User):496            @task497            def my_task(self):498                gevent.sleep(600)499        class TestUser2(User):500            @task501            def my_task(self):502                gevent.sleep(600)503        stop_timeout = 0504        env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)505        local_runner = env.create_local_runner()506        web_ui = env.create_web_ui("127.0.0.1", 0)507        gevent.sleep(0.1)508        ts = time.perf_counter()509        response = requests.post(510            f"http://127.0.0.1:{web_ui.server.server_port}/swarm",511            data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},512        )513        self.assertEqual(200, response.status_code)514        self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")515        ts = time.perf_counter()516        while local_runner.state != STATE_RUNNING:517            self.assertTrue(time.perf_counter() - ts <= 4, local_runner.state)518            gevent.sleep(0.1)519        self.assertTrue(3 <= time.perf_counter() - ts <= 5)520        self.assertEqual(local_runner.user_count, 20)521        local_runner.stop()522        web_ui.stop()523    def test_can_call_stop_endpoint_if_currently_swarming(self):524        class TestUser1(User):525            @task526            def my_task(self):527                gevent.sleep(600)528        class TestUser2(User):529            @task530            def my_task(self):531                gevent.sleep(600)532        stop_timeout = 5533        env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)534        local_runner = env.create_local_runner()535        web_ui = env.create_web_ui("127.0.0.1", 0)536        gevent.sleep(0.1)537        ts = time.perf_counter()538        response = requests.post(539            f"http://127.0.0.1:{web_ui.server.server_port}/swarm",540            data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},541        )542        self.assertEqual(200, response.status_code)543        self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")544        gevent.sleep(5)545        self.assertEqual(local_runner.state, STATE_SPAWNING)546        self.assertLessEqual(local_runner.user_count, 10)547        ts = time.perf_counter()548        response = requests.get(549            f"http://127.0.0.1:{web_ui.server.server_port}/stop",550        )551        self.assertEqual(200, response.status_code)552        self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")553        ts = time.perf_counter()554        while local_runner.state != STATE_STOPPED:555            self.assertTrue(time.perf_counter() - ts <= 2)556            gevent.sleep(0.1)557        self.assertLessEqual(local_runner.user_count, 0)558        local_runner.stop()559        web_ui.stop()560    def test_target_user_count_is_set_before_ramp_up(self):561        """Test for https://github.com/locustio/locust/issues/1883"""562        class MyUser1(User):563            wait_time = constant(0)564            @task565            def my_task(self):566                pass567        environment = Environment(user_classes=[MyUser1])568        runner = LocalRunner(environment)569        test_start_event_fired = [False]570        @environment.events.test_start.add_listener571        def on_test_start(*args, **kwargs):572            test_start_event_fired[0] = True573            self.assertEqual(runner.target_user_count, 3)574        runner.start(user_count=3, spawn_rate=1, wait=False)575        gevent.sleep(1)576        self.assertEqual(runner.target_user_count, 3)577        self.assertEqual(runner.user_count, 1)578        # However, target_user_classes_count is only updated at the end of the ramp-up/ramp-down579        # due to the way it is implemented.580        self.assertDictEqual({}, runner.target_user_classes_count)581        runner.spawning_greenlet.join()582        self.assertEqual(runner.target_user_count, 3)583        self.assertEqual(runner.user_count, 3)584        self.assertDictEqual({"MyUser1": 3}, runner.target_user_classes_count)585        runner.quit()586        self.assertTrue(test_start_event_fired[0])587    def test_stop_users_count(self):588        user_count = 10589        class BaseUser1(User):590            wait_time = constant(1)591            @task592            def task_a(self):593                pass594        class BaseUser2(BaseUser1):595            wait_time = constant(1)596        runner = Environment(user_classes=[BaseUser1, BaseUser2]).create_local_runner()597        runner.start(user_count=user_count, spawn_rate=10)598        sleep(1)599        self.assertEqual(user_count, runner.user_count)600        runner.stop()601        sleep(1)602        self.assertEqual(0, runner.user_count)603class TestMasterWorkerRunners(LocustTestCase):604    def test_distributed_integration_run(self):605        """606        Full integration test that starts both a MasterRunner and three WorkerRunner instances607        and makes sure that their stats is sent to the Master.608        """609        class TestUser(User):610            wait_time = constant(0.1)611            @task612            def incr_stats(self):613                self.environment.events.request.fire(614                    request_type="GET",615                    name="/",616                    response_time=1337,617                    response_length=666,618                    exception=None,619                    context={},620                )621        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):622            # start a Master runner623            master_env = Environment(user_classes=[TestUser])624            master = master_env.create_master_runner("*", 0)625            sleep(0)626            # start 3 Worker runners627            workers = []628            for i in range(3):629                worker_env = Environment(user_classes=[TestUser])630                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)631                workers.append(worker)632            # give workers time to connect633            sleep(0.1)634            # issue start command that should trigger TestUsers to be spawned in the Workers635            master.start(6, spawn_rate=1000)636            sleep(0.1)637            # check that worker nodes have started locusts638            for worker in workers:639                self.assertEqual(2, worker.user_count)640            # give time for users to generate stats, and stats to be sent to master641            sleep(1)642            master.quit()643            # make sure users are killed644            for worker in workers:645                self.assertEqual(0, worker.user_count)646        # check that stats are present in master647        self.assertGreater(648            master_env.runner.stats.total.num_requests,649            20,650            "For some reason the master node's stats has not come in",651        )652    def test_distributed_rebalanced_integration_run(self):653        """654        Full integration test that starts both a MasterRunner and three WorkerRunner instances655        and makes sure that their stats is sent to the Master.656        """657        class TestUser(User):658            wait_time = constant(0.1)659            @task660            def incr_stats(self):661                self.environment.events.request.fire(662                    request_type="GET",663                    name="/",664                    response_time=1337,665                    response_length=666,666                    exception=None,667                    context={},668                )669        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(670            "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"671        ):672            # start a Master runner673            options = parse_options(["--enable-rebalancing"])674            master_env = Environment(user_classes=[TestUser], parsed_options=options)675            master = master_env.create_master_runner("*", 0)676            sleep(0)677            # start 3 Worker runners678            workers = []679            def add_worker():680                worker_env = Environment(user_classes=[TestUser])681                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)682                workers.append(worker)683            for i in range(3):684                add_worker()685            # give workers time to connect686            sleep(0.1)687            # issue start command that should trigger TestUsers to be spawned in the Workers688            master.start(6, spawn_rate=1000)689            sleep(0.1)690            # check that worker nodes have started locusts691            for worker in workers:692                self.assertEqual(2, worker.user_count)693            # give time for users to generate stats, and stats to be sent to master694            # Add 1 more workers (should be 4 now)695            add_worker()696            @retry(AssertionError, tries=10, delay=0.5)697            def check_rebalanced_true():698                for worker in workers:699                    self.assertTrue(worker.user_count > 0)700            # Check that all workers have a user count > 0 at least701            check_rebalanced_true()702            # Add 2 more workers (should be 6 now)703            add_worker()704            add_worker()705            @retry(AssertionError, tries=10, delay=0.5)706            def check_rebalanced_equals():707                for worker in workers:708                    self.assertEqual(1, worker.user_count)709            # Check that all workers have a user count = 1 now710            check_rebalanced_equals()711            # Simulate that some workers are missing by "killing" them abrutly712            for i in range(3):713                workers[i].greenlet.kill(block=True)714            @retry(AssertionError, tries=10, delay=1)715            def check_master_worker_missing_count():716                self.assertEqual(3, len(master.clients.missing))717            # Check that master detected the missing workers718            check_master_worker_missing_count()719            @retry(AssertionError, tries=10, delay=1)720            def check_remaing_worker_new_user_count():721                for i in range(3, 6):722                    self.assertEqual(2, workers[i].user_count)723            # Check that remaining workers have a new count of user due to rebalancing.724            check_remaing_worker_new_user_count()725            sleep(1)726            # Finally quit and check states of remaining workers.727            master.quit()728            # make sure users are killed on remaining workers729            for i in range(3, 6):730                self.assertEqual(0, workers[i].user_count)731        # check that stats are present in master732        self.assertGreater(733            master_env.runner.stats.total.num_requests,734            20,735            "For some reason the master node's stats has not come in",736        )737    def test_distributed_run_with_custom_args(self):738        """739        Full integration test that starts both a MasterRunner and three WorkerRunner instances740        and makes sure that their stats is sent to the Master.741        """742        class TestUser(User):743            wait_time = constant(0.1)744            @task745            def incr_stats(self):746                self.environment.events.request.fire(747                    request_type="GET",748                    name=self.environment.parsed_options.my_str_argument,749                    response_time=self.environment.parsed_options.my_int_argument,750                    response_length=666,751                    exception=None,752                    context={},753                )754        @locust.events.init_command_line_parser.add_listener755        def _(parser, **kw):756            parser.add_argument("--my-int-argument", type=int)757            parser.add_argument("--my-str-argument", type=str, default="NOOOO")758        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):759            # start a Master runner760            master_env = Environment(user_classes=[TestUser])761            master = master_env.create_master_runner("*", 0)762            master_env.parsed_options = parse_options(763                [764                    "--my-int-argument",765                    "42",766                    "--my-str-argument",767                    "cool-string",768                ]769            )770            sleep(0)771            # start 3 Worker runners772            workers = []773            for i in range(3):774                worker_env = Environment(user_classes=[TestUser])775                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)776                workers.append(worker)777            # give workers time to connect778            sleep(0.1)779            # issue start command that should trigger TestUsers to be spawned in the Workers780            master.start(6, spawn_rate=1000)781            sleep(0.1)782            # check that worker nodes have started locusts783            for worker in workers:784                self.assertEqual(2, worker.user_count)785            # give time for users to generate stats, and stats to be sent to master786            sleep(1)787            master.quit()788            # make sure users are killed789            for worker in workers:790                self.assertEqual(0, worker.user_count)791        self.assertEqual(master_env.runner.stats.total.max_response_time, 42)792        self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)793    def test_test_stop_event(self):794        class TestUser(User):795            wait_time = constant(0.1)796            @task797            def my_task(l):798                pass799        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):800            # start a Master runner801            master_env = Environment(user_classes=[TestUser])802            test_stop_count = {"master": 0, "worker": 0}803            @master_env.events.test_stop.add_listener804            def _(*args, **kwargs):805                test_stop_count["master"] += 1806            master = master_env.create_master_runner("*", 0)807            sleep(0)808            # start a Worker runner809            worker_env = Environment(user_classes=[TestUser])810            @worker_env.events.test_stop.add_listener811            def _(*args, **kwargs):812                test_stop_count["worker"] += 1813            worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)814            # give worker time to connect815            sleep(0.1)816            # issue start command that should trigger TestUsers to be spawned in the Workers817            master.start(2, spawn_rate=1000)818            sleep(0.1)819            # check that worker nodes have started locusts820            self.assertEqual(2, worker.user_count)821            # give time for users to generate stats, and stats to be sent to master822            sleep(0.1)823            master_env.events.quitting.fire(environment=master_env, reverse=True)824            master.quit()825            sleep(0.1)826            # make sure users are killed827            self.assertEqual(0, worker.user_count)828        # check the test_stop event was called one time in master and one time in worker829        self.assertEqual(830            1,831            test_stop_count["master"],832            "The test_stop event was not called exactly one time in the master node",833        )834        self.assertEqual(835            1,836            test_stop_count["worker"],837            "The test_stop event was not called exactly one time in the worker node",838        )839    def test_distributed_shape(self):840        """841        Full integration test that starts both a MasterRunner and three WorkerRunner instances842        and tests a basic LoadTestShape with scaling up and down users843        """844        class TestUser(User):845            @task846            def my_task(self):847                pass848        class TestShape(LoadTestShape):849            def tick(self):850                run_time = self.get_run_time()851                if run_time < 2:852                    return 9, 9853                elif run_time < 4:854                    return 21, 21855                elif run_time < 6:856                    return 3, 21857                else:858                    return None859        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):860            test_shape = TestShape()861            master_env = Environment(user_classes=[TestUser], shape_class=test_shape)862            master_env.shape_class.reset_time()863            master = master_env.create_master_runner("*", 0)864            workers = []865            for i in range(3):866                worker_env = Environment(user_classes=[TestUser])867                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)868                workers.append(worker)869            # Give workers time to connect870            sleep(0.1)871            # Start a shape test872            master.start_shape()873            sleep(1)874            # Ensure workers have connected and started the correct amount of users875            for worker in workers:876                self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")877                self.assertEqual(878                    9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"879                )880            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})881            # Ensure new stage with more users has been reached882            sleep(2)883            for worker in workers:884                self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")885                self.assertEqual(886                    21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"887                )888            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})889            # Ensure new stage with less users has been reached890            sleep(2)891            for worker in workers:892                self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")893                self.assertEqual(894                    3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"895                )896            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})897            # Ensure test stops at the end898            sleep(2)899            for worker in workers:900                self.assertEqual(0, worker.user_count, "Shape test has not stopped")901                self.assertEqual(902                    0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"903                )904            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})905            self.assertEqual("stopped", master.state)906    def test_distributed_shape_with_fixed_users(self):907        """908        Full integration test that starts both a MasterRunner and three WorkerRunner instances909        and tests a basic LoadTestShape with scaling up and down users with 'fixed count' users910        """911        class TestUser(User):912            @task913            def my_task(self):914                pass915        class FixedUser1(User):916            fixed_count = 1917            @task918            def my_task(self):919                pass920        class FixedUser2(User):921            fixed_count = 11922            @task923            def my_task(self):924                pass925        class TestShape(LoadTestShape):926            def tick(self):927                run_time = self.get_run_time()928                if run_time < 1:929                    return 12, 12930                elif run_time < 2:931                    return 36, 24932                elif run_time < 3:933                    return 12, 24934                else:935                    return None936        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):937            test_shape = TestShape()938            master_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2], shape_class=test_shape)939            master_env.shape_class.reset_time()940            master = master_env.create_master_runner("*", 0)941            workers = []942            for _ in range(3):943                worker_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2])944                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)945                workers.append(worker)946            # Give workers time to connect947            sleep(0.1)948            # Start a shape test949            master.start_shape()950            sleep(1)951            # Ensure workers have connected and started the correct amount of users (fixed is spawn first)952            for worker in workers:953                self.assertEqual(4, worker.user_count, "Shape test has not reached stage 1")954                self.assertEqual(955                    12, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"956                )957            self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})958            # Ensure new stage with more users has been reached959            sleep(1)960            for worker in workers:961                self.assertEqual(12, worker.user_count, "Shape test has not reached stage 2")962                self.assertEqual(963                    36, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"964                )965            self.assertDictEqual(966                master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 24}967            )968            # Ensure new stage with less users has been reached969            # and expected count of the fixed users is present970            sleep(1)971            for worker in workers:972                self.assertEqual(4, worker.user_count, "Shape test has not reached stage 3")973                self.assertEqual(974                    12, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"975                )976            self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})977            # Ensure test stops at the end978            sleep(0.5)979            for worker in workers:980                self.assertEqual(0, worker.user_count, "Shape test has not stopped")981                self.assertEqual(982                    0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"983                )984            self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 0, "FixedUser2": 0, "TestUser": 0})985            try:986                with gevent.Timeout(3.0):987                    while master.state != STATE_STOPPED:988                        sleep(0.1)989            finally:990                self.assertEqual(STATE_STOPPED, master.state)991    def test_distributed_shape_with_stop_timeout(self):992        """993        Full integration test that starts both a MasterRunner and five WorkerRunner instances994        and tests a basic LoadTestShape with scaling up and down users995        """996        class TestUser1(User):997            def start(self, group: Group):998                gevent.sleep(0.5)999                return super().start(group)1000            @task1001            def my_task(self):1002                gevent.sleep(0)1003        class TestUser2(User):1004            def start(self, group: Group):1005                gevent.sleep(0.5)1006                return super().start(group)1007            @task1008            def my_task(self):1009                gevent.sleep(600)1010        class TestUser3(User):1011            def start(self, group: Group):1012                gevent.sleep(0.5)1013                return super().start(group)1014            @task1015            def my_task(self):1016                gevent.sleep(600)1017        class TestShape(LoadTestShape):1018            def tick(self):1019                run_time = self.get_run_time()1020                if run_time < 10:1021                    return 15, 31022                elif run_time < 30:1023                    return 5, 101024                else:1025                    return None1026        locust_worker_additional_wait_before_ready_after_stop = 51027        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1028            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1029            str(locust_worker_additional_wait_before_ready_after_stop),1030        ):1031            stop_timeout = 51032            master_env = Environment(1033                user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout1034            )1035            master_env.shape_class.reset_time()1036            master = master_env.create_master_runner("*", 0)1037            workers = []1038            for i in range(5):1039                worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])1040                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1041                workers.append(worker)1042            # Give workers time to connect1043            sleep(0.1)1044            self.assertEqual(STATE_INIT, master.state)1045            self.assertEqual(5, len(master.clients.ready))1046            # Re-order `workers` so that it is sorted by `id`.1047            # This is required because the dispatch is done1048            # on the sorted workers.1049            workers = sorted(workers, key=lambda w: w.client_id)1050            # Start a shape test1051            master.start_shape()1052            # First stage1053            ts = time.time()1054            while master.state != STATE_SPAWNING:1055                self.assertTrue(time.time() - ts <= 1, master.state)1056                sleep()1057            sleep(5 - (time.time() - ts))  # runtime = 5s1058            ts = time.time()1059            while master.state != STATE_RUNNING:1060                self.assertTrue(time.time() - ts <= 1, master.state)1061                sleep()1062            self.assertEqual(STATE_RUNNING, master.state)1063            w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1064            w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1065            w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1066            w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1067            w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1068            self.assertDictEqual(w1, workers[0].user_classes_count)1069            self.assertDictEqual(w2, workers[1].user_classes_count)1070            self.assertDictEqual(w3, workers[2].user_classes_count)1071            self.assertDictEqual(w4, workers[3].user_classes_count)1072            self.assertDictEqual(w5, workers[4].user_classes_count)1073            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1074            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1075            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1076            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1077            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1078            sleep(5 - (time.time() - ts))  # runtime = 10s1079            # Fourth stage1080            ts = time.time()1081            while master.state != STATE_SPAWNING:1082                self.assertTrue(time.time() - ts <= 1, master.state)1083                sleep()1084            sleep(5 - (time.time() - ts))  # runtime = 15s1085            # Fourth stage - Excess TestUser1 have been stopped but1086            #                TestUser2/TestUser3 have not reached stop timeout yet, so1087            #                their number are unchanged1088            ts = time.time()1089            while master.state != STATE_RUNNING:1090                self.assertTrue(time.time() - ts <= 1, master.state)1091                sleep()1092            delta = time.time() - ts1093            w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1094            w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1095            w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1096            w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1097            w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1098            self.assertDictEqual(w1, workers[0].user_classes_count)1099            self.assertDictEqual(w2, workers[1].user_classes_count)1100            self.assertDictEqual(w3, workers[2].user_classes_count)1101            self.assertDictEqual(w4, workers[3].user_classes_count)1102            self.assertDictEqual(w5, workers[4].user_classes_count)1103            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1104            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1105            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1106            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1107            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1108            sleep(1 - delta)  # runtime = 16s1109            # Fourth stage - All users are now at the desired number1110            ts = time.time()1111            while master.state != STATE_RUNNING:1112                self.assertTrue(time.time() - ts <= 1, master.state)1113                sleep()1114            delta = time.time() - ts1115            w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1116            w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1117            w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}1118            w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1119            w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1120            self.assertDictEqual(w1, workers[0].user_classes_count)1121            self.assertDictEqual(w2, workers[1].user_classes_count)1122            self.assertDictEqual(w3, workers[2].user_classes_count)1123            self.assertDictEqual(w4, workers[3].user_classes_count)1124            self.assertDictEqual(w5, workers[4].user_classes_count)1125            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1126            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1127            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1128            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1129            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1130            sleep(10 - delta)  # runtime = 26s1131            # Sleep stop_timeout and make sure the test has stopped1132            sleep(5)  # runtime = 31s1133            self.assertEqual(STATE_STOPPING, master.state)1134            sleep(stop_timeout)  # runtime = 36s1135            # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.1136            # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"1137            # above is that when a worker receives the stop message, it can take up to "stop_timeout"1138            # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds1139            # to send the "client_ready" message.1140            ts = time.time()1141            while len(master.clients.ready) != len(workers):1142                self.assertTrue(1143                    time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,1144                    f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",1145                )1146                sleep()1147            sleep(1)1148            # Check that no users are running1149            w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1150            w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1151            w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1152            w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1153            w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1154            self.assertDictEqual(w1, workers[0].user_classes_count)1155            self.assertDictEqual(w2, workers[1].user_classes_count)1156            self.assertDictEqual(w3, workers[2].user_classes_count)1157            self.assertDictEqual(w4, workers[3].user_classes_count)1158            self.assertDictEqual(w5, workers[4].user_classes_count)1159            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1160            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1161            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1162            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1163            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1164            ts = time.time()1165            while master.state != STATE_STOPPED:1166                self.assertTrue(time.time() - ts <= 5, master.state)1167                sleep()1168            master.stop()1169    @unittest.skip(reason="takes a lot of time and has randomness to it")1170    def test_distributed_shape_fuzzy_test(self):1171        """1172        Incredibility useful test to find issues with dispatch logic. This test allowed to find1173        multiple small corner cases with the new dispatch logic of locust v2.1174        The test is disabled by default because it takes a lot of time to run and has randomness to it.1175        However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.1176        """1177        class BaseUser(User):1178            @task1179            def my_task(self):1180                gevent.sleep(600)1181        class TestUser01(BaseUser):1182            pass1183        class TestUser02(BaseUser):1184            pass1185        class TestUser03(BaseUser):1186            pass1187        class TestUser04(BaseUser):1188            pass1189        class TestUser05(BaseUser):1190            pass1191        class TestUser06(BaseUser):1192            pass1193        class TestUser07(BaseUser):1194            pass1195        class TestUser08(BaseUser):1196            pass1197        class TestUser09(BaseUser):1198            pass1199        class TestUser10(BaseUser):1200            pass1201        class TestUser11(BaseUser):1202            pass1203        class TestUser12(BaseUser):1204            pass1205        class TestUser13(BaseUser):1206            pass1207        class TestUser14(BaseUser):1208            pass1209        class TestUser15(BaseUser):1210            pass1211        class TestShape(LoadTestShape):1212            def __init__(self):1213                super().__init__()1214                self.stages = []1215                runtime = 01216                for _ in range(100):1217                    runtime += random.uniform(3, 15)1218                    self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))1219            def tick(self):1220                run_time = self.get_run_time()1221                for stage in self.stages:1222                    if run_time < stage[0]:1223                        return stage[1], stage[2]1224        user_classes = [1225            TestUser01,1226            TestUser02,1227            TestUser03,1228            TestUser04,1229            TestUser05,1230            TestUser06,1231            TestUser07,1232            TestUser08,1233            TestUser09,1234            TestUser10,1235            TestUser11,1236            TestUser12,1237            TestUser13,1238            TestUser14,1239            TestUser15,1240        ]1241        chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))1242        for user_class in chosen_user_classes:1243            user_class.weight = random.uniform(1, 20)1244        locust_worker_additional_wait_before_ready_after_stop = 51245        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1246            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1247            str(locust_worker_additional_wait_before_ready_after_stop),1248        ):1249            stop_timeout = 51250            master_env = Environment(1251                user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout1252            )1253            master_env.shape_class.reset_time()1254            master = master_env.create_master_runner("*", 0)1255            workers = []1256            for i in range(random.randint(1, 30)):1257                worker_env = Environment(user_classes=chosen_user_classes)1258                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1259                workers.append(worker)1260            # Give workers time to connect1261            sleep(0.1)1262            self.assertEqual(STATE_INIT, master.state)1263            self.assertEqual(len(workers), len(master.clients.ready))1264            # Start a shape test1265            master.start_shape()1266            ts = time.time()1267            while master.state != STATE_STOPPED:1268                self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)1269                print(1270                    "{:.2f}/{:.2f} | {} | {:.0f} | ".format(1271                        time.time() - ts,1272                        master_env.shape_class.stages[-1][0],1273                        master.state,1274                        sum(master.reported_user_classes_count.values()),1275                    )1276                    + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))1277                )1278                sleep(1)1279            master.stop()1280    def test_distributed_shape_stop_and_restart(self):1281        """1282        Test stopping and then restarting a LoadTestShape1283        """1284        class TestUser(User):1285            @task1286            def my_task(self):1287                pass1288        class TestShape(LoadTestShape):1289            def tick(self):1290                run_time = self.get_run_time()1291                if run_time < 10:1292                    return 4, 41293                else:1294                    return None1295        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1296            master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1297            master_env.shape_class.reset_time()1298            master = master_env.create_master_runner("*", 0)1299            workers = []1300            for i in range(2):1301                worker_env = Environment(user_classes=[TestUser])1302                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1303                workers.append(worker)1304            # Give workers time to connect1305            sleep(0.1)1306            # Start a shape test and ensure workers have connected and started the correct amount of users1307            master.start_shape()1308            sleep(1)1309            for worker in workers:1310                self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1311            # Stop the test and ensure all user count is 01312            master.stop()1313            sleep(1)1314            for worker in workers:1315                self.assertEqual(0, worker.user_count, "Shape test has not stopped")1316            # Then restart the test again and ensure workers have connected and started the correct amount of users1317            master.start_shape()1318            sleep(1)1319            for worker in workers:1320                self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1321            master.stop()1322    def test_distributed_stop_with_stopping_state(self):1323        """1324        Test stopping state when workers have stopped and now ready for a next test1325        """1326        class TestUser(User):1327            @task1328            def my_task(self):1329                pass1330        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1331            master_env = Environment(user_classes=[TestUser])1332            master = master_env.create_master_runner("*", 0)1333            workers = []1334            for i in range(3):1335                worker_env = Environment(user_classes=[TestUser])1336                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1337                workers.append(worker)1338        for worker in workers:1339            worker.send_message("client_stopped", None)1340        sleep(1)1341        for worker in workers:1342            self.assertEqual(STATE_INIT, worker.state, "Worker sent a client_stopped, should be ready once stopped")1343        self.assertEqual(STATE_STOPPED, master.state)1344    def test_distributed_shape_statuses_transition(self):1345        """1346        Full integration test that starts both a MasterRunner and five WorkerRunner instances1347        The goal of this test is to validate the status on the master is correctly transitioned for each of the1348        test phases.1349        """1350        class TestUser1(User):1351            @task1352            def my_task(self):1353                gevent.sleep(600)1354        class TestShape(LoadTestShape):1355            def tick(self):1356                run_time = self.get_run_time()1357                if run_time < 5:1358                    return 5, 2.51359                elif run_time < 10:1360                    return 10, 2.51361                elif run_time < 15:1362                    return 15, 2.51363                else:1364                    return None1365        locust_worker_additional_wait_before_ready_after_stop = 21366        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1367            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1368            str(locust_worker_additional_wait_before_ready_after_stop),1369        ):1370            stop_timeout = 01371            master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1372            master_env.shape_class.reset_time()1373            master = master_env.create_master_runner("*", 0)1374            workers = []1375            for i in range(5):1376                worker_env = Environment(user_classes=[TestUser1])1377                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1378                workers.append(worker)1379            # Give workers time to connect1380            sleep(0.1)1381            self.assertEqual(STATE_INIT, master.state)1382            self.assertEqual(5, len(master.clients.ready))1383            statuses = []1384            ts = time.perf_counter()1385            master.start_shape()1386            while master.state != STATE_STOPPED:1387                # +5s buffer to let master stop1388                self.assertTrue(1389                    time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1390                    master.state,1391                )1392                statuses.append((time.perf_counter() - ts, master.state, master.user_count))1393                sleep(0.1)1394            self.assertEqual(statuses[0][1], STATE_INIT)1395            stage = 11396            tolerance = 1  # in s1397            for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1398                if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1399                    self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1400                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1401                    self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1402                    stage += 11403                elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1404                    self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1405                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1406                    self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1407                    stage += 11408                elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1409                    self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1410                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1411                    self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1412                    stage += 11413                elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1414                    self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1415    def test_swarm_endpoint_is_non_blocking(self):1416        class TestUser1(User):1417            @task1418            def my_task(self):1419                gevent.sleep(600)1420        class TestUser2(User):1421            @task1422            def my_task(self):1423                gevent.sleep(600)1424        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1425            stop_timeout = 01426            master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1427            master = master_env.create_master_runner("*", 0)1428            web_ui = master_env.create_web_ui("127.0.0.1", 0)1429            workers = []1430            for i in range(2):1431                worker_env = Environment(user_classes=[TestUser1, TestUser2])1432                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1433                workers.append(worker)1434            # Give workers time to connect1435            sleep(0.1)1436            self.assertEqual(STATE_INIT, master.state)1437            self.assertEqual(len(master.clients.ready), len(workers))1438            ts = time.perf_counter()1439            response = requests.post(1440                f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1441                data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},1442            )1443            self.assertEqual(200, response.status_code)1444            self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1445            ts = time.perf_counter()1446            while master.state != STATE_RUNNING:1447                self.assertTrue(time.perf_counter() - ts <= 4, master.state)1448                gevent.sleep(0.1)1449            self.assertTrue(3 <= time.perf_counter() - ts <= 5)1450            self.assertEqual(master.user_count, 20)1451            master.stop()1452            web_ui.stop()1453    def test_can_call_stop_endpoint_if_currently_swarming(self):1454        class TestUser1(User):1455            @task1456            def my_task(self):1457                gevent.sleep(600)1458        class TestUser2(User):1459            @task1460            def my_task(self):1461                gevent.sleep(600)1462        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1463            stop_timeout = 51464            master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1465            master = master_env.create_master_runner("*", 0)1466            web_ui = master_env.create_web_ui("127.0.0.1", 0)1467            workers = []1468            for i in range(2):1469                worker_env = Environment(user_classes=[TestUser1, TestUser2])1470                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1471                workers.append(worker)1472            # Give workers time to connect1473            sleep(0.1)1474            self.assertEqual(STATE_INIT, master.state)1475            self.assertEqual(len(master.clients.ready), len(workers))1476            ts = time.perf_counter()1477            response = requests.post(1478                f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1479                data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},1480            )1481            self.assertEqual(200, response.status_code)1482            self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1483            gevent.sleep(5)1484            self.assertEqual(master.state, STATE_SPAWNING)1485            self.assertLessEqual(master.user_count, 10)1486            ts = time.perf_counter()1487            response = requests.get(1488                f"http://127.0.0.1:{web_ui.server.server_port}/stop",1489            )1490            self.assertEqual(200, response.status_code)1491            self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")1492            ts = time.perf_counter()1493            while master.state != STATE_STOPPED:1494                self.assertTrue(time.perf_counter() - ts <= 2)1495                gevent.sleep(0.1)1496            self.assertLessEqual(master.user_count, 0)1497            master.stop()1498            web_ui.stop()1499    def test_target_user_count_is_set_before_ramp_up(self):1500        """Test for https://github.com/locustio/locust/issues/1883"""1501        class MyUser1(User):1502            wait_time = constant(0)1503            @task1504            def my_task(self):1505                pass1506        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1507            # start a Master runner1508            master_env = Environment(user_classes=[MyUser1])1509            master = master_env.create_master_runner("*", 0)1510            test_start_event_fired = [False]1511            @master_env.events.test_start.add_listener1512            def on_test_start(*args, **kwargs):1513                test_start_event_fired[0] = True1514                self.assertEqual(master.target_user_count, 3)1515            sleep(0)1516            # start 1 worker runner1517            worker_env = Environment(user_classes=[MyUser1])1518            worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1519            # give worker time to connect1520            sleep(0.1)1521            gevent.spawn(master.start, 3, spawn_rate=1)1522            sleep(1)1523            self.assertEqual(master.target_user_count, 3)1524            self.assertEqual(master.user_count, 1)1525            # However, target_user_classes_count is only updated at the end of the ramp-up/ramp-down1526            # due to the way it is implemented.1527            self.assertDictEqual({}, master.target_user_classes_count)1528            sleep(2)1529            self.assertEqual(master.target_user_count, 3)1530            self.assertEqual(master.user_count, 3)1531            self.assertDictEqual({"MyUser1": 3}, master.target_user_classes_count)1532            master.quit()1533            # make sure users are killed1534            self.assertEqual(0, worker.user_count)1535            self.assertTrue(test_start_event_fired[0])1536    def test_long_running_test_start_is_run_to_completion_on_worker(self):1537        """Test for https://github.com/locustio/locust/issues/1986"""1538        class MyUser1(User):1539            wait_time = constant(0)1540            @task1541            def my_task(self):1542                pass1543        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1544            master_env = Environment(user_classes=[MyUser1])1545            master = master_env.create_master_runner("*", 0)1546            sleep(0)1547            # start 1 worker runner1548            worker_env = Environment(user_classes=[MyUser1])1549            worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1550            test_start_exec_count = 01551            @worker_env.events.test_start.add_listener1552            def on_test_start(*_, **__):1553                nonlocal test_start_exec_count1554                test_start_exec_count += 11555                sleep(3)1556            # give worker time to connect1557            sleep(0.1)1558            gevent.spawn(master.start, 3, spawn_rate=1)1559            t0 = time.perf_counter()1560            while master.user_count != 3:1561                self.assertLessEqual(time.perf_counter() - t0, 5, "Expected 3 users to be spawned")1562                sleep(0.1)1563            master.quit()1564            # make sure users are killed1565            self.assertEqual(0, worker.user_count)1566            self.assertEqual(test_start_exec_count, 1)1567class TestMasterRunner(LocustRunnerTestCase):1568    def setUp(self):1569        super().setUp()1570        self.environment = Environment(events=locust.events, catch_exceptions=False)1571    def tearDown(self):1572        super().tearDown()1573    def get_runner(self, user_classes=None):1574        if user_classes is not None:1575            self.environment.user_classes = user_classes1576        return self.environment.create_master_runner("*", 5557)1577    def test_worker_connect(self):1578        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1579            master = self.get_runner()1580            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1581            self.assertEqual(1, len(master.clients))1582            self.assertTrue(1583                "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1584            )1585            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1586            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client3"))1587            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client4"))1588            self.assertEqual(4, len(master.clients))1589            server.mocked_send(Message("quit", None, "zeh_fake_client3"))1590            self.assertEqual(3, len(master.clients))1591    def test_worker_connect_with_special_versions(self):1592        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1593            master = self.get_runner()1594            server.mocked_send(Message("client_ready", None, "1.x_style_client_should_not_be_allowed"))1595            self.assertEqual(1, len(self.mocked_log.error))1596            self.assertEqual(0, len(master.clients))1597            server.mocked_send(Message("client_ready", "abcd", "other_version_mismatch_should_just_give_a_warning"))1598            self.assertEqual(1, len(self.mocked_log.warning))1599            self.assertEqual(1, len(master.clients))1600            server.mocked_send(Message("client_ready", -1, "version_check_bypass_should_not_warn"))1601            self.assertEqual(1, len(self.mocked_log.warning))1602            self.assertEqual(2, len(master.clients))1603            server.mocked_send(1604                Message("client_ready", __version__ + "1", "difference_in_patch_version_should_not_warn")1605            )1606            self.assertEqual(3, len(master.clients))1607            self.assertEqual(1, len(self.mocked_log.warning))1608    def test_worker_stats_report_median(self):1609        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1610            master = self.get_runner()1611            server.mocked_send(Message("client_ready", __version__, "fake_client"))1612            master.stats.get("/", "GET").log(100, 23455)1613            master.stats.get("/", "GET").log(800, 23455)1614            master.stats.get("/", "GET").log(700, 23455)1615            data = {"user_count": 1}1616            self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1617            master.stats.clear_all()1618            server.mocked_send(Message("stats", data, "fake_client"))1619            s = master.stats.get("/", "GET")1620            self.assertEqual(700, s.median_response_time)1621    def test_worker_stats_report_with_none_response_times(self):1622        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1623            master = self.get_runner()1624            server.mocked_send(Message("client_ready", __version__, "fake_client"))1625            master.stats.get("/mixed", "GET").log(0, 23455)1626            master.stats.get("/mixed", "GET").log(800, 23455)1627            master.stats.get("/mixed", "GET").log(700, 23455)1628            master.stats.get("/mixed", "GET").log(None, 23455)1629            master.stats.get("/mixed", "GET").log(None, 23455)1630            master.stats.get("/mixed", "GET").log(None, 23455)1631            master.stats.get("/mixed", "GET").log(None, 23455)1632            master.stats.get("/onlyNone", "GET").log(None, 23455)1633            data = {"user_count": 1}1634            self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1635            master.stats.clear_all()1636            server.mocked_send(Message("stats", data, "fake_client"))1637            s1 = master.stats.get("/mixed", "GET")1638            self.assertEqual(700, s1.median_response_time)1639            self.assertEqual(500, s1.avg_response_time)1640            s2 = master.stats.get("/onlyNone", "GET")1641            self.assertEqual(0, s2.median_response_time)1642            self.assertEqual(0, s2.avg_response_time)1643    def test_master_marks_downed_workers_as_missing(self):1644        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1645            master = self.get_runner()1646            server.mocked_send(Message("client_ready", __version__, "fake_client"))1647            sleep(6)1648            # print(master.clients['fake_client'].__dict__)1649            assert master.clients["fake_client"].state == STATE_MISSING1650    def test_last_worker_quitting_stops_test(self):1651        class TestUser(User):1652            @task1653            def my_task(self):1654                gevent.sleep(600)1655        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1656            "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1657        ):1658            master = self.get_runner(user_classes=[TestUser])1659            server.mocked_send(Message("client_ready", __version__, "fake_client1"))1660            server.mocked_send(Message("client_ready", __version__, "fake_client2"))1661            master.start(1, 2)1662            server.mocked_send(Message("spawning", None, "fake_client1"))1663            server.mocked_send(Message("spawning", None, "fake_client2"))1664            server.mocked_send(Message("quit", None, "fake_client1"))1665            sleep(0.1)1666            self.assertEqual(1, len(master.clients.all))1667            self.assertNotEqual(STATE_STOPPED, master.state, "Not all workers quit but test stopped anyway.")1668            server.mocked_send(Message("quit", None, "fake_client2"))1669            sleep(0.1)1670            self.assertEqual(0, len(master.clients.all))1671            self.assertEqual(STATE_STOPPED, master.state, "All workers quit but test didn't stop.")1672    @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)1673    def test_last_worker_missing_stops_test(self):1674        class TestUser(User):1675            @task1676            def my_task(self):1677                gevent.sleep(600)1678        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1679            "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1680        ):1681            master = self.get_runner(user_classes=[TestUser])1682            server.mocked_send(Message("client_ready", __version__, "fake_client1"))1683            server.mocked_send(Message("client_ready", __version__, "fake_client2"))1684            server.mocked_send(Message("client_ready", __version__, "fake_client3"))1685            master.start(3, 3)1686            server.mocked_send(Message("spawning", None, "fake_client1"))1687            server.mocked_send(Message("spawning", None, "fake_client2"))1688            server.mocked_send(Message("spawning", None, "fake_client3"))1689            sleep(0.2)1690            server.mocked_send(1691                Message(1692                    "heartbeat",1693                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1694                    "fake_client1",1695                )1696            )1697            server.mocked_send(1698                Message(1699                    "heartbeat",1700                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1701                    "fake_client2",1702                )1703            )1704            server.mocked_send(1705                Message(1706                    "heartbeat",1707                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1708                    "fake_client3",1709                )1710            )1711            sleep(0.2)1712            self.assertEqual(0, len(master.clients.missing))1713            self.assertEqual(3, master.worker_count)1714            self.assertNotIn(1715                master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1716            )1717            server.mocked_send(1718                Message(1719                    "heartbeat",1720                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1721                    "fake_client1",1722                )1723            )1724            sleep(0.4)1725            self.assertEqual(2, len(master.clients.missing))1726            self.assertEqual(1, master.worker_count)1727            self.assertNotIn(1728                master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1729            )1730            sleep(0.2)1731            self.assertEqual(3, len(master.clients.missing))1732            self.assertEqual(0, master.worker_count)1733            self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")1734    @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)1735    @mock.patch("locust.runners.HEARTBEAT_DEAD_INTERNAL", new=-3)1736    def test_worker_missing_after_heartbeat_dead_interval(self):1737        class TestUser(User):1738            @task1739            def my_task(self):1740                gevent.sleep(600)1741        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server, patch_env(1742            "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"1743        ):1744            master = self.get_runner(user_classes=[TestUser])1745            server.mocked_send(Message("client_ready", __version__, "fake_client1"))1746            server.mocked_send(Message("client_ready", __version__, "fake_client2"))1747            server.mocked_send(Message("client_ready", __version__, "fake_client3"))1748            master.start(3, 3)1749            server.mocked_send(Message("spawning", None, "fake_client1"))1750            server.mocked_send(Message("spawning", None, "fake_client2"))1751            server.mocked_send(Message("spawning", None, "fake_client3"))1752            sleep(0.1)1753            server.mocked_send(1754                Message(1755                    "heartbeat",1756                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1757                    "fake_client1",1758                )1759            )1760            server.mocked_send(1761                Message(1762                    "heartbeat",1763                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1764                    "fake_client2",1765                )1766            )1767            server.mocked_send(1768                Message(1769                    "heartbeat",1770                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1771                    "fake_client3",1772                )1773            )1774            sleep(0.1)1775            # initially all workers are in active state1776            self.assertEqual(0, len(master.clients.missing))1777            self.assertEqual(3, master.worker_count)1778            server.mocked_send(1779                Message(1780                    "heartbeat",1781                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1782                    "fake_client1",1783                )1784            )1785            server.mocked_send(1786                Message(1787                    "heartbeat",1788                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1789                    "fake_client2",1790                )1791            )1792            sleep(0.6)1793            # 4 intervals are passed since all 3 heart beats all workers are in missing state1794            self.assertEqual(3, len(master.clients.missing))1795            self.assertEqual(0, master.worker_count)1796            server.mocked_send(1797                Message(1798                    "heartbeat",1799                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1800                    "fake_client1",1801                )1802            )1803            server.mocked_send(1804                Message(1805                    "heartbeat",1806                    {"state": STATE_RUNNING, "current_cpu_usage": 50, "current_memory_usage": 200, "count": 1},1807                    "fake_client2",1808                )1809            )1810            sleep(0.2)1811            # hearbeat received from two workers so they are active, for fake_client3 HEARTBEAT_DEAD_INTERNAL has been breached, so it will be removed from worker list1812            self.assertEqual(0, len(master.clients.missing))1813            self.assertEqual(2, master.worker_count)1814            master.stop()1815    def test_master_total_stats(self):1816        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1817            master = self.get_runner()1818            server.mocked_send(Message("client_ready", __version__, "fake_client"))1819            stats = RequestStats()1820            stats.log_request("GET", "/1", 100, 3546)1821            stats.log_request("GET", "/1", 800, 56743)1822            stats2 = RequestStats()1823            stats2.log_request("GET", "/2", 700, 2201)1824            server.mocked_send(1825                Message(1826                    "stats",1827                    {1828                        "stats": stats.serialize_stats(),1829                        "stats_total": stats.total.serialize(),1830                        "errors": stats.serialize_errors(),1831                        "user_count": 1,1832                    },1833                    "fake_client",1834                )1835            )1836            server.mocked_send(1837                Message(1838                    "stats",1839                    {1840                        "stats": stats2.serialize_stats(),1841                        "stats_total": stats2.total.serialize(),1842                        "errors": stats2.serialize_errors(),1843                        "user_count": 2,1844                    },1845                    "fake_client",1846                )1847            )1848            self.assertEqual(700, master.stats.total.median_response_time)1849    def test_master_total_stats_with_none_response_times(self):1850        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1851            master = self.get_runner()1852            server.mocked_send(Message("client_ready", __version__, "fake_client"))1853            stats = RequestStats()1854            stats.log_request("GET", "/1", 100, 3546)1855            stats.log_request("GET", "/1", 800, 56743)1856            stats.log_request("GET", "/1", None, 56743)1857            stats2 = RequestStats()1858            stats2.log_request("GET", "/2", 700, 2201)1859            stats2.log_request("GET", "/2", None, 2201)1860            stats3 = RequestStats()1861            stats3.log_request("GET", "/3", None, 2201)1862            server.mocked_send(1863                Message(1864                    "stats",1865                    {1866                        "stats": stats.serialize_stats(),1867                        "stats_total": stats.total.serialize(),1868                        "errors": stats.serialize_errors(),1869                        "user_count": 1,1870                    },1871                    "fake_client",1872                )1873            )1874            server.mocked_send(1875                Message(1876                    "stats",1877                    {1878                        "stats": stats2.serialize_stats(),1879                        "stats_total": stats2.total.serialize(),1880                        "errors": stats2.serialize_errors(),1881                        "user_count": 2,1882                    },1883                    "fake_client",1884                )1885            )1886            server.mocked_send(1887                Message(1888                    "stats",1889                    {1890                        "stats": stats3.serialize_stats(),1891                        "stats_total": stats3.total.serialize(),1892                        "errors": stats3.serialize_errors(),1893                        "user_count": 2,1894                    },1895                    "fake_client",1896                )1897            )1898            self.assertEqual(700, master.stats.total.median_response_time)1899    def test_master_current_response_times(self):1900        start_time = 11901        with mock.patch("time.time") as mocked_time:1902            mocked_time.return_value = start_time1903            with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1904                master = self.get_runner()1905                self.environment.stats.reset_all()1906                mocked_time.return_value += 1.02341907                server.mocked_send(Message("client_ready", __version__, "fake_client"))1908                stats = RequestStats()1909                stats.log_request("GET", "/1", 100, 3546)1910                stats.log_request("GET", "/1", 800, 56743)1911                server.mocked_send(1912                    Message(1913                        "stats",1914                        {1915                            "stats": stats.serialize_stats(),1916                            "stats_total": stats.total.get_stripped_report(),1917                            "errors": stats.serialize_errors(),1918                            "user_count": 1,1919                        },1920                        "fake_client",1921                    )1922                )1923                mocked_time.return_value += 11924                stats2 = RequestStats()1925                stats2.log_request("GET", "/2", 400, 2201)1926                server.mocked_send(1927                    Message(1928                        "stats",1929                        {1930                            "stats": stats2.serialize_stats(),1931                            "stats_total": stats2.total.get_stripped_report(),1932                            "errors": stats2.serialize_errors(),1933                            "user_count": 2,1934                        },1935                        "fake_client",1936                    )1937                )1938                mocked_time.return_value += 41939                self.assertEqual(400, master.stats.total.get_current_response_time_percentile(0.5))1940                self.assertEqual(800, master.stats.total.get_current_response_time_percentile(0.95))1941                # let 10 second pass, do some more requests, send it to the master and make1942                # sure the current response time percentiles only accounts for these new requests1943                mocked_time.return_value += 10.100231944                stats.log_request("GET", "/1", 20, 1)1945                stats.log_request("GET", "/1", 30, 1)1946                stats.log_request("GET", "/1", 3000, 1)1947                server.mocked_send(1948                    Message(1949                        "stats",1950                        {1951                            "stats": stats.serialize_stats(),1952                            "stats_total": stats.total.get_stripped_report(),1953                            "errors": stats.serialize_errors(),1954                            "user_count": 2,1955                        },1956                        "fake_client",1957                    )1958                )1959                self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))1960                self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))1961    @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=600)1962    def test_rebalance_locust_users_on_worker_connect(self):1963        class TestUser(User):1964            @task1965            def my_task(self):1966                pass1967        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1968            master = self.get_runner(user_classes=[TestUser])1969            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1970            self.assertEqual(1, len(master.clients))1971            self.assertTrue(1972                "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1973            )1974            master.start(100, 20)1975            self.assertEqual(6, len(server.outbox))1976            # First element of the outbox list is ack msg. That is why it is skipped in for loop1977            for i, (_, msg) in enumerate(server.outbox[1:].copy()):1978                self.assertDictEqual({"TestUser": int((i + 1) * 20)}, msg.data["user_classes_count"])1979                server.outbox.pop()1980            # Normally, this attribute would be updated when the1981            # master receives the report from the worker.1982            master.clients["zeh_fake_client1"].user_classes_count = {"TestUser": 100}1983            # let another worker connect1984            server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1985            self.assertEqual(2, len(master.clients))1986            sleep(0.1)  # give time for messages to be sent to clients1987            self.assertEqual(4, len(server.outbox))1988            client_id, msg = server.outbox.pop()1989            self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1990            client_id, msg = server.outbox.pop()1991            self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1992    def test_sends_spawn_data_to_ready_running_spawning_workers(self):1993        """Sends spawn job to running, ready, or spawning workers"""1994        class TestUser(User):1995            @task1996            def my_task(self):1997                pass1998        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1999            master = self.get_runner(user_classes=[TestUser])2000            master.clients[1] = WorkerNode("1")2001            master.clients[2] = WorkerNode("2")2002            master.clients[3] = WorkerNode("3")2003            master.clients[1].state = STATE_INIT2004            master.clients[2].state = STATE_SPAWNING2005            master.clients[3].state = STATE_RUNNING2006            master.start(user_count=5, spawn_rate=5)2007            self.assertEqual(3, len(server.outbox))2008    def test_start_event(self):2009        """2010        Tests that test_start event is fired2011        """2012        class TestUser(User):2013            @task2014            def my_task(self):2015                pass2016        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2017            master = self.get_runner(user_classes=[TestUser])2018            run_count = [0]2019            @self.environment.events.test_start.add_listener2020            def on_test_start(*a, **kw):2021                run_count[0] += 12022            for i in range(5):2023                server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2024            master.start(7, 7)2025            self.assertEqual(10, len(server.outbox))2026            self.assertEqual(1, run_count[0])2027            # change number of users and check that test_start isn't fired again2028            master.start(7, 7)2029            self.assertEqual(1, run_count[0])2030            # stop and start to make sure test_start is fired again2031            master.stop()2032            master.start(3, 3)2033            self.assertEqual(2, run_count[0])2034            master.quit()2035    def test_stop_event(self):2036        """2037        Tests that test_stop event is fired2038        """2039        class TestUser(User):2040            @task2041            def my_task(self):2042                pass2043        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2044            master = self.get_runner(user_classes=[TestUser])2045            @self.environment.events.test_stopping.add_listener2046            def on_test_stopping(*_, **__):2047                self.runner_stopping = True2048            @self.environment.events.test_stop.add_listener2049            def on_test_stop(*_, **__):2050                self.runner_stopped = True2051            for i in range(5):2052                server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2053            master.start(7, 7)2054            self.assertEqual(10, len(server.outbox))2055            master.stop()2056            self.assertTrue(self.runner_stopping)2057            self.assertTrue(self.runner_stopped)2058            self.reset_state()2059            for i in range(5):2060                server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2061            master.start(7, 7)2062            master.stop()2063            master.quit()2064            self.assertTrue(self.runner_stopping)2065            self.assertTrue(self.runner_stopped)2066    def test_stop_event_quit(self):2067        """2068        Tests that test_stop event is fired when quit() is called directly2069        """2070        class TestUser(User):2071            @task2072            def my_task(self):2073                pass2074        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:2075            master = self.get_runner(user_classes=[TestUser])2076            @self.environment.events.test_stopping.add_listener2077            def on_test_stopping(*_, **__):2078                self.runner_stopping = True2079            @self.environment.events.test_stop.add_listener2080            def on_test_stop(*_, **__):2081                self.runner_stopped = True2082            for i in range(5):2083                server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))2084            master.start(7, 7)2085            self.assertEqual(10, len(server.outbox))2086            master.quit()2087            self.assertTrue(self.runner_stopping)2088            self.assertTrue(self.runner_stopped)2089    def test_spawn_zero_locusts(self):2090        class MyTaskSet(TaskSet):2091            @task...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
