Best Python code snippet using slash
evaluator.py
Source:evaluator.py  
...25        try:26            dictionary_ids = dictionaries if dictionaries else self.lexonomy_client.dictionaries()["dictionaries"]27        except HTTPError as error:28            report["available"] = False29            self._add_errors(report, [str(error)])30        #sys.stderr.write(f'Evaluating {len(dictionary_ids):d} dictionaries\n')31        return self.loop_dictionary_retrieval(dictionary_ids, limit, report)32    def loop_dictionary_retrieval(self, dictionary_ids, limit, report):33        if limit == -1:34            limit = len(dictionary_ids)35        count = 036        dicts = []37        for dictionary_id in dictionary_ids:38            try:39                if count < limit:40                    #sys.stderr.write(f"Loading Metadata of {dictionary_id} \n")41                    metadata = Metadata(self.lexonomy_client.about(dictionary_id))42                    dictionary = Dictionary(dictionary_id, metadata)43                    dicts.append(dictionary)44                count += 145            except HTTPError as error:46                self._add_errors(report, [str(error)])47                report["available"] = False48                #sys.stderr.write(f'Failed loading {dictionary_id} dictionary \n')49        return dicts, report50    def evaluate_metadata(self, dictionaries: [Dictionary]) -> dict:51        report = {}52        for dictionary in dictionaries:53            #sys.stderr.write(f'Evaluating {dictionary}')54            #sys.stderr.flush()55            metadata_report = {}56            metadata = dictionary.metadata57            if dictionary.metadata.errors:58                metadata_report['errors'] = dictionary.metadata.errors59            for metadata_evaluator in self.metadata_metrics_evaluators:60                #sys.stderr.write(str(metadata_evaluator))61                #sys.stderr.flush()62                metadata_evaluator.analyze(metadata)63                metadata_report.update(metadata_evaluator.result())64            report[dictionary.id] = {'metadata_report': metadata_report}65            #sys.stderr.write(str(metadata_report))66            #sys.stderr.flush()67        return report68    def evaluate_entries(self, dictionaries: [Dictionary], max_entries=None) -> dict:69        if max_entries is None:70            max_entries = 10071        report = {}72        for dictionary in dictionaries:73            entry_report = {}74            entry_counter = max_entries if max_entries is not None else dictionary.metadata.entry_count75            self._loop_entries_endpoint(dictionary, entry_report, entry_counter)76            #sys.stderr.write("\n")77            self._collect_entry_metrics(entry_report, self.entry_metrics_evaluators)78            report[dictionary.id] = {'entry_report': entry_report}79        return report80    @staticmethod81    def entry_evaluation_report_as_dataframe(report: dict):82        return pd.DataFrame.from_dict({i: report['dictionaries'][i]['entry_report']83                                       for i in report['dictionaries'].keys()},84                                      orient='index')85    @staticmethod86    def metadata_evaluation_report_as_dataframe(report: dict):87        return pd.DataFrame.from_dict({i: report['dictionaries'][i]['metadata_report']88                                       for i in report['dictionaries'].keys()},89                                      orient='index', dtype=object)90    @staticmethod91    def entry_report(dictionary_id, report: dict):92        return report[dictionary_id]['entry_report']93    def visualize(self, final_report):94        dataframe = self.entry_evaluation_report_as_dataframe(final_report).drop('errors', axis=1)95        dataframe = dataframe.apply(lambda x: x / x.max(), axis=0)96        dataframe['dict_type'] = 097        parallel_coordinates(dataframe, "dict_type", axvlines=True)98        plt.show()99    def aggregated_evaluation(self, report: dict):100        df = self.metadata_evaluation_report_as_dataframe(report)101        if Vocabulary.SIZE_OF_DICTIONARY in df:102            report[Vocabulary.AGGREGATION_METRICS] = {103                Vocabulary.DICTIONARY_SIZE: {104                    'min': float(df[Vocabulary.SIZE_OF_DICTIONARY].min()),105                    'max': float(df[Vocabulary.SIZE_OF_DICTIONARY].max()),106                    'mean': float(df[Vocabulary.SIZE_OF_DICTIONARY].mean()),107                    'median': float(df[Vocabulary.SIZE_OF_DICTIONARY].median())108                }109            }110        return report111    def _loop_entries_endpoint(self, dictionary, entry_report, max_entries, entries_limit=100):112        entries_offset = 0113        while entries_offset <= max_entries:114            try:115                entries = self.lexonomy_client.list(dictionary.id, limit=entries_limit, offset=entries_offset)116            except HTTPError as error:117                self._add_errors(entry_report, [str(error)])118            except JSONDecodeError as error:119                self._add_errors(entry_report, [str(error)])120            except RequestException as error:121                self._add_errors(entry_report, [str(error)])122            if not entries:123                break124            entries_offset = self._handle_entries(dictionary, entries, entry_report, max_entries, entries_offset)125            #sys.stderr.write(str(entries_offset) + '...')126            #sys.stderr.flush()127            if len(entries) < entries_limit:128                break129    @staticmethod130    def _collect_entry_metrics(entry_report, entry_metrics_evaluators: [EntryMetric]):131        for entry_metric in entry_metrics_evaluators:132            if entry_metric.result():133                #sys.stderr.write(str(entry_metric))134                #sys.stderr.write(str(entry_metric.result()))135                #sys.stderr.write("\n")136                #sys.stderr.flush()137                entry_report.update(entry_metric.result())138            entry_metric.reset()139    def _handle_entries(self, dictionary, entries, entry_report, max_entries, entries_offset):140        for entry in entries:141            entries_offset += 1142            if entries_offset > max_entries:143                break144            try:145                entry = Entry(entry)146                if entry.errors:147                    self._add_errors(entry_report, entry.errors)148                else:149                    self._entry_report(dictionary.id, entry_report, entry)150            except HTTPError:151                self._add_errors(entry_report, f'Failed to retrieve lemmas for dictionary {dictionary.id}')152            except ParseError as parse_error:153                self._add_errors(entry_report, [str(parse_error)])154            except JSONDecodeError as json_decode_error:155                self._add_errors(entry_report, [str(json_decode_error)])156            except RequestException as json_decode_error:157                self._add_errors(entry_report, [str(json_decode_error)])158        return entries_offset159    def evaluation_report(self, dictionary_report:dict, entry_report: dict, metadata_report: dict):160        for key in entry_report.keys():161            if key not in dictionary_report['dictionaries']:162                dictionary_report['dictionaries'][key] = {'entry_report': {}, 'metadata_report': {}}163            dictionary_report['dictionaries'][key]['entry_report'] = entry_report[key]['entry_report']164        for key in metadata_report.keys():165            if key not in dictionary_report['dictionaries']:166                dictionary_report['dictionaries'][key] = {'entry_report': {}, 'metadata_report': {}}167            dictionary_report['dictionaries'][key]['metadata_report'] = metadata_report[key]['metadata_report']168        return dictionary_report169    def _entry_report(self, dictionary_id: str, entry_report: dict, entry: Entry):170        retrieved_entry: JsonEntry = self._retrieve_entry(dictionary_id, entry, entry_report)171        if retrieved_entry is not None:172            if retrieved_entry.errors:173                self._add_errors(entry_report, retrieved_entry.errors)174            self._run_entry_metrics_evaluators(retrieved_entry, entry)175    def _retrieve_entry(self, dictionary_id, entry: Entry, entry_report: dict) -> JsonEntry:176        if "json" in entry.formats:177            try:178                return JsonEntry(self.lexonomy_client.json(dictionary_id, entry.id))179            except JSONDecodeError as jde:180                raise JSONDecodeError(f"Error parsing json response {entry.id}: {str(jde)}")181        elif "tei" in entry.formats:182            tei_entry = self.lexonomy_client.tei(dictionary_id, entry.id)183            try:184                tei_entry_element = validate_tei(tei_entry)185                return JsonEntry.from_tei_entry(tei_entry_element, entry.id)186            except ParseError as pe:187                raise ParseError(f"Error with entry {entry.id}: {str(pe)}")188        elif "ontolex" in entry.formats:189            ontolex_entry = self.lexonomy_client.ontolex(dictionary_id, entry.id)190            #try:191            ontolex_entry_element = validate_ontolex(ontolex_entry)192            return JsonEntry.from_ontolex_entry(ontolex_entry_element, entry.id)193            #except Exception as error:194            #    raise error195        else:196            self._add_errors(entry_report, ["Entry has no supported formats"])197            return None198    def _run_entry_metrics_evaluators(self, entry_details, entry_metadata):199        for entry_metric in self.entry_metrics_evaluators:200            entry_metric.accumulate(entry_details, entry_metadata)201    def _prepare_report(self, dictionary):202        if dictionary.id not in self.report['dictionaries']:203            self.report['dictionaries'][dictionary.id] = {'entry_report': {}, 'metadata_report': {}}204    def _add_entry_report(self, dictionary, entry_report):205        self.report['dictionaries'][dictionary.id]['entry_report'] = entry_report206    def _add_errors(self, entry_report, errors):207        if "errors" not in entry_report:208            entry_report["errors"] = []...test_evaluate.py
Source:test_evaluate.py  
1# nuScenes dev-kit.2# Code written by Holger Caesar, 2019.3import json4import os5import random6import shutil7import sys8import unittest9from typing import Dict, Optional, Any10import numpy as np11from tqdm import tqdm12from nuscenes import NuScenes13from nuscenes.eval.common.config import config_factory14from nuscenes.eval.tracking.constants import TRACKING_NAMES15from nuscenes.eval.tracking.evaluate import TrackingEval16from nuscenes.eval.tracking.utils import category_to_tracking_name17from nuscenes.utils.splits import create_splits_scenes18class TestMain(unittest.TestCase):19    res_mockup = 'nusc_eval.json'20    res_eval_folder = 'tmp'21    def tearDown(self):22        if os.path.exists(self.res_mockup):23            os.remove(self.res_mockup)24        if os.path.exists(self.res_eval_folder):25            shutil.rmtree(self.res_eval_folder)26    @staticmethod27    def _mock_submission(nusc: NuScenes,28                         split: str,29                         add_errors: bool = False) -> Dict[str, dict]:30        """31        Creates "reasonable" submission (results and metadata) by looping through the mini-val set, adding 1 GT32        prediction per sample. Predictions will be permuted randomly along all axes.33        :param nusc: NuScenes instance.34        :param split: Dataset split to use.35        :param add_errors: Whether to use GT or add errors to it.36        """37        def random_class(category_name: str, _add_errors: bool = False) -> Optional[str]:38            # Alter 10% of the valid labels.39            class_names = sorted(TRACKING_NAMES)40            tmp = category_to_tracking_name(category_name)41            if tmp is None:42                return None43            else:44                if not _add_errors or np.random.rand() < .9:45                    return tmp46                else:47                    return class_names[np.random.randint(0, len(class_names) - 1)]48        def random_id(instance_token: str, _add_errors: bool = False) -> str:49            # Alter 10% of the valid ids to be a random string, which hopefully corresponds to a new track.50            if not _add_errors or np.random.rand() < .9:51                _tracking_id = instance_token + '_pred'52            else:53                _tracking_id = str(np.random.randint(0, sys.maxsize))54            return _tracking_id55        mock_meta = {56            'use_camera': False,57            'use_lidar': True,58            'use_radar': False,59            'use_map': False,60            'use_external': False,61        }62        mock_results = {}63        # Get all samples in the current evaluation split.64        splits = create_splits_scenes()65        val_samples = []66        for sample in nusc.sample:67            if nusc.get('scene', sample['scene_token'])['name'] in splits[split]:68                val_samples.append(sample)69        # Prepare results.70        instance_to_score = dict()71        for sample in tqdm(val_samples, leave=False):72            sample_res = []73            for ann_token in sample['anns']:74                ann = nusc.get('sample_annotation', ann_token)75                translation = np.array(ann['translation'])76                size = np.array(ann['size'])77                rotation = np.array(ann['rotation'])78                velocity = nusc.box_velocity(ann_token)[:2]79                tracking_id = random_id(ann['instance_token'], _add_errors=add_errors)80                tracking_name = random_class(ann['category_name'], _add_errors=add_errors)81                # Skip annotations for classes not part of the detection challenge.82                if tracking_name is None:83                    continue84                # Skip annotations with 0 lidar/radar points.85                num_pts = ann['num_lidar_pts'] + ann['num_radar_pts']86                if num_pts == 0:87                    continue88                # If we randomly assign a score in [0, 1] to each box and later average over the boxes in the track,89                # the average score will be around 0.5 and we will have 0 predictions above that.90                # Therefore we assign the same scores to each box in a track.91                if ann['instance_token'] not in instance_to_score:92                    instance_to_score[ann['instance_token']] = random.random()93                tracking_score = instance_to_score[ann['instance_token']]94                tracking_score = np.clip(tracking_score + random.random() * 0.3, 0, 1)95                if add_errors:96                    translation += 4 * (np.random.rand(3) - 0.5)97                    size *= (np.random.rand(3) + 0.5)98                    rotation += (np.random.rand(4) - 0.5) * .199                    velocity *= np.random.rand(3)[:2] + 0.5100                sample_res.append({101                        'sample_token': sample['token'],102                        'translation': list(translation),103                        'size': list(size),104                        'rotation': list(rotation),105                        'velocity': list(velocity),106                        'tracking_id': tracking_id,107                        'tracking_name': tracking_name,108                        'tracking_score': tracking_score109                    })110            mock_results[sample['token']] = sample_res111        mock_submission = {112            'meta': mock_meta,113            'results': mock_results114        }115        return mock_submission116    @unittest.skip117    def basic_test(self,118                   eval_set: str = 'mini_val',119                   add_errors: bool = False,120                   render_curves: bool = False) -> Dict[str, Any]:121        """122        Run the evaluation with fixed randomness on the specified subset, with or without introducing errors in the123        submission.124        :param eval_set: Which split to evaluate on.125        :param add_errors: Whether to use GT as submission or introduce additional errors.126        :param render_curves: Whether to render stats curves to disk.127        :return: The metrics returned by the evaluation.128        """129        random.seed(42)130        np.random.seed(42)131        assert 'NUSCENES' in os.environ, 'Set NUSCENES env. variable to enable tests.'132        if eval_set.startswith('mini'):133            version = 'v1.0-mini'134        elif eval_set == 'test':135            version = 'v1.0-test'136        else:137            version = 'v1.0-trainval'138        nusc = NuScenes(version=version, dataroot=os.environ['NUSCENES'], verbose=False)139        with open(self.res_mockup, 'w') as f:140            mock = self._mock_submission(nusc, eval_set, add_errors=add_errors)141            json.dump(mock, f, indent=2)142        cfg = config_factory('tracking_nips_2019')143        nusc_eval = TrackingEval(cfg, self.res_mockup, eval_set=eval_set, output_dir=self.res_eval_folder,144                                 nusc_version=version, nusc_dataroot=os.environ['NUSCENES'], verbose=False)145        metrics = nusc_eval.main(render_curves=render_curves)146        return metrics147    @unittest.skip148    def test_delta_mock(self,149                        eval_set: str = 'mini_val',150                        render_curves: bool = False):151        """152        This tests runs the evaluation for an arbitrary random set of predictions.153        This score is then captured in this very test such that if we change the eval code,154        this test will trigger if the results changed.155        :param eval_set: Which set to evaluate on.156        :param render_curves: Whether to render stats curves to disk.157        """158        # Run the evaluation with errors.159        metrics = self.basic_test(eval_set, add_errors=True, render_curves=render_curves)160        # Compare metrics to known solution.161        if eval_set == 'mini_val':162            self.assertAlmostEqual(metrics['amota'], 0.23766771095785147)163            self.assertAlmostEqual(metrics['amotp'], 1.5275400961369252)164            self.assertAlmostEqual(metrics['motar'], 0.3726570200013319)165            self.assertAlmostEqual(metrics['mota'], 0.25003943918566174)166            self.assertAlmostEqual(metrics['motp'], 1.2976508610883917)167        else:168            print('Skipping checks due to choice of custom eval_set: %s' % eval_set)169    @unittest.skip170    def test_delta_gt(self,171                      eval_set: str = 'mini_val',172                      render_curves: bool = False):173        """174        This tests runs the evaluation with the ground truth used as predictions.175        This should result in a perfect score for every metric.176        This score is then captured in this very test such that if we change the eval code,177        this test will trigger if the results changed.178        :param eval_set: Which set to evaluate on.179        :param render_curves: Whether to render stats curves to disk.180        """181        # Run the evaluation without errors.182        metrics = self.basic_test(eval_set, add_errors=False, render_curves=render_curves)183        # Compare metrics to known solution. Do not check:184        # - MT/TP (hard to figure out here).185        # - AMOTA/AMOTP (unachieved recall values lead to hard unintuitive results).186        if eval_set == 'mini_val':187            self.assertAlmostEqual(metrics['amota'], 1.0)188            self.assertAlmostEqual(metrics['amotp'], 0.0, delta=1e-5)189            self.assertAlmostEqual(metrics['motar'], 1.0)190            self.assertAlmostEqual(metrics['recall'], 1.0)191            self.assertAlmostEqual(metrics['mota'], 1.0)192            self.assertAlmostEqual(metrics['motp'], 0.0, delta=1e-5)193            self.assertAlmostEqual(metrics['faf'], 0.0)194            self.assertAlmostEqual(metrics['ml'], 0.0)195            self.assertAlmostEqual(metrics['fp'], 0.0)196            self.assertAlmostEqual(metrics['fn'], 0.0)197            self.assertAlmostEqual(metrics['ids'], 0.0)198            self.assertAlmostEqual(metrics['frag'], 0.0)199            self.assertAlmostEqual(metrics['tid'], 0.0)200            self.assertAlmostEqual(metrics['lgd'], 0.0)201        else:202            print('Skipping checks due to choice of custom eval_set: %s' % eval_set)203if __name__ == '__main__':...interface.py
Source:interface.py  
...73        vars.update({'errors': self._errors, 'data': self._data, 'messages': self._messages,74                     'wallabag_host': self._cfg.wallabag_host,75                     'tags': [t.tag for t in wallabag.make_tags(self._cfg.tag)]})76        return vars77    def _add_errors(self, errors):78        self._errors.update(errors)79    def _set_data(self, data):80        self._data = data81    def _add_message(self, msg):82        self._messages.append(msg)83    @property84    def _session(self):85        return self.request.app['session_maker']86class IndexView(ViewBase):87    @aiohttp_jinja2.template("index.html")88    async def get(self):89        return self._template({})90    @aiohttp_jinja2.template("index.html")91    async def post(self):92        data = await self.request.post()93        self._set_data(data)94        validator = Validator(self.request.app.loop, data)95        await asyncio.gather(validator.validate_emails(),96                             validator.validate_credentials())97        self._add_errors(validator.errors)98        if validator.success:99            user = models.User(name=validator.username, kindle_mail=validator.kindle_email,100                               email=validator.notify_email)101            with self._session as session:102                if session.query(models.User.name).filter(models.User.name == validator.username).count() != 0:103                    self._add_errors({'user': "User is already registered"})104                elif not await self._wallabag.get_token(user, validator.password):105                    self._add_errors({'auth': 'Cannot authenticate at wallabag server to get a token'})106                else:107                    session.add(user)108                    session.commit()109                    self._add_message(f'User {validator.username} successfully registered')110                    self._set_data({})111                    logger.info("User {user} registered", user=validator.username)112        return self._template({})113class ReLoginView(ViewBase):114    @aiohttp_jinja2.template("relogin.html")115    async def get(self):116        return self._template({'action': 'update', 'description': 'Refresh'})117    @aiohttp_jinja2.template("relogin.html")118    async def post(self):119        data = await self.request.post()120        self._set_data(data)121        validator = Validator(self.request.app.loop, data)122        await validator.validate_credentials()123        self._add_errors(validator.errors)124        if validator.success:125            with self._session as session:126                user = session.query(models.User).filter(models.User.name == validator.username).first()127                if user is None:128                    self._add_errors({'user': 'User not registered'})129                else:130                    if await self._wallabag.get_token(user, validator.password):131                        user.active = True132                        session.commit()133                        self._add_message(f"User {validator.username} successfully updated.")134                        logger.info("User {user} successfully updated.", user=user)135                    else:136                        self._add_errors({'auth': "Authentication against wallabag server failed"})137        return self._template({'action': 'update', 'description': 'Refresh'})138class DeleteView(ViewBase):139    @aiohttp_jinja2.template("relogin.html")140    async def get(self):141        return self._template({'action': 'delete', 'description': 'Delete'})142    @aiohttp_jinja2.template("relogin.html")143    async def post(self):144        data = await self.request.post()145        self._set_data(data)146        validator = Validator(self.request.app.loop, data)147        await validator.validate_credentials()148        self._add_errors(validator.errors)149        if validator.success:150            with self._session as session:151                user = session.query(models.User).filter(models.User.name == validator.username).first()152                if user is None:153                    self._add_errors({'user': 'User not registered'})154                else:155                    if await self._wallabag.get_token(user, validator.password):156                        session.delete(user)157                        session.commit()158                        self._add_message(f"User {validator.username} successfully deleted.")159                        logger.info("User {user} successfully deleted.", user=user)160                    else:161                        self._add_errors({'auth': "Authentication against wallabag server failed"})162        return self._template({'action': 'delete', 'description': 'Delete'})163class App:164    def __init__(self, config, wallabag):165        self.config = config166        self.wallabag = wallabag167        self.app = web.Application()168        self.site = None  # type: web.TCPSite169        self.setup_app()170        self.setup_routes()171    def setup_app(self):172        self.app['config'] = self.config173        self.app['wallabag'] = self.wallabag174        self.app['session_maker'] = models.context_session(self.config)175        aiohttp_jinja2.setup(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
