Best Python code snippet using yandex-tank
load_plan.py
Source:load_plan.py  
...31        return self.duration32    def __len__(self):33        '''Return total ammo count'''34        return int(self.duration / 1000 * self.rps)35    def get_rps_list(self):36        return [(int(self.rps), self.duration / 1000)]37    def __repr__(self):38        return 'const(%s, %s)' % (self.rps, self.duration / 1000)39class Line(object):40    '''Load plan with linear load'''41    def __init__(self, minrps, maxrps, duration):42        """43        :param minrps:44        :param maxrps:45        :param duration: milliseconds46        """47        self.minrps = float(minrps)48        self.maxrps = float(maxrps)49        self.duration = duration / 1000.050        self.slope = (self.maxrps - self.minrps) / self.duration51    def ts(self, n):52        """53        :param n: number of charge54        :return: when to shoot nth charge, milliseconds55        """56        try:57            root1, root2 = solve_quadratic(self.slope / 2.0, self.minrps, -n)58        except ZeroDivisionError:59            root2 = float(n) / self.minrps60        return int(root2 * 1000)61    def __iter__(self):62        """63        :return: timestamps for each charge64        """65        return (self.ts(n) for n in range(0, self.__len__()))66    def rps_at(self, t):67        '''Return rps for second t'''68        if 0 <= t <= self.duration:69            return self.minrps + \70                float(self.maxrps - self.minrps) * t / self.duration71        else:72            return 073    def get_duration(self):74        '''Return load duration in seconds'''75        return int(self.duration * 1000)76    def __len__(self):77        '''Return total ammo count'''78        return int((self.maxrps + self.minrps) / 2.0 * self.duration)79    def get_float_rps_list(self):80        '''81        get list of constant load parts (we have no constant load at all, but tank will think so),82        with parts durations (float)83        '''84        int_rps = range(int(self.minrps), int(self.maxrps) + 1)85        step_duration = float(self.duration) / len(int_rps)86        rps_list = [(rps, int(step_duration)) for rps in int_rps]87        return rps_list88    def get_rps_list(self):89        """90        get list of each second's rps91        :returns: list of tuples (rps, duration of corresponding rps in seconds)92        :rtype: list93        """94        seconds = range(0, int(self.duration) + 1)95        rps_groups = groupby([proper_round(self.rps_at(t)) for t in seconds],96                             lambda x: x)97        rps_list = [(rps, len(list(rpl))) for rps, rpl in rps_groups]98        return rps_list99class Composite(object):100    '''Load plan with multiple steps'''101    def __init__(self, steps):102        self.steps = steps103    def __iter__(self):104        base = 0105        for step in self.steps:106            for ts in step:107                yield ts + base108            base += step.get_duration()109    def get_duration(self):110        '''Return total duration'''111        return sum(step.get_duration() for step in self.steps)112    def __len__(self):113        '''Return total ammo count'''114        return int(sum(step.__len__() for step in self.steps))115    def get_rps_list(self):116        return list(117            chain.from_iterable(step.get_rps_list() for step in self.steps))118class Stairway(Composite):119    def __init__(self, minrps, maxrps, increment, step_duration):120        if maxrps < minrps:121            increment = -increment122        n_steps = int((maxrps - minrps) / increment)123        steps = [124            Const(minrps + i * increment, step_duration)125            for i in range(0, n_steps + 1)126        ]127        if increment > 0:128            if (minrps + n_steps * increment) < maxrps:129                steps.append(Const(maxrps, step_duration))130        elif increment < 0:131            if (minrps + n_steps * increment) > maxrps:132                steps.append(Const(maxrps, step_duration))133        super(Stairway, self).__init__(steps)134class StepFactory(object):135    @staticmethod136    def line(params):137        template = re.compile('([0-9.]+),\s*([0-9.]+),\s*([0-9.]+[dhms]?)+\)')138        minrps, maxrps, duration = template.search(params).groups()139        return Line(float(minrps), float(maxrps), parse_duration(duration))140    @staticmethod141    def const(params):142        template = re.compile('([0-9.]+),\s*([0-9.]+[dhms]?)+\)')143        rps, duration = template.search(params).groups()144        return Const(float(rps), parse_duration(duration))145    @staticmethod146    def stairway(params):147        template = re.compile(148            '([0-9.]+),\s*([0-9.]+),\s*([0-9.]+),\s*([0-9.]+[dhms]?)+\)')149        minrps, maxrps, increment, duration = template.search(params).groups()150        return Stairway(151            float(minrps),152            float(maxrps), float(increment), parse_duration(duration))153    @staticmethod154    def produce(step_config):155        _plans = {156            'line': StepFactory.line,157            'const': StepFactory.const,158            'step': StepFactory.stairway,159        }160        load_type, params = step_config.split('(')161        load_type = load_type.strip()162        if load_type in _plans:163            return _plans[load_type](params)164        else:165            raise NotImplementedError(166                'No such load type implemented: "%s"' % load_type)167def create(rps_schedule):168    """169    Create Load Plan as defined in schedule. Publish info about its duration.170    """171    if len(rps_schedule) > 1:172        lp = Composite(173            [StepFactory.produce(step_config) for step_config in rps_schedule])174    else:175        lp = StepFactory.produce(rps_schedule[0])176    info.status.publish('duration', lp.get_duration() / 1000)177    info.status.publish('steps', lp.get_rps_list())178    info.status.lp_len = len(lp)...monitoring.py
Source:monitoring.py  
...8    await ws.prepare(req)9    global last_tick10    async for msg in ws:11        if msg.type == web.WSMsgType.text:12            rps_list = get_rps_list()13            await ws.send_json(data=rps_list)14            while not ws.closed:15                rps_list = get_rps_list()16                requests_num = 0 if not len(rps_list) else rps_list[len(rps_list) - 1]17                seconds_passed = time() - last_tick18                if seconds_passed < 1:19                    await asyncio.sleep(1 - seconds_passed)20                last_tick = time()21                print('REQ', rps_list)22                try:23                    await ws.send_str(str(requests_num))24                except ConnectionResetError:25                    break26        elif msg.type == web.WSMsgType.close:27            break...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
