Best Python code snippet using locust
test_runners.py
Source:test_runners.py  
...1799            master.start(1, 3)1800            self.assertEqual({"MyUser1": 1, "MyUser2": 0}, master.target_user_classes_count)1801            self.assertEqual(1, master.target_user_count)1802            self.assertEqual(3, master.spawn_rate)1803    def test_custom_message_send(self):1804        class MyUser(User):1805            wait_time = constant(1)1806            @task1807            def my_task(self):1808                pass1809        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1810            master = self.get_runner()1811            for i in range(5):1812                master.clients[i] = WorkerNode(str(i))1813            master.send_message("test_custom_msg", {"test_data": 123})1814            self.assertEqual(5, len(server.outbox))1815            for _, msg in server.outbox:1816                self.assertEqual("test_custom_msg", msg.type)1817                self.assertEqual(123, msg.data["test_data"])1818    def test_custom_message_receive(self):1819        class MyUser(User):1820            wait_time = constant(1)1821            @task1822            def my_task(self):1823                pass1824        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1825            test_custom_msg = [False]1826            test_custom_msg_data = [{}]1827            def on_custom_msg(msg, **kw):1828                test_custom_msg[0] = True1829                test_custom_msg_data[0] = msg.data1830            master = self.get_runner()1831            master.register_message("test_custom_msg", on_custom_msg)1832            server.mocked_send(Message("test_custom_msg", {"test_data": 123}, "dummy_id"))1833            self.assertTrue(test_custom_msg[0])1834            self.assertEqual(123, test_custom_msg_data[0]["test_data"])1835    def test_undefined_custom_message_receive(self):1836        class MyUser(User):1837            wait_time = constant(1)1838            @task1839            def my_task(self):1840                pass1841        with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1842            test_custom_msg = [False]1843            def on_custom_msg(msg, **kw):1844                test_custom_msg[0] = True1845            master = self.get_runner()1846            master.register_message("test_custom_msg", on_custom_msg)1847            server.mocked_send(Message("unregistered_custom_msg", {}, "dummy_id"))1848            self.assertFalse(test_custom_msg[0])1849            self.assertEqual(1, len(self.mocked_log.warning))1850            msg = self.mocked_log.warning[0]1851            self.assertIn("Unknown message type recieved from worker", msg)1852    def test_wait_for_workers_report_after_ramp_up(self):1853        def assert_cache_hits():1854            self.assertEqual(master._wait_for_workers_report_after_ramp_up.cache_info().hits, 0)1855            master._wait_for_workers_report_after_ramp_up()1856            self.assertEqual(master._wait_for_workers_report_after_ramp_up.cache_info().hits, 1)1857        master = self.get_runner()1858        master._wait_for_workers_report_after_ramp_up.cache_clear()1859        self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 0.1)1860        assert_cache_hits()1861        master._wait_for_workers_report_after_ramp_up.cache_clear()1862        with _patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"):1863            self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7)1864            assert_cache_hits()1865        master._wait_for_workers_report_after_ramp_up.cache_clear()1866        with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), _patch_env(1867            "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7 * WORKER_REPORT_INTERVAL"1868        ):1869            self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7 * 1.5)1870            assert_cache_hits()1871        master._wait_for_workers_report_after_ramp_up.cache_clear()1872@contextmanager1873def _patch_env(name: str, value: str):1874    prev_value = os.getenv(name)1875    os.environ[name] = value1876    try:1877        yield1878    finally:1879        if prev_value is None:1880            del os.environ[name]1881        else:1882            os.environ[name] = prev_value1883class TestWorkerRunner(LocustTestCase):1884    def setUp(self):1885        super().setUp()1886        # self._report_to_master_event_handlers = [h for h in events.report_to_master._handlers]1887    def tearDown(self):1888        # events.report_to_master._handlers = self._report_to_master_event_handlers1889        super().tearDown()1890    def get_runner(self, environment=None, user_classes=None):1891        if environment is None:1892            environment = self.environment1893        user_classes = user_classes or []1894        environment.user_classes = user_classes1895        return WorkerRunner(environment, master_host="localhost", master_port=5557)1896    def test_worker_stop_timeout(self):1897        class MyTestUser(User):1898            _test_state = 01899            @task1900            def the_task(self):1901                MyTestUser._test_state = 11902                gevent.sleep(0.2)1903                MyTestUser._test_state = 21904        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1905            environment = Environment()1906            worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1907            self.assertEqual(1, len(client.outbox))1908            self.assertEqual("client_ready", client.outbox[0].type)1909            client.mocked_send(1910                Message(1911                    "spawn",1912                    {1913                        "timestamp": 1605538584,1914                        "user_classes_count": {"MyTestUser": 1},1915                        "host": "",1916                        "stop_timeout": 1,1917                        "parsed_options": {},1918                    },1919                    "dummy_client_id",1920                )1921            )1922            # wait for worker to spawn locusts1923            self.assertIn("spawning", [m.type for m in client.outbox])1924            worker.spawning_greenlet.join()1925            self.assertEqual(1, len(worker.user_greenlets))1926            # check that locust has started running1927            gevent.sleep(0.01)1928            self.assertEqual(1, MyTestUser._test_state)1929            # send stop message1930            client.mocked_send(Message("stop", None, "dummy_client_id"))1931            worker.user_greenlets.join()1932            # check that locust user got to finish1933            self.assertEqual(2, MyTestUser._test_state)1934    def test_worker_without_stop_timeout(self):1935        class MyTestUser(User):1936            _test_state = 01937            @task1938            def the_task(self):1939                MyTestUser._test_state = 11940                gevent.sleep(0.2)1941                MyTestUser._test_state = 21942        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1943            environment = Environment(stop_timeout=None)1944            worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1945            self.assertEqual(1, len(client.outbox))1946            self.assertEqual("client_ready", client.outbox[0].type)1947            client.mocked_send(1948                Message(1949                    "spawn",1950                    {1951                        "timestamp": 1605538584,1952                        "user_classes_count": {"MyTestUser": 1},1953                        "host": "",1954                        "stop_timeout": None,1955                        "parsed_options": {},1956                    },1957                    "dummy_client_id",1958                )1959            )1960            # print("outbox:", client.outbox)1961            # wait for worker to spawn locusts1962            self.assertIn("spawning", [m.type for m in client.outbox])1963            worker.spawning_greenlet.join()1964            self.assertEqual(1, len(worker.user_greenlets))1965            # check that locust has started running1966            gevent.sleep(0.01)1967            self.assertEqual(1, MyTestUser._test_state)1968            # send stop message1969            client.mocked_send(Message("stop", None, "dummy_client_id"))1970            worker.user_greenlets.join()1971            # check that locust user did not get to finish1972            self.assertEqual(1, MyTestUser._test_state)1973    def test_spawn_message_with_older_timestamp_is_rejected(self):1974        class MyUser(User):1975            wait_time = constant(1)1976            def start(self, group: Group):1977                # We do this so that the spawning does not finish1978                # too quickly1979                gevent.sleep(0.1)1980                return super().start(group)1981            @task1982            def my_task(self):1983                pass1984        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1985            environment = Environment()1986            worker = self.get_runner(environment=environment, user_classes=[MyUser])1987            client.mocked_send(1988                Message(1989                    "spawn",1990                    {1991                        "timestamp": 1605538584,1992                        "user_classes_count": {"MyUser": 10},1993                        "host": "",1994                        "stop_timeout": None,1995                        "parsed_options": {},1996                    },1997                    "dummy_client_id",1998                )1999            )2000            sleep(0.6)2001            self.assertEqual(STATE_SPAWNING, worker.state)2002            worker.spawning_greenlet.join()2003            self.assertEqual(10, worker.user_count)2004            # Send same timestamp as the first message2005            client.mocked_send(2006                Message(2007                    "spawn",2008                    {2009                        "timestamp": 1605538584,2010                        "user_classes_count": {"MyUser": 9},2011                        "host": "",2012                        "stop_timeout": None,2013                        "parsed_options": {},2014                    },2015                    "dummy_client_id",2016                )2017            )2018            worker.spawning_greenlet.join()2019            # Still 10 users2020            self.assertEqual(10, worker.user_count)2021            # Send older timestamp than the first message2022            client.mocked_send(2023                Message(2024                    "spawn",2025                    {2026                        "timestamp": 1605538583,2027                        "user_classes_count": {"MyUser": 2},2028                        "host": "",2029                        "stop_timeout": None,2030                        "parsed_options": {},2031                    },2032                    "dummy_client_id",2033                )2034            )2035            worker.spawning_greenlet.join()2036            # Still 10 users2037            self.assertEqual(10, worker.user_count)2038            # Send newer timestamp than the first message2039            client.mocked_send(2040                Message(2041                    "spawn",2042                    {2043                        "timestamp": 1605538585,2044                        "user_classes_count": {"MyUser": 2},2045                        "host": "",2046                        "stop_timeout": None,2047                        "parsed_options": {},2048                    },2049                    "dummy_client_id",2050                )2051            )2052            worker.spawning_greenlet.join()2053            self.assertEqual(2, worker.user_count)2054            worker.quit()2055    def test_worker_messages_sent_to_master(self):2056        """2057        Ensure that worker includes both "user_count" and "user_classes_count"2058        when reporting to the master.2059        """2060        class MyUser(User):2061            wait_time = constant(1)2062            def start(self, group: Group):2063                # We do this so that the spawning does not finish2064                # too quickly2065                gevent.sleep(0.1)2066                return super().start(group)2067            @task2068            def my_task(self):2069                pass2070        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2071            environment = Environment()2072            worker = self.get_runner(environment=environment, user_classes=[MyUser])2073            client.mocked_send(2074                Message(2075                    "spawn",2076                    {2077                        "timestamp": 1605538584,2078                        "user_classes_count": {"MyUser": 10},2079                        "host": "",2080                        "stop_timeout": None,2081                        "parsed_options": {},2082                    },2083                    "dummy_client_id",2084                )2085            )2086            sleep(0.6)2087            self.assertEqual(STATE_SPAWNING, worker.state)2088            worker.spawning_greenlet.join()2089            self.assertEqual(10, worker.user_count)2090            sleep(2)2091            message = next((m for m in reversed(client.outbox) if m.type == "stats"), None)2092            self.assertIsNotNone(message)2093            self.assertIn("user_count", message.data)2094            self.assertIn("user_classes_count", message.data)2095            self.assertEqual(message.data["user_count"], 10)2096            self.assertEqual(message.data["user_classes_count"]["MyUser"], 10)2097            message = next((m for m in client.outbox if m.type == "spawning_complete"), None)2098            self.assertIsNotNone(message)2099            self.assertIn("user_count", message.data)2100            self.assertIn("user_classes_count", message.data)2101            self.assertEqual(message.data["user_count"], 10)2102            self.assertEqual(message.data["user_classes_count"]["MyUser"], 10)2103            worker.quit()2104    def test_worker_heartbeat_messages_sent_to_master(self):2105        """2106        Validate content of the heartbeat payload sent to the master.2107        """2108        class MyUser(User):2109            wait_time = constant(1)2110            @task2111            def my_task(self):2112                pass2113        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2114            environment = Environment()2115            worker = self.get_runner(environment=environment, user_classes=[MyUser])2116            t0 = time.perf_counter()2117            while len([m for m in client.outbox if m.type == "heartbeat"]) == 0:2118                self.assertLessEqual(time.perf_counter() - t0, 3)2119                sleep(0.1)2120            message = next((m for m in reversed(client.outbox) if m.type == "heartbeat"))2121            self.assertEqual(len(message.data), 2)2122            self.assertIn("state", message.data)2123            self.assertIn("current_cpu_usage", message.data)2124            worker.quit()2125    def test_change_user_count_during_spawning(self):2126        class MyUser(User):2127            wait_time = constant(1)2128            def start(self, group: Group):2129                # We do this so that the spawning does not finish2130                # too quickly2131                gevent.sleep(0.1)2132                return super().start(group)2133            @task2134            def my_task(self):2135                pass2136        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2137            environment = Environment()2138            worker = self.get_runner(environment=environment, user_classes=[MyUser])2139            client.mocked_send(2140                Message(2141                    "spawn",2142                    {2143                        "timestamp": 1605538584,2144                        "user_classes_count": {"MyUser": 10},2145                        "host": "",2146                        "stop_timeout": None,2147                        "parsed_options": {},2148                    },2149                    "dummy_client_id",2150                )2151            )2152            sleep(0.6)2153            self.assertEqual(STATE_SPAWNING, worker.state)2154            client.mocked_send(2155                Message(2156                    "spawn",2157                    {2158                        "timestamp": 1605538585,2159                        "user_classes_count": {"MyUser": 9},2160                        "host": "",2161                        "stop_timeout": None,2162                        "parsed_options": {},2163                    },2164                    "dummy_client_id",2165                )2166            )2167            sleep(0)2168            worker.spawning_greenlet.join()2169            self.assertEqual(9, len(worker.user_greenlets))2170            worker.quit()2171    def test_computed_properties(self):2172        class MyUser1(User):2173            wait_time = constant(1)2174            @task2175            def my_task(self):2176                pass2177        class MyUser2(User):2178            wait_time = constant(1)2179            @task2180            def my_task(self):2181                pass2182        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2183            environment = Environment()2184            worker = self.get_runner(environment=environment, user_classes=[MyUser1, MyUser2])2185            client.mocked_send(2186                Message(2187                    "spawn",2188                    {2189                        "timestamp": 1605538584,2190                        "user_classes_count": {"MyUser1": 10, "MyUser2": 10},2191                        "host": "",2192                        "stop_timeout": None,2193                        "parsed_options": {},2194                    },2195                    "dummy_client_id",2196                )2197            )2198            worker.spawning_greenlet.join()2199            self.assertDictEqual(worker.user_classes_count, {"MyUser1": 10, "MyUser2": 10})2200            self.assertDictEqual(worker.target_user_classes_count, {"MyUser1": 10, "MyUser2": 10})2201            self.assertEqual(worker.target_user_count, 20)2202            client.mocked_send(2203                Message(2204                    "spawn",2205                    {2206                        "timestamp": 1605538585,2207                        "user_classes_count": {"MyUser1": 1, "MyUser2": 2},2208                        "host": "",2209                        "stop_timeout": None,2210                        "parsed_options": {},2211                    },2212                    "dummy_client_id",2213                )2214            )2215            worker.spawning_greenlet.join()2216            self.assertDictEqual(worker.user_classes_count, {"MyUser1": 1, "MyUser2": 2})2217            self.assertDictEqual(worker.target_user_classes_count, {"MyUser1": 1, "MyUser2": 2})2218            self.assertEqual(worker.target_user_count, 3)2219            worker.quit()2220    def test_custom_message_send(self):2221        class MyUser(User):2222            wait_time = constant(1)2223            @task2224            def my_task(self):2225                pass2226        with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2227            environment = Environment()2228            worker = self.get_runner(environment=environment, user_classes=[MyUser])2229            client.outbox.clear()2230            worker.send_message("test_custom_msg", {"test_data": 123})2231            self.assertEqual("test_custom_msg", client.outbox[0].type)2232            self.assertEqual(123, client.outbox[0].data["test_data"])2233            worker.quit()2234    def test_custom_message_receive(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
