Best Python code snippet using yandex-tank
validator.py
Source:validator.py  
...211        results = {}212        for plugin_name, package, config in self.__parse_enabled_plugins():213            try:214                results[plugin_name] = \215                    self.__validate_plugin(config,216                                           load_plugin_schema(package))217            except ValidationError as e:218                errors[plugin_name] = e.errors219        if len(errors) > 0:220            raise ValidationError((dict(errors)))221        for plugin_name, plugin_conf in results.items():222            core_validated[plugin_name] = plugin_conf223        return core_validated224    def __validate_core(self):225        v = PatchedValidator(allow_unknown=self.PLUGINS_SCHEMA)226        result = v.validate(self.raw_config_dict, self.BASE_SCHEMA)227        if not result:228            errors = v.errors229            for key, value in tuple(errors.items()):230                if 'must be of dict type' in value:231                    errors[key] = ['unknown field']232            raise ValidationError(errors)233        normalized = v.normalized(self.raw_config_dict)234        return self.__set_core_dynamic_options(235            normalized) if self.with_dynamic_options else normalized236    def __validate_plugin(self, config, schema):237        schema.update(self.PLUGINS_SCHEMA['schema'])238        v = PatchedValidator(schema, allow_unknown=False)239        # .validate() makes .errors as side effect if there's any240        if not v.validate(config):241            raise ValidationError(v.errors)242        # .normalized() returns config with defaults243        return v.normalized(config)244    def __set_core_dynamic_options(self, config):245        for option, setter in self.DYNAMIC_OPTIONS.items():246            try:247                config[self.CORE_SECTION][option] = setter()248            except KeyError:249                config[self.CORE_SECTION] = {option: setter()}250        return config...consumers.py
Source:consumers.py  
...113                self.print_msg_socket(s_interrupt_plugin_name + ' is unable to be interrupted - not running')114            return115        116        self.__g_sPluginPathAbs = os.path.join(self.__g_sAbsPathBot, 'svplugins', s_plugin_name)117        if not self.__validate_plugin():118            self.print_msg_socket('invalid plugin')119            return120121        lst_command.append('config_loc='+ s_sv_acct_id + '/' + s_sv_brand_id)122        # prevent duplicated plugin request123        if s_plugin_unique_id in list(self.__g_dictPluginThread.keys()):  124            if s_plugin_name in list(self.__g_dictPluginThread[s_plugin_unique_id].keys()):  # means duplicated request125                if self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run:126                    self.print_msg_socket(s_plugin_name + ' is already in progress!')127                    self.print_msg_socket(str(self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run))128                    return129        try:130            oJobPlugin = importlib.import_module('svplugins.' + s_plugin_name + '.task')131            with oJobPlugin.svJobPlugin() as oJob: # to enforce each plugin follow strict guideline or remove from scheduler132                self.print_msg_socket(s_plugin_name + ' has been initiated')  # oJob.__class__.__name__ + '133                oJob.set_websocket_output(self.print_msg_socket)134                oJob.set_my_name(s_plugin_name)135                oJob.parse_command(lst_command)136                o_plugin_thread = ThreadWithTrace(target=oJob.do_task, args=(self.__cb_thread_done,))137                # self.print_msg_socket('begin - ignite')138                o_plugin_thread.start()139                # self.print_msg_socket('end - ignite')140        except Exception as err: # prevent websocket disconnection141            self.print_msg_socket(str(err))142        # except ImportError as err:143        #     self.print_msg_socket(self.__get_plugin_instruction('module'))144145        if self.__g_dictPluginThread.get(s_plugin_unique_id, self.__g_sSvNull) != self.__g_sSvNull:  # if brand thread exists146            self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name] = o_plugin_thread147        else:  # if new brand thread requested148            self.__g_dictPluginThread[s_plugin_unique_id] = {s_plugin_name: o_plugin_thread}149        self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run = True150151    def __cb_thread_done(self, s_plugin_name):152        # self.print_msg_socket('begin __cb_thread_done:' + s_plugin_name)153        # s_brand_name = self.scope["url_route"]["kwargs"]["brand_name"]154        sv_acct_id = self.scope["url_route"]["kwargs"]["sv_acct_id"]155        sv_brand_id = self.scope["url_route"]["kwargs"]["sv_brand_id"]156        s_plugin_unique_id = sv_acct_id + '_' + sv_brand_id157        if s_plugin_name in self.__g_dictPluginThread[s_plugin_unique_id]:158            self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run = False159            # self.print_msg_socket('end __cb_thread_done:' + s_plugin_name)160            self.print_msg_socket(s_plugin_name + ' has been finished')161        # else:162        #     self.__halt_all_thread(s_plugin_unique_id)163    164    def __halt_all_thread(self, s_plugin_unique_id=None):165        if s_plugin_unique_id:166            self.print_msg_socket('Trying to halt all task for ' + str(s_plugin_unique_id))167            for s_plugin_name, _ in self.__g_dictPluginThread[s_plugin_unique_id].items():168                self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run = False169                self.print_msg_socket(s_plugin_name + ' has been finished')170        else:171            self.print_msg_socket('Trying to halt all taks')172            for s_plugin_unique_id, dict_plugin_thread in self.__g_dictPluginThread.items():173                for s_plugin_name, _ in dict_plugin_thread.items():174                    self.__g_dictPluginThread[s_plugin_unique_id][s_plugin_name].b_run = False175                    self.print_msg_socket(s_plugin_name + ' has been finished')176177    def __validate_plugin(self):178        """ find the module in /svplugins directory """179        s_plugin_path_abs = os.path.join(self.__g_sPluginPathAbs, 'task.py')180        if os.path.isfile(s_plugin_path_abs):181            return True182        else:183            return False184185    def __get_plugin_instruction(self, s_content):186        """ find the README.md in plugin directory """187        dict_content = {'module': 'IMPORT_MODULE.md'}188        s_plugin_path_abs = os.path.join(self.__g_sPluginPathAbs, 'docs', dict_content[s_content])189        if os.path.isfile(s_plugin_path_abs):190            with open(s_plugin_path_abs, mode='r', encoding='utf-8') as f:191                s_content = f.readlines()
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
