Best Python code snippet using yandex-tank
tankcore.py
Source:tankcore.py  
...108        self.lock_dir = self.get_option(self.SECTION, 'lock_dir')109        with open(os.path.join(self.artifacts_dir, CONFIGINITIAL), 'w') as f:110            yaml.dump(self.configinitial, f)111        self.add_artifact_file(error_output)112        self.add_artifact_to_send(LPRequisites.CONFIGINITIAL, yaml.dump(self.configinitial))113        configinfo = self.config.validated.copy()114        configinfo.setdefault(self.SECTION, {})115        configinfo[self.SECTION][self.API_JOBNO] = self.test_id116        self.add_artifact_to_send(LPRequisites.CONFIGINFO, yaml.dump(configinfo))117        with open(os.path.join(self.artifacts_dir, VALIDATED_CONF), 'w') as f:118            yaml.dump(configinfo, f)119        logger.info('New test id %s' % self.test_id)120    @property121    def cfg_snapshot(self):122        if not self._cfg_snapshot:123            self._cfg_snapshot = str(self.config)124        return self._cfg_snapshot125    @staticmethod126    def get_available_options():127        # todo: should take this from schema128        return [129            "artifacts_base_dir", "artifacts_dir",130            "taskset_path", "affinity"131        ]132    @property133    def plugins(self):134        """135        :returns: {plugin_name: plugin_class, ...}136        :rtype: dict137        """138        if self._plugins is None:139            self.load_plugins()140            if self._plugins is None:141                self._plugins = {}142        return self._plugins143    @property144    def artifacts_base_dir(self):145        if not self._artifacts_base_dir:146            try:147                artifacts_base_dir = os.path.abspath(self.get_option(self.SECTION, "artifacts_base_dir"))148            except ValidationError:149                artifacts_base_dir = os.path.abspath('logs')150            if not os.path.exists(artifacts_base_dir):151                os.makedirs(artifacts_base_dir)152                os.chmod(self.artifacts_base_dir, 0o755)153            self._artifacts_base_dir = artifacts_base_dir154        return self._artifacts_base_dir155    def load_plugins(self):156        """157        Tells core to take plugin options and instantiate plugin classes158        """159        logger.info("Loading plugins...")160        for (plugin_name, plugin_path, plugin_cfg) in self.config.plugins:161            logger.debug("Loading plugin %s from %s", plugin_name, plugin_path)162            if plugin_path == "yandextank.plugins.Overload":163                logger.warning(164                    "Deprecated plugin name: 'yandextank.plugins.Overload'\n"165                    "There is a new generic plugin now.\n"166                    "Correcting to 'yandextank.plugins.DataUploader overload'")167                plugin_path = "yandextank.plugins.DataUploader overload"168            try:169                logger.info("Trying to import module: {}".format(plugin_name))170                logger.info("Path: {}".format(plugin_path))171                plugin = il.import_module(plugin_path)172            except ImportError:173                logger.warning('Plugin name %s path %s import error', plugin_name, plugin_path)174                logger.debug('Plugin name %s path %s import error', plugin_name, plugin_path, exc_info=True)175                raise176            try:177                instance = getattr(plugin, 'Plugin')(self, cfg=plugin_cfg, name=plugin_name)178            except AttributeError:179                logger.warning('Plugin %s classname should be `Plugin`', plugin_name)180                raise181            else:182                self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)183        logger.debug("Plugin instances: %s", self._plugins)184    @property185    def job(self):186        if not self._job:187            # monitoring plugin188            monitorings = [plugin for plugin in self.plugins.values() if isinstance(plugin, MonitoringPlugin)]189            # generator plugin190            try:191                gen = self.get_plugin_of_type(GeneratorPlugin)192            except KeyError:193                logger.warning("Load generator not found")194                gen = GeneratorPlugin(self, {}, 'generator dummy')195            # aggregator196            aggregator = TankAggregator(gen)197            self._job = Job(monitoring_plugins=monitorings,198                            generator_plugin=gen,199                            aggregator=aggregator,200                            tank=socket.getfqdn())201        return self._job202    def plugins_configure(self):203        """        Call configure() on all plugins        """204        self.publish("core", "stage", "configure")205        logger.info("Configuring plugins...")206        self.taskset_affinity = self.get_option(self.SECTION, 'affinity')207        if self.taskset_affinity:208            self.__setup_taskset(self.taskset_affinity, pid=os.getpid())209        for plugin in self.plugins.values():210            if not self.interrupted.is_set():211                logger.debug("Configuring %s", plugin)212                plugin.configure()213                if isinstance(plugin, MonitoringDataListener):214                    self.monitoring_data_listeners.append(plugin)215    def plugins_prepare_test(self):216        """ Call prepare_test() on all plugins        """217        logger.info("Preparing test...")218        self.publish("core", "stage", "prepare")219        for plugin in self.plugins.values():220            if not self.interrupted.is_set():221                logger.debug("Preparing %s", plugin)222                plugin.prepare_test()223    def plugins_start_test(self):224        """        Call start_test() on all plugins        """225        if not self.interrupted.is_set():226            logger.info("Starting test...")227            self.publish("core", "stage", "start")228            self.job.aggregator.start_test()229            for plugin_name, plugin in self.plugins.items():230                logger.debug("Starting %s", plugin)231                start_time = time.time()232                plugin.start_test()233                logger.info("Plugin {0:s} required {1:f} seconds to start".format(plugin_name,234                                                                                  time.time() - start_time))235            self.publish('generator', 'test_start', self.job.generator_plugin.start_time)236    def wait_for_finish(self):237        """238        Call is_test_finished() on all plugins 'till one of them initiates exit239        """240        if not self.interrupted.is_set():241            logger.info("Waiting for test to finish...")242            logger.info('Artifacts dir: {dir}'.format(dir=self.artifacts_dir))243            self.publish("core", "stage", "shoot")244            if not self.plugins:245                raise RuntimeError("It's strange: we have no plugins loaded...")246        while not self.interrupted.is_set():247            begin_time = time.time()248            aggr_retcode = self.job.aggregator.is_test_finished()249            if aggr_retcode >= 0:250                return aggr_retcode251            for plugin_name, plugin in self.plugins.items():252                logger.debug("Polling %s", plugin)253                try:254                    retcode = plugin.is_test_finished()255                    if retcode >= 0:256                        return retcode257                except Exception:258                    logger.warning('Plugin {} failed:'.format(plugin_name), exc_info=True)259                    if isinstance(plugin, GeneratorPlugin):260                        return RetCode.ERROR261                    else:262                        logger.warning('Disabling plugin {}'.format(plugin_name))263                        plugin.is_test_finished = lambda: RetCode.CONTINUE264            end_time = time.time()265            diff = end_time - begin_time266            logger.debug("Polling took %s", diff)267            logger.debug("Tank status: %s", json.dumps(self.info.get_info_dict()))268            # screen refresh every 0.5 s269            if diff < 0.5:270                time.sleep(0.5 - diff)271        return 1272    def plugins_end_test(self, retcode):273        """        Call end_test() on all plugins        """274        logger.info("Finishing test...")275        self.publish("core", "stage", "end")276        self.publish('generator', 'test_end', time.time())277        logger.info("Stopping load generator and aggregator")278        retcode = self.job.aggregator.end_test(retcode)279        logger.debug("RC after: %s", retcode)280        logger.info('Stopping monitoring')281        for plugin in self.job.monitoring_plugins:282            logger.info('Stopping %s', plugin)283            retcode = plugin.end_test(retcode) or retcode284            logger.info('RC after: %s', retcode)285        for plugin in [p for p in self.plugins.values() if286                       p is not self.job.generator_plugin and p not in self.job.monitoring_plugins]:287            logger.debug("Finalize %s", plugin)288            try:289                logger.debug("RC before: %s", retcode)290                retcode = plugin.end_test(retcode)291                logger.debug("RC after: %s", retcode)292            except Exception:  # FIXME too broad exception clause293                logger.error("Failed finishing plugin %s", plugin, exc_info=True)294                if not retcode:295                    retcode = 1296        return retcode297    def plugins_post_process(self, retcode):298        """299        Call post_process() on all plugins300        """301        logger.info("Post-processing test...")302        self.publish("core", "stage", "post_process")303        for plugin in self.plugins.values():304            logger.debug("Post-process %s", plugin)305            try:306                logger.debug("RC before: %s", retcode)307                retcode = plugin.post_process(retcode)308                logger.debug("RC after: %s", retcode)309            except Exception:  # FIXME too broad exception clause310                logger.error("Failed post-processing plugin %s", plugin, exc_info=True)311                if not retcode:312                    retcode = 1313        return retcode314    def publish_monitoring_data(self, data):315        """sends pending data set to listeners"""316        for plugin in self.monitoring_data_listeners:317            # deep copy to ensure each listener gets it's own copy318            try:319                plugin.monitoring_data(copy.deepcopy(data))320            except Exception:321                logger.error("Plugin failed to process monitoring data", exc_info=True)322    def __setup_taskset(self, affinity, pid=None, args=None):323        """ if pid specified: set process w/ pid `pid` CPU affinity to specified `affinity` core(s)324            if args specified: modify list of args for Popen to start w/ taskset w/ affinity `affinity`325        """326        self.taskset_path = self.get_option(self.SECTION, 'taskset_path')327        if args:328            return [self.taskset_path, '-c', affinity] + args329        if pid:330            args = "%s -pc %s %s" % (self.taskset_path, affinity, pid)331            retcode, stdout, stderr = execute(args, shell=True, poll_period=0.1, catch_out=True)332            logger.debug('taskset for pid %s stdout: %s', pid, stdout)333            if retcode == 0:334                logger.info("Enabled taskset for pid %s with affinity %s", str(pid), affinity)335            else:336                logger.debug('Taskset setup failed w/ retcode :%s', retcode)337                raise KeyError(stderr)338    def _collect_artifacts(self, validation_failed=False):339        logger.debug("Collecting artifacts")340        logger.info("Artifacts dir: %s", self.artifacts_dir)341        for filename, keep in self.artifact_files.items():342            try:343                self.__collect_file(filename, keep)344            except Exception as ex:345                logger.warn("Failed to collect file %s: %s", filename, ex)346    def get_option(self, section, option, default=None):347        return self.config.get_option(section, option, default)348    def set_option(self, section, option, value):349        """350        Set an option in storage351        """352        raise NotImplementedError353    def set_exitcode(self, code):354        self.output['core']['exitcode'] = code355    def get_plugin_of_type(self, plugin_class):356        """357        Retrieve a plugin of desired class, KeyError raised otherwise358        """359        logger.debug("Searching for plugin: %s", plugin_class)360        matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]361        if matches:362            if len(matches) > 1:363                logger.debug(364                    "More then one plugin of type %s found. Using first one.",365                    plugin_class)366            return matches[-1]367        else:368            raise KeyError("Requested plugin type not found: %s" % plugin_class)369    def get_plugins_of_type(self, plugin_class):370        """371        Retrieve a list of plugins of desired class, KeyError raised otherwise372        """373        logger.debug("Searching for plugins: %s", plugin_class)374        matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]375        if matches:376            return matches377        else:378            raise KeyError("Requested plugin type not found: %s" % plugin_class)379    def get_jobno(self, plugin_name='plugin_lunapark'):380        uploader_plugin = self.plugins[plugin_name]381        return uploader_plugin.lp_job.number382    def __collect_file(self, filename, keep_original=False):383        """384        Move or copy single file to artifacts dir385        """386        dest = self.artifacts_dir + '/' + os.path.basename(filename)387        logger.debug("Collecting file: %s to %s", filename, dest)388        if not filename or not os.path.exists(filename):389            logger.warning("File not found to collect: %s", filename)390            return391        if os.path.exists(dest):392            # FIXME: 3 find a way to store artifacts anyway393            logger.warning("File already exists: %s", dest)394            return395        if keep_original:396            shutil.copy(filename, self.artifacts_dir)397        else:398            shutil.move(filename, self.artifacts_dir)399        os.chmod(dest, 0o644)400    def add_artifact_file(self, filename, keep_original=False):401        """402        Add file to be stored as result artifact on post-process phase403        """404        if filename:405            logger.debug(406                "Adding artifact file to collect (keep=%s): %s", keep_original,407                filename)408            self.artifact_files[filename] = keep_original409    def add_artifact_to_send(self, lp_requisites, content):410        self.artifacts_to_send.append((lp_requisites, content))411    def apply_shorthand_options(self, options, default_section='DEFAULT'):412        for option_str in options:413            key, value = option_str.split('=')414            try:415                section, option = key.split('.')416            except ValueError:417                section = default_section418                option = key419            logger.debug(420                "Override option: %s => [%s] %s=%s", option_str, section,421                option, value)422            self.set_option(section, option, value)423    def mkstemp(self, suffix, prefix, directory=None):...plugin.py
Source:plugin.py  
...131            self.monitoring = None132            self.die_on_fail = False133            return134        with open(self.config) as f:135            self.core.add_artifact_to_send(LPRequisites.MONITORING, str(f.read()))136        # FIXME [legacy] backward compatibility with Monitoring module137        # configuration below.138        self.monitoring.ssh_timeout = expand_to_seconds(139            self.get_option("ssh_timeout", "5s"))140        try:141            autostop = self.core.get_plugin_of_type(AutostopPlugin)142            autostop.add_criterion_class(MetricHigherCriterion)143            autostop.add_criterion_class(MetricLowerCriterion)144        except KeyError:145            logger.debug(146                "No autostop plugin found, not adding instances criterion")147    def prepare_test(self):148        if not self.config or self.config.lower() == 'none':149            return...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
