How to use swarm method in locust

Best Python code snippet using locust

multiswarm.py

Source:multiswarm.py Github

copy

Full Screen

...104 logbook = tools.Logbook()105 logbook.header = "gen", "nswarm", "evals", "error", "offline_error", "avg", "max"106 107 # Generate the initial population108 population = [toolbox.swarm(n=NPARTICLES) for _ in range(NSWARMS)]109 110 # Evaluate each particle111 for swarm in population:112 for part in swarm:113 part.fitness.values = toolbox.evaluate(part)114 # Update swarm's attractors personal best and global best115 if not part.best or part.fitness > part.bestfit:116 part.best = toolbox.clone(part[:]) # Get the position117 part.bestfit.values = part.fitness.values # Get the fitness118 if not swarm.best or part.fitness > swarm.bestfit:119 swarm.best = toolbox.clone(part[:]) # Get the position120 swarm.bestfit.values = part.fitness.values # Get the fitness121 record = stats.compile(itertools.chain(*population))122 logbook.record(gen=0, evals=mpb.nevals, nswarm=len(population),123 error=mpb.currentError(), offline_error=mpb.offlineError(), **record)124 if verbose:125 print(logbook.stream)126 127 generation = 1128 while mpb.nevals < 5e5:129 # Check for convergence130 rexcl = (BOUNDS[1] - BOUNDS[0]) / (2 * len(population)**(1.0/NDIM))131 132 not_converged = 0133 worst_swarm_idx = None134 worst_swarm = None135 for i, swarm in enumerate(population):136 # Compute the diameter of the swarm137 for p1, p2 in itertools.combinations(swarm, 2):138 d = math.sqrt(sum((x1 - x2)**2. for x1, x2 in zip(p1, p2)))139 if d > 2*rexcl:140 not_converged += 1141 # Search for the worst swarm according to its global best142 if not worst_swarm or swarm.bestfit < worst_swarm.bestfit:143 worst_swarm_idx = i144 worst_swarm = swarm145 break146 147 # If all swarms have converged, add a swarm148 if not_converged == 0:149 population.append(toolbox.swarm(n=NPARTICLES))150 # If too many swarms are roaming, remove the worst swarm151 elif not_converged > NEXCESS:152 population.pop(worst_swarm_idx)153 154 # Update and evaluate the swarm155 for swarm in population:156 # Check for change157 if swarm.best and toolbox.evaluate(swarm.best) != swarm.bestfit.values:158 # Convert particles to quantum particles159 swarm[:] = toolbox.convert(swarm, rcloud=RCLOUD, centre=swarm.best)160 swarm.best = None161 del swarm.bestfit.values162 163 for part in swarm:164 # Not necessary to update if it is a new swarm165 # or a swarm just converted to quantum166 if swarm.best and part.best:167 toolbox.update(part, swarm.best)168 part.fitness.values = toolbox.evaluate(part)169 170 # Update swarm's attractors personal best and global best171 if not part.best or part.fitness > part.bestfit:172 part.best = toolbox.clone(part[:])173 part.bestfit.values = part.fitness.values174 if not swarm.best or part.fitness > swarm.bestfit:175 swarm.best = toolbox.clone(part[:])176 swarm.bestfit.values = part.fitness.values177 178 record = stats.compile(itertools.chain(*population))179 logbook.record(gen=generation, evals=mpb.nevals, nswarm=len(population),180 error=mpb.currentError(), offline_error=mpb.offlineError(), **record)181 if verbose:182 print(logbook.stream)183 184 # Apply exclusion185 reinit_swarms = set()186 for s1, s2 in itertools.combinations(range(len(population)), 2):187 # Swarms must have a best and not already be set to reinitialize188 if population[s1].best and population[s2].best and not (s1 in reinit_swarms or s2 in reinit_swarms):189 dist = 0190 for x1, x2 in zip(population[s1].best, population[s2].best):191 dist += (x1 - x2)**2.192 dist = math.sqrt(dist)193 if dist < rexcl:194 if population[s1].bestfit <= population[s2].bestfit:195 reinit_swarms.add(s1)196 else:197 reinit_swarms.add(s2)198 199 # Reinitialize and evaluate swarms200 for s in reinit_swarms:201 population[s] = toolbox.swarm(n=NPARTICLES)202 for part in population[s]:203 part.fitness.values = toolbox.evaluate(part)204 205 # Update swarm's attractors personal best and global best206 if not part.best or part.fitness > part.bestfit:207 part.best = toolbox.clone(part[:])208 part.bestfit.values = part.fitness.values209 if not population[s].best or part.fitness > population[s].bestfit:210 population[s].best = toolbox.clone(part[:])211 population[s].bestfit.values = part.fitness.values212 generation += 1213if __name__ == "__main__":...

