Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py  
...41        self._test_name = None42    @property43    def meta(self):44        if self._meta is None:45            self._meta = dict(self.get_lp_meta(), **self.cfg.get('meta', {}))46        return self._meta47    @property48    def test_name(self):49        if self._test_name is None:50            self._test_name = self.cfg.get('test_name') or self.core.info.get_value(['uploader', 'job_name'])51        return self._test_name52    def configure(self):53        pass54    def start_test(self):55        try:56            self.reader = self.core.job.generator_plugin.get_reader(parser=string_to_df_microsec)57        except TypeError:58            logger.error('Generator plugin does not support NeUploader')59            self.is_test_finished = lambda: -160            self.reader = []61    @thread_safe_property62    def col_map(self):63        return {64            'interval_real': self.data_session.new_true_metric,65            'connect_time': self.data_session.new_true_metric,66            'send_time': self.data_session.new_true_metric,67            'latency': self.data_session.new_true_metric,68            'receive_time': self.data_session.new_true_metric,69            'interval_event': self.data_session.new_true_metric,70            'net_code': self.data_session.new_event_metric,71            'proto_code': self.data_session.new_event_metric72        }73    @thread_safe_property74    def data_session(self):75        """76        :rtype: DataSession77        """78        if self._data_session is None:79            config_filenames = {'validated_conf.yaml', 'configinitial.yaml'}80            self._data_session = DataSession({'clients': self.clients_cfg},81                                             tankapi_info=self.tankapi_info(),82                                             config_filenames=config_filenames,83                                             artifacts_dir=self.core.artifacts_dir,84                                             test_start=self.core.info.get_value(['generator', 'test_start'], 0) * 10**6)85            self.add_cleanup(self._cleanup)86            self._data_session.update_job(dict({'name': self.test_name,87                                                '__type': 'tank'},88                                               **self.meta))89            job_no = self._data_session.clients[0].job_number90            if job_no:91                self.publish('job_no', int(job_no))92                self.publish('web_link', urljoin(self.LUNA_LINK, job_no))93        return self._data_session94    def tankapi_info(self):95        meta = self.cfg.get('meta', {})96        return {97            'host': meta.get('tankapi_host'),98            'port': meta.get('tankapi_port'),99            'local_id': self.core.test_id100        }101    def _cleanup(self):102        self.upload_actual_rps(data=pandas.DataFrame([]), last_piece=True)103        uploader_metainfo = self.get_lp_meta()104        autostop_info = self.get_autostop_info()105        regressions = self.get_regressions_names(uploader_metainfo)106        lp_link = self.core.info.get_value(['uploader', 'web_link'])107        meta = self.meta108        meta.update(autostop_info)109        meta['regression'] = regressions110        meta['lunapark_link'] = lp_link111        self.data_session.update_job(meta)112        self.data_session.close(test_end=self.core.info.get_value(['generator', 'test_end'], 0) * 10**6)113    def is_test_finished(self):114        df = next(self.reader)115        if df is not None:116            self.upload(df)117        return -1118    def monitoring_data(self, data_list):119        self.upload_monitoring(data_list)120    def post_process(self, retcode):121        try:122            self.rps_uploader.start()123            for chunk in self.reader:124                if chunk is not None:125                    self.upload(chunk)126            self.upload_actual_rps(data=pandas.DataFrame([]), last_piece=True)127            if self.rps_uploader.is_alive():128                self.rps_uploader.join()129        except KeyboardInterrupt:130            logger.warning('Caught KeyboardInterrupt on Neuploader')131            self._cleanup()132        return retcode133    @property134    def is_telegraf(self):135        return True136    def get_metric_obj(self, col, case):137        """138        Generator of metric objects:139        Checks existent metrics and creates new metric if it does not exist.140        :param col:  str with column name141        :param case: str with case name142        :return: metric object143        """144        case_metrics = self.metrics_objs.get(case)145        if case_metrics is None:146            for col, constructor in self.col_map.items():147                self.metrics_objs.setdefault(case, {})[col] = constructor(148                    dict(self.meta,149                         name=col,150                         source='tank',151                         importance='high' if col in self.importance_high else ''),152                    raw=False, aggregate=True,153                    parent=self.get_metric_obj(col, self.OVERALL) if case != self.OVERALL else None,154                    case=case if case != self.OVERALL else None155                )156        return self.metrics_objs[case][col]157    def upload(self, df):158        self.upload_actual_rps(df)159        df_cases_set = set()160        for row in df.itertuples():161            if row.tag and isinstance(row.tag, str):162                df_cases_set.add(row.tag)163                if '|' in row.tag:164                    for tag in row.tag.split('|'):165                        df_cases_set.add(tag)166        for column in self.col_map:167            overall_metric_obj = self.get_metric_obj(column, self.OVERALL)168            df['value'] = df[column]169            result_df = self.filter_df_by_case(df, self.OVERALL)170            overall_metric_obj.put(result_df)171            for case_name in df_cases_set:172                case_metric_obj = self.get_metric_obj(column, case_name)173                df['value'] = df[column]174                result_df = self.filter_df_by_case(df, case_name)175                case_metric_obj.put(result_df)176    def upload_monitoring(self, data):177        for metric_name, df in self.monitoring_data_to_dfs(data).items():178            if metric_name not in self.monitoring_metrics:179                panel, metric = metric_name.split(':', 1)180                try:181                    group, name = metric.split('_', 1)182                except ValueError:183                    name = metric184                    group = '_OTHER_'185                self.monitoring_metrics[metric_name] =\186                    self.data_session.new_true_metric(187                        meta=dict(self.meta,188                                  name=name,189                                  group=group,190                                  host=panel,191                                  type='monitoring'))192            self.monitoring_metrics[metric_name].put(df)193    def upload_planned_rps(self):194        """ Uploads planned rps as a raw metric """195        df = self.parse_stpd()196        if not df.empty:197            self.rps_metrics['planned_rps_metrics_obj'] = self.data_session.new_true_metric(198                meta=dict(self.meta, name=self.PLANNED_RPS_METRICS_NAME, source='tank'),199                raw=True, aggregate=False, parent=None, case=None)200            self.rps_metrics['planned_rps_metrics_obj'].put(df)201    def upload_actual_rps(self, data, last_piece=False):202        """ Upload actual rps metric """203        if self.rps_metrics['actual_rps_metrics_obj'] is None:204            self.rps_metrics['actual_rps_metrics_obj'] = self.data_session.new_true_metric(205                meta=dict(self.meta, name=self.ACTUAL_RPS_METRICS_NAME),206                raw=True, aggregate=False, parent=None, case=None207            )208        df = self.count_actual_rps(data, last_piece)209        if not df.empty:210            self.rps_metrics['actual_rps_metrics_obj'].put(df)211    def parse_stpd(self):212        """  Reads rps plan from stpd file """213        stpd_file = self.core.info.get_value(['stepper', 'stpd_file'])214        if not stpd_file:215            logger.info('No stpd found, no planned_rps metrics')216            return pandas.DataFrame()217        rows_list = []218        test_start = int(self.core.info.get_value(['generator', 'test_start'], 0) * 10 ** 3)219        pattern = r'^\d+ (\d+)\s*.*$'220        regex = re.compile(pattern)221        try:222            with open(stpd_file) as stpd:223                for line in stpd:224                    if regex.match(line):225                        timestamp = int((int(line.split(' ')[1]) + test_start) / 1e3)  # seconds226                        rows_list.append(timestamp)227        except Exception:228            logger.warning('Failed to parse stpd file')229            logger.debug('', exc_info=True)230            return pandas.DataFrame()231        return self.rps_series_to_df(pandas.Series(rows_list))232    def count_actual_rps(self, data, last_piece):233        """ Counts actual rps on base of input chunk. Uses buffer for latest timestamp in df. """234        if not last_piece and not data.empty:235            concat_ts = pandas.concat([(data.ts / 1e6).astype(int), self.rps_metrics['actual_rps_latest']])236            self.rps_metrics['actual_rps_latest'] = concat_ts.loc[lambda s: s == concat_ts.max()]237            series_to_send = concat_ts.loc[lambda s: s < concat_ts.max()]238            df = self.rps_series_to_df(series_to_send) if series_to_send.any else pandas.DataFrame([])239        else:240            df = self.rps_series_to_df(self.rps_metrics['actual_rps_latest'])241            self.rps_metrics['actual_rps_latest'] = pandas.Series()242        return df243    @staticmethod244    def monitoring_data_to_dfs(data):245        panels = {}246        for chunk in data:247            for panel_name, content in chunk['data'].items():248                if panel_name in panels:249                    for metric_name, value in content['metrics'].items():250                        if metric_name in panels[panel_name]:251                            panels[panel_name][metric_name]['value'].append(value)252                            panels[panel_name][metric_name]['ts'].append(chunk['timestamp'])253                        else:254                            panels[panel_name][metric_name] = {'value': [value], 'ts': [chunk['timestamp']]}255                else:256                    panels[panel_name] = {name: {'value': [value], 'ts': [chunk['timestamp']]} for name, value in content['metrics'].items()}257        return {'{}:{}'.format(panelk, name): pandas.DataFrame({'ts': [ts * 1000000 for ts in values['ts']], 'value': values['value']})258                for panelk, panelv in panels.items() for name, values in panelv.items()}259    @staticmethod260    def rps_series_to_df(series):261        df = series.value_counts().to_frame(name='value')262        df_to_send = df.rename_axis('ts')263        df_to_send.reset_index(inplace=True)264        df_to_send.loc[:, 'ts'] = (df_to_send['ts'] * 1e6).astype(int)265        return df_to_send266    @staticmethod267    def filter_df_by_case(df, case):268        """269        Filter dataframe by case name. If case is '__overall__', return all rows.270        :param df: DataFrame271        :param case: str with case name272        :return: DataFrame with columns 'ts' and 'value'273        """274        case = case.strip()275        return df[['ts', 'value']] if case == Plugin.OVERALL else df[df.tag.str.strip() == case][['ts', 'value']]276    def get_lp_meta(self):277        uploader_meta = self.core.info.get_value(['uploader'])278        if not uploader_meta:279            logger.info('No uploader metainfo found')280            return {}281        else:282            meta_tags_names = ['component', 'description', 'name', 'person', 'task', 'version', 'lunapark_jobno']283            meta_tags = {key: uploader_meta.get(key) for key in meta_tags_names if key in uploader_meta}284            meta_tags.update({k: v if v is not None else '' for k, v in uploader_meta.get('meta', {}).items()})285            return meta_tags286    @staticmethod287    def get_regressions_names(uploader_metainfo):288        task, component_name = uploader_metainfo.get('task'), uploader_metainfo.get('component')289        if not task or not component_name:290            return []...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
