How to use test_custom_message_send method in locust

Best Python code snippet using locust

test_runners.py

Source:test_runners.py Github

copy

Full Screen

...1799 master.start(1, 3)1800 self.assertEqual({"MyUser1": 1, "MyUser2": 0}, master.target_user_classes_count)1801 self.assertEqual(1, master.target_user_count)1802 self.assertEqual(3, master.spawn_rate)1803 def test_custom_message_send(self):1804 class MyUser(User):1805 wait_time = constant(1)1806 @task1807 def my_task(self):1808 pass1809 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1810 master = self.get_runner()1811 for i in range(5):1812 master.clients[i] = WorkerNode(str(i))1813 master.send_message("test_custom_msg", {"test_data": 123})1814 self.assertEqual(5, len(server.outbox))1815 for _, msg in server.outbox:1816 self.assertEqual("test_custom_msg", msg.type)1817 self.assertEqual(123, msg.data["test_data"])1818 def test_custom_message_receive(self):1819 class MyUser(User):1820 wait_time = constant(1)1821 @task1822 def my_task(self):1823 pass1824 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1825 test_custom_msg = [False]1826 test_custom_msg_data = [{}]1827 def on_custom_msg(msg, **kw):1828 test_custom_msg[0] = True1829 test_custom_msg_data[0] = msg.data1830 master = self.get_runner()1831 master.register_message("test_custom_msg", on_custom_msg)1832 server.mocked_send(Message("test_custom_msg", {"test_data": 123}, "dummy_id"))1833 self.assertTrue(test_custom_msg[0])1834 self.assertEqual(123, test_custom_msg_data[0]["test_data"])1835 def test_undefined_custom_message_receive(self):1836 class MyUser(User):1837 wait_time = constant(1)1838 @task1839 def my_task(self):1840 pass1841 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1842 test_custom_msg = [False]1843 def on_custom_msg(msg, **kw):1844 test_custom_msg[0] = True1845 master = self.get_runner()1846 master.register_message("test_custom_msg", on_custom_msg)1847 server.mocked_send(Message("unregistered_custom_msg", {}, "dummy_id"))1848 self.assertFalse(test_custom_msg[0])1849 self.assertEqual(1, len(self.mocked_log.warning))1850 msg = self.mocked_log.warning[0]1851 self.assertIn("Unknown message type recieved from worker", msg)1852 def test_wait_for_workers_report_after_ramp_up(self):1853 def assert_cache_hits():1854 self.assertEqual(master._wait_for_workers_report_after_ramp_up.cache_info().hits, 0)1855 master._wait_for_workers_report_after_ramp_up()1856 self.assertEqual(master._wait_for_workers_report_after_ramp_up.cache_info().hits, 1)1857 master = self.get_runner()1858 master._wait_for_workers_report_after_ramp_up.cache_clear()1859 self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 0.1)1860 assert_cache_hits()1861 master._wait_for_workers_report_after_ramp_up.cache_clear()1862 with _patch_env("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7"):1863 self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7)1864 assert_cache_hits()1865 master._wait_for_workers_report_after_ramp_up.cache_clear()1866 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=1.5), _patch_env(1867 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "5.7 * WORKER_REPORT_INTERVAL"1868 ):1869 self.assertEqual(master._wait_for_workers_report_after_ramp_up(), 5.7 * 1.5)1870 assert_cache_hits()1871 master._wait_for_workers_report_after_ramp_up.cache_clear()1872@contextmanager1873def _patch_env(name: str, value: str):1874 prev_value = os.getenv(name)1875 os.environ[name] = value1876 try:1877 yield1878 finally:1879 if prev_value is None:1880 del os.environ[name]1881 else:1882 os.environ[name] = prev_value1883class TestWorkerRunner(LocustTestCase):1884 def setUp(self):1885 super().setUp()1886 # self._report_to_master_event_handlers = [h for h in events.report_to_master._handlers]1887 def tearDown(self):1888 # events.report_to_master._handlers = self._report_to_master_event_handlers1889 super().tearDown()1890 def get_runner(self, environment=None, user_classes=None):1891 if environment is None:1892 environment = self.environment1893 user_classes = user_classes or []1894 environment.user_classes = user_classes1895 return WorkerRunner(environment, master_host="localhost", master_port=5557)1896 def test_worker_stop_timeout(self):1897 class MyTestUser(User):1898 _test_state = 01899 @task1900 def the_task(self):1901 MyTestUser._test_state = 11902 gevent.sleep(0.2)1903 MyTestUser._test_state = 21904 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1905 environment = Environment()1906 worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1907 self.assertEqual(1, len(client.outbox))1908 self.assertEqual("client_ready", client.outbox[0].type)1909 client.mocked_send(1910 Message(1911 "spawn",1912 {1913 "timestamp": 1605538584,1914 "user_classes_count": {"MyTestUser": 1},1915 "host": "",1916 "stop_timeout": 1,1917 "parsed_options": {},1918 },1919 "dummy_client_id",1920 )1921 )1922 # wait for worker to spawn locusts1923 self.assertIn("spawning", [m.type for m in client.outbox])1924 worker.spawning_greenlet.join()1925 self.assertEqual(1, len(worker.user_greenlets))1926 # check that locust has started running1927 gevent.sleep(0.01)1928 self.assertEqual(1, MyTestUser._test_state)1929 # send stop message1930 client.mocked_send(Message("stop", None, "dummy_client_id"))1931 worker.user_greenlets.join()1932 # check that locust user got to finish1933 self.assertEqual(2, MyTestUser._test_state)1934 def test_worker_without_stop_timeout(self):1935 class MyTestUser(User):1936 _test_state = 01937 @task1938 def the_task(self):1939 MyTestUser._test_state = 11940 gevent.sleep(0.2)1941 MyTestUser._test_state = 21942 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1943 environment = Environment(stop_timeout=None)1944 worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1945 self.assertEqual(1, len(client.outbox))1946 self.assertEqual("client_ready", client.outbox[0].type)1947 client.mocked_send(1948 Message(1949 "spawn",1950 {1951 "timestamp": 1605538584,1952 "user_classes_count": {"MyTestUser": 1},1953 "host": "",1954 "stop_timeout": None,1955 "parsed_options": {},1956 },1957 "dummy_client_id",1958 )1959 )1960 # print("outbox:", client.outbox)1961 # wait for worker to spawn locusts1962 self.assertIn("spawning", [m.type for m in client.outbox])1963 worker.spawning_greenlet.join()1964 self.assertEqual(1, len(worker.user_greenlets))1965 # check that locust has started running1966 gevent.sleep(0.01)1967 self.assertEqual(1, MyTestUser._test_state)1968 # send stop message1969 client.mocked_send(Message("stop", None, "dummy_client_id"))1970 worker.user_greenlets.join()1971 # check that locust user did not get to finish1972 self.assertEqual(1, MyTestUser._test_state)1973 def test_spawn_message_with_older_timestamp_is_rejected(self):1974 class MyUser(User):1975 wait_time = constant(1)1976 def start(self, group: Group):1977 # We do this so that the spawning does not finish1978 # too quickly1979 gevent.sleep(0.1)1980 return super().start(group)1981 @task1982 def my_task(self):1983 pass1984 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1985 environment = Environment()1986 worker = self.get_runner(environment=environment, user_classes=[MyUser])1987 client.mocked_send(1988 Message(1989 "spawn",1990 {1991 "timestamp": 1605538584,1992 "user_classes_count": {"MyUser": 10},1993 "host": "",1994 "stop_timeout": None,1995 "parsed_options": {},1996 },1997 "dummy_client_id",1998 )1999 )2000 sleep(0.6)2001 self.assertEqual(STATE_SPAWNING, worker.state)2002 worker.spawning_greenlet.join()2003 self.assertEqual(10, worker.user_count)2004 # Send same timestamp as the first message2005 client.mocked_send(2006 Message(2007 "spawn",2008 {2009 "timestamp": 1605538584,2010 "user_classes_count": {"MyUser": 9},2011 "host": "",2012 "stop_timeout": None,2013 "parsed_options": {},2014 },2015 "dummy_client_id",2016 )2017 )2018 worker.spawning_greenlet.join()2019 # Still 10 users2020 self.assertEqual(10, worker.user_count)2021 # Send older timestamp than the first message2022 client.mocked_send(2023 Message(2024 "spawn",2025 {2026 "timestamp": 1605538583,2027 "user_classes_count": {"MyUser": 2},2028 "host": "",2029 "stop_timeout": None,2030 "parsed_options": {},2031 },2032 "dummy_client_id",2033 )2034 )2035 worker.spawning_greenlet.join()2036 # Still 10 users2037 self.assertEqual(10, worker.user_count)2038 # Send newer timestamp than the first message2039 client.mocked_send(2040 Message(2041 "spawn",2042 {2043 "timestamp": 1605538585,2044 "user_classes_count": {"MyUser": 2},2045 "host": "",2046 "stop_timeout": None,2047 "parsed_options": {},2048 },2049 "dummy_client_id",2050 )2051 )2052 worker.spawning_greenlet.join()2053 self.assertEqual(2, worker.user_count)2054 worker.quit()2055 def test_worker_messages_sent_to_master(self):2056 """2057 Ensure that worker includes both "user_count" and "user_classes_count"2058 when reporting to the master.2059 """2060 class MyUser(User):2061 wait_time = constant(1)2062 def start(self, group: Group):2063 # We do this so that the spawning does not finish2064 # too quickly2065 gevent.sleep(0.1)2066 return super().start(group)2067 @task2068 def my_task(self):2069 pass2070 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2071 environment = Environment()2072 worker = self.get_runner(environment=environment, user_classes=[MyUser])2073 client.mocked_send(2074 Message(2075 "spawn",2076 {2077 "timestamp": 1605538584,2078 "user_classes_count": {"MyUser": 10},2079 "host": "",2080 "stop_timeout": None,2081 "parsed_options": {},2082 },2083 "dummy_client_id",2084 )2085 )2086 sleep(0.6)2087 self.assertEqual(STATE_SPAWNING, worker.state)2088 worker.spawning_greenlet.join()2089 self.assertEqual(10, worker.user_count)2090 sleep(2)2091 message = next((m for m in reversed(client.outbox) if m.type == "stats"), None)2092 self.assertIsNotNone(message)2093 self.assertIn("user_count", message.data)2094 self.assertIn("user_classes_count", message.data)2095 self.assertEqual(message.data["user_count"], 10)2096 self.assertEqual(message.data["user_classes_count"]["MyUser"], 10)2097 message = next((m for m in client.outbox if m.type == "spawning_complete"), None)2098 self.assertIsNotNone(message)2099 self.assertIn("user_count", message.data)2100 self.assertIn("user_classes_count", message.data)2101 self.assertEqual(message.data["user_count"], 10)2102 self.assertEqual(message.data["user_classes_count"]["MyUser"], 10)2103 worker.quit()2104 def test_worker_heartbeat_messages_sent_to_master(self):2105 """2106 Validate content of the heartbeat payload sent to the master.2107 """2108 class MyUser(User):2109 wait_time = constant(1)2110 @task2111 def my_task(self):2112 pass2113 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2114 environment = Environment()2115 worker = self.get_runner(environment=environment, user_classes=[MyUser])2116 t0 = time.perf_counter()2117 while len([m for m in client.outbox if m.type == "heartbeat"]) == 0:2118 self.assertLessEqual(time.perf_counter() - t0, 3)2119 sleep(0.1)2120 message = next((m for m in reversed(client.outbox) if m.type == "heartbeat"))2121 self.assertEqual(len(message.data), 2)2122 self.assertIn("state", message.data)2123 self.assertIn("current_cpu_usage", message.data)2124 worker.quit()2125 def test_change_user_count_during_spawning(self):2126 class MyUser(User):2127 wait_time = constant(1)2128 def start(self, group: Group):2129 # We do this so that the spawning does not finish2130 # too quickly2131 gevent.sleep(0.1)2132 return super().start(group)2133 @task2134 def my_task(self):2135 pass2136 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2137 environment = Environment()2138 worker = self.get_runner(environment=environment, user_classes=[MyUser])2139 client.mocked_send(2140 Message(2141 "spawn",2142 {2143 "timestamp": 1605538584,2144 "user_classes_count": {"MyUser": 10},2145 "host": "",2146 "stop_timeout": None,2147 "parsed_options": {},2148 },2149 "dummy_client_id",2150 )2151 )2152 sleep(0.6)2153 self.assertEqual(STATE_SPAWNING, worker.state)2154 client.mocked_send(2155 Message(2156 "spawn",2157 {2158 "timestamp": 1605538585,2159 "user_classes_count": {"MyUser": 9},2160 "host": "",2161 "stop_timeout": None,2162 "parsed_options": {},2163 },2164 "dummy_client_id",2165 )2166 )2167 sleep(0)2168 worker.spawning_greenlet.join()2169 self.assertEqual(9, len(worker.user_greenlets))2170 worker.quit()2171 def test_computed_properties(self):2172 class MyUser1(User):2173 wait_time = constant(1)2174 @task2175 def my_task(self):2176 pass2177 class MyUser2(User):2178 wait_time = constant(1)2179 @task2180 def my_task(self):2181 pass2182 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2183 environment = Environment()2184 worker = self.get_runner(environment=environment, user_classes=[MyUser1, MyUser2])2185 client.mocked_send(2186 Message(2187 "spawn",2188 {2189 "timestamp": 1605538584,2190 "user_classes_count": {"MyUser1": 10, "MyUser2": 10},2191 "host": "",2192 "stop_timeout": None,2193 "parsed_options": {},2194 },2195 "dummy_client_id",2196 )2197 )2198 worker.spawning_greenlet.join()2199 self.assertDictEqual(worker.user_classes_count, {"MyUser1": 10, "MyUser2": 10})2200 self.assertDictEqual(worker.target_user_classes_count, {"MyUser1": 10, "MyUser2": 10})2201 self.assertEqual(worker.target_user_count, 20)2202 client.mocked_send(2203 Message(2204 "spawn",2205 {2206 "timestamp": 1605538585,2207 "user_classes_count": {"MyUser1": 1, "MyUser2": 2},2208 "host": "",2209 "stop_timeout": None,2210 "parsed_options": {},2211 },2212 "dummy_client_id",2213 )2214 )2215 worker.spawning_greenlet.join()2216 self.assertDictEqual(worker.user_classes_count, {"MyUser1": 1, "MyUser2": 2})2217 self.assertDictEqual(worker.target_user_classes_count, {"MyUser1": 1, "MyUser2": 2})2218 self.assertEqual(worker.target_user_count, 3)2219 worker.quit()2220 def test_custom_message_send(self):2221 class MyUser(User):2222 wait_time = constant(1)2223 @task2224 def my_task(self):2225 pass2226 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:2227 environment = Environment()2228 worker = self.get_runner(environment=environment, user_classes=[MyUser])2229 client.outbox.clear()2230 worker.send_message("test_custom_msg", {"test_data": 123})2231 self.assertEqual("test_custom_msg", client.outbox[0].type)2232 self.assertEqual(123, client.outbox[0].data["test_data"])2233 worker.quit()2234 def test_custom_message_receive(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful