Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py  
...147                    "Unable to check task staus, id: %s, error code: %s" %148                    (self.task, ex.response.status_code))149            raise ex150    @staticmethod151    def search_task_from_cwd(cwd):152        issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")153        while cwd:154            logger.debug("Checking if dir is named like JIRA issue: %s", cwd)155            if issue.match(os.path.basename(cwd)):156                res = re.search(issue, os.path.basename(cwd))157                return res.group(1).upper()158            newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))159            if newdir == cwd:160                break161            else:162                cwd = newdir163        raise RuntimeError(164            "task=dir requested, but no JIRA issue name in cwd: %s" %165            os.getcwd())166    def prepare_test(self):167        info = self.generator_info168        port = info.port169        instances = info.instances170        if info.ammo_file.startswith(171                "http://") or info.ammo_file.startswith("https://"):172            ammo_path = info.ammo_file173        else:174            ammo_path = os.path.realpath(info.ammo_file)175        duration = int(info.duration)176        if duration:177            self.lock_target_duration = duration178        loop_count = info.loop_count179        lp_job = self.lp_job180        self.locked_targets = self.check_and_lock_targets(strict=bool(181            int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)182        try:183            if lp_job._number:184                self.make_symlink(lp_job._number)185                self.check_task_is_open()186            else:187                self.check_task_is_open()188                lp_job.create()189                self.make_symlink(lp_job.number)190            self.core.publish(self.SECTION, 'jobno', lp_job.number)191        except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:192            logger.error(e.message)193            logger.error(194                'Failed to connect to Lunapark, disabling DataUploader')195            self.start_test = lambda *a, **kw: None196            self.post_process = lambda *a, **kw: None197            self.on_aggregated_data = lambda *a, **kw: None198            self.monitoring_data = lambda *a, **kw: None199            return200        cmdline = ' '.join(sys.argv)201        lp_job.edit_metainfo(202            instances=instances,203            ammo_path=ammo_path,204            loop_count=loop_count,205            is_regression=self.get_option(206                'regress',207                '0'),208            regression_component=self.regression_component,209            cmdline=cmdline,210        )  # todo: tanktype?211        self.core.job.subscribe_plugin(self)212        try:213            console = self.core.get_plugin_of_type(ConsolePlugin)214        except KeyError:215            logger.debug("Console plugin not found", exc_info=True)216            console = None217        if console:218            console.add_info_widget(JobInfoWidget(self))219        self.set_option('target_host', self.target)220        self.set_option('target_port', port)221        self.set_option('cmdline', cmdline)222        self.set_option('ammo_path', ammo_path)223        self.set_option('loop_count', loop_count)224        self.__save_conf()225    def start_test(self):226        self.on_air = True227        status_sender = threading.Thread(target=self.__send_status)228        status_sender.daemon = True229        status_sender.start()230        self.status_sender = status_sender231        upload = threading.Thread(target=self.__data_uploader)232        upload.daemon = True233        upload.start()234        self.upload = upload235        monitoring = threading.Thread(target=self.__monitoring_uploader)236        monitoring.daemon = True237        monitoring.start()238        self.monitoring = monitoring239        web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))240        logger.info("Web link: %s", web_link)241        self.publish("jobno", self.lp_job.number)242        self.publish("web_link", web_link)243        self.set_option("jobno", str(self.lp_job.number))244        if self.jobno_file:245            logger.debug("Saving jobno to: %s", self.jobno_file)246            fdes = open(self.jobno_file, 'w')247            fdes.write(str(self.lp_job.number))248            fdes.close()249        self.__save_conf()250    def is_test_finished(self):251        return self.retcode252    def end_test(self, retcode):253        self.on_air = False254        self.monitoring_queue.put(None)255        self.data_queue.put(None)256        self.__save_conf()257        timeout = int(self.get_option('threads_timeout', '60'))258        logger.info(259            'Waiting for sender threads to join for {} seconds ("meta.threads_timeout" config option)'.format(timeout))260        self.monitoring.join(timeout=timeout)261        if self.monitoring.isAlive():262            logger.error('Monitoring thread joining timed out. Terminating.')263        self.upload.join(timeout=timeout)264        if self.upload.isAlive():265            logger.error('Upload thread joining timed out. Terminating.')266        self.unlock_targets(self.locked_targets)267        return retcode268    def post_process(self, rc):269        try:270            self.lp_job.close(rc)271        except Exception:  # pylint: disable=W0703272            logger.warning("Failed to close job", exc_info=True)273        logger.info(274            "Web link: %s%s",275            self.lp_job.api_client.base_url,276            self.lp_job.number)277        autostop = None278        try:279            autostop = self.core.get_plugin_of_type(AutostopPlugin)280        except KeyError:281            logger.debug("No autostop plugin loaded", exc_info=True)282        if autostop and autostop.cause_criterion:283            rps = 0284            if autostop.cause_criterion.cause_second:285                rps = autostop.cause_criterion.cause_second[286                    1]["metrics"]["reqps"]287                if not rps:288                    rps = autostop.cause_criterion.cause_second[0][289                        "overall"]["interval_real"]["len"]290            self.lp_job.set_imbalance_and_dsc(291                int(rps), autostop.cause_criterion.explain())292        else:293            logger.debug("No autostop cause detected")294        self.__save_conf()295        return rc296    def on_aggregated_data(self, data, stats):297        """298        @data: aggregated data299        @stats: stats about gun300        """301        if self.lp_job.is_alive:302            self.data_queue.put((data, stats))303    def monitoring_data(self, data_list):304        if self.lp_job.is_alive:305            if len(data_list) > 0:306                if self.is_telegraf:307                    # telegraf308                    self.monitoring_queue.put(data_list)309                else:310                    # monitoring311                    [self.monitoring_queue.put(data) for data in data_list]312    @property313    def is_telegraf(self):314        if self._is_telegraf is None:315            self._is_telegraf = 'Telegraf' in self.core.job.monitoring_plugin.__module__316        return self._is_telegraf317    def _core_with_tank_api(self):318        """319        Return True if we are running under Tank API320        """321        api_found = False322        try:323            import yandex_tank_api.worker  # pylint: disable=F0401324        except ImportError:325            logger.debug("Attempt to import yandex_tank_api.worker failed")326        else:327            api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)328        logger.debug("We are%s running under API server", '' if api_found else ' likely not')329        return api_found330    def __send_status(self):331        logger.info('Status sender thread started')332        lp_job = self.lp_job333        while lp_job.is_alive and self.on_air:334            try:335                self.lp_job.send_status(self.core.status)336                time.sleep(self.send_status_period)337            except (APIClient.NetworkError, APIClient.NotAvailable) as e:338                logger.warn('Failed to send status')339                logger.debug(e.message)340                break341            except APIClient.StoppedFromOnline:342                logger.info("Test stopped from Lunapark")343                lp_job.is_alive = False344                self.retcode = 8345                break346        logger.info("Closing Status sender thread")347    def __data_uploader(self):348        logger.info('Data uploader thread started')349        lp_job = self.lp_job350        queue = self.data_queue351        while lp_job.is_alive:352            try:353                entry = queue.get(timeout=1)354                if entry is not None:355                    data, stats = entry356                else:357                    logger.info("Data uploader queue returned None")358                    break359                lp_job.push_test_data(data, stats)360            except Empty:361                continue362            except APIClient.StoppedFromOnline:363                logger.info("Test stopped from Lunapark")364                lp_job.is_alive = False365                self.retcode = 8366                break367            except Exception as e:368                logger.info("Mysterious exception: %s", e)369                self.retcode = 8370                break371        logger.info("Closing Data uploader thread")372    def __monitoring_uploader(self):373        logger.info('Monitoring uploader thread started')374        lp_job = self.lp_job375        queue = self.monitoring_queue376        while lp_job.is_alive:377            try:378                data = queue.get(timeout=1)379                if data is not None:380                    lp_job.push_monitoring_data(data)381                else:382                    logger.info('Monitoring queue returned None')383                    break384            except Empty:385                continue386            except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:387                logger.warn('Failed to push monitoring data')388                logger.warn(e.message)389                break390            except APIClient.StoppedFromOnline:391                logger.info("Test stopped from Lunapark")392                lp_job.is_alive = False393                self.retcode = 8394                break395        logger.info('Closing Monitoring uploader thread')396    def __save_conf(self):397        config_copy = self.get_option('copy_config_to', '')398        if config_copy:399            self.core.config.flush(config_copy)400        config = copy.copy(self.core.config.config)401        try:402            config_filename = self.core.job.monitoring_plugin.config403            if config_filename and config_filename not in ['none', 'auto']:404                with open(config_filename) as config_file:405                    config.set(406                        self.core.job.monitoring_plugin.SECTION,407                        "config_contents",408                        config_file.read())409        except Exception:  # pylint: disable=W0703410            logger.warning("Can't get monitoring config", exc_info=True)411        output = StringIO()412        config.write(output)413        self.lp_job.send_config_snapshot(output.getvalue())414        with open(os.path.join(self.core.artifacts_dir, 'saved_conf.ini'), 'w') as f:415            config.write(f)416    def parse_lock_targets(self):417        # prepare target lock list418        locks_list_cfg = self.get_option('lock_targets', 'auto').strip()419        def no_target():420            logging.warn("Target lock set to 'auto', but no target info available")421            return ''422        locks_list = (self.target or no_target() if locks_list_cfg.lower() == 'auto' else locks_list_cfg).split('\n')423        targets_to_lock = [host for host in locks_list if host]424        return targets_to_lock425    def lock_targets(self, targets_to_lock, ignore, strict):426        locked_targets = [target for target in targets_to_lock427                          if self.lp_job.lock_target(target, self.lock_target_duration, ignore, strict)]428        return locked_targets429    def unlock_targets(self, locked_targets):430        logger.info("Unlocking targets: %s", locked_targets)431        for target in locked_targets:432            logger.info(target)433            self.lp_job.api_client.unlock_target(target)434    def check_and_lock_targets(self, strict, ignore):435        targets_list = self.parse_lock_targets()436        logger.info('Locking targets: %s', targets_list)437        locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)438        logger.info('Locked targets: %s', locked_targets)439        return locked_targets440    def make_symlink(self, name):441        PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')442        if not os.path.exists(PLUGIN_DIR):443            os.makedirs(PLUGIN_DIR)444        os.symlink(445            os.path.relpath(446                self.core.artifacts_dir,447                PLUGIN_DIR),448            os.path.join(449                PLUGIN_DIR,450                str(name)))451    def _get_user_agent(self):452        plugin_agent = 'Uploader/{}'.format(self.VERSION)453        return ' '.join((plugin_agent,454                         self.core.get_user_agent()))455    def __get_operator(self):456        try:457            return self.get_option(458                'operator',459                '') or pwd.getpwuid(460                os.geteuid())[0]461        except:462            logger.error(463                "Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config file.")464            raise465    def __get_api_client(self):466        if self.backend_type == BackendTypes.LUNAPARK:467            client = APIClient468            self._api_token = None469        elif self.backend_type == BackendTypes.OVERLOAD:470            client = OverloadClient471            self._api_token = self.read_token(self.get_option("token_file", ""))472        else:473            raise RuntimeError("Backend type doesn't match any of the expected")474        return client(base_url=self.get_option('api_address'),475                      writer_url=self.get_option('writer_endpoint', ""),476                      network_attempts=int(self.get_option('network_attempts', 60)),477                      api_attempts=int(self.get_option('api_attempts', 60)),478                      maintenance_attempts=int(self.get_option('maintenance_attempts', 10)),479                      network_timeout=int(self.get_option('network_timeout', 10)),480                      api_timeout=int(self.get_option('api_timeout', 10)),481                      maintenance_timeout=int(self.get_option('maintenance_timeout', 60)),482                      connection_timeout=int(self.get_option('connection_timeout', 30)),483                      user_agent=self._get_user_agent(),484                      api_token=self.api_token)485    @property486    def lp_job(self):487        if self._lp_job is None:488            self._lp_job = self.__get_lp_job()489        return self._lp_job490    def __get_lp_job(self):491        api_client = self.__get_api_client()492        info = self.generator_info493        port = info.port494        loadscheme = [] if isinstance(info.rps_schedule,495                                      str) else info.rps_schedule496        return LPJob(client=api_client,497                     target_host=self.target,498                     target_port=port,499                     number=self.get_option('jobno', ''),500                     token=self.get_option('upload_token', ''),501                     person=self.__get_operator(),502                     task=self.task,503                     name=self.get_option('job_name', 'none').decode('utf8'),504                     description=self.get_option('job_dsc', '').decode('utf8'),505                     tank=self.core.job.tank,506                     notify_list=self.get_option("notify", '').split(' '),507                     load_scheme=loadscheme,508                     version=self.get_option('ver', ''),509                     log_data_requests=bool(int(self.get_option('log_data_requests', '0'))),510                     log_monitoring_requests=bool(int(self.get_option('log_monitoring_requests', '0'))),511                     log_status_requests=bool(int(self.get_option('log_status_requests', '0'))),512                     log_other_requests=bool(int(self.get_option('log_other_requests', '0'))))513    @property514    def task(self):515        if self._task is None:516            task = self.get_option('task', '')517            if task == 'dir':518                task = self.search_task_from_cwd(os.getcwd())519            self._task = task520        return self._task521    @property522    def api_token(self):523        if self._api_token == '':524            if self.backend_type == BackendTypes.LUNAPARK:525                self._api_token = None526            elif self.backend_type == BackendTypes.OVERLOAD:527                self._api_token = self.read_token(self.get_option("token_file", ""))528            else:529                raise RuntimeError("Backend type doesn't match any of the expected")530        return self._api_token531    @staticmethod532    def read_token(filename):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
