Best Python code snippet using locust
test_web.py
Source:test_web.py  
...21from locust.web import WebUI22from .testcases import LocustTestCase23from .util import create_tls_cert24class _HeaderCheckMixin:25    def _check_csv_headers(self, headers, exp_fn_prefix):26        # Check common headers for csv file download request27        self.assertIn("Content-Type", headers)28        content_type = headers["Content-Type"]29        self.assertIn("text/csv", content_type)30        self.assertIn("Content-disposition", headers)31        disposition = headers[32            "Content-disposition"33        ]  # e.g.: 'attachment; filename=requests_full_history_1597586811.5084946.csv'34        self.assertIn(exp_fn_prefix, disposition)35class TestWebUI(LocustTestCase, _HeaderCheckMixin):36    def setUp(self):37        super().setUp()38        parser = get_parser(default_config_files=[])39        self.environment.parsed_options = parser.parse_args([])40        self.stats = self.environment.stats41        self.web_ui = self.environment.create_web_ui("127.0.0.1", 0)42        self.web_ui.app.view_functions["request_stats"].clear_cache()43        gevent.sleep(0.01)44        self.web_port = self.web_ui.server.server_port45    def tearDown(self):46        super().tearDown()47        self.web_ui.stop()48        self.runner.quit()49    def test_web_ui_reference_on_environment(self):50        self.assertEqual(self.web_ui, self.environment.web_ui)51    def test_web_ui_no_runner(self):52        env = Environment()53        web_ui = WebUI(env, "127.0.0.1", 0)54        gevent.sleep(0.01)55        try:56            response = requests.get("http://127.0.0.1:%i/" % web_ui.server.server_port)57            self.assertEqual(500, response.status_code)58            self.assertEqual("Error: Locust Environment does not have any runner", response.text)59        finally:60            web_ui.stop()61    def test_index(self):62        self.assertEqual(200, requests.get("http://127.0.0.1:%i/" % self.web_port).status_code)63    def test_index_with_spawn_options(self):64        html_to_option = {65            "user_count": ["-u", "100"],66            "spawn_rate": ["-r", "10.0"],67        }68        for html_name_to_test in html_to_option.keys():69            # Test that setting each spawn option individually populates the corresponding field in the html, and none of the others70            self.environment.parsed_options = parse_options(html_to_option[html_name_to_test])71            response = requests.get("http://127.0.0.1:%i/" % self.web_port)72            self.assertEqual(200, response.status_code)73            d = pq(response.content.decode("utf-8"))74            for html_name in html_to_option.keys():75                start_value = d(f".start [name={html_name}]").attr("value")76                edit_value = d(f".edit [name={html_name}]").attr("value")77                if html_name_to_test == html_name:78                    self.assertEqual(html_to_option[html_name][1], start_value)79                    self.assertEqual(html_to_option[html_name][1], edit_value)80                else:81                    self.assertEqual("", start_value)82                    self.assertEqual("", edit_value)83    def test_stats_no_data(self):84        self.assertEqual(200, requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).status_code)85    def test_stats(self):86        self.stats.log_request("GET", "/<html>", 120, 5612)87        response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)88        self.assertEqual(200, response.status_code)89        data = json.loads(response.text)90        self.assertEqual(2, len(data["stats"]))  # one entry plus Aggregated91        self.assertEqual("/<html>", data["stats"][0]["name"])92        self.assertEqual("/<html>", data["stats"][0]["safe_name"])93        self.assertEqual("GET", data["stats"][0]["method"])94        self.assertEqual(120, data["stats"][0]["avg_response_time"])95        self.assertEqual("Aggregated", data["stats"][1]["name"])96        self.assertEqual(1, data["stats"][1]["num_requests"])97        self.assertEqual(120, data["stats"][1]["avg_response_time"])98    def test_stats_cache(self):99        self.stats.log_request("GET", "/test", 120, 5612)100        response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)101        self.assertEqual(200, response.status_code)102        data = json.loads(response.text)103        self.assertEqual(2, len(data["stats"]))  # one entry plus Aggregated104        # add another entry105        self.stats.log_request("GET", "/test2", 120, 5612)106        data = json.loads(requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).text)107        self.assertEqual(2, len(data["stats"]))  # old value should be cached now108        self.web_ui.app.view_functions["request_stats"].clear_cache()109        data = json.loads(requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).text)110        self.assertEqual(3, len(data["stats"]))  # this should no longer be cached111    def test_stats_rounding(self):112        self.stats.log_request("GET", "/test", 1.39764125, 2)113        self.stats.log_request("GET", "/test", 999.9764125, 1000)114        response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)115        self.assertEqual(200, response.status_code)116        data = json.loads(response.text)117        self.assertEqual(1, data["stats"][0]["min_response_time"])118        self.assertEqual(1000, data["stats"][0]["max_response_time"])119    def test_request_stats_csv(self):120        self.stats.log_request("GET", "/test2", 120, 5612)121        response = requests.get("http://127.0.0.1:%i/stats/requests/csv" % self.web_port)122        self.assertEqual(200, response.status_code)123        self._check_csv_headers(response.headers, "requests")124    def test_request_stats_full_history_csv_not_present(self):125        self.stats.log_request("GET", "/test2", 120, 5612)126        response = requests.get("http://127.0.0.1:%i/stats/requests_full_history/csv" % self.web_port)127        self.assertEqual(404, response.status_code)128    def test_failure_stats_csv(self):129        self.stats.log_error("GET", "/", Exception("Error1337"))130        response = requests.get("http://127.0.0.1:%i/stats/failures/csv" % self.web_port)131        self.assertEqual(200, response.status_code)132        self._check_csv_headers(response.headers, "failures")133    def test_request_stats_with_errors(self):134        self.stats.log_error("GET", "/", Exception("Error1337"))135        response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)136        self.assertEqual(200, response.status_code)137        self.assertIn("Error1337", response.text)138    def test_reset_stats(self):139        try:140            raise Exception("A cool test exception")141        except Exception as e:142            tb = e.__traceback__143            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))144            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))145        self.stats.log_request("GET", "/test", 120, 5612)146        self.stats.log_error("GET", "/", Exception("Error1337"))147        response = requests.get("http://127.0.0.1:%i/stats/reset" % self.web_port)148        self.assertEqual(200, response.status_code)149        self.assertEqual({}, self.stats.errors)150        self.assertEqual({}, self.runner.exceptions)151        self.assertEqual(0, self.stats.get("/", "GET").num_requests)152        self.assertEqual(0, self.stats.get("/", "GET").num_failures)153        self.assertEqual(0, self.stats.get("/test", "GET").num_requests)154        self.assertEqual(0, self.stats.get("/test", "GET").num_failures)155    def test_exceptions(self):156        try:157            raise Exception("A cool test exception")158        except Exception as e:159            tb = e.__traceback__160            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))161            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))162        response = requests.get("http://127.0.0.1:%i/exceptions" % self.web_port)163        self.assertEqual(200, response.status_code)164        self.assertIn("A cool test exception", response.text)165        response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)166        self.assertEqual(200, response.status_code)167    def test_exceptions_csv(self):168        try:169            raise Exception("Test exception")170        except Exception as e:171            tb = e.__traceback__172            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))173            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))174        response = requests.get("http://127.0.0.1:%i/exceptions/csv" % self.web_port)175        self.assertEqual(200, response.status_code)176        self._check_csv_headers(response.headers, "exceptions")177        reader = csv.reader(StringIO(response.text))178        rows = []179        for row in reader:180            rows.append(row)181        self.assertEqual(2, len(rows))182        self.assertEqual("Test exception", rows[1][1])183        self.assertEqual(2, int(rows[1][0]), "Exception count should be 2")184    def test_swarm_host_value_specified(self):185        class MyUser(User):186            wait_time = constant(1)187            @task(1)188            def my_task(self):189                pass190        self.environment.user_classes = [MyUser]191        response = requests.post(192            "http://127.0.0.1:%i/swarm" % self.web_port,193            data={"user_count": 5, "spawn_rate": 5, "host": "https://localhost"},194        )195        self.assertEqual(200, response.status_code)196        self.assertEqual("https://localhost", response.json()["host"])197        self.assertEqual(self.environment.host, "https://localhost")198    def test_swarm_host_value_not_specified(self):199        class MyUser(User):200            wait_time = constant(1)201            @task(1)202            def my_task(self):203                pass204        self.environment.user_classes = [MyUser]205        response = requests.post(206            "http://127.0.0.1:%i/swarm" % self.web_port,207            data={"user_count": 5, "spawn_rate": 5},208        )209        self.assertEqual(200, response.status_code)210        self.assertEqual(None, response.json()["host"])211        self.assertEqual(self.environment.host, None)212    def test_host_value_from_user_class(self):213        class MyUser(User):214            host = "http://example.com"215        self.environment.user_classes = [MyUser]216        response = requests.get("http://127.0.0.1:%i/" % self.web_port)217        self.assertEqual(200, response.status_code)218        self.assertIn("http://example.com", response.content.decode("utf-8"))219        self.assertNotIn("setting this will override the host on all User classes", response.content.decode("utf-8"))220    def test_host_value_from_multiple_user_classes(self):221        class MyUser(User):222            host = "http://example.com"223        class MyUser2(User):224            host = "http://example.com"225        self.environment.user_classes = [MyUser, MyUser2]226        response = requests.get("http://127.0.0.1:%i/" % self.web_port)227        self.assertEqual(200, response.status_code)228        self.assertIn("http://example.com", response.content.decode("utf-8"))229        self.assertNotIn("setting this will override the host on all User classes", response.content.decode("utf-8"))230    def test_host_value_from_multiple_user_classes_different_hosts(self):231        class MyUser(User):232            host = None233        class MyUser2(User):234            host = "http://example.com"235        self.environment.user_classes = [MyUser, MyUser2]236        response = requests.get("http://127.0.0.1:%i/" % self.web_port)237        self.assertEqual(200, response.status_code)238        self.assertNotIn("http://example.com", response.content.decode("utf-8"))239        self.assertIn("setting this will override the host on all User classes", response.content.decode("utf-8"))240    def test_report_page(self):241        self.stats.log_request("GET", "/test", 120, 5612)242        r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)243        self.assertEqual(200, r.status_code)244        self.assertIn("<title>Test Report</title>", r.text)245        self.assertIn("charts-container", r.text)246        self.assertIn(247            '<a href="?download=1">Download the Report</a>',248            r.text,249            "Download report link not found in HTML content",250        )251    def test_report_page_empty_stats(self):252        r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)253        self.assertEqual(200, r.status_code)254        self.assertIn("<title>Test Report</title>", r.text)255        self.assertIn("charts-container", r.text)256    def test_report_download(self):257        self.stats.log_request("GET", "/test", 120, 5612)258        r = requests.get("http://127.0.0.1:%i/stats/report?download=1" % self.web_port)259        self.assertEqual(200, r.status_code)260        self.assertIn("attachment", r.headers.get("Content-Disposition", ""))261        self.assertNotIn("Download the Report", r.text, "Download report link found in HTML content")262    def test_report_host(self):263        self.environment.host = "http://test.com"264        self.stats.log_request("GET", "/test", 120, 5612)265        r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)266        self.assertEqual(200, r.status_code)267        self.assertIn("http://test.com", r.text)268    def test_report_host2(self):269        class MyUser(User):270            host = "http://test2.com"271            @task272            def my_task(self):273                pass274        self.environment.host = None275        self.environment.user_classes = [MyUser]276        self.stats.log_request("GET", "/test", 120, 5612)277        r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)278        self.assertEqual(200, r.status_code)279        self.assertIn("http://test2.com", r.text)280    def test_report_exceptions(self):281        try:282            raise Exception("Test exception")283        except Exception as e:284            tb = e.__traceback__285            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))286            self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))287        self.stats.log_request("GET", "/test", 120, 5612)288        r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)289        # self.assertEqual(200, r.status_code)290        self.assertIn("<h2>Exceptions Statistics</h2>", r.text)291class TestWebUIAuth(LocustTestCase):292    def setUp(self):293        super().setUp()294        parser = get_parser(default_config_files=[])295        options = parser.parse_args(["--web-auth", "john:doe"])296        self.runner = Runner(self.environment)297        self.stats = self.runner.stats298        self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, auth_credentials=options.web_auth)299        self.web_ui.app.view_functions["request_stats"].clear_cache()300        gevent.sleep(0.01)301        self.web_port = self.web_ui.server.server_port302    def tearDown(self):303        super().tearDown()304        self.web_ui.stop()305        self.runner.quit()306    def test_index_with_basic_auth_enabled_correct_credentials(self):307        self.assertEqual(308            200, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port, auth=("john", "doe")).status_code309        )310    def test_index_with_basic_auth_enabled_incorrect_credentials(self):311        self.assertEqual(312            401, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port, auth=("john", "invalid")).status_code313        )314    def test_index_with_basic_auth_enabled_blank_credentials(self):315        self.assertEqual(401, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port).status_code)316class TestWebUIWithTLS(LocustTestCase):317    def setUp(self):318        super().setUp()319        tls_cert, tls_key = create_tls_cert("127.0.0.1")320        self.tls_cert_file = NamedTemporaryFile(delete=False)321        self.tls_key_file = NamedTemporaryFile(delete=False)322        with open(self.tls_cert_file.name, "w") as f:323            f.write(tls_cert.decode())324        with open(self.tls_key_file.name, "w") as f:325            f.write(tls_key.decode())326        parser = get_parser(default_config_files=[])327        options = parser.parse_args(328            [329                "--tls-cert",330                self.tls_cert_file.name,331                "--tls-key",332                self.tls_key_file.name,333            ]334        )335        self.runner = Runner(self.environment)336        self.stats = self.runner.stats337        self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, tls_cert=options.tls_cert, tls_key=options.tls_key)338        gevent.sleep(0.01)339        self.web_port = self.web_ui.server.server_port340    def tearDown(self):341        super().tearDown()342        self.web_ui.stop()343        self.runner.quit()344        os.unlink(self.tls_cert_file.name)345        os.unlink(self.tls_key_file.name)346    def test_index_with_https(self):347        # Suppress only the single warning from urllib3 needed.348        from urllib3.exceptions import InsecureRequestWarning349        requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)350        self.assertEqual(200, requests.get("https://127.0.0.1:%i/" % self.web_port, verify=False).status_code)351class TestWebUIFullHistory(LocustTestCase, _HeaderCheckMixin):352    STATS_BASE_NAME = "web_test"353    STATS_FILENAME = "{}_stats.csv".format(STATS_BASE_NAME)354    STATS_HISTORY_FILENAME = "{}_stats_history.csv".format(STATS_BASE_NAME)355    STATS_FAILURES_FILENAME = "{}_failures.csv".format(STATS_BASE_NAME)356    def setUp(self):357        super().setUp()358        self.remove_files_if_exists()359        parser = get_parser(default_config_files=[])360        self.environment.parsed_options = parser.parse_args(["--csv", self.STATS_BASE_NAME, "--csv-full-history"])361        self.stats = self.environment.stats362        self.stats.CSV_STATS_INTERVAL_SEC = 0.02363        locust.stats.CSV_STATS_INTERVAL_SEC = 0.1364        self.stats_csv_writer = StatsCSVFileWriter(365            self.environment, stats.PERCENTILES_TO_REPORT, self.STATS_BASE_NAME, full_history=True366        )367        self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, stats_csv_writer=self.stats_csv_writer)368        self.web_ui.app.view_functions["request_stats"].clear_cache()369        gevent.sleep(0.01)370        self.web_port = self.web_ui.server.server_port371    def tearDown(self):372        super().tearDown()373        self.web_ui.stop()374        self.runner.quit()375        self.remove_files_if_exists()376    def remove_file_if_exists(self, filename):377        if os.path.exists(filename):378            os.remove(filename)379    def remove_files_if_exists(self):380        self.remove_file_if_exists(self.STATS_FILENAME)381        self.remove_file_if_exists(self.STATS_HISTORY_FILENAME)382        self.remove_file_if_exists(self.STATS_FAILURES_FILENAME)383    def test_request_stats_full_history_csv(self):384        self.stats.log_request("GET", "/test", 1.39764125, 2)385        self.stats.log_request("GET", "/test", 999.9764125, 1000)386        self.stats.log_request("GET", "/test2", 120, 5612)387        greenlet = gevent.spawn(self.stats_csv_writer.stats_writer)388        gevent.sleep(0.01)389        self.stats_csv_writer.stats_history_flush()390        gevent.kill(greenlet)391        response = requests.get("http://127.0.0.1:%i/stats/requests_full_history/csv" % self.web_port)392        self.assertEqual(200, response.status_code)393        self._check_csv_headers(response.headers, "requests_full_history")394        self.assertIn("Content-Length", response.headers)395        reader = csv.reader(StringIO(response.text))396        rows = [r for r in reader]397        self.assertEqual(4, len(rows))398        self.assertEqual("Timestamp", rows[0][0])399        self.assertEqual("GET", rows[1][2])400        self.assertEqual("/test", rows[1][3])401        self.assertEqual("/test2", rows[2][3])402        self.assertEqual("", rows[3][2])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
