Best Python code snippet using locust
main.py
Source:main.py  
1import inspect2import logging3import os4import importlib5import signal6import socket7import sys8import time9import gevent10import locust11from . import log12from .argument_parser import parse_locustfile_option, parse_options13from .env import Environment14from .log import setup_logging, greenlet_exception_logger15from . import stats16from .stats import print_error_report, print_percentile_stats, print_stats, stats_printer, stats_history17from .stats import StatsCSV, StatsCSVFileWriter18from .user import User19from .user.inspectuser import get_task_ratio_dict, print_task_ratio20from .util.timespan import parse_timespan21from .exception import AuthCredentialsError22from .shape import LoadTestShape23from .input_events import input_listener24version = locust.__version__25def is_user_class(item):26    """27    Check if a variable is a runnable (non-abstract) User class28    """29    return bool(inspect.isclass(item) and issubclass(item, User) and item.abstract is False)30def is_shape_class(item):31    """32    Check if a class is a LoadTestShape33    """34    return bool(35        inspect.isclass(item) and issubclass(item, LoadTestShape) and item.__dict__["__module__"] != "locust.shape"36    )37def load_locustfile(path):38    """39    Import given locustfile path and return (docstring, callables).40    Specifically, the locustfile's ``__doc__`` attribute (a string) and a41    dictionary of ``{'name': callable}`` containing all callables which pass42    the "is a Locust" test.43    """44    # Start with making sure the current working dir is in the sys.path45    sys.path.insert(0, os.getcwd())46    # Get directory and locustfile name47    directory, locustfile = os.path.split(path)48    # If the directory isn't in the PYTHONPATH, add it so our import will work49    added_to_path = False50    index = None51    if directory not in sys.path:52        sys.path.insert(0, directory)53        added_to_path = True54    # If the directory IS in the PYTHONPATH, move it to the front temporarily,55    # otherwise other locustfiles -- like Locusts's own -- may scoop the intended56    # one.57    else:58        i = sys.path.index(directory)59        if i != 0:60            # Store index for later restoration61            index = i62            # Add to front, then remove from original position63            sys.path.insert(0, directory)64            del sys.path[i + 1]65    # Perform the import66    source = importlib.machinery.SourceFileLoader(os.path.splitext(locustfile)[0], path)67    imported = source.load_module()68    # Remove directory from path if we added it ourselves (just to be neat)69    if added_to_path:70        del sys.path[0]71    # Put back in original index if we moved it72    if index is not None:73        sys.path.insert(index + 1, directory)74        del sys.path[0]75    # Return our two-tuple76    user_classes = {name: value for name, value in vars(imported).items() if is_user_class(value)}77    # Find shape class, if any, return it78    shape_classes = [value for name, value in vars(imported).items() if is_shape_class(value)]79    if shape_classes:80        shape_class = shape_classes[0]()81    else:82        shape_class = None83    return imported.__doc__, user_classes, shape_class84def create_environment(user_classes, options, events=None, shape_class=None):85    """86    Create an Environment instance from options87    """88    return Environment(89        user_classes=user_classes,90        shape_class=shape_class,91        tags=options.tags,92        exclude_tags=options.exclude_tags,93        events=events,94        host=options.host,95        reset_stats=options.reset_stats,96        stop_timeout=options.stop_timeout,97        parsed_options=options,98    )99def main():100    # find specified locustfile and make sure it exists, using a very simplified101    # command line parser that is only used to parse the -f option102    locustfile = parse_locustfile_option()103    # import the locustfile104    docstring, user_classes, shape_class = load_locustfile(locustfile)105    # parse all command line options106    options = parse_options()107    if options.headful:108        options.headless = False109    if options.slave or options.expect_slaves:110        sys.stderr.write("The --slave/--expect-slaves parameters have been renamed --worker/--expect-workers\n")111        sys.exit(1)112    if options.step_time or options.step_load or options.step_users or options.step_clients:113        sys.stderr.write(114            "The step load feature was removed in Locust 1.3. You can achieve similar results using a LoadTestShape class. See https://docs.locust.io/en/stable/generating-custom-load-shape.html\n"115        )116        sys.exit(1)117    if options.hatch_rate:118        sys.stderr.write("[DEPRECATED] The --hatch-rate parameter has been renamed --spawn-rate\n")119        options.spawn_rate = options.hatch_rate120    # setup logging121    if not options.skip_log_setup:122        if options.loglevel.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:123            setup_logging(options.loglevel, options.logfile)124        else:125            sys.stderr.write("Invalid --loglevel. Valid values are: DEBUG/INFO/WARNING/ERROR/CRITICAL\n")126            sys.exit(1)127    logger = logging.getLogger(__name__)128    greenlet_exception_handler = greenlet_exception_logger(logger)129    if options.list_commands:130        print("Available Users:")131        for name in user_classes:132            print("    " + name)133        sys.exit(0)134    if not user_classes:135        logger.error("No User class found!")136        sys.exit(1)137    # make sure specified User exists138    if options.user_classes:139        missing = set(options.user_classes) - set(user_classes.keys())140        if missing:141            logger.error("Unknown User(s): %s\n" % (", ".join(missing)))142            sys.exit(1)143        else:144            names = set(options.user_classes) & set(user_classes.keys())145            user_classes = [user_classes[n] for n in names]146    else:147        # list() call is needed to consume the dict_view object in Python 3148        user_classes = list(user_classes.values())149    if os.name != "nt" and not options.master:150        try:151            import resource152            minimum_open_file_limit = 10000153            current_open_file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]154            if current_open_file_limit < minimum_open_file_limit:155                # Increasing the limit to 10000 within a running process should work on at least MacOS.156                # It does not work on all OS:es, but we should be no worse off for trying.157                resource.setrlimit(resource.RLIMIT_NOFILE, [minimum_open_file_limit, resource.RLIM_INFINITY])158        except BaseException:159            logger.warning(160                (161                    f"System open file limit '{current_open_file_limit}' is below minimum setting '{minimum_open_file_limit}'. "162                    "It's not high enough for load testing, and the OS didn't allow locust to increase it by itself. "163                    "See https://github.com/locustio/locust/wiki/Installation#increasing-maximum-number-of-open-files-limit for more info."164                )165            )166    # create locust Environment167    environment = create_environment(user_classes, options, events=locust.events, shape_class=shape_class)168    if shape_class and (options.num_users or options.spawn_rate):169        logger.warning(170            "The specified locustfile contains a shape class but a conflicting argument was specified: users or spawn-rate. Ignoring arguments"171        )172    if options.show_task_ratio:173        print("\n Task ratio per User class")174        print("-" * 80)175        print_task_ratio(user_classes)176        print("\n Total task ratio")177        print("-" * 80)178        print_task_ratio(user_classes, total=True)179        sys.exit(0)180    if options.show_task_ratio_json:181        from json import dumps182        task_data = {183            "per_class": get_task_ratio_dict(user_classes),184            "total": get_task_ratio_dict(user_classes, total=True),185        }186        print(dumps(task_data))187        sys.exit(0)188    if options.master:189        runner = environment.create_master_runner(190            master_bind_host=options.master_bind_host,191            master_bind_port=options.master_bind_port,192        )193    elif options.worker:194        try:195            runner = environment.create_worker_runner(options.master_host, options.master_port)196            logger.debug("Connected to locust master: %s:%s", options.master_host, options.master_port)197        except socket.error as e:198            logger.error("Failed to connect to the Locust master: %s", e)199            sys.exit(-1)200    else:201        runner = environment.create_local_runner()202    # main_greenlet is pointing to runners.greenlet by default, it will point the web greenlet later if in web mode203    main_greenlet = runner.greenlet204    if options.run_time:205        if not options.headless:206            logger.error("The --run-time argument can only be used together with --headless")207            sys.exit(1)208        if options.worker:209            logger.error("--run-time should be specified on the master node, and not on worker nodes")210            sys.exit(1)211        try:212            options.run_time = parse_timespan(options.run_time)213        except ValueError:214            logger.error("Valid --run-time formats are: 20, 20s, 3m, 2h, 1h20m, 3h30m10s, etc.")215            sys.exit(1)216    if options.csv_prefix:217        stats_csv_writer = StatsCSVFileWriter(218            environment, stats.PERCENTILES_TO_REPORT, options.csv_prefix, options.stats_history_enabled219        )220    else:221        stats_csv_writer = StatsCSV(environment, stats.PERCENTILES_TO_REPORT)222    # start Web UI223    if not options.headless and not options.worker:224        # spawn web greenlet225        protocol = "https" if options.tls_cert and options.tls_key else "http"226        try:227            if options.web_host == "*":228                # special check for "*" so that we're consistent with --master-bind-host229                web_host = ""230            else:231                web_host = options.web_host232            if web_host:233                logger.info("Starting web interface at %s://%s:%s" % (protocol, web_host, options.web_port))234            else:235                logger.info(236                    "Starting web interface at %s://0.0.0.0:%s (accepting connections from all network interfaces)"237                    % (protocol, options.web_port)238                )239            web_ui = environment.create_web_ui(240                host=web_host,241                port=options.web_port,242                auth_credentials=options.web_auth,243                tls_cert=options.tls_cert,244                tls_key=options.tls_key,245                stats_csv_writer=stats_csv_writer,246                delayed_start=True,247            )248        except AuthCredentialsError:249            logger.error("Credentials supplied with --web-auth should have the format: username:password")250            sys.exit(1)251    else:252        web_ui = None253    # Fire locust init event which can be used by end-users' code to run setup code that254    # need access to the Environment, Runner or WebUI.255    environment.events.init.fire(environment=environment, runner=runner, web_ui=web_ui)256    if web_ui:257        web_ui.start()258        main_greenlet = web_ui.greenlet259    if options.headless:260        # headless mode261        if options.master:262            # wait for worker nodes to connect263            while len(runner.clients.ready) < options.expect_workers:264                logging.info(265                    "Waiting for workers to be ready, %s of %s connected",266                    len(runner.clients.ready),267                    options.expect_workers,268                )269                time.sleep(1)270        if not options.worker:271            # apply headless mode defaults272            if options.num_users is None:273                options.num_users = 1274            if options.spawn_rate is None:275                options.spawn_rate = 1276            # start the test277            if environment.shape_class:278                environment.runner.start_shape()279            else:280                runner.start(options.num_users, options.spawn_rate)281    def spawn_run_time_limit_greenlet():282        def timelimit_stop():283            logger.info("Time limit reached. Stopping Locust.")284            runner.quit()285        gevent.spawn_later(options.run_time, timelimit_stop).link_exception(greenlet_exception_handler)286    if options.run_time:287        logger.info("Run time limit set to %s seconds" % options.run_time)288        spawn_run_time_limit_greenlet()289    elif options.headless:290        logger.info("No run time limit set, use CTRL+C to interrupt.")291    else:292        pass  # dont log anything - not having a time limit is normal when not running headless293    input_listener_greenlet = None294    if not options.worker:295        # spawn input listener greenlet296        input_listener_greenlet = gevent.spawn(297            input_listener(298                {299                    "w": lambda: runner.spawn_users(1, 100)300                    if runner.state != "spawning"301                    else logging.warning("Already spawning users, can't spawn more right now"),302                    "W": lambda: runner.spawn_users(10, 100)303                    if runner.state != "spawning"304                    else logging.warning("Already spawning users, can't spawn more right now"),305                    "s": lambda: runner.stop_users(1)306                    if runner.state != "spawning"307                    else logging.warning("Spawning users, can't stop right now"),308                    "S": lambda: runner.stop_users(10)309                    if runner.state != "spawning"310                    else logging.warning("Spawning users, can't stop right now"),311                }312            )313        )314        input_listener_greenlet.link_exception(greenlet_exception_handler)315    stats_printer_greenlet = None316    if not options.only_summary and (options.print_stats or (options.headless and not options.worker)):317        # spawn stats printing greenlet318        stats_printer_greenlet = gevent.spawn(stats_printer(runner.stats))319        stats_printer_greenlet.link_exception(greenlet_exception_handler)320    if options.csv_prefix:321        gevent.spawn(stats_csv_writer.stats_writer).link_exception(greenlet_exception_handler)322    gevent.spawn(stats_history, runner)323    def shutdown():324        """325        Shut down locust by firing quitting event, printing/writing stats and exiting326        """327        logger.info("Running teardowns...")328        if input_listener_greenlet is not None:329            input_listener_greenlet.kill(block=False)330        environment.events.quitting.fire(environment=environment, reverse=True)331        # determine the process exit code332        if log.unhandled_greenlet_exception:333            code = 2334        elif environment.process_exit_code is not None:335            code = environment.process_exit_code336        elif len(runner.errors) or len(runner.exceptions):337            code = options.exit_code_on_error338        else:339            code = 0340        logger.info("Shutting down (exit code %s), bye." % code)341        if stats_printer_greenlet is not None:342            stats_printer_greenlet.kill(block=False)343        logger.info("Cleaning up runner...")344        if runner is not None:345            runner.quit()346        print_stats(runner.stats, current=False)347        print_percentile_stats(runner.stats)348        print_error_report(runner.stats)349        sys.exit(code)350    # install SIGTERM handler351    def sig_term_handler():352        logger.info("Got SIGTERM signal")353        shutdown()354    gevent.signal_handler(signal.SIGTERM, sig_term_handler)355    try:356        logger.info("Starting Locust %s" % version)357        main_greenlet.join()358        shutdown()359    except KeyboardInterrupt:...regulation.py
Source:regulation.py  
1from pycurb import PyCurbObject2class Regulation(PyCurbObject):3    fields = ['rule', 'user_classes', 'time_spans', 'priority', 'payment']4    def __init__(self,5                 rule,6                 user_classes=None,7                 time_spans=None,8                 priority=None,9                 payment=None):10        self.rule = rule11        self.user_classes = None if user_classes == [{}] else user_classes12        self.time_spans = None if time_spans == [] else time_spans13        self.priority = priority14        self.payment = payment15    def to_dict(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
