Best Python code snippet using locust
test_runners.py
Source:test_runners.py  
1import mock2import unittest3import gevent4from gevent import sleep5from gevent.queue import Queue6import greenlet7import locust8from locust import runners, constant, LoadTestShape9from locust.main import create_environment10from locust.user import User, TaskSet, task11from locust.env import Environment12from locust.exception import RPCError, StopUser13from locust.rpc import Message14from locust.runners import (15    LocalRunner,16    WorkerNode,17    WorkerRunner,18    STATE_INIT,19    STATE_SPAWNING,20    STATE_RUNNING,21    STATE_MISSING,22    STATE_STOPPING,23    STATE_STOPPED,24)25from locust.stats import RequestStats26from locust.test.testcases import LocustTestCase27NETWORK_BROKEN = "network broken"28def mocked_rpc():29    class MockedRpcServerClient:30        queue = Queue()31        outbox = []32        def __init__(self, *args, **kwargs):33            pass34        @classmethod35        def mocked_send(cls, message):36            cls.queue.put(message.serialize())37            sleep(0)38        def recv(self):39            results = self.queue.get()40            msg = Message.unserialize(results)41            if msg.data == NETWORK_BROKEN:42                raise RPCError()43            return msg44        def send(self, message):45            self.outbox.append(message)46        def send_to_client(self, message):47            self.outbox.append((message.node_id, message))48        def recv_from_client(self):49            results = self.queue.get()50            msg = Message.unserialize(results)51            if msg.data == NETWORK_BROKEN:52                raise RPCError()53            return msg.node_id, msg54        def close(self):55            raise RPCError()56    return MockedRpcServerClient57class mocked_options:58    def __init__(self):59        self.spawn_rate = 560        self.num_users = 561        self.host = "/"62        self.tags = None63        self.exclude_tags = None64        self.master_host = "localhost"65        self.master_port = 555766        self.master_bind_host = "*"67        self.master_bind_port = 555768        self.heartbeat_liveness = 369        self.heartbeat_interval = 170        self.stop_timeout = None71        self.connection_broken = False72    def reset_stats(self):73        pass74class HeyAnException(Exception):75    pass76class TestLocustRunner(LocustTestCase):77    def assert_locust_class_distribution(self, expected_distribution, classes):78        # Construct a {UserClass => count} dict from a list of user classes79        distribution = {}80        for user_class in classes:81            if not user_class in distribution:82                distribution[user_class] = 083            distribution[user_class] += 184        expected_str = str({k.__name__: v for k, v in expected_distribution.items()})85        actual_str = str({k.__name__: v for k, v in distribution.items()})86        self.assertEqual(87            expected_distribution,88            distribution,89            "Expected a User class distribution of %s but found %s"90            % (91                expected_str,92                actual_str,93            ),94        )95    def test_cpu_warning(self):96        _monitor_interval = runners.CPU_MONITOR_INTERVAL97        runners.CPU_MONITOR_INTERVAL = 2.098        try:99            class CpuUser(User):100                wait_time = constant(0.001)101                @task102                def cpu_task(self):103                    for i in range(1000000):104                        _ = 3 / 2105            environment = Environment(user_classes=[CpuUser])106            runner = LocalRunner(environment)107            self.assertFalse(runner.cpu_warning_emitted)108            runner.spawn_users(1, 1, wait=False)109            sleep(2.5)110            runner.quit()111            self.assertTrue(runner.cpu_warning_emitted)112        finally:113            runners.CPU_MONITOR_INTERVAL = _monitor_interval114    def test_weight_locusts(self):115        class BaseUser(User):116            pass117        class L1(BaseUser):118            weight = 101119        class L2(BaseUser):120            weight = 99121        class L3(BaseUser):122            weight = 100123        runner = Environment(user_classes=[L1, L2, L3]).create_local_runner()124        self.assert_locust_class_distribution({L1: 10, L2: 9, L3: 10}, runner.weight_users(29))125        self.assert_locust_class_distribution({L1: 10, L2: 10, L3: 10}, runner.weight_users(30))126        self.assert_locust_class_distribution({L1: 11, L2: 10, L3: 10}, runner.weight_users(31))127    def test_weight_locusts_fewer_amount_than_user_classes(self):128        class BaseUser(User):129            pass130        class L1(BaseUser):131            weight = 101132        class L2(BaseUser):133            weight = 99134        class L3(BaseUser):135            weight = 100136        runner = Environment(user_classes=[L1, L2, L3]).create_local_runner()137        self.assertEqual(1, len(runner.weight_users(1)))138        self.assert_locust_class_distribution({L1: 1}, runner.weight_users(1))139    def test_kill_locusts(self):140        triggered = [False]141        class BaseUser(User):142            wait_time = constant(1)143            @task144            class task_set(TaskSet):145                @task146                def trigger(self):147                    triggered[0] = True148        runner = Environment(user_classes=[BaseUser]).create_local_runner()149        runner.spawn_users(2, spawn_rate=2, wait=False)150        self.assertEqual(2, len(runner.user_greenlets))151        g1 = list(runner.user_greenlets)[0]152        g2 = list(runner.user_greenlets)[1]153        runner.stop_users(2)154        self.assertEqual(0, len(runner.user_greenlets))155        self.assertTrue(g1.dead)156        self.assertTrue(g2.dead)157        self.assertTrue(triggered[0])158    def test_start_event(self):159        class MyUser(User):160            wait_time = constant(1)161            task_run_count = 0162            @task163            def my_task(self):164                MyUser.task_run_count += 1165        test_start_run = [0]166        environment = Environment(user_classes=[MyUser])167        def on_test_start(*args, **kwargs):168            test_start_run[0] += 1169        environment.events.test_start.add_listener(on_test_start)170        runner = LocalRunner(environment)171        runner.start(user_count=3, spawn_rate=3, wait=False)172        runner.spawning_greenlet.get(timeout=3)173        self.assertEqual(1, test_start_run[0])174        self.assertEqual(3, MyUser.task_run_count)175    def test_stop_event(self):176        class MyUser(User):177            wait_time = constant(1)178            @task179            def my_task(self):180                pass181        test_stop_run = [0]182        environment = Environment(user_classes=[User])183        def on_test_stop(*args, **kwargs):184            test_stop_run[0] += 1185        environment.events.test_stop.add_listener(on_test_stop)186        runner = LocalRunner(environment)187        runner.start(user_count=3, spawn_rate=3, wait=False)188        self.assertEqual(0, test_stop_run[0])189        runner.stop()190        self.assertEqual(1, test_stop_run[0])191    def test_stop_event_quit(self):192        class MyUser(User):193            wait_time = constant(1)194            @task195            def my_task(self):196                pass197        test_stop_run = [0]198        environment = Environment(user_classes=[User])199        def on_test_stop(*args, **kwargs):200            test_stop_run[0] += 1201        environment.events.test_stop.add_listener(on_test_stop)202        runner = LocalRunner(environment)203        runner.start(user_count=3, spawn_rate=3, wait=False)204        self.assertEqual(0, test_stop_run[0])205        runner.quit()206        self.assertEqual(1, test_stop_run[0])207    def test_stop_event_stop_and_quit(self):208        class MyUser(User):209            wait_time = constant(1)210            @task211            def my_task(self):212                pass213        test_stop_run = [0]214        environment = Environment(user_classes=[MyUser])215        def on_test_stop(*args, **kwargs):216            test_stop_run[0] += 1217        environment.events.test_stop.add_listener(on_test_stop)218        runner = LocalRunner(environment)219        runner.start(user_count=3, spawn_rate=3, wait=False)220        self.assertEqual(0, test_stop_run[0])221        runner.stop()222        runner.quit()223        self.assertEqual(1, test_stop_run[0])224    def test_change_user_count_during_spawning(self):225        class MyUser(User):226            wait_time = constant(1)227            @task228            def my_task(self):229                pass230        environment = Environment(user_classes=[MyUser])231        runner = LocalRunner(environment)232        runner.start(user_count=10, spawn_rate=5, wait=False)233        sleep(0.6)234        runner.start(user_count=5, spawn_rate=5, wait=False)235        runner.spawning_greenlet.join()236        self.assertEqual(5, len(runner.user_greenlets))237        runner.quit()238    def test_reset_stats(self):239        class MyUser(User):240            @task241            class task_set(TaskSet):242                @task243                def my_task(self):244                    self.user.environment.events.request.fire(245                        request_type="GET",246                        name="/test",247                        response_time=666,248                        response_length=1337,249                        exception=None,250                        context={},251                    )252                    sleep(2)253        environment = Environment(user_classes=[MyUser], reset_stats=True)254        runner = LocalRunner(environment)255        runner.start(user_count=6, spawn_rate=12, wait=False)256        sleep(0.25)257        self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)258        sleep(0.3)259        self.assertLessEqual(runner.stats.get("/test", "GET").num_requests, 1)260        runner.quit()261    def test_no_reset_stats(self):262        class MyUser(User):263            @task264            class task_set(TaskSet):265                @task266                def my_task(self):267                    self.user.environment.events.request.fire(268                        request_type="GET",269                        name="/test",270                        response_time=666,271                        response_length=1337,272                        exception=None,273                        context={},274                    )275                    sleep(2)276        environment = Environment(reset_stats=False, user_classes=[MyUser])277        runner = LocalRunner(environment)278        runner.start(user_count=6, spawn_rate=12, wait=False)279        sleep(0.25)280        self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)281        sleep(0.3)282        self.assertEqual(6, runner.stats.get("/test", "GET").num_requests)283        runner.quit()284    def test_runner_reference_on_environment(self):285        env = Environment()286        runner = env.create_local_runner()287        self.assertEqual(env, runner.environment)288        self.assertEqual(runner, env.runner)289    def test_users_can_call_runner_quit_without_deadlocking(self):290        class BaseUser(User):291            stop_triggered = False292            @task293            def trigger(self):294                self.environment.runner.quit()295            def on_stop(self):296                BaseUser.stop_triggered = True297        runner = Environment(user_classes=[BaseUser]).create_local_runner()298        runner.spawn_users(1, 1, wait=False)299        timeout = gevent.Timeout(0.5)300        timeout.start()301        try:302            runner.greenlet.join()303        except gevent.Timeout:304            self.fail("Got Timeout exception, runner must have hung somehow.")305        finally:306            timeout.cancel()307        self.assertTrue(BaseUser.stop_triggered)308    def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):309        class BaseUser(User):310            stop_count = 0311            @task312            def trigger(self):313                pass314            def on_stop(self):315                gevent.sleep(0.1)316                BaseUser.stop_count += 1317        runner = Environment(user_classes=[BaseUser]).create_local_runner()318        runner.spawn_users(10, 10, wait=False)319        timeout = gevent.Timeout(0.3)320        timeout.start()321        try:322            runner.quit()323        except gevent.Timeout:324            self.fail("Got Timeout exception, runner must have hung somehow.")325        finally:326            timeout.cancel()327        self.assertEqual(10, BaseUser.stop_count)  # verify that all users executed on_stop328    def test_stop_users_with_spawn_rate(self):329        class MyUser(User):330            wait_time = constant(1)331            @task332            def my_task(self):333                pass334        environment = Environment(user_classes=[MyUser])335        runner = LocalRunner(environment)336        # Start load test, wait for users to start, then trigger ramp down337        runner.start(10, 10, wait=False)338        sleep(1)339        runner.start(2, 4, wait=False)340        # Wait a moment and then ensure the user count has started to drop but341        # not immediately to user_count342        sleep(1)343        user_count = len(runner.user_greenlets)344        self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)345        self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)346        # Wait and ensure load test users eventually dropped to desired count347        sleep(2)348        user_count = len(runner.user_greenlets)349        self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)350class TestMasterWorkerRunners(LocustTestCase):351    def test_distributed_integration_run(self):352        """353        Full integration test that starts both a MasterRunner and three WorkerRunner instances354        and makes sure that their stats is sent to the Master.355        """356        class TestUser(User):357            wait_time = constant(0.1)358            @task359            def incr_stats(l):360                l.environment.events.request.fire(361                    request_type="GET",362                    name="/",363                    response_time=1337,364                    response_length=666,365                    exception=None,366                    context={},367                )368        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):369            # start a Master runner370            master_env = Environment(user_classes=[TestUser])371            master = master_env.create_master_runner("*", 0)372            sleep(0)373            # start 3 Worker runners374            workers = []375            for i in range(3):376                worker_env = Environment(user_classes=[TestUser])377                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)378                workers.append(worker)379            # give workers time to connect380            sleep(0.1)381            # issue start command that should trigger TestUsers to be spawned in the Workers382            master.start(6, spawn_rate=1000)383            sleep(0.1)384            # check that worker nodes have started locusts385            for worker in workers:386                self.assertEqual(2, worker.user_count)387            # give time for users to generate stats, and stats to be sent to master388            sleep(1)389            master.quit()390            # make sure users are killed391            for worker in workers:392                self.assertEqual(0, worker.user_count)393        # check that stats are present in master394        self.assertGreater(395            master_env.runner.stats.total.num_requests,396            20,397            "For some reason the master node's stats has not come in",398        )399    def test_test_stop_event(self):400        class TestUser(User):401            wait_time = constant(0.1)402            @task403            def my_task(l):404                pass405        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):406            # start a Master runner407            master_env = Environment(user_classes=[TestUser])408            test_stop_count = {"master": 0, "worker": 0}409            @master_env.events.test_stop.add_listener410            def _(*args, **kwargs):411                test_stop_count["master"] += 1412            master = master_env.create_master_runner("*", 0)413            sleep(0)414            # start a Worker runner415            worker_env = Environment(user_classes=[TestUser])416            @worker_env.events.test_stop.add_listener417            def _(*args, **kwargs):418                test_stop_count["worker"] += 1419            worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)420            # give worker time to connect421            sleep(0.1)422            # issue start command that should trigger TestUsers to be spawned in the Workers423            master.start(2, spawn_rate=1000)424            sleep(0.1)425            # check that worker nodes have started locusts426            self.assertEqual(2, worker.user_count)427            # give time for users to generate stats, and stats to be sent to master428            sleep(0.1)429            master_env.events.quitting.fire(environment=master_env, reverse=True)430            master.quit()431            sleep(0.1)432            # make sure users are killed433            self.assertEqual(0, worker.user_count)434        # check the test_stop event was called one time in master and zero times in workder435        self.assertEqual(436            1,437            test_stop_count["master"],438            "The test_stop event was not called exactly one time in the master node",439        )440        self.assertEqual(441            0,442            test_stop_count["worker"],443            "The test_stop event was called in the worker node",444        )445    def test_distributed_shape(self):446        """447        Full integration test that starts both a MasterRunner and three WorkerRunner instances448        and tests a basic LoadTestShape with scaling up and down users449        """450        class TestUser(User):451            @task452            def my_task(self):453                pass454        class TestShape(LoadTestShape):455            def tick(self):456                run_time = self.get_run_time()457                if run_time < 2:458                    return (9, 9)459                elif run_time < 4:460                    return (21, 21)461                elif run_time < 6:462                    return (3, 21)463                else:464                    return None465        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):466            test_shape = TestShape()467            master_env = Environment(user_classes=[TestUser], shape_class=test_shape)468            master_env.shape_class.reset_time()469            master = master_env.create_master_runner("*", 0)470            workers = []471            for i in range(3):472                worker_env = Environment(user_classes=[TestUser])473                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)474                workers.append(worker)475            # Give workers time to connect476            sleep(0.1)477            # Start a shape test478            master.start_shape()479            sleep(1)480            # Ensure workers have connected and started the correct amounf of users481            for worker in workers:482                self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")483                self.assertEqual(484                    9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"485                )486            # Ensure new stage with more users has been reached487            sleep(2)488            for worker in workers:489                self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")490                self.assertEqual(491                    21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"492                )493            # Ensure new stage with less users has been reached494            sleep(2)495            for worker in workers:496                self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")497                self.assertEqual(498                    3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"499                )500            # Ensure test stops at the end501            sleep(2)502            for worker in workers:503                self.assertEqual(0, worker.user_count, "Shape test has not stopped")504                self.assertEqual(505                    0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"506                )507    def test_distributed_shape_stop_and_restart(self):508        """509        Test stopping and then restarting a LoadTestShape510        """511        class TestUser(User):512            @task513            def my_task(self):514                pass515        class TestShape(LoadTestShape):516            def tick(self):517                run_time = self.get_run_time()518                if run_time < 10:519                    return (4, 4)520                else:521                    return None522        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):523            master_env = Environment(user_classes=[TestUser], shape_class=TestShape())524            master_env.shape_class.reset_time()525            master = master_env.create_master_runner("*", 0)526            workers = []527            for i in range(2):528                worker_env = Environment(user_classes=[TestUser])529                worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)530                workers.append(worker)531            # Give workers time to connect532            sleep(0.1)533            # Start a shape test and ensure workers have connected and started the correct amounf of users534            master.start_shape()535            sleep(1)536            for worker in workers:537                self.assertEqual(2, worker.user_count, "Shape test has not started correctly")538            # Stop the test and ensure all user count is 0539            master.stop()540            sleep(1)541            for worker in workers:542                self.assertEqual(0, worker.user_count, "Shape test has not stopped")543            # Then restart the test again and ensure workers have connected and started the correct amounf of users544            master.start_shape()545            sleep(1)546            for worker in workers:547                self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")548            master.stop()549class TestMasterRunner(LocustTestCase):550    def setUp(self):551        super().setUp()552        self.environment = Environment(events=locust.events, catch_exceptions=False)553    def tearDown(self):554        super().tearDown()555    def get_runner(self):556        return self.environment.create_master_runner("*", 5557)557    def test_worker_connect(self):558        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:559            master = self.get_runner()560            server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))561            self.assertEqual(1, len(master.clients))562            self.assertTrue(563                "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"564            )565            server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))566            server.mocked_send(Message("client_ready", None, "zeh_fake_client3"))567            server.mocked_send(Message("client_ready", None, "zeh_fake_client4"))568            self.assertEqual(4, len(master.clients))569            server.mocked_send(Message("quit", None, "zeh_fake_client3"))570            self.assertEqual(3, len(master.clients))571    def test_worker_stats_report_median(self):572        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:573            master = self.get_runner()574            server.mocked_send(Message("client_ready", None, "fake_client"))575            master.stats.get("/", "GET").log(100, 23455)576            master.stats.get("/", "GET").log(800, 23455)577            master.stats.get("/", "GET").log(700, 23455)578            data = {"user_count": 1}579            self.environment.events.report_to_master.fire(client_id="fake_client", data=data)580            master.stats.clear_all()581            server.mocked_send(Message("stats", data, "fake_client"))582            s = master.stats.get("/", "GET")583            self.assertEqual(700, s.median_response_time)584    def test_worker_stats_report_with_none_response_times(self):585        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:586            master = self.get_runner()587            server.mocked_send(Message("client_ready", None, "fake_client"))588            master.stats.get("/mixed", "GET").log(0, 23455)589            master.stats.get("/mixed", "GET").log(800, 23455)590            master.stats.get("/mixed", "GET").log(700, 23455)591            master.stats.get("/mixed", "GET").log(None, 23455)592            master.stats.get("/mixed", "GET").log(None, 23455)593            master.stats.get("/mixed", "GET").log(None, 23455)594            master.stats.get("/mixed", "GET").log(None, 23455)595            master.stats.get("/onlyNone", "GET").log(None, 23455)596            data = {"user_count": 1}597            self.environment.events.report_to_master.fire(client_id="fake_client", data=data)598            master.stats.clear_all()599            server.mocked_send(Message("stats", data, "fake_client"))600            s1 = master.stats.get("/mixed", "GET")601            self.assertEqual(700, s1.median_response_time)602            self.assertEqual(500, s1.avg_response_time)603            s2 = master.stats.get("/onlyNone", "GET")604            self.assertEqual(0, s2.median_response_time)605            self.assertEqual(0, s2.avg_response_time)606    def test_master_marks_downed_workers_as_missing(self):607        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:608            master = self.get_runner()609            server.mocked_send(Message("client_ready", None, "fake_client"))610            sleep(6)611            # print(master.clients['fake_client'].__dict__)612            assert master.clients["fake_client"].state == STATE_MISSING613    def test_last_worker_quitting_stops_test(self):614        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:615            master = self.get_runner()616            server.mocked_send(Message("client_ready", None, "fake_client1"))617            server.mocked_send(Message("client_ready", None, "fake_client2"))618            master.start(1, 2)619            server.mocked_send(Message("spawning", None, "fake_client1"))620            server.mocked_send(Message("spawning", None, "fake_client2"))621            server.mocked_send(Message("quit", None, "fake_client1"))622            sleep(0)623            self.assertEqual(1, len(master.clients.all))624            self.assertNotEqual(STATE_STOPPED, master.state, "Not all workers quit but test stopped anyway.")625            server.mocked_send(Message("quit", None, "fake_client2"))626            sleep(0)627            self.assertEqual(0, len(master.clients.all))628            self.assertEqual(STATE_STOPPED, master.state, "All workers quit but test didn't stop.")629    @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)630    def test_last_worker_missing_stops_test(self):631        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:632            master = self.get_runner()633            server.mocked_send(Message("client_ready", None, "fake_client1"))634            server.mocked_send(Message("client_ready", None, "fake_client2"))635            server.mocked_send(Message("client_ready", None, "fake_client3"))636            master.start(3, 3)637            server.mocked_send(Message("spawning", None, "fake_client1"))638            server.mocked_send(Message("spawning", None, "fake_client2"))639            server.mocked_send(Message("spawning", None, "fake_client3"))640            sleep(0.2)641            server.mocked_send(642                Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")643            )644            server.mocked_send(645                Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client2")646            )647            server.mocked_send(648                Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client3")649            )650            sleep(0.2)651            self.assertEqual(0, len(master.clients.missing))652            self.assertEqual(3, master.worker_count)653            self.assertNotIn(654                master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."655            )656            server.mocked_send(657                Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")658            )659            sleep(0.4)660            self.assertEqual(2, len(master.clients.missing))661            self.assertEqual(1, master.worker_count)662            self.assertNotIn(663                master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."664            )665            sleep(0.2)666            self.assertEqual(3, len(master.clients.missing))667            self.assertEqual(0, master.worker_count)668            self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")669    def test_master_total_stats(self):670        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:671            master = self.get_runner()672            server.mocked_send(Message("client_ready", None, "fake_client"))673            stats = RequestStats()674            stats.log_request("GET", "/1", 100, 3546)675            stats.log_request("GET", "/1", 800, 56743)676            stats2 = RequestStats()677            stats2.log_request("GET", "/2", 700, 2201)678            server.mocked_send(679                Message(680                    "stats",681                    {682                        "stats": stats.serialize_stats(),683                        "stats_total": stats.total.serialize(),684                        "errors": stats.serialize_errors(),685                        "user_count": 1,686                    },687                    "fake_client",688                )689            )690            server.mocked_send(691                Message(692                    "stats",693                    {694                        "stats": stats2.serialize_stats(),695                        "stats_total": stats2.total.serialize(),696                        "errors": stats2.serialize_errors(),697                        "user_count": 2,698                    },699                    "fake_client",700                )701            )702            self.assertEqual(700, master.stats.total.median_response_time)703    def test_master_total_stats_with_none_response_times(self):704        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:705            master = self.get_runner()706            server.mocked_send(Message("client_ready", None, "fake_client"))707            stats = RequestStats()708            stats.log_request("GET", "/1", 100, 3546)709            stats.log_request("GET", "/1", 800, 56743)710            stats.log_request("GET", "/1", None, 56743)711            stats2 = RequestStats()712            stats2.log_request("GET", "/2", 700, 2201)713            stats2.log_request("GET", "/2", None, 2201)714            stats3 = RequestStats()715            stats3.log_request("GET", "/3", None, 2201)716            server.mocked_send(717                Message(718                    "stats",719                    {720                        "stats": stats.serialize_stats(),721                        "stats_total": stats.total.serialize(),722                        "errors": stats.serialize_errors(),723                        "user_count": 1,724                    },725                    "fake_client",726                )727            )728            server.mocked_send(729                Message(730                    "stats",731                    {732                        "stats": stats2.serialize_stats(),733                        "stats_total": stats2.total.serialize(),734                        "errors": stats2.serialize_errors(),735                        "user_count": 2,736                    },737                    "fake_client",738                )739            )740            server.mocked_send(741                Message(742                    "stats",743                    {744                        "stats": stats3.serialize_stats(),745                        "stats_total": stats3.total.serialize(),746                        "errors": stats3.serialize_errors(),747                        "user_count": 2,748                    },749                    "fake_client",750                )751            )752            self.assertEqual(700, master.stats.total.median_response_time)753    def test_master_current_response_times(self):754        start_time = 1755        with mock.patch("time.time") as mocked_time:756            mocked_time.return_value = start_time757            with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:758                master = self.get_runner()759                self.environment.stats.reset_all()760                mocked_time.return_value += 1.0234761                server.mocked_send(Message("client_ready", None, "fake_client"))762                stats = RequestStats()763                stats.log_request("GET", "/1", 100, 3546)764                stats.log_request("GET", "/1", 800, 56743)765                server.mocked_send(766                    Message(767                        "stats",768                        {769                            "stats": stats.serialize_stats(),770                            "stats_total": stats.total.get_stripped_report(),771                            "errors": stats.serialize_errors(),772                            "user_count": 1,773                        },774                        "fake_client",775                    )776                )777                mocked_time.return_value += 1778                stats2 = RequestStats()779                stats2.log_request("GET", "/2", 400, 2201)780                server.mocked_send(781                    Message(782                        "stats",783                        {784                            "stats": stats2.serialize_stats(),785                            "stats_total": stats2.total.get_stripped_report(),786                            "errors": stats2.serialize_errors(),787                            "user_count": 2,788                        },789                        "fake_client",790                    )791                )792                mocked_time.return_value += 4793                self.assertEqual(400, master.stats.total.get_current_response_time_percentile(0.5))794                self.assertEqual(800, master.stats.total.get_current_response_time_percentile(0.95))795                # let 10 second pass, do some more requests, send it to the master and make796                # sure the current response time percentiles only accounts for these new requests797                mocked_time.return_value += 10.10023798                stats.log_request("GET", "/1", 20, 1)799                stats.log_request("GET", "/1", 30, 1)800                stats.log_request("GET", "/1", 3000, 1)801                server.mocked_send(802                    Message(803                        "stats",804                        {805                            "stats": stats.serialize_stats(),806                            "stats_total": stats.total.get_stripped_report(),807                            "errors": stats.serialize_errors(),808                            "user_count": 2,809                        },810                        "fake_client",811                    )812                )813                self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))814                self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))815    def test_rebalance_locust_users_on_worker_connect(self):816        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:817            master = self.get_runner()818            server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))819            self.assertEqual(1, len(master.clients))820            self.assertTrue(821                "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"822            )823            master.start(100, 20)824            self.assertEqual(1, len(server.outbox))825            client_id, msg = server.outbox.pop()826            self.assertEqual(100, msg.data["num_users"])827            self.assertEqual(20, msg.data["spawn_rate"])828            # let another worker connect829            server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))830            self.assertEqual(2, len(master.clients))831            self.assertEqual(2, len(server.outbox))832            client_id, msg = server.outbox.pop()833            self.assertEqual(50, msg.data["num_users"])834            self.assertEqual(10, msg.data["spawn_rate"])835            client_id, msg = server.outbox.pop()836            self.assertEqual(50, msg.data["num_users"])837            self.assertEqual(10, msg.data["spawn_rate"])838    def test_sends_spawn_data_to_ready_running_spawning_workers(self):839        """Sends spawn job to running, ready, or spawning workers"""840        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:841            master = self.get_runner()842            master.clients[1] = WorkerNode(1)843            master.clients[2] = WorkerNode(2)844            master.clients[3] = WorkerNode(3)845            master.clients[1].state = STATE_INIT846            master.clients[2].state = STATE_SPAWNING847            master.clients[3].state = STATE_RUNNING848            master.start(user_count=5, spawn_rate=5)849            self.assertEqual(3, len(server.outbox))850    def test_start_event(self):851        """852        Tests that test_start event is fired853        """854        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:855            master = self.get_runner()856            run_count = [0]857            @self.environment.events.test_start.add_listener858            def on_test_start(*a, **kw):859                run_count[0] += 1860            for i in range(5):861                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))862            master.start(7, 7)863            self.assertEqual(5, len(server.outbox))864            self.assertEqual(1, run_count[0])865            # change number of users and check that test_start isn't fired again866            master.start(7, 7)867            self.assertEqual(1, run_count[0])868            # stop and start to make sure test_start is fired again869            master.stop()870            master.start(3, 3)871            self.assertEqual(2, run_count[0])872            master.quit()873    def test_stop_event(self):874        """875        Tests that test_stop event is fired876        """877        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:878            master = self.get_runner()879            run_count = [0]880            @self.environment.events.test_stop.add_listener881            def on_test_stop(*a, **kw):882                run_count[0] += 1883            for i in range(5):884                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))885            master.start(7, 7)886            self.assertEqual(5, len(server.outbox))887            master.stop()888            self.assertEqual(1, run_count[0])889            run_count[0] = 0890            for i in range(5):891                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))892            master.start(7, 7)893            master.stop()894            master.quit()895            self.assertEqual(1, run_count[0])896    def test_stop_event_quit(self):897        """898        Tests that test_stop event is fired when quit() is called directly899        """900        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:901            master = self.get_runner()902            run_count = [0]903            @self.environment.events.test_stop.add_listener904            def on_test_stop(*a, **kw):905                run_count[0] += 1906            for i in range(5):907                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))908            master.start(7, 7)909            self.assertEqual(5, len(server.outbox))910            master.quit()911            self.assertEqual(1, run_count[0])912    def test_spawn_zero_locusts(self):913        class MyTaskSet(TaskSet):914            @task915            def my_task(self):916                pass917        class MyTestUser(User):918            tasks = [MyTaskSet]919            wait_time = constant(0.1)920        environment = Environment(user_classes=[MyTestUser])921        runner = LocalRunner(environment)922        timeout = gevent.Timeout(2.0)923        timeout.start()924        try:925            runner.start(0, 1, wait=True)926            runner.spawning_greenlet.join()927        except gevent.Timeout:928            self.fail("Got Timeout exception. A locust seems to have been spawned, even though 0 was specified.")929        finally:930            timeout.cancel()931    def test_spawn_uneven_locusts(self):932        """933        Tests that we can accurately spawn a certain number of locusts, even if it's not an934        even number of the connected workers935        """936        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:937            master = self.get_runner()938            for i in range(5):939                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))940            master.start(7, 7)941            self.assertEqual(5, len(server.outbox))942            num_users = 0943            for _, msg in server.outbox:944                num_users += msg.data["num_users"]945            self.assertEqual(7, num_users, "Total number of locusts that would have been spawned is not 7")946    def test_spawn_fewer_locusts_than_workers(self):947        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:948            master = self.get_runner()949            for i in range(5):950                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))951            master.start(2, 2)952            self.assertEqual(5, len(server.outbox))953            num_users = 0954            for _, msg in server.outbox:955                num_users += msg.data["num_users"]956            self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")957    def test_custom_shape_scale_up(self):958        class MyUser(User):959            @task960            def my_task(self):961                pass962        class TestShape(LoadTestShape):963            def tick(self):964                run_time = self.get_run_time()965                if run_time < 2:966                    return (1, 1)967                elif run_time < 4:968                    return (2, 2)969                else:970                    return None971        self.environment.user_classes = [MyUser]972        self.environment.shape_class = TestShape()973        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:974            master = self.get_runner()975            for i in range(5):976                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))977            # Start the shape_worker978            self.environment.shape_class.reset_time()979            master.start_shape()980            sleep(0.5)981            # Wait for shape_worker to update user_count982            num_users = 0983            for _, msg in server.outbox:984                if msg.data:985                    num_users += msg.data["num_users"]986            self.assertEqual(987                1, num_users, "Total number of users in first stage of shape test is not 1: %i" % num_users988            )989            # Wait for shape_worker to update user_count again990            sleep(2)991            num_users = 0992            for _, msg in server.outbox:993                if msg.data:994                    num_users += msg.data["num_users"]995            self.assertEqual(996                3, num_users, "Total number of users in second stage of shape test is not 3: %i" % num_users997            )998            # Wait to ensure shape_worker has stopped the test999            sleep(3)1000            self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1001    def test_custom_shape_scale_down(self):1002        class MyUser(User):1003            @task1004            def my_task(self):1005                pass1006        class TestShape(LoadTestShape):1007            def tick(self):1008                run_time = self.get_run_time()1009                if run_time < 2:1010                    return (5, 5)1011                elif run_time < 4:1012                    return (-4, 4)1013                else:1014                    return None1015        self.environment.user_classes = [MyUser]1016        self.environment.shape_class = TestShape()1017        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1018            master = self.get_runner()1019            for i in range(5):1020                server.mocked_send(Message("client_ready", None, "fake_client%i" % i))1021            # Start the shape_worker1022            self.environment.shape_class.reset_time()1023            master.start_shape()1024            sleep(0.5)1025            # Wait for shape_worker to update user_count1026            num_users = 01027            for _, msg in server.outbox:1028                if msg.data:1029                    num_users += msg.data["num_users"]1030            self.assertEqual(1031                5, num_users, "Total number of users in first stage of shape test is not 5: %i" % num_users1032            )1033            # Wait for shape_worker to update user_count again1034            sleep(2)1035            num_users = 01036            for _, msg in server.outbox:1037                if msg.data:1038                    num_users += msg.data["num_users"]1039            self.assertEqual(1040                1, num_users, "Total number of users in second stage of shape test is not 1: %i" % num_users1041            )1042            # Wait to ensure shape_worker has stopped the test1043            sleep(3)1044            self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1045    def test_exception_in_task(self):1046        class MyUser(User):1047            @task1048            def will_error(self):1049                raise HeyAnException(":(")1050        self.environment.user_classes = [MyUser]1051        runner = self.environment.create_local_runner()1052        l = MyUser(self.environment)1053        self.assertRaises(HeyAnException, l.run)1054        self.assertRaises(HeyAnException, l.run)1055        self.assertEqual(1, len(runner.exceptions))1056        hash_key, exception = runner.exceptions.popitem()1057        self.assertTrue("traceback" in exception)1058        self.assertTrue("HeyAnException" in exception["traceback"])1059        self.assertEqual(2, exception["count"])1060    def test_exception_is_caught(self):1061        """Test that exceptions are stored, and execution continues"""1062        class MyTaskSet(TaskSet):1063            def __init__(self, *a, **kw):1064                super().__init__(*a, **kw)1065                self._task_queue = [self.will_error, self.will_stop]1066            @task(1)1067            def will_error(self):1068                raise HeyAnException(":(")1069            @task(1)1070            def will_stop(self):1071                raise StopUser()1072        class MyUser(User):1073            wait_time = constant(0.01)1074            tasks = [MyTaskSet]1075        # set config to catch exceptions in locust users1076        self.environment.catch_exceptions = True1077        self.environment.user_classes = [MyUser]1078        runner = LocalRunner(self.environment)1079        l = MyUser(self.environment)1080        # make sure HeyAnException isn't raised1081        l.run()1082        l.run()1083        # make sure we got two entries in the error log1084        self.assertEqual(2, len(self.mocked_log.error))1085        # make sure exception was stored1086        self.assertEqual(1, len(runner.exceptions))1087        hash_key, exception = runner.exceptions.popitem()1088        self.assertTrue("traceback" in exception)1089        self.assertTrue("HeyAnException" in exception["traceback"])1090        self.assertEqual(2, exception["count"])1091    def test_master_reset_connection(self):1092        """Test that connection will be reset when network issues found"""1093        with mock.patch("locust.runners.FALLBACK_INTERVAL", new=0.1):1094            with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1095                master = self.get_runner()1096                self.assertEqual(0, len(master.clients))1097                server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client"))1098                self.assertTrue(master.connection_broken)1099                server.mocked_send(Message("client_ready", None, "fake_client"))1100                sleep(0.2)1101                self.assertFalse(master.connection_broken)1102                self.assertEqual(1, len(master.clients))1103                master.quit()1104class TestWorkerRunner(LocustTestCase):1105    def setUp(self):1106        super().setUp()1107        # self._report_to_master_event_handlers = [h for h in events.report_to_master._handlers]1108    def tearDown(self):1109        # events.report_to_master._handlers = self._report_to_master_event_handlers1110        super().tearDown()1111    def get_runner(self, environment=None, user_classes=[]):1112        if environment is None:1113            environment = self.environment1114        environment.user_classes = user_classes1115        return WorkerRunner(environment, master_host="localhost", master_port=5557)1116    def test_worker_stop_timeout(self):1117        class MyTestUser(User):1118            _test_state = 01119            @task1120            def the_task(self):1121                MyTestUser._test_state = 11122                gevent.sleep(0.2)1123                MyTestUser._test_state = 21124        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1125            environment = Environment()1126            test_start_run = [False]1127            @environment.events.test_start.add_listener1128            def on_test_start(_environment, **kw):1129                test_start_run[0] = True1130            worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1131            self.assertEqual(1, len(client.outbox))1132            self.assertEqual("client_ready", client.outbox[0].type)1133            client.mocked_send(1134                Message(1135                    "spawn",1136                    {1137                        "spawn_rate": 1,1138                        "num_users": 1,1139                        "host": "",1140                        "stop_timeout": 1,1141                    },1142                    "dummy_client_id",1143                )1144            )1145            # print("outbox:", client.outbox)1146            # wait for worker to spawn locusts1147            self.assertIn("spawning", [m.type for m in client.outbox])1148            worker.spawning_greenlet.join()1149            self.assertEqual(1, len(worker.user_greenlets))1150            # check that locust has started running1151            gevent.sleep(0.01)1152            self.assertEqual(1, MyTestUser._test_state)1153            # send stop message1154            client.mocked_send(Message("stop", None, "dummy_client_id"))1155            worker.user_greenlets.join()1156            # check that locust user got to finish1157            self.assertEqual(2, MyTestUser._test_state)1158            # make sure the test_start was never fired on the worker1159            self.assertFalse(test_start_run[0])1160    def test_worker_without_stop_timeout(self):1161        class MyTestUser(User):1162            _test_state = 01163            @task1164            def the_task(self):1165                MyTestUser._test_state = 11166                gevent.sleep(0.2)1167                MyTestUser._test_state = 21168        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1169            environment = Environment(stop_timeout=None)1170            worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1171            self.assertEqual(1, len(client.outbox))1172            self.assertEqual("client_ready", client.outbox[0].type)1173            client.mocked_send(1174                Message(1175                    "spawn",1176                    {1177                        "spawn_rate": 1,1178                        "num_users": 1,1179                        "host": "",1180                        "stop_timeout": None,1181                    },1182                    "dummy_client_id",1183                )1184            )1185            # print("outbox:", client.outbox)1186            # wait for worker to spawn locusts1187            self.assertIn("spawning", [m.type for m in client.outbox])1188            worker.spawning_greenlet.join()1189            self.assertEqual(1, len(worker.user_greenlets))1190            # check that locust has started running1191            gevent.sleep(0.01)1192            self.assertEqual(1, MyTestUser._test_state)1193            # send stop message1194            client.mocked_send(Message("stop", None, "dummy_client_id"))1195            worker.user_greenlets.join()1196            # check that locust user did not get to finish1197            self.assertEqual(1, MyTestUser._test_state)1198    def test_change_user_count_during_spawning(self):1199        class MyUser(User):1200            wait_time = constant(1)1201            @task1202            def my_task(self):1203                pass1204        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1205            environment = Environment()1206            worker = self.get_runner(environment=environment, user_classes=[MyUser])1207            client.mocked_send(1208                Message(1209                    "spawn",1210                    {1211                        "spawn_rate": 5,1212                        "num_users": 10,1213                        "host": "",1214                        "stop_timeout": None,1215                    },1216                    "dummy_client_id",1217                )1218            )1219            sleep(0.6)1220            self.assertEqual(STATE_SPAWNING, worker.state)1221            client.mocked_send(1222                Message(1223                    "spawn",1224                    {1225                        "spawn_rate": 5,1226                        "num_users": 9,1227                        "host": "",1228                        "stop_timeout": None,1229                    },1230                    "dummy_client_id",1231                )1232            )1233            sleep(0)1234            worker.spawning_greenlet.join()1235            self.assertEqual(9, len(worker.user_greenlets))1236            worker.quit()1237class TestMessageSerializing(unittest.TestCase):1238    def test_message_serialize(self):1239        msg = Message("client_ready", None, "my_id")1240        rebuilt = Message.unserialize(msg.serialize())1241        self.assertEqual(msg.type, rebuilt.type)1242        self.assertEqual(msg.data, rebuilt.data)1243        self.assertEqual(msg.node_id, rebuilt.node_id)1244class TestStopTimeout(LocustTestCase):1245    def test_stop_timeout(self):1246        short_time = 0.051247        class MyTaskSet(TaskSet):1248            @task1249            def my_task(self):1250                MyTaskSet.state = "first"1251                gevent.sleep(short_time)1252                MyTaskSet.state = "second"  # should only run when run time + stop_timeout is > short_time1253                gevent.sleep(short_time)1254                MyTaskSet.state = "third"  # should only run when run time + stop_timeout is > short_time * 21255        class MyTestUser(User):1256            tasks = [MyTaskSet]1257        environment = Environment(user_classes=[MyTestUser])1258        runner = environment.create_local_runner()1259        runner.start(1, 1, wait=False)1260        gevent.sleep(short_time / 2)1261        runner.quit()1262        self.assertEqual("first", MyTaskSet.state)1263        # exit with timeout1264        environment = Environment(user_classes=[MyTestUser], stop_timeout=short_time / 2)1265        runner = environment.create_local_runner()1266        runner.start(1, 1, wait=False)1267        gevent.sleep(short_time)1268        runner.quit()1269        self.assertEqual("second", MyTaskSet.state)1270        # allow task iteration to complete, with some margin1271        environment = Environment(user_classes=[MyTestUser], stop_timeout=short_time * 3)1272        runner = environment.create_local_runner()1273        runner.start(1, 1, wait=False)1274        gevent.sleep(short_time)1275        timeout = gevent.Timeout(short_time * 2)1276        timeout.start()1277        try:1278            runner.quit()1279            runner.greenlet.join()1280        except gevent.Timeout:1281            self.fail("Got Timeout exception. Some locusts must have kept running after iteration finish")1282        finally:1283            timeout.cancel()1284        self.assertEqual("third", MyTaskSet.state)1285    def test_stop_timeout_during_on_start(self):1286        short_time = 0.051287        class MyTaskSet(TaskSet):1288            finished_on_start = False1289            my_task_run = False1290            def on_start(self):1291                gevent.sleep(short_time)1292                MyTaskSet.finished_on_start = True1293            @task1294            def my_task(self):1295                MyTaskSet.my_task_run = True1296        class MyTestUser(User):1297            tasks = [MyTaskSet]1298        environment = create_environment([MyTestUser], mocked_options())1299        environment.stop_timeout = short_time1300        runner = environment.create_local_runner()1301        runner.start(1, 1)1302        gevent.sleep(short_time / 2)1303        runner.quit()1304        self.assertTrue(MyTaskSet.finished_on_start)1305        self.assertFalse(MyTaskSet.my_task_run)1306    def test_stop_timeout_exit_during_wait(self):1307        short_time = 0.051308        class MyTaskSet(TaskSet):1309            @task1310            def my_task(self):1311                pass1312        class MyTestUser(User):1313            tasks = [MyTaskSet]1314            wait_time = constant(1)1315        environment = Environment(user_classes=[MyTestUser], stop_timeout=short_time)1316        runner = environment.create_local_runner()1317        runner.start(1, 1)1318        gevent.sleep(short_time)  # sleep to make sure locust has had time to start waiting1319        timeout = gevent.Timeout(short_time)1320        timeout.start()1321        try:1322            runner.quit()1323            runner.greenlet.join()1324        except gevent.Timeout:1325            self.fail("Got Timeout exception. Waiting locusts should stop immediately, even when using stop_timeout.")1326        finally:1327            timeout.cancel()1328    def test_stop_timeout_with_interrupt(self):1329        short_time = 0.051330        class MySubTaskSet(TaskSet):1331            @task1332            def a_task(self):1333                gevent.sleep(0)1334                self.interrupt(reschedule=True)1335        class MyTaskSet(TaskSet):1336            tasks = [MySubTaskSet]1337        class MyTestUser(User):1338            tasks = [MyTaskSet]1339        environment = create_environment([MyTestUser], mocked_options())1340        environment.stop_timeout = short_time1341        runner = environment.create_local_runner()1342        runner.start(1, 1, wait=True)1343        gevent.sleep(0)1344        timeout = gevent.Timeout(short_time)1345        timeout.start()1346        try:1347            runner.quit()1348            runner.greenlet.join()1349        except gevent.Timeout:1350            self.fail("Got Timeout exception. Interrupted locusts should exit immediately during stop_timeout.")1351        finally:1352            timeout.cancel()1353    def test_stop_timeout_with_interrupt_no_reschedule(self):1354        state = [0]1355        class MySubTaskSet(TaskSet):1356            @task1357            def a_task(self):1358                gevent.sleep(0.1)1359                state[0] = 11360                self.interrupt(reschedule=False)1361        class MyTestUser(User):1362            tasks = [MySubTaskSet]1363            wait_time = constant(3)1364        environment = create_environment([MyTestUser], mocked_options())1365        environment.stop_timeout = 0.31366        runner = environment.create_local_runner()1367        runner.start(1, 1, wait=True)1368        gevent.sleep(0)1369        timeout = gevent.Timeout(0.11)1370        timeout.start()1371        try:1372            runner.quit()1373            runner.greenlet.join()1374        except gevent.Timeout:1375            self.fail("Got Timeout exception. Interrupted locusts should exit immediately during stop_timeout.")1376        finally:1377            timeout.cancel()1378        self.assertEqual(1, state[0])1379    def test_kill_locusts_with_stop_timeout(self):1380        short_time = 0.051381        class MyTaskSet(TaskSet):1382            @task1383            def my_task(self):1384                MyTaskSet.state = "first"1385                gevent.sleep(short_time)1386                MyTaskSet.state = "second"  # should only run when run time + stop_timeout is > short_time1387                gevent.sleep(short_time)1388                MyTaskSet.state = "third"  # should only run when run time + stop_timeout is > short_time * 21389        class MyTestUser(User):1390            tasks = [MyTaskSet]1391        environment = create_environment([MyTestUser], mocked_options())1392        runner = environment.create_local_runner()1393        runner.start(1, 1)1394        gevent.sleep(short_time / 2)1395        runner.stop_users(1)1396        self.assertEqual("first", MyTaskSet.state)1397        runner.quit()1398        environment.runner = None1399        environment.stop_timeout = short_time / 2  # exit with timeout1400        runner = environment.create_local_runner()1401        runner.start(1, 1)1402        gevent.sleep(short_time)1403        runner.stop_users(1)1404        self.assertEqual("second", MyTaskSet.state)1405        runner.quit()1406        environment.runner = None1407        environment.stop_timeout = short_time * 3  # allow task iteration to complete, with some margin1408        runner = environment.create_local_runner()1409        runner.start(1, 1)1410        gevent.sleep(short_time)1411        timeout = gevent.Timeout(short_time * 2)1412        timeout.start()1413        try:1414            runner.stop_users(1)1415            runner.user_greenlets.join()1416        except gevent.Timeout:1417            self.fail("Got Timeout exception. Some locusts must have kept running after iteration finish")1418        finally:1419            timeout.cancel()1420        self.assertEqual("third", MyTaskSet.state)1421    def test_users_can_call_runner_quit_with_stop_timeout(self):1422        class BaseUser(User):1423            wait_time = constant(1)1424            @task1425            def trigger(self):1426                self.environment.runner.quit()1427        runner = Environment(user_classes=[BaseUser]).create_local_runner()1428        runner.environment.stop_timeout = 11429        runner.spawn_users(1, 1, wait=False)1430        timeout = gevent.Timeout(0.5)1431        timeout.start()1432        try:1433            runner.greenlet.join()1434        except gevent.Timeout:1435            self.fail("Got Timeout exception, runner must have hung somehow.")1436        finally:1437            timeout.cancel()1438    def test_gracefully_handle_exceptions_in_listener(self):1439        class MyUser(User):1440            wait_time = constant(1)1441            @task1442            def my_task(self):1443                pass1444        test_stop_run = [0]1445        environment = Environment(user_classes=[User])1446        def on_test_stop_ok(*args, **kwargs):1447            test_stop_run[0] += 11448        def on_test_stop_fail(*args, **kwargs):1449            assert 01450        environment.events.test_stop.add_listener(on_test_stop_ok)1451        environment.events.test_stop.add_listener(on_test_stop_fail)1452        environment.events.test_stop.add_listener(on_test_stop_ok)1453        runner = LocalRunner(environment)1454        runner.start(user_count=3, spawn_rate=3, wait=False)1455        self.assertEqual(0, test_stop_run[0])1456        runner.stop()1457        self.assertEqual(2, test_stop_run[0])1458    def test_stop_timeout_with_ramp_down(self):1459        class MyTaskSet(TaskSet):1460            @task1461            def my_task(self):1462                gevent.sleep(1)1463        class MyTestUser(User):1464            tasks = [MyTaskSet]1465        environment = Environment(user_classes=[MyTestUser], stop_timeout=2)1466        runner = environment.create_local_runner()1467        # Start load test, wait for users to start, then trigger ramp down1468        runner.start(10, 10, wait=False)1469        sleep(1)1470        runner.start(2, 4, wait=False)1471        # Wait a moment and then ensure the user count has started to drop but1472        # not immediately to user_count1473        sleep(1.1)1474        user_count = len(runner.user_greenlets)1475        self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)1476        self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)1477        # Wait and ensure load test users eventually dropped to desired count1478        sleep(2)1479        user_count = len(runner.user_greenlets)...shape_worker.py
Source:shape_worker.py  
1"""Utilities to load plugins to Agisoft Metashape2Copyright (C) 2021  Geoscan Ltd. https://www.geoscan.aero/3This program is free software: you can redistribute it and/or modify4it under the terms of the GNU General Public License as published by5the Free Software Foundation, either version 3 of the License, or6(at your option) any later version.7This program is distributed in the hope that it will be useful,8but WITHOUT ANY WARRANTY; without even the implied warranty of9MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10GNU General Public License for more details.11You should have received a copy of the GNU General Public License12along with this program. If not, see <https://www.gnu.org/licenses/>.13"""14from functools import partial15from common.startup.initialization import ps, import_module16from installed_plugins.utils.utils import init_top_menu17TOP_MENU = init_top_menu()18def inject(trans):19    from shape_worker.shape_worker import grid_generator20    ps.app.addMenuItem(_(TOP_MENU) + "/" + _("Shape") + "/" + _("Build grid"), partial(grid_generator, trans))...msk_grid_builder.py
Source:msk_grid_builder.py  
1"""Utilities to load plugins to Agisoft Metashape2Copyright (C) 2021  Geoscan Ltd. https://www.geoscan.aero/3This program is free software: you can redistribute it and/or modify4it under the terms of the GNU General Public License as published by5the Free Software Foundation, either version 3 of the License, or6(at your option) any later version.7This program is distributed in the hope that it will be useful,8but WITHOUT ANY WARRANTY; without even the implied warranty of9MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10GNU General Public License for more details.11You should have received a copy of the GNU General Public License12along with this program. If not, see <https://www.gnu.org/licenses/>.13"""14from functools import partial15from common.startup.initialization import ps, import_module16from installed_plugins.utils.utils import init_top_menu17TOP_MENU = init_top_menu()18def inject(trans):19    from shape_worker.msk_grid_builder import main20    ps.app.addMenuItem(_(TOP_MENU) + "/" + _("Shape") + "/" + _("Build MSK grid"), partial(main, trans))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
