Best Python code snippet using pytest-benchmark
terminal.py
Source:terminal.py  
...169        else:170            fill = ''171        line = str(line)172        self._tw.write("\r" + line + fill, **markup)173    def write_sep(self, sep, title=None, **markup):174        self.ensure_newline()175        self._tw.sep(sep, title, **markup)176    def section(self, title, sep="=", **kw):177        self._tw.sep(sep, title, **kw)178    def line(self, msg, **kw):179        self._tw.line(msg, **kw)180    def pytest_internalerror(self, excrepr):181        for line in py.builtin.text(excrepr).split("\n"):182            self.write_line("INTERNALERROR> " + line)183        return 1184    def pytest_logwarning(self, code, fslocation, message, nodeid):185        warnings = self.stats.setdefault("warnings", [])186        warning = WarningReport(code=code, fslocation=fslocation,187                                message=message, nodeid=nodeid)188        warnings.append(warning)189    def pytest_plugin_registered(self, plugin):190        if self.config.option.traceconfig:191            msg = "PLUGIN registered: %s" % (plugin,)192            # XXX this event may happen during setup/teardown time193            #     which unfortunately captures our output here194            #     which garbles our output if we use self.write_line195            self.write_line(msg)196    def pytest_deselected(self, items):197        self.stats.setdefault('deselected', []).extend(items)198    def pytest_runtest_logstart(self, nodeid, location):199        # ensure that the path is printed before the200        # 1st test of a module starts running201        if self.showlongtestinfo:202            line = self._locationline(nodeid, *location)203            self.write_ensure_prefix(line, "")204        elif self.showfspath:205            fsid = nodeid.split("::")[0]206            self.write_fspath_result(fsid, "")207    def pytest_runtest_logreport(self, report):208        rep = report209        res = self.config.hook.pytest_report_teststatus(report=rep)210        cat, letter, word = res211        self.stats.setdefault(cat, []).append(rep)212        self._tests_ran = True213        if not letter and not word:214            # probably passed setup/teardown215            return216        if self.verbosity <= 0:217            if not hasattr(rep, 'node') and self.showfspath:218                self.write_fspath_result(rep.nodeid, letter)219            else:220                self._tw.write(letter)221        else:222            if isinstance(word, tuple):223                word, markup = word224            else:225                if rep.passed:226                    markup = {'green': True}227                elif rep.failed:228                    markup = {'red': True}229                elif rep.skipped:230                    markup = {'yellow': True}231            line = self._locationline(rep.nodeid, *rep.location)232            if not hasattr(rep, 'node'):233                self.write_ensure_prefix(line, word, **markup)234                # self._tw.write(word, **markup)235            else:236                self.ensure_newline()237                if hasattr(rep, 'node'):238                    self._tw.write("[%s] " % rep.node.gateway.id)239                self._tw.write(word, **markup)240                self._tw.write(" " + line)241                self.currentfspath = -2242    def pytest_collection(self):243        if not self.isatty and self.config.option.verbose >= 1:244            self.write("collecting ... ", bold=True)245    def pytest_collectreport(self, report):246        if report.failed:247            self.stats.setdefault("error", []).append(report)248        elif report.skipped:249            self.stats.setdefault("skipped", []).append(report)250        items = [x for x in report.result if isinstance(x, pytest.Item)]251        self._numcollected += len(items)252        if self.isatty:253            # self.write_fspath_result(report.nodeid, 'E')254            self.report_collect()255    def report_collect(self, final=False):256        if self.config.option.verbose < 0:257            return258        errors = len(self.stats.get('error', []))259        skipped = len(self.stats.get('skipped', []))260        if final:261            line = "collected "262        else:263            line = "collecting "264        line += str(self._numcollected) + " item" + ('' if self._numcollected == 1 else 's')265        if errors:266            line += " / %d errors" % errors267        if skipped:268            line += " / %d skipped" % skipped269        if self.isatty:270            self.rewrite(line, bold=True, erase=True)271            if final:272                self.write('\n')273        else:274            self.write_line(line)275    def pytest_collection_modifyitems(self):276        self.report_collect(True)277    @pytest.hookimpl(trylast=True)278    def pytest_sessionstart(self, session):279        self._sessionstarttime = time.time()280        if not self.showheader:281            return282        self.write_sep("=", "test session starts", bold=True)283        verinfo = platform.python_version()284        msg = "platform %s -- Python %s" % (sys.platform, verinfo)285        if hasattr(sys, 'pypy_version_info'):286            verinfo = ".".join(map(str, sys.pypy_version_info[:3]))287            msg += "[pypy-%s-%s]" % (verinfo, sys.pypy_version_info[3])288        msg += ", pytest-%s, py-%s, pluggy-%s" % (289               pytest.__version__, py.__version__, pluggy.__version__)290        if self.verbosity > 0 or self.config.option.debug or \291           getattr(self.config.option, 'pastebin', None):292            msg += " -- " + str(sys.executable)293        self.write_line(msg)294        lines = self.config.hook.pytest_report_header(295            config=self.config, startdir=self.startdir)296        self._write_report_lines_from_hooks(lines)297    def _write_report_lines_from_hooks(self, lines):298        lines.reverse()299        for line in flatten(lines):300            self.write_line(line)301    def pytest_report_header(self, config):302        inifile = ""303        if config.inifile:304            inifile = " " + config.rootdir.bestrelpath(config.inifile)305        lines = ["rootdir: %s, inifile:%s" % (config.rootdir, inifile)]306        plugininfo = config.pluginmanager.list_plugin_distinfo()307        if plugininfo:308            lines.append(309                "plugins: %s" % ", ".join(_plugin_nameversions(plugininfo)))310        return lines311    def pytest_collection_finish(self, session):312        if self.config.option.collectonly:313            self._printcollecteditems(session.items)314            if self.stats.get('failed'):315                self._tw.sep("!", "collection failures")316                for rep in self.stats.get('failed'):317                    rep.toterminal(self._tw)318                return 1319            return 0320        lines = self.config.hook.pytest_report_collectionfinish(321            config=self.config, startdir=self.startdir, items=session.items)322        self._write_report_lines_from_hooks(lines)323    def _printcollecteditems(self, items):324        # to print out items and their parent collectors325        # we take care to leave out Instances aka ()326        # because later versions are going to get rid of them anyway327        if self.config.option.verbose < 0:328            if self.config.option.verbose < -1:329                counts = {}330                for item in items:331                    name = item.nodeid.split('::', 1)[0]332                    counts[name] = counts.get(name, 0) + 1333                for name, count in sorted(counts.items()):334                    self._tw.line("%s: %d" % (name, count))335            else:336                for item in items:337                    nodeid = item.nodeid338                    nodeid = nodeid.replace("::()::", "::")339                    self._tw.line(nodeid)340            return341        stack = []342        indent = ""343        for item in items:344            needed_collectors = item.listchain()[1:]  # strip root node345            while stack:346                if stack == needed_collectors[:len(stack)]:347                    break348                stack.pop()349            for col in needed_collectors[len(stack):]:350                stack.append(col)351                # if col.name == "()":352                #    continue353                indent = (len(stack) - 1) * "  "354                self._tw.line("%s%s" % (indent, col))355    @pytest.hookimpl(hookwrapper=True)356    def pytest_sessionfinish(self, exitstatus):357        outcome = yield358        outcome.get_result()359        self._tw.line("")360        summary_exit_codes = (361            EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, EXIT_USAGEERROR,362            EXIT_NOTESTSCOLLECTED)363        if exitstatus in summary_exit_codes:364            self.config.hook.pytest_terminal_summary(terminalreporter=self,365                                                     exitstatus=exitstatus)366            self.summary_errors()367            self.summary_failures()368            self.summary_warnings()369            self.summary_passes()370        if exitstatus == EXIT_INTERRUPTED:371            self._report_keyboardinterrupt()372            del self._keyboardinterrupt_memo373        self.summary_deselected()374        self.summary_stats()375    def pytest_keyboard_interrupt(self, excinfo):376        self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)377    def pytest_unconfigure(self):378        if hasattr(self, '_keyboardinterrupt_memo'):379            self._report_keyboardinterrupt()380    def _report_keyboardinterrupt(self):381        excrepr = self._keyboardinterrupt_memo382        msg = excrepr.reprcrash.message383        self.write_sep("!", msg)384        if "KeyboardInterrupt" in msg:385            if self.config.option.fulltrace:386                excrepr.toterminal(self._tw)387            else:388                self._tw.line("to show a full traceback on KeyboardInterrupt use --fulltrace", yellow=True)389                excrepr.reprcrash.toterminal(self._tw)390    def _locationline(self, nodeid, fspath, lineno, domain):391        def mkrel(nodeid):392            line = self.config.cwd_relative_nodeid(nodeid)393            if domain and line.endswith(domain):394                line = line[:-len(domain)]395                values = domain.split("[")396                values[0] = values[0].replace('.', '::')  # don't replace '.' in params397                line += "[".join(values)398            return line399        # collect_fspath comes from testid which has a "/"-normalized path400        if fspath:401            res = mkrel(nodeid).replace("::()", "")  # parens-normalization402            if nodeid.split("::")[0] != fspath.replace("\\", nodes.SEP):403                res += " <- " + self.startdir.bestrelpath(fspath)404        else:405            res = "[location]"406        return res + " "407    def _getfailureheadline(self, rep):408        if hasattr(rep, 'location'):409            fspath, lineno, domain = rep.location410            return domain411        else:412            return "test session"  # XXX?413    def _getcrashline(self, rep):414        try:415            return str(rep.longrepr.reprcrash)416        except AttributeError:417            try:418                return str(rep.longrepr)[:50]419            except AttributeError:420                return ""421    #422    # summaries for sessionfinish423    #424    def getreports(self, name):425        values = []426        for x in self.stats.get(name, []):427            if not hasattr(x, '_pdbshown'):428                values.append(x)429        return values430    def summary_warnings(self):431        if self.hasopt("w"):432            all_warnings = self.stats.get("warnings")433            if not all_warnings:434                return435            grouped = itertools.groupby(all_warnings, key=lambda wr: wr.get_location(self.config))436            self.write_sep("=", "warnings summary", yellow=True, bold=False)437            for location, warnings in grouped:438                self._tw.line(str(location) or '<undetermined location>')439                for w in warnings:440                    lines = w.message.splitlines()441                    indented = '\n'.join('  ' + x for x in lines)442                    self._tw.line(indented)443                self._tw.line()444            self._tw.line('-- Docs: http://doc.pytest.org/en/latest/warnings.html')445    def summary_passes(self):446        if self.config.option.tbstyle != "no":447            if self.hasopt("P"):448                reports = self.getreports('passed')449                if not reports:450                    return451                self.write_sep("=", "PASSES")452                for rep in reports:453                    msg = self._getfailureheadline(rep)454                    self.write_sep("_", msg)455                    self._outrep_summary(rep)456    def print_teardown_sections(self, rep):457        for secname, content in rep.sections:458            if 'teardown' in secname:459                self._tw.sep('-', secname)460                if content[-1:] == "\n":461                    content = content[:-1]462                self._tw.line(content)463    def summary_failures(self):464        if self.config.option.tbstyle != "no":465            reports = self.getreports('failed')466            if not reports:467                return468            self.write_sep("=", "FAILURES")469            for rep in reports:470                if self.config.option.tbstyle == "line":471                    line = self._getcrashline(rep)472                    self.write_line(line)473                else:474                    msg = self._getfailureheadline(rep)475                    markup = {'red': True, 'bold': True}476                    self.write_sep("_", msg, **markup)477                    self._outrep_summary(rep)478                    for report in self.getreports(''):479                        if report.nodeid == rep.nodeid and report.when == 'teardown':480                            self.print_teardown_sections(report)481    def summary_errors(self):482        if self.config.option.tbstyle != "no":483            reports = self.getreports('error')484            if not reports:485                return486            self.write_sep("=", "ERRORS")487            for rep in self.stats['error']:488                msg = self._getfailureheadline(rep)489                if not hasattr(rep, 'when'):490                    # collect491                    msg = "ERROR collecting " + msg492                elif rep.when == "setup":493                    msg = "ERROR at setup of " + msg494                elif rep.when == "teardown":495                    msg = "ERROR at teardown of " + msg496                self.write_sep("_", msg)497                self._outrep_summary(rep)498    def _outrep_summary(self, rep):499        rep.toterminal(self._tw)500        for secname, content in rep.sections:501            self._tw.sep("-", secname)502            if content[-1:] == "\n":503                content = content[:-1]504            self._tw.line(content)505    def summary_stats(self):506        session_duration = time.time() - self._sessionstarttime507        (line, color) = build_summary_stats_line(self.stats)508        msg = "%s in %.2f seconds" % (line, session_duration)509        markup = {color: True, 'bold': True}510        if self.verbosity >= 0:511            self.write_sep("=", msg, **markup)512        if self.verbosity == -1:513            self.write_line(msg, **markup)514    def summary_deselected(self):515        if 'deselected' in self.stats:516            self.write_sep("=", "%d tests deselected" % (517                len(self.stats['deselected'])), bold=True)518def repr_pythonversion(v=None):519    if v is None:520        v = sys.version_info521    try:522        return "%s.%s.%s-%s-%s" % v523    except (TypeError, ValueError):524        return str(v)525def flatten(values):526    for x in values:527        if isinstance(x, (list, tuple)):528            for y in flatten(x):529                yield y530        else:...cross_validation.py
Source:cross_validation.py  
1# coding=utf-82""""3    Cross Validation fo Recommender Algorithms4"""5# © 2018. Case Recommender (MIT License)6from collections import defaultdict7import numpy as np8import shutil9from caserec.utils.split_database import SplitDatabase10__author__ = 'removed for double blind review'11class CrossValidation(object):12    def __init__(self, input_file, recommender, dir_folds, k_folds=10, header=None, sep='\t', write_predictions=False,13                 write_sep='\t', recommender_verbose=False, evaluation_in_fold_verbose=True, metrics=None,14                 as_table=False, table_sep='\t', del_folds=False, random_seed=None):15        """16        Cross Validation17        This strategy is responsible to divide the database in K folds, in which each fold contain a train and a test18        set. Its also responsible to run and evaluate the recommender results in each fold and calculate the mean and19        the standard deviation.20        Usage:21            >> rec = MostPopular(as_binary=True)22            >> CrossValidation(db, rec, fold_d, evaluation_in_fold_verbose=False).compute()23        :param input_file: Database file24        :type input_file: str25        :param recommender: Initialize the recommender algorithm. e.g.: MostPopular(as_binary=True)26        :type recommender: class27        :param dir_folds: Directory to write folds (train and test files)28        :type dir_folds: str29        :param k_folds: How much folds the strategy will divide30        :type k_folds: int, default 1031        :param header: Skip header line32        :type header: int, default None33        :param sep: Delimiter for input files34        :type sep: str, default '\t'35        :param write_predictions: Write the recommender predictions in each fold36        :type write_predictions: bool, default False37        :param write_sep: Delimiter for output files38        :type write_sep: str, default '\t'39        :param recommender_verbose: Print header of recommender in each fold40        :type recommender_verbose: bool, default False41        :param evaluation_in_fold_verbose: Print evaluation of recommender in each fold42        :type evaluation_in_fold_verbose: bool, default True43        :param metrics: List of evaluation metrics44        :type metrics: str, default None45        :param as_table: Print the evaluation results as table46        :type as_table: bool, default False47        :param table_sep: Delimiter for print results (only work with verbose=True and as_table=True)48        :type table_sep: str, default '\t'49        :param del_folds: Delete folds after evaluation50        :type del_folds: bool, default False51        :param random_seed: Random seed52        :type random_seed: int, default None53        """54        self.input_file = input_file55        self.recommender = recommender56        self.dir_folds = dir_folds57        self.k_folds = k_folds58        self.header = header59        self.sep = sep60        self.write_predictions = write_predictions61        self.write_sep = write_sep62        self.recommender_verbose = recommender_verbose63        self.evaluation_in_fold_verbose = evaluation_in_fold_verbose64        self.metrics = metrics65        self.as_table = as_table66        self.table_sep = table_sep67        self.del_folds = del_folds68        self.random_seed = random_seed69        # internal vars70        self.folds_results = defaultdict(list)71    def generate_folds(self):72        """73        Method to generate folds with k fold cross validation74        """75        SplitDatabase(input_file=self.input_file, n_splits=self.k_folds, dir_folds=self.dir_folds,76                      sep_read=self.sep, header=self.header).kfoldcrossvalidation(random_state=self.random_seed)77    def execute_algorithm(self):78        """79        Method to run recommender algorithm in k folds80        """81        for k in range(self.k_folds):82            train_file = self.dir_folds + 'folds/%d/train.dat' % k83            test_file = self.dir_folds + 'folds/%d/test.dat' % k84            self.recommender.train_file = train_file85            self.recommender.test_file = test_file86            if self.write_predictions:87                output_file = self.dir_folds + 'folds/%d/output.dat' % k88                self.recommender.output_file = output_file89            self.recommender.compute(verbose=self.recommender_verbose,90                                     verbose_evaluation=self.evaluation_in_fold_verbose, metrics=self.metrics)91            if self.metrics is None:92                self.metrics = self.recommender.evaluation_results.keys()93            for metric in self.metrics:94                self.folds_results[metric.upper()].append(self.recommender.evaluation_results[metric.upper()])95    def evaluate(self, verbose=True):96        """97        Method to evaluate folds results and generate mean and standard deviation98        :param verbose: If True, print evaluation results99        :type verbose: bool, default True100        """101        mean_dict = defaultdict(dict)102        std_dict = defaultdict(dict)103        for metric in self.metrics:104            mean_dict[metric.upper()] = np.mean(self.folds_results[metric.upper()])105            std_dict[metric.upper()] = np.std(self.folds_results[metric.upper()])106        if verbose:107            if self.as_table:108                header = ''109                values_mean = ''110                values_std = ''111                for metric in self.metrics:112                    header += metric.upper() + self.table_sep113                    values_mean += str(round(mean_dict[metric.upper()], 6)) + self.table_sep114                    values_std += str(round(std_dict[metric.upper()], 6)) + self.table_sep115                print('Metric%s%s' % (self.table_sep, header))116                print('Mean%s%s' % (self.table_sep, values_mean))117                print('STD%s%s' % (self.table_sep, values_std))118            else:119                evaluation_mean = 'Mean:: '120                evaluation_std = 'STD:: '121                for metrics in self.metrics:122                    evaluation_mean += "%s: %.6f " % (metrics.upper(), mean_dict[metrics.upper()])123                    evaluation_std += "%s: %.6f " % (metrics.upper(), std_dict[metrics.upper()])124                print(evaluation_mean)125                print(evaluation_std)126    def erase_folds(self):127        """128        Method to delete folds after evaluation129        """130        folds = self.dir_folds + 'folds/'131        shutil.rmtree(folds)132    def compute(self, verbose=True):133        """134        Method to run the cross validation135        :param verbose: If True, print header136        :type verbose: bool, default True137        """138        if verbose:139            print("[Case Recommender: Cross Validation]\n")140            print("Database:: %s \nRecommender Algorithm:: %s | K Folds: %d\n" % (self.input_file,141                                                                                  self.recommender.recommender_name,142                                                                                  self.k_folds))143        self.generate_folds()144        self.execute_algorithm()145        self.evaluate(verbose)146        if self.del_folds:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