Full Screen

Full Screen

operators.py

Source:operators.py Github

copy

Full Screen

...18 You can use this method to update your personal best positions.19 .. code-block:: python20 import pyswarms.backend as P21 from pyswarms.backend.swarms import Swarm22 my_swarm = P.create_swarm(n_particles, dimensions)23 # Inside the for-loop...24 for i in range(iters):25 # It updates the swarm internally26 my_swarm.pbest_pos, my_swarm.pbest_cost = P.update_pbest(my_swarm)27 It updates your :code:`current_pbest` with the personal bests acquired by28 comparing the (1) cost of the current positions and the (2) personal29 bests your swarm has attained.30 If the cost of the current position is less than the cost of the personal31 best, then the current position replaces the previous personal best32 position.33 Parameters34 ----------35 swarm : pyswarms.backend.swarm.Swarm36 a Swarm instance37 Returns38 -------39 numpy.ndarray40 New personal best positions of shape :code:`(n_particles, n_dimensions)`41 numpy.ndarray42 New personal best costs of shape :code:`(n_particles,)`43 """44 try:45 # Infer dimensions from positions46 dimensions = swarm.dimensions47 # Create a 1-D and 2-D mask based from comparisons48 mask_cost = swarm.current_cost < swarm.pbest_cost49 mask_pos = np.repeat(mask_cost[:, np.newaxis], dimensions, axis=1)50 # Apply masks51 new_pbest_pos = np.where(~mask_pos, swarm.pbest_pos, swarm.position)52 new_pbest_cost = np.where(53 ~mask_cost, swarm.pbest_cost, swarm.current_cost54 )55 except AttributeError:56 rep.logger.exception(57 "Please pass a Swarm class. You passed {}".format(type(swarm))58 )59 raise60 else:61 return (new_pbest_pos, new_pbest_cost)62def compute_velocity(swarm, clamp, vh, bounds=None):63 """Update the velocity matrix64 This method updates the velocity matrix using the best and current65 positions of the swarm. The velocity matrix is computed using the66 cognitive and social terms of the swarm. The velocity is handled67 by a :code:`VelocityHandler`.68 A sample usage can be seen with the following:69 .. code-block :: python70 import pyswarms.backend as P71 from pyswarms.swarms.backend import Swarm, VelocityHandler72 my_swarm = P.create_swarm(n_particles, dimensions)73 my_vh = VelocityHandler(strategy="invert")74 for i in range(iters):75 # Inside the for-loop76 my_swarm.velocity = compute_velocity(my_swarm, clamp, my_vh, bounds)77 Parameters78 ----------79 swarm : pyswarms.backend.swarms.Swarm80 a Swarm instance81 clamp : tuple of floats, optional82 a tuple of size 2 where the first entry is the minimum velocity83 and the second entry is the maximum velocity. It84 sets the limits for velocity clamping.85 vh : pyswarms.backend.handlers.VelocityHandler86 a VelocityHandler object with a specified handling strategy.87 For further information see :mod:`pyswarms.backend.handlers`.88 bounds : tuple of numpy.ndarray or list, optional89 a tuple of size 2 where the first entry is the minimum bound while90 the second entry is the maximum bound. Each array must be of shape91 :code:`(dimensions,)`.92 Returns93 -------94 numpy.ndarray95 Updated velocity matrix96 """97 try:98 # Prepare parameters99 swarm_size = swarm.position.shape100 c1 = swarm.options["c1"]101 c2 = swarm.options["c2"]102 w = swarm.options["w"]103 # Compute for cognitive and social terms104 cognitive = (105 c1106 * np.random.uniform(0, 1, swarm_size)107 * (swarm.pbest_pos - swarm.position)108 )109 social = (110 c2111 * np.random.uniform(0, 1, swarm_size)112 * (swarm.best_pos - swarm.position)113 )114 # Compute temp velocity (subject to clamping if possible)115 temp_velocity = (w * swarm.velocity) + cognitive + social116 updated_velocity = vh(117 temp_velocity, clamp, position=swarm.position, bounds=bounds118 )119 except AttributeError:120 rep.logger.exception(121 "Please pass a Swarm class. You passed {}".format(type(swarm))122 )123 raise124 except KeyError:125 rep.logger.exception("Missing keyword in swarm.options")126 raise127 else:128 return updated_velocity129def compute_position(swarm, bounds, bh):130 """Update the position matrix131 This method updates the position matrix given the current position and the132 velocity. If bounded, the positions are handled by a133 :code:`BoundaryHandler` instance134 .. code-block :: python135 import pyswarms.backend as P136 from pyswarms.swarms.backend import Swarm, VelocityHandler137 my_swarm = P.create_swarm(n_particles, dimensions)138 my_bh = BoundaryHandler(strategy="intermediate")139 for i in range(iters):140 # Inside the for-loop141 my_swarm.position = compute_position(my_swarm, bounds, my_bh)142 Parameters143 ----------144 swarm : pyswarms.backend.swarms.Swarm145 a Swarm instance146 bounds : tuple of numpy.ndarray or list, optional147 a tuple of size 2 where the first entry is the minimum bound while148 the second entry is the maximum bound. Each array must be of shape149 :code:`(dimensions,)`.150 bh : pyswarms.backend.handlers.BoundaryHandler151 a BoundaryHandler object with a specified handling strategy...

