Best Python code snippet using yandex-tank
cumulative_criterions.py
Source:cumulative_criterions.py  
...86        return total_responses87    def get_rc(self):88        return 2589    def explain(self):90        items = self.get_criterion_parameters()91        explanation = "%(ratio).2f%% responses times higher than %(limit)sms for %(seconds_count)ss " \92            "since: %(since_time)s" % items93        if self.tag:94            explanation = explanation + " for tag %(tag)s" % items95        return explanation96    def get_criterion_parameters(self):97        parameters = {98            'ratio': self.total_fail_ratio * 100,99            'limit': self.rt_limit / 1000,100            'seconds_count': self.window_size,101            'since_time': self.cause_second[0]["ts"],102            'tag': self.tag103        }104        return parameters105    def widget_explain(self):106        items = self.get_criterion_parameters()107        return "%(ratio).2f%% times >%(limit)sms for %(seconds_count)ss" % items, self.total_fail_ratio108class TotalHTTPCodesCriterion(AbstractCriterion):109    ''' Cummulative HTTP Criterion '''110    @staticmethod111    def get_type_string():112        return 'total_http'113    def __init__(self, autostop, param_str):114        AbstractCriterion.__init__(self)115        self.seconds_count = 0116        params = param_str.split(',')117        self.codes_mask = params[0].lower()118        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))119        self.autostop = autostop120        self.data = deque()121        self.second_window = deque()122        level_str = params[1].strip()123        if level_str[-1:] == '%':124            self.level = float(level_str[:-1])125            self.is_relative = True126        else:127            self.level = int(level_str)128            self.is_relative = False129        self.seconds_limit = expand_to_seconds(params[2])130        self.tag = params[3].strip() if len(params) == 4 else None131    def notify(self, data, stat):132        matched_responses, total_responses = self.parse_data(data)133        if self.is_relative:134            if total_responses > 0:135                matched_responses = float(matched_responses) / total_responses * 100136            else:137                matched_responses = 1138        logger.debug(139            "HTTP codes matching mask %s: %s/%s", self.codes_mask,140            matched_responses, self.level)141        self.data.append(matched_responses)142        self.second_window.append((data, stat))143        if len(self.data) > self.seconds_limit:144            self.data.popleft()145            self.second_window.popleft()146        queue_len = 1147        if self.is_relative:148            queue_len = len(self.data)149        if (sum(self.data) / queue_len) >= self.level\150                and len(self.data) >= self.seconds_limit:  # yapf:disable151            self.cause_second = self.second_window[0]152            logger.debug(self.explain())153            return True154        return False155    def parse_data(self, data):156        # Parse data for specific tag if it is present157        if self.tag:158            if data["tagged"].get(self.tag):159                matched_responses = self.count_matched_codes(160                    self.codes_regex, data["tagged"][self.tag]["proto_code"]["count"])161                total_responses = data["tagged"][self.tag]["interval_real"]["len"]162            # matched_responses=0 if current tag differs from selected one163            else:164                matched_responses = 0165                total_responses = data["overall"]["interval_real"]["len"]166        # Parse data for overall167        else:168            matched_responses = self.count_matched_codes(169                self.codes_regex, data["overall"]["proto_code"]["count"])170            total_responses = data["overall"]["interval_real"]["len"]171        return matched_responses, total_responses172    def get_rc(self):173        return 26174    def get_level_str(self):175        ''' format level str '''176        if self.is_relative:177            level_str = str(self.level) + "%"178        else:179            level_str = self.level180        return level_str181    def explain(self):182        items = self.get_criterion_parameters()183        explanation = "%(code)s codes count higher than %(level)s for %(seconds_limit)ss, since %(since_time)s" % items184        if self.tag:185            explanation = explanation + " for tag %(tag)s" % items186        return explanation187    def get_criterion_parameters(self):188        parameters = {189            'code': self.codes_mask,190            'level': self.get_level_str(),191            'seconds_limit': self.seconds_limit,192            'since_time': self.cause_second[0]["ts"],193            'tag': self.tag194        }195        return parameters196    def widget_explain(self):197        items = self.get_criterion_parameters()198        explanation = "HTTP %(code)s>%(level)s for %(seconds_limit)ss" % items199        if self.is_relative:200            return explanation, sum(self.data)201        return explanation, 1.0202class TotalNetCodesCriterion(AbstractCriterion):203    ''' Cummulative Net Criterion '''204    @staticmethod205    def get_type_string():206        return 'total_net'207    def __init__(self, autostop, param_str):208        AbstractCriterion.__init__(self)209        self.seconds_count = 0210        params = param_str.split(',')211        self.codes_mask = params[0].lower()212        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))213        self.autostop = autostop214        self.data = deque()215        self.second_window = deque()216        level_str = params[1].strip()217        if level_str[-1:] == '%':218            self.level = float(level_str[:-1])219            self.is_relative = True220        else:221            self.level = int(level_str)222            self.is_relative = False223        self.seconds_limit = expand_to_seconds(params[2])224        self.tag = params[3].strip() if len(params) == 4 else None225    def notify(self, data, stat):226        matched_responses, total_responses = self.parse_data(data)227        if self.is_relative:228            if total_responses:229                matched_responses = float(matched_responses) / total_responses * 100230                logger.debug(231                    "Net codes matching mask %s: %s%%/%s", self.codes_mask,232                    round(matched_responses, 2), self.get_level_str())233            else:234                matched_responses = 1235        else:236            logger.debug(237                "Net codes matching mask %s: %s/%s", self.codes_mask,238                matched_responses, self.get_level_str())239        self.data.append(matched_responses)240        self.second_window.append((data, stat))241        if len(self.data) > self.seconds_limit:242            self.data.popleft()243            self.second_window.popleft()244        queue_len = 1245        if self.is_relative:246            queue_len = len(self.data)247        if (sum(self.data) / queue_len) >= self.level\248                and len(self.data) >= self.seconds_limit:  # yapf:disable249            self.cause_second = self.second_window[0]250            logger.debug(self.explain())251            return True252        return False253    def parse_data(self, data):254        # Count data for specific tag if it's present255        if self.tag:256            if data["tagged"].get(self.tag):257                codes = data["tagged"][self.tag]["net_code"]["count"].copy()258                if '0' in codes:259                    codes.pop('0')260                matched_responses = self.count_matched_codes(self.codes_regex, codes)261                total_responses = data["tagged"][self.tag]["interval_real"]["len"]262            # matched_responses=0 if current tag differs from selected one263            else:264                matched_responses = 0265                total_responses = data["overall"]["interval_real"]["len"]266        # Count data for overall267        else:268            codes = data["overall"]["net_code"]["count"].copy()269            if '0' in codes:270                codes.pop('0')271            matched_responses = self.count_matched_codes(self.codes_regex, codes)272            total_responses = data["overall"]["interval_real"]["len"]273        return matched_responses, total_responses274    def get_rc(self):275        return 27276    def get_level_str(self):277        ''' format level str '''278        if self.is_relative:279            level_str = str(self.level) + "%"280        else:281            level_str = str(self.level)282        return level_str283    def explain(self):284        items = self.get_criterion_parameters()285        explanation = "%(code)s net codes count higher than %(level)s for %(seconds_limit)ss, since %(since_time)s" \286            % items287        if self.tag:288            explanation = explanation + " for tag %(tag)s" % items289        return explanation290    def get_criterion_parameters(self):291        parameters = {292            'code': self.codes_mask,293            'level': self.get_level_str(),294            'seconds_limit': self.seconds_limit,295            'since_time': self.cause_second[0]["ts"],296            'tag': self.tag297        }298        return parameters299    def widget_explain(self):300        items = self.get_criterion_parameters()301        explanation = "Net %(code)s>%(level)s for %(seconds_limit)ss" % items302        if self.is_relative:303            return explanation, sum(self.data)304        return explanation, 1.0305class TotalNegativeHTTPCodesCriterion(AbstractCriterion):306    ''' Reversed HTTP Criterion '''307    @staticmethod308    def get_type_string():309        return 'negative_http'310    def __init__(self, autostop, param_str):311        AbstractCriterion.__init__(self)312        self.seconds_count = 0313        params = param_str.split(',')314        self.codes_mask = params[0].lower()315        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))316        self.autostop = autostop317        self.data = deque()318        self.second_window = deque()319        level_str = params[1].strip()320        if level_str[-1:] == '%':321            self.level = float(level_str[:-1])322            self.is_relative = True323        else:324            self.level = int(level_str)325            self.is_relative = False326        self.seconds_limit = expand_to_seconds(params[2])327        self.tag = params[3].strip() if len(params) == 4 else None328    def notify(self, data, stat):329        matched_responses, total_responses = self.parse_data(data)330        if self.is_relative:331            if total_responses:332                matched_responses = float(matched_responses) / total_responses * 100333                matched_responses = 100 - matched_responses334            else:335                matched_responses = 1336            logger.debug(337                "HTTP codes matching mask not %s: %s/%s", self.codes_mask,338                round(matched_responses, 1), self.level)339        else:340            matched_responses = (341                total_responses - matched_responses)342            logger.debug(343                "HTTP codes matching mask not %s: %s/%s", self.codes_mask,344                matched_responses, self.level)345        self.data.append(matched_responses)346        self.second_window.append((data, stat))347        if len(self.data) > self.seconds_limit:348            self.data.popleft()349            self.second_window.popleft()350        queue_len = 1351        if self.is_relative:352            queue_len = len(self.data)353        if (sum(self.data) / queue_len) >= self.level\354                and len(self.data) >= self.seconds_limit:  # yapf:disable355            self.cause_second = self.second_window[0]356            logger.debug(self.explain())357            return True358        return False359    def parse_data(self, data):360        # Parse data for specific tag if it is present361        if self.tag:362            if data["tagged"].get(self.tag):363                matched_responses = self.count_matched_codes(364                    self.codes_regex, data["tagged"][self.tag]["proto_code"]["count"])365                total_responses = data["tagged"][self.tag]["interval_real"]["len"]366            # matched_responses=0 if current tag differs from selected one367            else:368                matched_responses = 0369                total_responses = data["overall"]["interval_real"]["len"]370        # Parse data for overall371        else:372            matched_responses = self.count_matched_codes(373                self.codes_regex, data["overall"]["proto_code"]["count"])374            total_responses = data["overall"]["interval_real"]["len"]375        return matched_responses, total_responses376    def get_rc(self):377        return 28378    def get_level_str(self):379        ''' format level str'''380        if self.is_relative:381            level_str = str(self.level) + "%"382        else:383            level_str = self.level384        return level_str385    def explain(self):386        items = self.get_criterion_parameters()387        explanation = "Not %(code)s codes count higher than %(level)s for %(seconds_limit)ss, since %(since_time)s" % items388        if self.tag:389            explanation = explanation + " for tag %(tag)s" % items390        return explanation391    def get_criterion_parameters(self):392        parameters = {393            'code': self.codes_mask,394            'level': self.get_level_str(),395            'seconds_limit': self.seconds_limit,396            'since_time': self.cause_second[0]["ts"],397            'tag': self.tag398        }399        return parameters400    def widget_explain(self):401        items = self.get_criterion_parameters()402        explanation = "HTTP not %(code)s>%(level)s for %(seconds_limit)ss" % items403        if self.is_relative:404            return explanation, sum(self.data)405        return explanation, 1.0406class TotalNegativeNetCodesCriterion(AbstractCriterion):407    ''' Reversed NET Criterion '''408    @staticmethod409    def get_type_string():410        return 'negative_net'411    def __init__(self, autostop, param_str):412        AbstractCriterion.__init__(self)413        self.seconds_count = 0414        params = param_str.split(',')415        self.codes_mask = params[0].lower()416        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))417        self.autostop = autostop418        self.data = deque()419        self.second_window = deque()420        level_str = params[1].strip()421        if level_str[-1:] == '%':422            self.level = float(level_str[:-1])423            self.is_relative = True424        else:425            self.level = int(level_str)426            self.is_relative = False427        self.seconds_limit = expand_to_seconds(params[2])428        self.tag = params[3].strip() if len(params) == 4 else None429    def notify(self, data, stat):430        matched_responses, total_responses = self.parse_data(data)431        if self.is_relative:432            if total_responses:433                matched_responses = float(matched_responses) / total_responses * 100434                matched_responses = 100 - matched_responses435            else:436                matched_responses = 1437            logger.debug(438                "Net codes matching mask not %s: %s/%s", self.codes_mask,439                round(matched_responses, 1), self.level)440        else:441            matched_responses = (442                total_responses - matched_responses)443            logger.debug(444                "Net codes matching mask not %s: %s/%s", self.codes_mask,445                matched_responses, self.level)446        self.data.append(matched_responses)447        self.second_window.append((data, stat))448        if len(self.data) > self.seconds_limit:449            self.data.popleft()450            self.second_window.popleft()451        queue_len = 1452        if self.is_relative:453            queue_len = len(self.data)454        if (sum(self.data) / queue_len) >= self.level \455                and len(self.data) >= self.seconds_limit:  # yapf:disable456            self.cause_second = self.second_window[0]457            logger.debug(self.explain())458            return True459        return False460    def parse_data(self, data):461        # Count data for specific tag if it's present462        if self.tag:463            if data["tagged"].get(self.tag):464                codes = data["tagged"][self.tag]["net_code"]["count"].copy()465                if '0' in codes:466                    codes.pop('0')467                matched_responses = self.count_matched_codes(self.codes_regex, codes)468                total_responses = data["tagged"][self.tag]["interval_real"]["len"]469            # matched_responses=0 if current tag differs from selected one470            else:471                matched_responses = 0472                total_responses = data["overall"]["interval_real"]["len"]473        # Count data for overall474        else:475            codes = data["overall"]["net_code"]["count"].copy()476            if '0' in codes:477                codes.pop('0')478            matched_responses = self.count_matched_codes(self.codes_regex, codes)479            total_responses = data["overall"]["interval_real"]["len"]480        return matched_responses, total_responses481    def get_rc(self):482        return 29483    def get_level_str(self):484        ''' format level str'''485        if self.is_relative:486            level_str = str(self.level) + "%"487        else:488            level_str = self.level489        return level_str490    def explain(self):491        items = self.get_criterion_parameters()492        explanation = "Not %(code)s codes count higher than %(level)s for %(seconds_limit)ss, since %(since_time)s"\493            % items494        if self.tag:495            explanation = explanation + " for tag %(tag)s" % items496        return explanation497    def get_criterion_parameters(self):498        parameters = {499            'code': self.codes_mask,500            'level': self.get_level_str(),501            'seconds_limit': self.seconds_limit,502            'since_time': self.cause_second[0]["ts"],503            'tag': self.tag504        }505        return parameters506    def widget_explain(self):507        items = self.get_criterion_parameters()508        explanation = "Net not %(code)s>%(level)s for %(seconds_limit)ss" % items509        if self.is_relative:510            return explanation, sum(self.data)511        return explanation, 1.0512class TotalHTTPTrendCriterion(AbstractCriterion):513    ''' HTTP Trend Criterion '''514    @staticmethod515    def get_type_string():516        return 'http_trend'517    def __init__(self, autostop, param_str):518        AbstractCriterion.__init__(self)519        self.seconds_count = 0520        params = param_str.split(',')521        self.codes_mask = params[0].lower()522        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))523        self.autostop = autostop524        self.tangents = deque()525        self.second_window = deque()526        self.total_tan = float()527        self.tangents.append(0)528        self.last = 0529        self.seconds_limit = expand_to_seconds(params[1])530        self.measurement_error = float()531        self.tag = params[2].strip() if len(params) == 3 else None532    def notify(self, data, stat):533        matched_responses = self.parse_data(data)534        self.tangents.append(matched_responses - self.last)535        self.second_window.append((data, stat))536        self.last = matched_responses537        if len(self.tangents) > self.seconds_limit:538            self.tangents.popleft()539            self.second_window.popleft()540        self.measurement_error = self.calc_measurement_error(self.tangents)541        self.total_tan = float(sum(self.tangents) / len(self.tangents))542        logger.debug(543            "Last trend for http codes %s: %.2f +/- %.2f", self.codes_mask,544            self.total_tan, self.measurement_error)545        if self.total_tan + self.measurement_error < 0:546            self.cause_second = self.second_window[0]547            logger.debug(self.explain())548            return True549        return False550    def parse_data(self, data):551        # Count data for specific tag if it's present552        if self.tag:553            if data["tagged"].get(self.tag):554                matched_responses = self.count_matched_codes(555                    self.codes_regex, data["tagged"][self.tag]["proto_code"]["count"])556            # matched_responses=0 if current tag differs from selected one557            else:558                matched_responses = 0559        # Count data for overall if it's present560        else:561            matched_responses = self.count_matched_codes(562                self.codes_regex, data["overall"]["proto_code"]["count"])563        return matched_responses564    def calc_measurement_error(self, tangents):565        '''566        formula for measurement error567        sqrt ( (sum(1, n, (k_i - <k>)**2) / (n*(n-1)))568        '''569        if len(tangents) < 2:570            return 0.0571        avg_tan = float(sum(tangents) / len(tangents))572        numerator = float()573        for i in tangents:574            numerator += (i - avg_tan) * (i - avg_tan)575        return math.sqrt(numerator / len(tangents) / (len(tangents) - 1))576    def get_rc(self):577        return 30578    def explain(self):579        items = self.get_criterion_parameters()580        return "Last trend for %(code)s http codes " \581            "is %(total_tan).2f +/- %(measurement_err).2f for %(seconds_limit)ss, since %(since_time)s" % items582    def get_criterion_parameters(self):583        parameters = {584            'code': self.codes_mask,585            'total_tan': self.total_tan,586            'measurement_err': self.measurement_error,587            'seconds_limit': self.seconds_limit,588            'since_time': self.cause_second[0]["ts"],589            'tag': self.tag590        }591        return parameters592    def widget_explain(self):593        items = self.get_criterion_parameters()594        return "HTTP(%(code)s) trend is %(total_tan).2f +/- %(measurement_err).2f < 0 for %(seconds_limit)ss" % items, 1.0595class QuantileOfSaturationCriterion(AbstractCriterion):596    ''' Quantile of Saturation Criterion597        example: qsat(50ms, 3m, 10%) '''598    @staticmethod599    def get_type_string():600        return 'qsat'601    def __init__(self, autostop, param_str):602        AbstractCriterion.__init__(self)603        raise NotImplementedError604    #     self.autostop = autostop605    #     self.data = deque()606    #     self.second_window = deque()607    #     params = param_str.split(',')...criterions.py
Source:criterions.py  
...49        return rt_total, requests_number50    def get_rc(self):51        return self.RC_TIME52    def explain(self):53        items = self.get_criterion_parameters()54        explanation = "Average response time higher than %(limit)sms for %(seconds_count)ss, since %(since_time)s" % items55        if self.tag:56            explanation = explanation + " for tag %(tag)s" % items57        return explanation58    def get_criterion_parameters(self):59        parameters = {60            'limit': self.rt_limit,61            'seconds_count': self.seconds_count,62            'seconds_limit': self.seconds_limit,63            'since_time': self.cause_second[0]["ts"],64            'tag': self.tag65        }66        return parameters67    def widget_explain(self):68        items = self.get_criterion_parameters()69        return "Avg Time >%(limit)sms for %(seconds_count)s/%(seconds_limit)ss" % items, \70            float(self.seconds_count) / self.seconds_limit71class HTTPCodesCriterion(AbstractCriterion):72    """ HTTP codes criterion """73    @staticmethod74    def get_type_string():75        return 'http'76    def __init__(self, autostop, param_str):77        AbstractCriterion.__init__(self)78        self.seconds_count = 079        params = param_str.split(',')80        self.codes_mask = params[0].lower()81        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))82        self.autostop = autostop83        level_str = params[1].strip()84        if level_str[-1:] == '%':85            self.level = float(level_str[:-1]) / 10086            self.is_relative = True87        else:88            self.level = int(level_str)89            self.is_relative = False90        self.seconds_limit = expand_to_seconds(params[2])91        self.tag = params[3].strip() if len(params) == 4 else None92    def notify(self, data, stat):93        matched_responses, total_responses = self.parse_data(data)94        if self.is_relative:95            if total_responses:96                matched_responses = float(matched_responses) / total_responses97            else:98                matched_responses = 099        logger.debug("HTTP codes matching mask %s: %s/%s", self.codes_mask, matched_responses, self.level)100        if matched_responses >= self.level:101            if not self.seconds_count:102                self.cause_second = (data, stat)103            logger.debug(self.explain())104            self.seconds_count += 1105            self.autostop.add_counting(self)106            if self.seconds_count >= self.seconds_limit:107                return True108        else:109            self.seconds_count = 0110        return False111    def parse_data(self, data):112        # Parse data for specific tag113        if self.tag:114            if data["tagged"].get(self.tag):115                total_responses = data["tagged"][self.tag]["interval_real"]["len"]116                matched_responses = self.count_matched_codes(117                    self.codes_regex, data["tagged"][self.tag]["proto_code"]["count"])118            # matched_responses=0 if current tag differs from selected one119            else:120                matched_responses = 0121                total_responses = data["overall"]["interval_real"]["len"]122        # Parse data for overall123        else:124            matched_responses = self.count_matched_codes(self.codes_regex, data["overall"]["proto_code"]["count"])125            total_responses = data["overall"]["interval_real"]["len"]126        return matched_responses, total_responses127    def get_rc(self):128        return self.RC_HTTP129    def get_level_str(self):130        """ format level str """131        if self.is_relative:132            level_str = str(100 * self.level) + "%"133        else:134            level_str = self.level135        return level_str136    def explain(self):137        items = self.get_criterion_parameters()138        explanation = "%(code)s codes count higher than %(level)s for %(seconds_count)ss, since %(since_time)s" % items139        if self.tag:140            explanation = explanation + " for tag %(tag)s" % items141        return explanation142    def get_criterion_parameters(self):143        parameters = {144            'code': self.codes_mask,145            'level': self.get_level_str(),146            'seconds_count': self.seconds_count,147            'seconds_limit': self.seconds_limit,148            'since_time': self.cause_second[0].get('ts'),149            'tag': self.tag150        }151        return parameters152    def widget_explain(self):153        items = self.get_criterion_parameters()154        return "HTTP %(code)s>%(level)s for %(seconds_count)s/%(seconds_limit)ss" % items, \155            float(self.seconds_count) / self.seconds_limit156class NetCodesCriterion(AbstractCriterion):157    """ Net codes criterion """158    @staticmethod159    def get_type_string():160        return 'net'161    def __init__(self, autostop, param_str):162        AbstractCriterion.__init__(self)163        self.seconds_count = 0164        params = param_str.split(',')165        self.codes_mask = params[0].lower()166        self.codes_regex = re.compile(self.codes_mask.replace("x", '.'))167        self.autostop = autostop168        level_str = params[1].strip()169        if level_str[-1:] == '%':170            self.level = float(level_str[:-1]) / 100171            self.is_relative = True172        else:173            self.level = int(level_str)174            self.is_relative = False175        self.seconds_limit = expand_to_seconds(params[2])176        self.tag = params[3].strip() if len(params) == 4 else None177    def notify(self, data, stat):178        matched_responses, total_responses = self.parse_data(data)179        if self.is_relative:180            if total_responses:181                matched_responses = float(matched_responses) / total_responses182            else:183                matched_responses = 0184        logger.debug("Net codes matching mask %s: %s/%s", self.codes_mask, matched_responses, self.level)185        if matched_responses >= self.level:186            if not self.seconds_count:187                self.cause_second = (data, stat)188            logger.debug(self.explain())189            self.seconds_count += 1190            self.autostop.add_counting(self)191            if self.seconds_count >= self.seconds_limit:192                return True193        else:194            self.seconds_count = 0195        return False196    def parse_data(self, data):197        # Count data for specific tag if it's present198        if self.tag:199            if data["tagged"].get(self.tag):200                total_responses = data["tagged"][self.tag]["interval_real"]["len"]201                code_count = data["tagged"][self.tag]["net_code"]["count"]202                codes = copy.deepcopy(code_count)203                if '0' in codes:204                    codes.pop('0')205                matched_responses = self.count_matched_codes(self.codes_regex, codes)206            # matched_responses=0 if current tag differs from selected one207            else:208                matched_responses = 0209                total_responses = data["overall"]["interval_real"]["len"]210        # Count data for overall211        else:212            code_count = data["overall"]["net_code"]["count"]213            total_responses = data["overall"]["interval_real"]["len"]214            codes = copy.deepcopy(code_count)215            if '0' in codes:216                codes.pop('0')217            matched_responses = self.count_matched_codes(self.codes_regex, codes)218        return matched_responses, total_responses219    def get_rc(self):220        return self.RC_NET221    def get_level_str(self):222        """ format level str """223        if self.is_relative:224            level_str = str(100 * self.level) + "%"225        else:226            level_str = self.level227        return level_str228    def explain(self):229        items = self.get_criterion_parameters()230        explanation = "%(code)s net codes count higher than %(level)s for %(seconds_count)ss, since %(since_time)s" \231                      % items232        if self.tag:233            explanation = explanation + " for tag %(tag)s" % items234        return explanation235    def get_criterion_parameters(self):236        parameters = {237            'code': self.codes_mask,238            'level': self.get_level_str(),239            'seconds_count': self.seconds_count,240            'seconds_limit': self.seconds_limit,241            'since_time': self.cause_second[0].get("ts"),242            'tag': self.tag243        }244        return parameters245    def widget_explain(self):246        items = self.get_criterion_parameters()247        return "Net %(code)s>%(level)s for %(seconds_count)s/%(seconds_limit)ss" % items, \248            float(self.seconds_count) / self.seconds_limit249class QuantileCriterion(AbstractCriterion):250    """ quantile criterion """251    @staticmethod252    def get_type_string():253        return 'quantile'254    def __init__(self, autostop, param_str):255        AbstractCriterion.__init__(self)256        self.seconds_count = 0257        params = param_str.split(',')258        self.quantile = float(params[0])259        self.rt_limit = expand_to_milliseconds(params[1])260        self.seconds_limit = expand_to_seconds(params[2])261        self.autostop = autostop262        self.tag = params[3].strip() if len(params) == 4 else None263    def notify(self, data, stat):264        quantiles = self.parse_data(data)265        logger.debug('Autostop quantiles for ts %s: %s', data['ts'], quantiles)266        if self.quantile not in quantiles.keys():267            logger.warning("No quantile %s in %s", self.quantile, quantiles)268        if self.quantile in quantiles.keys() and quantiles[self.quantile] / 1000.0 > self.rt_limit:269            if not self.seconds_count:270                self.cause_second = (data, stat)271            self.seconds_count += 1272            logger.debug(self.explain())273            self.autostop.add_counting(self)274            if self.seconds_count >= self.seconds_limit:275                return True276        else:277            self.seconds_count = 0278        return False279    def parse_data(self, data):280        # Parse data for specific tag281        if self.tag:282            if data["tagged"].get(self.tag):283                quantile_values = data["tagged"][self.tag]["interval_real"]["q"]["value"]284            # quantile_values empty if current tag differs from selected one285            else:286                quantile_values = []287        # Parse data for overall288        else:289            quantile_values = data["overall"]["interval_real"]["q"]["value"]290        quantiles = dict(zip(data["overall"]["interval_real"]["q"]["q"], quantile_values))291        return quantiles292    def get_rc(self):293        return self.RC_TIME294    def explain(self):295        items = self.get_criterion_parameters()296        explanation = "Percentile %(percentile)s higher than %(limit)sms for %(seconds_count)ss, since %(since_time)s" \297            % items298        if self.tag:299            explanation = explanation + " for tag %(tag)s" % items300        return explanation301    def get_criterion_parameters(self):302        parameters = {303            'percentile': self.quantile,304            'limit': self.rt_limit,305            'seconds_count': self.seconds_count,306            'seconds_limit': self.seconds_limit,307            'since_time': self.cause_second[0].get("ts"),308            'tag': self.tag309        }310        return parameters311    def widget_explain(self):312        items = self.get_criterion_parameters()313        return "%(percentile)s%% >%(limit)sms for %(seconds_count)s/%(seconds_limit)ss" % items, \314            float(self.seconds_count) / self.seconds_limit315class SteadyCumulativeQuantilesCriterion(AbstractCriterion):316    """ quantile criterion """317    @staticmethod318    def get_type_string():319        return 'steady_cumulative'320    def __init__(self, autostop, param_str):321        raise NotImplementedError322        AbstractCriterion.__init__(self)323        self.seconds_count = 0324        self.quantile_hash = ""325        self.seconds_limit = expand_to_seconds(param_str.split(',')[0])326        self.autostop = autostop327    def notify(self, data, stat):328        quantiles = dict(329            zip(data["overall"]["q"]["q"], data["overall"]["q"]["values"]))330        quantile_hash = json.dumps(quantiles)331        logging.debug("Cumulative quantiles hash: %s", quantile_hash)332        if self.quantile_hash == quantile_hash:333            if not self.seconds_count:334                self.cause_second = (data, stat)335            logger.debug(self.explain())336            self.seconds_count += 1337            self.autostop.add_counting(self)338            if self.seconds_count >= self.seconds_limit:339                return True340        else:341            self.seconds_count = 0342        self.quantile_hash = quantile_hash343        return False344    def get_rc(self):345        return self.RC_STEADY346    def explain(self):347        items = self.get_criterion_parameters()348        return "Cumulative percentiles are steady for %ss, since %s" % items349    def get_criterion_parameters(self):350        parameters = (351            self.seconds_count, self.cause_second[0]["ts"])352        return parameters353    def widget_explain(self):354        items = (self.seconds_count, self.seconds_limit)355        return "Steady for %s/%ss" % items, float(356            self.seconds_count) / self.seconds_limit357class TimeLimitCriterion(AbstractCriterion):358    """ time limit criterion """359    @staticmethod360    def get_type_string():361        return 'limit'362    def __init__(self, autostop, param_str):363        AbstractCriterion.__init__(self)364        self.start_time = time.time()365        self.end_time = time.time()366        self.time_limit = expand_to_seconds(param_str)367    def notify(self, data, stat):368        self.end_time = time.time()369        return (self.end_time - self.start_time) > self.time_limit370    def get_rc(self):371        return self.RC_TIME372    def explain(self):373        return "Test time elapsed. Limit: %(limit)ss, actual time: %(actual)ss" % self.get_criterion_parameters()374    def get_criterion_parameters(self):375        parameters = {376            'limit': self.time_limit,377            'actual': self.end_time - self.start_time378        }379        return parameters380    def widget_explain(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
