Best Python code snippet using locust
test_runners.py
Source:test_runners.py  
...438        self.assertTrue(3 <= time.perf_counter() - ts <= 5)439        self.assertEqual(local_runner.user_count, 20)440        local_runner.stop()441        web_ui.stop()442    def test_can_call_stop_endpoint_if_currently_swarming(self):443        class TestUser1(User):444            @task445            def my_task(self):446                gevent.sleep(600)447        class TestUser2(User):448            @task449            def my_task(self):450                gevent.sleep(600)451        stop_timeout = 5452        env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)453        local_runner = env.create_local_runner()454        web_ui = env.create_web_ui("127.0.0.1", 0)455        gevent.sleep(0.1)456        ts = time.perf_counter()457        response = requests.post(458            "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),459            data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},460        )461        self.assertEqual(200, response.status_code)462        self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")463        gevent.sleep(5)464        self.assertEqual(local_runner.state, STATE_SPAWNING)465        self.assertLessEqual(local_runner.user_count, 10)466        ts = time.perf_counter()467        response = requests.get(468            "http://127.0.0.1:{}/stop".format(web_ui.server.server_port),469        )470        self.assertEqual(200, response.status_code)471        self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")472        ts = time.perf_counter()473        while local_runner.state != STATE_STOPPED:474            self.assertTrue(time.perf_counter() - ts <= 2)475            gevent.sleep(0.1)476        self.assertLessEqual(local_runner.user_count, 0)477        local_runner.stop()478        web_ui.stop()479class TestMasterWorkerRunners(LocustTestCase):480    def test_distributed_integration_run(self):481        """482        Full integration test that starts both a MasterRunner and three WorkerRunner instances483        and makes sure that their stats is sent to the Master.484        """485        class TestUser(User):486            wait_time = constant(0.1)487            @task488            def incr_stats(self):489                self.environment.events.request.fire(490                    request_type="GET",491                    name="/",492                    response_time=1337,493                    response_length=666,494                    exception=None,495                    context={},496                )497        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):498            # start a Master runner499            master_env = Environment(user_classes=[TestUser])500            master = master_env.create_master_runner("*", 0)501            sleep(0)502            # start 3 Worker runners503            workers = []504            for i in range(3):505                worker_env = Environment(user_classes=[TestUser])506                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)507                workers.append(worker)508            # give workers time to connect509            sleep(0.1)510            # issue start command that should trigger TestUsers to be spawned in the Workers511            master.start(6, spawn_rate=1000)512            sleep(0.1)513            # check that worker nodes have started locusts514            for worker in workers:515                self.assertEqual(2, worker.user_count)516            # give time for users to generate stats, and stats to be sent to master517            sleep(1)518            master.quit()519            # make sure users are killed520            for worker in workers:521                self.assertEqual(0, worker.user_count)522        # check that stats are present in master523        self.assertGreater(524            master_env.runner.stats.total.num_requests,525            20,526            "For some reason the master node's stats has not come in",527        )528    def test_distributed_run_with_custom_args(self):529        """530        Full integration test that starts both a MasterRunner and three WorkerRunner instances531        and makes sure that their stats is sent to the Master.532        """533        class TestUser(User):534            wait_time = constant(0.1)535            @task536            def incr_stats(self):537                self.environment.events.request.fire(538                    request_type="GET",539                    name=self.environment.parsed_options.my_str_argument,540                    response_time=self.environment.parsed_options.my_int_argument,541                    response_length=666,542                    exception=None,543                    context={},544                )545        @locust.events.init_command_line_parser.add_listener546        def _(parser, **kw):547            parser.add_argument("--my-int-argument", type=int)548            parser.add_argument("--my-str-argument", type=str, default="NOOOO")549        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):550            # start a Master runner551            master_env = Environment(user_classes=[TestUser])552            master = master_env.create_master_runner("*", 0)553            master_env.parsed_options = parse_options(554                [555                    "--my-int-argument",556                    "42",557                    "--my-str-argument",558                    "cool-string",559                ]560            )561            sleep(0)562            # start 3 Worker runners563            workers = []564            for i in range(3):565                worker_env = Environment(user_classes=[TestUser])566                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)567                workers.append(worker)568            # give workers time to connect569            sleep(0.1)570            # issue start command that should trigger TestUsers to be spawned in the Workers571            master.start(6, spawn_rate=1000)572            sleep(0.1)573            # check that worker nodes have started locusts574            for worker in workers:575                self.assertEqual(2, worker.user_count)576            # give time for users to generate stats, and stats to be sent to master577            sleep(1)578            master.quit()579            # make sure users are killed580            for worker in workers:581                self.assertEqual(0, worker.user_count)582        self.assertEqual(master_env.runner.stats.total.max_response_time, 42)583        self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)584    def test_test_stop_event(self):585        class TestUser(User):586            wait_time = constant(0.1)587            @task588            def my_task(l):589                pass590        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):591            # start a Master runner592            master_env = Environment(user_classes=[TestUser])593            test_stop_count = {"master": 0, "worker": 0}594            @master_env.events.test_stop.add_listener595            def _(*args, **kwargs):596                test_stop_count["master"] += 1597            master = master_env.create_master_runner("*", 0)598            sleep(0)599            # start a Worker runner600            worker_env = Environment(user_classes=[TestUser])601            @worker_env.events.test_stop.add_listener602            def _(*args, **kwargs):603                test_stop_count["worker"] += 1604            worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)605            # give worker time to connect606            sleep(0.1)607            # issue start command that should trigger TestUsers to be spawned in the Workers608            master.start(2, spawn_rate=1000)609            sleep(0.1)610            # check that worker nodes have started locusts611            self.assertEqual(2, worker.user_count)612            # give time for users to generate stats, and stats to be sent to master613            sleep(0.1)614            master_env.events.quitting.fire(environment=master_env, reverse=True)615            master.quit()616            sleep(0.1)617            # make sure users are killed618            self.assertEqual(0, worker.user_count)619        # check the test_stop event was called one time in master and one time in worker620        self.assertEqual(621            1,622            test_stop_count["master"],623            "The test_stop event was not called exactly one time in the master node",624        )625        self.assertEqual(626            1,627            test_stop_count["worker"],628            "The test_stop event was not called exactly one time in the worker node",629        )630    def test_distributed_shape(self):631        """632        Full integration test that starts both a MasterRunner and three WorkerRunner instances633        and tests a basic LoadTestShape with scaling up and down users634        """635        class TestUser(User):636            @task637            def my_task(self):638                pass639        class TestShape(LoadTestShape):640            def tick(self):641                run_time = self.get_run_time()642                if run_time < 2:643                    return 9, 9644                elif run_time < 4:645                    return 21, 21646                elif run_time < 6:647                    return 3, 21648                else:649                    return None650        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):651            test_shape = TestShape()652            master_env = Environment(user_classes=[TestUser], shape_class=test_shape)653            master_env.shape_class.reset_time()654            master = master_env.create_master_runner("*", 0)655            workers = []656            for i in range(3):657                worker_env = Environment(user_classes=[TestUser])658                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)659                workers.append(worker)660            # Give workers time to connect661            sleep(0.1)662            # Start a shape test663            master.start_shape()664            sleep(1)665            # Ensure workers have connected and started the correct amount of users666            for worker in workers:667                self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")668                self.assertEqual(669                    9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"670                )671            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})672            # Ensure new stage with more users has been reached673            sleep(2)674            for worker in workers:675                self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")676                self.assertEqual(677                    21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"678                )679            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})680            # Ensure new stage with less users has been reached681            sleep(2)682            for worker in workers:683                self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")684                self.assertEqual(685                    3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"686                )687            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})688            # Ensure test stops at the end689            sleep(2)690            for worker in workers:691                self.assertEqual(0, worker.user_count, "Shape test has not stopped")692                self.assertEqual(693                    0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"694                )695            self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})696            self.assertEqual("stopped", master.state)697    def test_distributed_shape_with_stop_timeout(self):698        """699        Full integration test that starts both a MasterRunner and five WorkerRunner instances700        and tests a basic LoadTestShape with scaling up and down users701        """702        class TestUser1(User):703            def start(self, group: Group):704                gevent.sleep(0.5)705                return super().start(group)706            @task707            def my_task(self):708                gevent.sleep(0)709        class TestUser2(User):710            def start(self, group: Group):711                gevent.sleep(0.5)712                return super().start(group)713            @task714            def my_task(self):715                gevent.sleep(600)716        class TestUser3(User):717            def start(self, group: Group):718                gevent.sleep(0.5)719                return super().start(group)720            @task721            def my_task(self):722                gevent.sleep(600)723        class TestShape(LoadTestShape):724            def tick(self):725                run_time = self.get_run_time()726                if run_time < 10:727                    return 15, 3728                elif run_time < 30:729                    return 5, 10730                else:731                    return None732        locust_worker_additional_wait_before_ready_after_stop = 5733        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(734            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",735            str(locust_worker_additional_wait_before_ready_after_stop),736        ):737            stop_timeout = 5738            master_env = Environment(739                user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout740            )741            master_env.shape_class.reset_time()742            master = master_env.create_master_runner("*", 0)743            workers = []744            for i in range(5):745                worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])746                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)747                workers.append(worker)748            # Give workers time to connect749            sleep(0.1)750            self.assertEqual(STATE_INIT, master.state)751            self.assertEqual(5, len(master.clients.ready))752            # Re-order `workers` so that it is sorted by `id`.753            # This is required because the dispatch is done754            # on the sorted workers.755            workers = sorted(workers, key=lambda w: w.client_id)756            # Start a shape test757            master.start_shape()758            # First stage759            ts = time.time()760            while master.state != STATE_SPAWNING:761                self.assertTrue(time.time() - ts <= 1, master.state)762                sleep()763            sleep(5 - (time.time() - ts))  # runtime = 5s764            ts = time.time()765            while master.state != STATE_RUNNING:766                self.assertTrue(time.time() - ts <= 1, master.state)767                sleep()768            self.assertEqual(STATE_RUNNING, master.state)769            w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}770            w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}771            w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}772            w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}773            w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}774            self.assertDictEqual(w1, workers[0].user_classes_count)775            self.assertDictEqual(w2, workers[1].user_classes_count)776            self.assertDictEqual(w3, workers[2].user_classes_count)777            self.assertDictEqual(w4, workers[3].user_classes_count)778            self.assertDictEqual(w5, workers[4].user_classes_count)779            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)780            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)781            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)782            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)783            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)784            sleep(5 - (time.time() - ts))  # runtime = 10s785            # Fourth stage786            ts = time.time()787            while master.state != STATE_SPAWNING:788                self.assertTrue(time.time() - ts <= 1, master.state)789                sleep()790            sleep(5 - (time.time() - ts))  # runtime = 15s791            # Fourth stage - Excess TestUser1 have been stopped but792            #                TestUser2/TestUser3 have not reached stop timeout yet, so793            #                their number are unchanged794            ts = time.time()795            while master.state != STATE_RUNNING:796                self.assertTrue(time.time() - ts <= 1, master.state)797                sleep()798            delta = time.time() - ts799            w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}800            w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}801            w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}802            w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}803            w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}804            self.assertDictEqual(w1, workers[0].user_classes_count)805            self.assertDictEqual(w2, workers[1].user_classes_count)806            self.assertDictEqual(w3, workers[2].user_classes_count)807            self.assertDictEqual(w4, workers[3].user_classes_count)808            self.assertDictEqual(w5, workers[4].user_classes_count)809            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)810            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)811            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)812            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)813            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)814            sleep(1 - delta)  # runtime = 16s815            # Fourth stage - All users are now at the desired number816            ts = time.time()817            while master.state != STATE_RUNNING:818                self.assertTrue(time.time() - ts <= 1, master.state)819                sleep()820            delta = time.time() - ts821            w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}822            w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}823            w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}824            w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}825            w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}826            self.assertDictEqual(w1, workers[0].user_classes_count)827            self.assertDictEqual(w2, workers[1].user_classes_count)828            self.assertDictEqual(w3, workers[2].user_classes_count)829            self.assertDictEqual(w4, workers[3].user_classes_count)830            self.assertDictEqual(w5, workers[4].user_classes_count)831            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)832            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)833            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)834            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)835            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)836            sleep(10 - delta)  # runtime = 26s837            # Sleep stop_timeout and make sure the test has stopped838            sleep(5)  # runtime = 31s839            self.assertEqual(STATE_STOPPING, master.state)840            sleep(stop_timeout)  # runtime = 36s841            # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.842            # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"843            # above is that when a worker receives the stop message, it can take up to "stop_timeout"844            # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds845            # to send the "client_ready" message.846            ts = time.time()847            while len(master.clients.ready) != len(workers):848                self.assertTrue(849                    time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,850                    f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",851                )852                sleep()853            sleep(1)854            # Check that no users are running855            w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}856            w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}857            w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}858            w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}859            w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}860            self.assertDictEqual(w1, workers[0].user_classes_count)861            self.assertDictEqual(w2, workers[1].user_classes_count)862            self.assertDictEqual(w3, workers[2].user_classes_count)863            self.assertDictEqual(w4, workers[3].user_classes_count)864            self.assertDictEqual(w5, workers[4].user_classes_count)865            self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)866            self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)867            self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)868            self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)869            self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)870            ts = time.time()871            while master.state != STATE_STOPPED:872                self.assertTrue(time.time() - ts <= 5, master.state)873                sleep()874            master.stop()875    @unittest.skip876    def test_distributed_shape_fuzzy_test(self):877        """878        Incredibility useful test to find issues with dispatch logic. This test allowed to find879        multiple small corner cases with the new dispatch logic of locust v2.880        The test is disabled by default because it takes a lot of time to run and has randomness to it.881        However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.882        """883        class BaseUser(User):884            @task885            def my_task(self):886                gevent.sleep(600)887        class TestUser01(BaseUser):888            pass889        class TestUser02(BaseUser):890            pass891        class TestUser03(BaseUser):892            pass893        class TestUser04(BaseUser):894            pass895        class TestUser05(BaseUser):896            pass897        class TestUser06(BaseUser):898            pass899        class TestUser07(BaseUser):900            pass901        class TestUser08(BaseUser):902            pass903        class TestUser09(BaseUser):904            pass905        class TestUser10(BaseUser):906            pass907        class TestUser11(BaseUser):908            pass909        class TestUser12(BaseUser):910            pass911        class TestUser13(BaseUser):912            pass913        class TestUser14(BaseUser):914            pass915        class TestUser15(BaseUser):916            pass917        class TestShape(LoadTestShape):918            def __init__(self):919                super().__init__()920                self.stages = []921                runtime = 0922                for _ in range(100):923                    runtime += random.uniform(3, 15)924                    self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))925            def tick(self):926                run_time = self.get_run_time()927                for stage in self.stages:928                    if run_time < stage[0]:929                        return stage[1], stage[2]930        user_classes = [931            TestUser01,932            TestUser02,933            TestUser03,934            TestUser04,935            TestUser05,936            TestUser06,937            TestUser07,938            TestUser08,939            TestUser09,940            TestUser10,941            TestUser11,942            TestUser12,943            TestUser13,944            TestUser14,945            TestUser15,946        ]947        chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))948        for user_class in chosen_user_classes:949            user_class.weight = random.uniform(1, 20)950        locust_worker_additional_wait_before_ready_after_stop = 5951        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(952            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",953            str(locust_worker_additional_wait_before_ready_after_stop),954        ):955            stop_timeout = 5956            master_env = Environment(957                user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout958            )959            master_env.shape_class.reset_time()960            master = master_env.create_master_runner("*", 0)961            workers = []962            for i in range(random.randint(1, 30)):963                worker_env = Environment(user_classes=chosen_user_classes)964                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)965                workers.append(worker)966            # Give workers time to connect967            sleep(0.1)968            self.assertEqual(STATE_INIT, master.state)969            self.assertEqual(len(workers), len(master.clients.ready))970            # Start a shape test971            master.start_shape()972            ts = time.time()973            while master.state != STATE_STOPPED:974                self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)975                print(976                    "{:.2f}/{:.2f} | {} | {:.0f} | ".format(977                        time.time() - ts,978                        master_env.shape_class.stages[-1][0],979                        master.state,980                        sum(master.reported_user_classes_count.values()),981                    )982                    + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))983                )984                sleep(1)985            master.stop()986    def test_distributed_shape_stop_and_restart(self):987        """988        Test stopping and then restarting a LoadTestShape989        """990        class TestUser(User):991            @task992            def my_task(self):993                pass994        class TestShape(LoadTestShape):995            def tick(self):996                run_time = self.get_run_time()997                if run_time < 10:998                    return 4, 4999                else:1000                    return None1001        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1002            master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1003            master_env.shape_class.reset_time()1004            master = master_env.create_master_runner("*", 0)1005            workers = []1006            for i in range(2):1007                worker_env = Environment(user_classes=[TestUser])1008                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1009                workers.append(worker)1010            # Give workers time to connect1011            sleep(0.1)1012            # Start a shape test and ensure workers have connected and started the correct amount of users1013            master.start_shape()1014            sleep(1)1015            for worker in workers:1016                self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1017            # Stop the test and ensure all user count is 01018            master.stop()1019            sleep(1)1020            for worker in workers:1021                self.assertEqual(0, worker.user_count, "Shape test has not stopped")1022            # Then restart the test again and ensure workers have connected and started the correct amount of users1023            master.start_shape()1024            sleep(1)1025            for worker in workers:1026                self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1027            master.stop()1028    def test_distributed_shape_statuses_transition(self):1029        """1030        Full integration test that starts both a MasterRunner and five WorkerRunner instances1031        The goal of this test is to validate the status on the master is correctly transitioned for each of the1032        test phases.1033        """1034        class TestUser1(User):1035            @task1036            def my_task(self):1037                gevent.sleep(600)1038        class TestShape(LoadTestShape):1039            def tick(self):1040                run_time = self.get_run_time()1041                if run_time < 5:1042                    return 5, 2.51043                elif run_time < 10:1044                    return 10, 2.51045                elif run_time < 15:1046                    return 15, 2.51047                else:1048                    return None1049        locust_worker_additional_wait_before_ready_after_stop = 21050        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(1051            "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1052            str(locust_worker_additional_wait_before_ready_after_stop),1053        ):1054            stop_timeout = 01055            master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1056            master_env.shape_class.reset_time()1057            master = master_env.create_master_runner("*", 0)1058            workers = []1059            for i in range(5):1060                worker_env = Environment(user_classes=[TestUser1])1061                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1062                workers.append(worker)1063            # Give workers time to connect1064            sleep(0.1)1065            self.assertEqual(STATE_INIT, master.state)1066            self.assertEqual(5, len(master.clients.ready))1067            statuses = []1068            ts = time.perf_counter()1069            master.start_shape()1070            while master.state != STATE_STOPPED:1071                # +5s buffer to let master stop1072                self.assertTrue(1073                    time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1074                    master.state,1075                )1076                statuses.append((time.perf_counter() - ts, master.state, master.user_count))1077                sleep(0.1)1078            self.assertEqual(statuses[0][1], STATE_INIT)1079            stage = 11080            tolerance = 1  # in s1081            for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1082                if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1083                    self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1084                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1085                    self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1086                    stage += 11087                elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1088                    self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1089                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1090                    self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1091                    stage += 11092                elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1093                    self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1094                elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1095                    self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1096                    stage += 11097                elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1098                    self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1099    def test_swarm_endpoint_is_non_blocking(self):1100        class TestUser1(User):1101            @task1102            def my_task(self):1103                gevent.sleep(600)1104        class TestUser2(User):1105            @task1106            def my_task(self):1107                gevent.sleep(600)1108        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1109            stop_timeout = 01110            master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1111            master = master_env.create_master_runner("*", 0)1112            web_ui = master_env.create_web_ui("127.0.0.1", 0)1113            workers = []1114            for i in range(2):1115                worker_env = Environment(user_classes=[TestUser1, TestUser2])1116                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1117                workers.append(worker)1118            # Give workers time to connect1119            sleep(0.1)1120            self.assertEqual(STATE_INIT, master.state)1121            self.assertEqual(len(master.clients.ready), len(workers))1122            ts = time.perf_counter()1123            response = requests.post(1124                "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),1125                data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},1126            )1127            self.assertEqual(200, response.status_code)1128            self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1129            ts = time.perf_counter()1130            while master.state != STATE_RUNNING:1131                self.assertTrue(time.perf_counter() - ts <= 4, master.state)1132                gevent.sleep(0.1)1133            self.assertTrue(3 <= time.perf_counter() - ts <= 5)1134            self.assertEqual(master.user_count, 20)1135            master.stop()1136            web_ui.stop()1137    def test_can_call_stop_endpoint_if_currently_swarming(self):1138        class TestUser1(User):1139            @task1140            def my_task(self):1141                gevent.sleep(600)1142        class TestUser2(User):1143            @task1144            def my_task(self):1145                gevent.sleep(600)1146        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1147            stop_timeout = 51148            master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1149            master = master_env.create_master_runner("*", 0)1150            web_ui = master_env.create_web_ui("127.0.0.1", 0)1151            workers = []...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