Full Screen

Full Screen

task1.py

Source:task1.py Github

copy

Full Screen

1import numpy as np 2import random3from matplotlib import pyplot as plt4def createSwarm(n):5 xpos = np.random.uniform(0,1,n)6 swarm = []7 for i in range(n):8 swarm.append([xpos[i],random.getrandbits(1)])9 return swarm10# def testSwarm():11# return [[0.98,1],[0.99,1],[0.01,0], [0.02,0]]12def sim(n, timesteps, r):13 14 swarm = createSwarm(n)15 swarmRange=range(0,n)16 17 lefties=[]18 for i in range(0,timesteps):19 leftGoers=0 20 for j in swarmRange:21 if swarm[j][1]:22 leftGoers+=1 23 lefties.append(leftGoers)24 25 neighbourhood = getNeighbourhood(swarm, swarmRange)26 27 switchers = getSwitchers(neighbourhood, swarm, swarmRange)28 29 30 31 for j in switchers:32 swarm[j][1] = 1-swarm[j][1]33 34 35 print(leftGoers)36 for j in swarmRange:37 38 #move swarm 39 #right40 if(swarm[j][1]):41 if(swarm[j][0] == 1):42 swarm[j][0] == 043 swarm[j][0] += 0.00144 45 #left46 else:47 if(swarm[j][0] == 0):48 swarm[j][0] == 149 swarm[j][0] -= 0.00150 return lefties51def getSwitchers(neighbourhood, swarm, swarmRange):52 #switch?53 switchers = []54 55 for i in swarmRange:56 p=np.random.uniform(0,100,1)[0]57 if p <= 0.15:58 switchers.append(i)59 else:60 count = 061 for n in neighbourhood[i]:62 if(swarm[n][1]!=swarm[i][1]):63 count+=164 #half=np.floor(len(neighbourhood[i])/2)65 half=len(neighbourhood[i])/266 if (count>half):67 switchers.append(i)68 return switchers69def getNeighbourhood(swarm, swarmRange):70 neighbourhood=[]71 for i in swarmRange:72 ncount=[]73 if(swarm[i][0] > 0.045 and swarm[i][0] < 0.955):74 for j in swarmRange:75 76 if(j!=i):77 if(abs(swarm[i][0] - swarm[j][0]) < 0.045):78 79 ncount.append(j)80 81 elif(swarm[i][0] <= 0.045):82 for j in swarmRange:83 if(j!=i):84 xi = swarm[i][0]85 xj = swarm[j][0]86 dist1 = xi - xj87 dist2 = 1-xi - xj88 if(abs(dist1) < 0.045 or abs(dist2) < 0.045):89 ncount.append(j) 90 elif(swarm[i][0] >= 0.955):91 for j in swarmRange:92 if(j!=i):93 xi = swarm[i][0]94 xj = swarm[j][0]95 dist1 =abs(xi - xj)96 dist2 = abs(xi - 1 - xj)97 if(dist1 < 0.045): 98 99 ncount.append(j) 100 elif (dist2< 0.045):101 ncount.append(j) 102 neighbourhood.append(ncount)103 return neighbourhood104n = 20105timesteps=500106r = 0.045107x = []108for i in range(timesteps): 109 x.append(i)110plt.xlim(0,timesteps)111plt.ylim(0,20)112plt.plot(x,sim(n, timesteps, r))113plt.show()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful