How to use _remove_user_classes_with_weight_zero method in locust

Best Python code snippet using locust

env.py

Source:env.py Github

copy

Full Screen

...77 If False, exceptions will be raised.78 """79 self.parsed_options = parsed_options80 """Reference to the parsed command line options (used to pre-populate fields in Web UI). May be None when using Locust as a library"""81 self._remove_user_classes_with_weight_zero()82 # Validate there's no class with the same name but in different modules83 if len({user_class.__name__ for user_class in self.user_classes}) != len(self.user_classes):84 raise ValueError(85 "The following user classes have the same class name: {}".format(86 ", ".join(map(methodcaller("fullname"), self.user_classes))87 )88 )89 if self.shape_class is not None and not isinstance(self.shape_class, LoadTestShape):90 raise ValueError(91 "shape_class should be instance of LoadTestShape or subclass LoadTestShape, but got: %s"92 % self.shape_class93 )94 def _create_runner(95 self,96 runner_class: Type[RunnerType],97 *args,98 **kwargs,99 ) -> RunnerType:100 if self.runner is not None:101 raise RunnerAlreadyExistsError(f"Environment.runner already exists ({self.runner})")102 self.runner = runner_class(self, *args, **kwargs)103 # Attach the runner to the shape class so that the shape class can access user count state104 if self.shape_class:105 self.shape_class.runner = self.runner106 return self.runner107 def create_local_runner(self) -> LocalRunner:108 """109 Create a :class:`LocalRunner <locust.runners.LocalRunner>` instance for this Environment110 """111 return self._create_runner(LocalRunner)112 def create_master_runner(self, master_bind_host="*", master_bind_port=5557) -> MasterRunner:113 """114 Create a :class:`MasterRunner <locust.runners.MasterRunner>` instance for this Environment115 :param master_bind_host: Interface/host that the master should use for incoming worker connections.116 Defaults to "*" which means all interfaces.117 :param master_bind_port: Port that the master should listen for incoming worker connections on118 """119 return self._create_runner(120 MasterRunner,121 master_bind_host=master_bind_host,122 master_bind_port=master_bind_port,123 )124 def create_worker_runner(self, master_host: str, master_port: int) -> WorkerRunner:125 """126 Create a :class:`WorkerRunner <locust.runners.WorkerRunner>` instance for this Environment127 :param master_host: Host/IP of a running master node128 :param master_port: Port on master node to connect to129 """130 # Create a new RequestStats with use_response_times_cache set to False to save some memory131 # and CPU cycles, since the response_times_cache is not needed for Worker nodes132 self.stats = RequestStats(use_response_times_cache=False)133 return self._create_runner(134 WorkerRunner,135 master_host=master_host,136 master_port=master_port,137 )138 def create_web_ui(139 self,140 host="",141 port=8089,142 auth_credentials: Optional[str] = None,143 tls_cert: Optional[str] = None,144 tls_key: Optional[str] = None,145 stats_csv_writer: Optional[StatsCSV] = None,146 delayed_start=False,147 ) -> WebUI:148 """149 Creates a :class:`WebUI <locust.web.WebUI>` instance for this Environment and start running the web server150 :param host: Host/interface that the web server should accept connections to. Defaults to ""151 which means all interfaces152 :param port: Port that the web server should listen to153 :param auth_credentials: If provided (in format "username:password") basic auth will be enabled154 :param tls_cert: An optional path (str) to a TLS cert. If this is provided the web UI will be155 served over HTTPS156 :param tls_key: An optional path (str) to a TLS private key. If this is provided the web UI will be157 served over HTTPS158 :param stats_csv_writer: `StatsCSV <stats_csv.StatsCSV>` instance.159 :param delayed_start: Whether or not to delay starting web UI until `start()` is called. Delaying web UI start160 allows for adding Flask routes or Blueprints before accepting requests, avoiding errors.161 """162 self.web_ui = WebUI(163 self,164 host,165 port,166 auth_credentials=auth_credentials,167 tls_cert=tls_cert,168 tls_key=tls_key,169 stats_csv_writer=stats_csv_writer,170 delayed_start=delayed_start,171 )172 return self.web_ui173 def _filter_tasks_by_tags(self) -> None:174 """175 Filter the tasks on all the user_classes recursively, according to the tags and176 exclude_tags attributes177 """178 if getattr(self, "_tasks_filtered", False):179 return # only filter once180 self._tasks_filtered = True181 if self.tags is not None:182 tags = set(self.tags)183 elif self.parsed_options and self.parsed_options.tags:184 tags = set(self.parsed_options.tags)185 else:186 tags = None187 if self.exclude_tags is not None:188 exclude_tags = set(self.exclude_tags)189 elif self.parsed_options and self.parsed_options.exclude_tags:190 exclude_tags = set(self.parsed_options.exclude_tags)191 else:192 exclude_tags = None193 for user_class in self.user_classes:194 filter_tasks_by_tags(user_class, tags, exclude_tags)195 def _remove_user_classes_with_weight_zero(self) -> None:196 """197 Remove user classes having a weight of zero.198 """199 if len(self.user_classes) == 0:200 # Preserve previous behaviour that allowed no user classes to be specified.201 return202 filtered_user_classes = [203 user_class for user_class in self.user_classes if user_class.weight > 0 or user_class.fixed_count > 0204 ]205 if len(filtered_user_classes) == 0:206 # TODO: Better exception than `ValueError`?207 raise ValueError("There are no users with weight > 0.")208 self.user_classes[:] = filtered_user_classes209 def assign_equal_weights(self) -> None:...

Full Screen

Full Screen

queue.py

Source:queue.py Github

copy

Full Screen

...119 services_counselor_class,120 tio_class,121 too_class,122 ]123 environment._remove_user_classes_with_weight_zero()124 # if the user classes change between runs (because user classes125 # with 0 weights have been removed), we have to do some manual /126 # hacky cleanups127 setattr(environment.runner, "target_user_classes_count", {})128 setattr(environment.runner, "target_user_count", 0)129 setattr(environment.runner, "_users_dispatcher", None)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful