# How to use num_queued method in autotest

Best Python code snippet using autotest_python

mm1_queue.py

Source:mm1_queue.py

`1"""Implements a discrete-time M/M/1 Queue environment.2Converted from continuous time and simplified to fixed set of states as per:3https://www.aaai.org/Papers/AAAI/1996/AAAI96-130.pdf4This implementation is a bit messy. We intended to clean it up, but did not.5State Representation6---------------------7In paper, state has 2 components: (jobs_in_queue: int, job_available: bool)8We have packed these into a single integer:9 state = jobs_in_queue*2 + job_available.10Invalid Actions11---------------------12In the paper, the algorithm does not have the option to admit a job when none13is available. Since our MDP transition and rewards are stored as matrices, we14we modified the reward/transition function such that the ADMIT action when no15job has arrived. The transitions are applied as if CONTINUE was chose, but an16extra penalty is added to the reward. Essentially, it's always worse to ADMIT17than to CONTINUE when no jobs have arrived.18Finite Implementation19---------------------20In the paper, the algorithm can (in theory) store an unbounded number of jobs.21Our implementation has a limit on the number of jobs that can be stored so that22we can represent transition and reward dynamics with a fixed-size matrix.23This limit should be set such that no reasonable algorithm would admit a new job24when the limit of jobs has been reached.25If the algorithm chooses to ADMIT a new job when the maximum is reached, there26is no reward for admitting the job. Further, there is no probability that an27existing job will be completed on the transition.28Essentially, ADMIT at capacity gives same reward as CONTINUE, but expected next29 state is worse than CONTINUE.30"""31import functools32import itertools33from typing import Callable34import numpy as np35from differential_value_iteration.environments import structure36COST_FN = Callable[[int], float]37CONTINUE = 038ADMIT = 139def linear_cost_fn(cost_constant: float, jobs_waiting: int) -> float:40 if cost_constant <= 0.:41 raise ValueError(f'cost_constant must be positive, got: {cost_constant}')42 if jobs_waiting < 0:43 raise ValueError(f'jobs_waiting must be non-negative, got: {jobs_waiting}')44 return jobs_waiting * cost_constant45def global_state_to_paper_state(global_state: int):46 jobs_in_queue = global_state // 247 new_job = True if global_state % 2 == 1 else False48 return jobs_in_queue, new_job49def to_global_state(jobs_in_queue: int, new_job: bool):50 return jobs_in_queue * 2 + new_job51def create(arrival_rate: float, service_rate: float, admit_reward: float,52 cost_fn: COST_FN, max_stored_jobs: int,53 dtype: np.dtype) -> structure.MarkovDecisionProcess:54 """Creates a new MM1 Queue MDP.55 Args:56 arrival_rate: Rate of new jobs arriving. Should be positive.57 service_rate: Rate that accepted jobs are processed. Should be positive.58 admit_reward: Reward for accepting a job for processing.59 cost_fn: Function that returns cost (expressed as a positive number) for60 holding some number of jobs.61 max_stored_jobs: Limit on number of stored jobs transitions and rewards can62 be stored as fixed-size matrices.63 dtype: NumPy dtype of MDP, should be a float type, probably np.float64.64 Returns: An MDP.65 """66 arrive_prob = arrival_rate / (arrival_rate + service_rate)67 complete_prob = service_rate / (arrival_rate + service_rate)68 joint_rate = service_rate + arrival_rate69 num_states = max_stored_jobs * 270 num_actions = 271 transitions = np.zeros((num_actions, num_states, num_states), dtype=dtype)72 rewards = np.zeros((num_actions, num_states), dtype=dtype)73 transition_possibilities = itertools.product(74 range(max_stored_jobs),75 [False, True],76 [ADMIT, CONTINUE])77 for num_queued, new_job, action in transition_possibilities:78 s = to_global_state(jobs_in_queue=num_queued,79 new_job=new_job)80 # Base Case.81 if num_queued == 0:82 if not new_job:83 no_new_job_next = s84 new_job_next = to_global_state(jobs_in_queue=num_queued,85 new_job=True)86 transitions[action, s, no_new_job_next] = complete_prob87 transitions[action, s, new_job_next] = arrive_prob88 if action == CONTINUE:89 rewards[action, s] = 0.90 elif action == ADMIT:91 rewards[action, s] = -1.92 elif new_job:93 if action == CONTINUE:94 no_new_job_next = to_global_state(jobs_in_queue=num_queued,95 new_job=False)96 new_job_next = s97 transitions[action, s, no_new_job_next] = complete_prob98 transitions[action, s, new_job_next] = arrive_prob99 rewards[action, s] = 0.100 elif action == ADMIT:101 no_new_job_next = to_global_state(jobs_in_queue=num_queued,102 new_job=False)103 new_job_next = to_global_state(jobs_in_queue=num_queued + 1,104 new_job=True)105 transitions[action, s, no_new_job_next] = complete_prob106 transitions[action, s, new_job_next] = arrive_prob107 rewards[action, s] = (admit_reward - cost_fn(108 jobs_waiting=num_queued + 1)) * joint_rate109 # General Case.110 elif num_queued > 0 and (num_queued < max_stored_jobs - 1):111 if not new_job:112 no_new_job_next = to_global_state(jobs_in_queue=num_queued - 1,113 new_job=False)114 new_job_next = to_global_state(jobs_in_queue=num_queued,115 new_job=True)116 transitions[action, s, no_new_job_next] = complete_prob117 transitions[action, s, new_job_next] = arrive_prob118 if action == CONTINUE:119 rewards[action, s] = (-cost_fn(jobs_waiting=num_queued)) * joint_rate120 elif action == ADMIT:121 # Small penalty for invalid action.122 rewards[action, s] = (-cost_fn(123 jobs_waiting=num_queued + 1)) * joint_rate124 elif new_job:125 if action == CONTINUE:126 no_new_job_next = to_global_state(jobs_in_queue=num_queued - 1,127 new_job=False)128 new_job_next = to_global_state(jobs_in_queue=num_queued,129 new_job=True)130 transitions[action, s, no_new_job_next] = complete_prob131 transitions[action, s, new_job_next] = arrive_prob132 rewards[action, s] = (-cost_fn(jobs_waiting=num_queued)) * joint_rate133 elif action == ADMIT:134 no_new_job_next = to_global_state(jobs_in_queue=num_queued,135 new_job=False)136 new_job_next = to_global_state(jobs_in_queue=num_queued + 1,137 new_job=True)138 transitions[action, s, no_new_job_next] = complete_prob139 transitions[action, s, new_job_next] = arrive_prob140 rewards[action, s] = (admit_reward - cost_fn(141 jobs_waiting=num_queued + 1)) * joint_rate142 # In our finite model, we cannot add more jobs in this state.143 elif num_queued == max_stored_jobs - 1:144 # Same as general case.145 if not new_job:146 new_job_next = to_global_state(jobs_in_queue=num_queued,147 new_job=True)148 no_new_job_next = to_global_state(jobs_in_queue=num_queued - 1,149 new_job=False)150 transitions[action, s, new_job_next] = arrive_prob151 transitions[action, s, no_new_job_next] = complete_prob152 if action == CONTINUE:153 rewards[action, s] = (-cost_fn(jobs_waiting=num_queued)) * joint_rate154 elif action == ADMIT:155 # Small penalty for invalid action.156 rewards[action, s] = (-cost_fn(157 jobs_waiting=num_queued + 1)) * joint_rate158 elif new_job:159 # Same as general case.160 if action == CONTINUE:161 new_job_next = to_global_state(jobs_in_queue=num_queued,162 new_job=True)163 no_new_job_next = to_global_state(jobs_in_queue=num_queued - 1,164 new_job=False)165 transitions[action, s, new_job_next] = arrive_prob166 transitions[action, s, no_new_job_next] = complete_prob167 rewards[action, s] = -cost_fn(jobs_waiting=num_queued) * joint_rate168 elif action == ADMIT:169 # Stuck here, cannot add another job to the queue.170 new_job_next = to_global_state(jobs_in_queue=num_queued,171 new_job=True)172 no_new_job_next = to_global_state(jobs_in_queue=num_queued,173 new_job=False)174 transitions[action, s, new_job_next] = arrive_prob175 transitions[action, s, no_new_job_next] = complete_prob176 # Same as passing b/c could not admit job.177 rewards[action, s] = -cost_fn(jobs_waiting=num_queued) * joint_rate178 name = f'MM1 {arrival_rate}:{service_rate}:{admit_reward}:{max_stored_jobs}:{dtype}:{cost_fn.func.__name__}'179 return structure.MarkovDecisionProcess(transitions=transitions,180 rewards=rewards,181 name=name)182MM1_QUEUE_1 = functools.partial(create,183 arrival_rate=1.,184 service_rate=1.,185 admit_reward=10.,186 cost_fn=functools.partial(linear_cost_fn,187 cost_constant=1.),188 max_stored_jobs=20)189MM1_QUEUE_2 = functools.partial(create,190 arrival_rate=1.5,191 service_rate=1.,192 admit_reward=4.,193 cost_fn=functools.partial(linear_cost_fn,194 cost_constant=1.),195 max_stored_jobs=20)196MM1_QUEUE_3 = functools.partial(create,197 arrival_rate=1.,198 service_rate=1.5,199 admit_reward=4.,200 cost_fn=functools.partial(linear_cost_fn,201 cost_constant=1.),...`

test_clients.py

Source:test_clients.py

`1from collections import defaultdict2import json3import pytest4from pyglidein_server import clients, resources5from pyglidein_server.condor import JobCounts6from pyglidein_server.util import Error7def test_clients_init():8 clients.Clients()9def test_clients_update_single():10 queues = {11 'foo': {12 'resources': {},13 'num_processing': 10,14 'num_queued': 0,15 }16 }17 cl = clients.Clients()18 cl.update('foo', queues)19 data = cl.get('foo')20 assert len(data) == 121def test_clients_update_multi():22 queues = {23 'bar': {24 'resources': {'memory': 2},25 'num_processing': 10,26 'num_queued': 11,27 },28 'baz': {29 'resources': {'memory': 4},30 'num_processing': 12,31 'num_queued': 13,32 }33 }34 cl = clients.Clients()35 cl.update('foo', queues)36 data = cl.get('foo')37 assert len(data) == 238def test_clients_bad_resource():39 queues = {40 'foo': {41 'resources': {'foo': 1},42 'num_processing': 10,43 'num_queued': 0,44 }45 }46 cl = clients.Clients()47 with pytest.raises(Error) as exc_info:48 cl.update('foo', queues)49 assert 'resources' in exc_info.value.reason50def test_get_json():51 queues = {52 'foo': {53 'resources': {},54 'num_processing': 10,55 'num_queued': 0,56 }57 }58 cl = clients.Clients()59 cl.update('foo', queues)60 data = cl.get_json()61 assert len(data) == 162 values = list(data.values())63 assert len(values) == 164 json.dumps(data)65####### Matching tests #######66testdata = [67 # single site, single resource68 ({'site': {'q1': {'resources': {}, 'num_processing': 0, 'num_queued': 0}}},69 [{'resources': {}, 'processing': 0, 'queued': 1}],70 'site',71 {'q1': 1}),72 ({'site': {'q1': {'resources': {}, 'num_processing': 5, 'num_queued': 0}}},73 [{'resources': {}, 'processing': 5, 'queued': 10}],74 'site',75 {'q1': 8}),76 ({'site': {'q1': {'resources': {}, 'num_processing': 5, 'num_queued': 1}}},77 [{'resources': {}, 'processing': 5, 'queued': 10}],78 'site',79 {'q1': 4}),80 ({'site': {'q1': {'resources': {}, 'num_processing': 5, 'num_queued': 2}}},81 [{'resources': {}, 'processing': 5, 'queued': 10}],82 'site',83 {'q1': 2}),84 ({'site': {'q1': {'resources': {}, 'num_processing': 5, 'num_queued': 3}}},85 [{'resources': {}, 'processing': 5, 'queued': 10}],86 'site',87 {}),88 ({'site': {'q1': {'resources': {}, 'num_processing': 50, 'num_queued': 20}}},89 [{'resources': {}, 'processing': 50, 'queued': 100}],90 'site',91 {'q1': 12}),92 # single site, jobs of different sizes93 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 50, 'num_queued': 20}}},94 [{'resources': {}, 'processing': 25, 'queued': 50},95 {'resources': {'memory':2}, 'processing': 25, 'queued': 50},96 ],97 'site',98 {'q1': 2}),99 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 50, 'num_queued': 10}}},100 [{'resources': {}, 'processing': 45, 'queued': 90},101 {'resources': {'memory':2}, 'processing': 5, 'queued': 10},102 ],103 'site',104 {'q1': 14}),105 # multi-site106 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 20, 'num_queued': 10}},107 'site2': {'q2': {'resources': {}, 'num_processing': 30, 'num_queued': 20}},108 },109 [{'resources': {}, 'processing': 45, 'queued': 90},110 {'resources': {'memory':2}, 'processing': 5, 'queued': 10},111 ],112 'site',113 {'q1': 1}),114 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 20, 'num_queued': 10}},115 'site2': {'q2': {'resources': {}, 'num_processing': 30, 'num_queued': 20}},116 },117 [{'resources': {}, 'processing': 45, 'queued': 90},118 {'resources': {'memory':2}, 'processing': 5, 'queued': 10},119 ],120 'site2',121 {}),122 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 20, 'num_queued': 10}},123 'site2': {'q2': {'resources': {}, 'num_processing': 30, 'num_queued': 20}},124 },125 [{'resources': {}, 'processing': 45, 'queued': 500},126 {'resources': {'memory':2}, 'processing': 5, 'queued': 10},127 ],128 'site',129 {'q1': 45}),130 ({'site': {'q1': {'resources': {'memory':2}, 'num_processing': 20, 'num_queued': 10}},131 'site2': {'q2': {'resources': {}, 'num_processing': 30, 'num_queued': 20}},132 },133 [{'resources': {}, 'processing': 45, 'queued': 500},134 {'resources': {'memory':2}, 'processing': 5, 'queued': 10},135 ],136 'site2',137 {'q2': 73}),138 # larger resources139 ({'site': {'q1': {'resources': {'memory':1}, 'num_processing': 20, 'num_queued': 10}}},140 [{'resources': {}, 'processing': 50, 'queued': 10},141 {'resources': {'memory':2}, 'processing': 0, 'queued': 1000},142 ],143 'site',144 {}),145]146class FakeCondor:147 def __init__(self, data):148 self.data = defaultdict(JobCounts)149 for d in data:150 res = resources.Resources(d['resources'])151 self.data[res]['_sum']['processing'] += d['processing']152 self.data[res]['_sum']['queued'] += d['queued']153 def get(self):154 return self.data155@pytest.mark.parametrize('glideins,condor,name,expected', testdata)156def test_clients_match(glideins, condor, name, expected):157 cl = clients.Clients()158 for site in glideins:159 cl.update(site, glideins[site])160 ret = cl.match(name, FakeCondor(condor))...`

clients.py

Source:clients.py

`1from copy import deepcopy2import math3import logging4from .resources import Resources5from .util import Error6logger = logging.getLogger(__name__)7class Clients:8 """9 Stores client (site) information.10 Each client may have N resource queues. Lookups are by `Resources`,11 specifically separating queued and processing resources.12 """13 def __init__(self):14 self.data = {}15 def update(self, name, queues):16 """17 Update a client.18 Valid queues are a dict of queue information, including19 `resources`, `num_queued`, and `num_processing` for each client queue.20 The keys are used as references, since the resources may be binned21 and alter slightly.22 Args:23 name (str): name of client24 queues (dict): queue information25 """26 if not isinstance(queues, dict):27 raise Error('client data must be a dict of queue statuses')28 ret = {}29 for ref in queues:30 queue = queues[ref]31 # validate32 if set(queue.keys()) != {'resources', 'num_queued', 'num_processing'}:33 raise Error('client data must have keys: resources, num_queued, num_processing')34 if set(queue['resources']) - set(Resources.RESOURCE_DEFAULTS):35 raise Error(f'client data resources must be: {set(Resources.RESOURCE_DEFAULTS)}')36 # set resources37 ret[Resources(queue['resources'], tolerance=1)] = {38 'ref': ref,39 'num_queued': queue['num_queued'],40 'num_processing': queue['num_processing'],41 }42 # do update43 self.data[name] = ret44 def get(self, name):45 """Get client data"""46 return self.data[name]47 def get_all(self):48 """Get all client data"""49 return self.data50 def get_json(self):51 """Get client data in json format"""52 ret = {}53 for k in self.data:54 r = {}55 for res in self.data[k]:56 key = hash(res)57 r[key] = deepcopy(self.data[k][res])58 r[key]['_resources'] = res.resources59 ret[k] = r60 return ret61 def match(self, name, condor_queue):62 """63 Perform matching for a client.64 This matches a client against the condor queue and other clients,65 to determine if it should submit more glideins on any of its queues.66 Args:67 name (str): name of client68 condor_queue (CondorCache): condor queue69 Returns:70 dict: name of queue and number of jobs to submit71 """72 condor_jobs = condor_queue.get()73 ret = {}74 for res in self.data[name]:75 queue = self.data[name][res]76 jobs_queued = 0.77 jobs_processing = 0.78 for r in condor_jobs:79 if r <= res:80 mismatch = res.mismatch(r)81 jobs_queued += mismatch * condor_jobs[r]['_sum']['queued']82 jobs_processing += mismatch * condor_jobs[r]['_sum']['processing']83 if jobs_processing > 0:84 job_ratio = jobs_processing / (jobs_processing + jobs_queued)85 else:86 job_ratio = 1.87 logger.debug(f'jobs_queued: {jobs_queued}')88 logger.debug(f'jobs_processing: {jobs_processing}')89 logger.debug(f'job_ratio: {job_ratio}')90 glideins_queued = 0.91 glideins_processing = 0.92 for site in self.data:93 for r in self.data[site]:94 if r <= res:95 mismatch = res.mismatch(r)96 glideins_queued += mismatch * self.data[site][r]['num_queued']97 glideins_processing += mismatch * self.data[site][r]['num_processing']98 if glideins_processing > 0:99 glidein_util = glideins_processing / (glideins_processing + glideins_queued)100 else:101 glidein_util = 1.102 logger.debug(f'glideins_queued: {glideins_queued}')103 logger.debug(f'glideins_processing: {glideins_processing}')104 logger.debug(f'glidein_util: {glidein_util}')105 global_queue = (jobs_queued - glideins_queued) * math.pow(job_ratio, 1/4) * math.pow(glidein_util, 2)106 logger.debug(f'global_queue: {global_queue}')107 local_queue = max(global_queue - queue['num_queued'], 0)108 logger.debug(f'local_queue: {local_queue}')109 if local_queue > 0:110 ret[queue['ref']] = math.ceil(local_queue)...`

## Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.