Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py  
...100        return __file__101    @property102    def lock_duration(self):103        if self._lock_duration is None:104            info = self.get_generator_info()105            self._lock_duration = info.duration if info.duration else \106                expand_to_seconds(self.get_option("target_lock_duration"))107        return self._lock_duration108    def get_available_options(self):109        opts = [110            "api_address",111            "writer_endpoint",112            "task",113            "job_name",114            "job_dsc",115            "notify",116            "ver", "component",117            "operator",118            "jobno_file",119            "ignore_target_lock",120            "target_lock_duration",121            "lock_targets",122            "jobno",123            "upload_token",124            'connection_timeout',125            'network_attempts',126            'api_attempts',127            'maintenance_attempts',128            'network_timeout',129            'api_timeout',130            'maintenance_timeout',131            'strict_lock',132            'send_status_period',133            'log_data_requests',134            'log_monitoring_requests',135            'log_status_requests',136            'log_other_requests',137            'threads_timeout',138            'chunk_size'139        ]140        return opts141    def configure(self):142        self.core.publish(self.SECTION, 'component', self.get_option('component'))143        self.core.publish(self.SECTION, 'task', self.get_option('task'))144        self.core.publish(self.SECTION, 'job_name', self.get_option('job_name'))145    def check_task_is_open(self):146        if self.backend_type == BackendTypes.OVERLOAD:147            return148        TASK_TIP = 'The task should be connected to Lunapark.' \149                   'Open startrek task page, click "actions" -> "load testing".'150        logger.debug("Check if task %s is open", self.task)151        try:152            task_data = self.lp_job.get_task_data(self.task)[0]153            try:154                task_status = task_data['status']155                if task_status == 'Open':156                    logger.info("Task %s is ok", self.task)157                    self.task_name = str(task_data['name'])158                else:159                    logger.info("Task %s:" % self.task)160                    logger.info(task_data)161                    raise RuntimeError("Task is not open")162            except KeyError:163                try:164                    error = task_data['error']165                    raise RuntimeError(166                        "Task %s error: %s\n%s" %167                        (self.task, error, TASK_TIP))168                except KeyError:169                    raise RuntimeError(170                        'Unknown task data format:\n{}'.format(task_data))171        except requests.exceptions.HTTPError as ex:172            logger.error(173                "Failed to check task status for '%s': %s", self.task, ex)174            if ex.response.status_code == 404:175                raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))176            elif ex.response.status_code == 500 or ex.response.status_code == 400:177                raise RuntimeError(178                    "Unable to check task staus, id: %s, error code: %s" %179                    (self.task, ex.response.status_code))180            raise ex181    @staticmethod182    def search_task_from_cwd(cwd):183        issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")184        while cwd:185            logger.debug("Checking if dir is named like JIRA issue: %s", cwd)186            if issue.match(os.path.basename(cwd)):187                res = re.search(issue, os.path.basename(cwd))188                return res.group(1).upper()189            newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))190            if newdir == cwd:191                break192            else:193                cwd = newdir194        raise RuntimeError(195            "task=dir requested, but no JIRA issue name in cwd: %s" %196            os.getcwd())197    def prepare_test(self):198        info = self.get_generator_info()199        port = info.port200        instances = info.instances201        if info.ammo_file is not None:202            if info.ammo_file.startswith("http://") or info.ammo_file.startswith("https://"):203                ammo_path = info.ammo_file204            else:205                ammo_path = os.path.realpath(info.ammo_file)206        else:207            logger.warning('Failed to get info about ammo path')208            ammo_path = 'Undefined'209        loop_count = int(info.loop_count)210        try:211            lp_job = self.lp_job212            self.add_cleanup(self.unlock_targets)213            self.locked_targets = self.check_and_lock_targets(strict=self.get_option('strict_lock'),214                                                              ignore=self.get_option('ignore_target_lock'))215            if lp_job._number:216                self.make_symlink(lp_job._number)217                self.check_task_is_open()218            else:219                self.check_task_is_open()220                lp_job.create()221                self.make_symlink(lp_job.number)222            self.publish('job_no', lp_job.number)223        except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:224            logger.error(e)225            logger.error(226                'Failed to connect to Lunapark, disabling DataUploader')227            self.start_test = lambda *a, **kw: None228            self.post_process = lambda *a, **kw: None229            self.on_aggregated_data = lambda *a, **kw: None230            self.monitoring_data = lambda *a, **kw: None231            return232        cmdline = ' '.join(sys.argv)233        lp_job.edit_metainfo(234            instances=instances,235            ammo_path=ammo_path,236            loop_count=loop_count,237            regression_component=self.get_option("component"),238            cmdline=cmdline,239        )240        self.core.job.subscribe_plugin(self)241        try:242            console = self.core.get_plugin_of_type(ConsolePlugin)243        except KeyError:244            logger.debug("Console plugin not found", exc_info=True)245            console = None246        if console:247            console.add_info_widget(JobInfoWidget(self))248        self.set_option('target_host', self.target)249        self.set_option('target_port', port)250        self.set_option('cmdline', cmdline)251        self.set_option('ammo_path', ammo_path)252        self.set_option('loop_count', loop_count)253        self.__save_conf()254    def start_test(self):255        self.add_cleanup(self.join_threads)256        self.status_sender.start()257        self.upload.start()258        self.monitoring.start()259        if self.core.error_log:260            self.events.start()261        self.web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))262        logger.info("Web link: %s", self.web_link)263        self.publish("jobno", self.lp_job.number)264        self.publish("web_link", self.web_link)265        jobno_file = self.get_option("jobno_file", '')266        if jobno_file:267            logger.debug("Saving jobno to: %s", jobno_file)268            with open(jobno_file, 'w') as fdes:269                fdes.write(str(self.lp_job.number))270            self.core.add_artifact_file(jobno_file)271        self.__save_conf()272    def is_test_finished(self):273        return self.retcode274    def end_test(self, retcode):275        if retcode != 0:276            self.lp_job.interrupted.set()277        self.__save_conf()278        self.unlock_targets()279        return retcode280    def close_job(self):281        self.lp_job.close(self.retcode)282    def join_threads(self):283        self.lp_job.interrupted.set()284        if self.monitoring.is_alive():285            self.monitoring.join()286        if self.upload.is_alive():287            self.upload.join()288    def stop_events_processing(self):289        self.events_queue.put(None)290        self.events_reader.close()291        self.events_processing.close()292        if self.events_processing.is_alive():293            self.events_processing.join()294        if self.events.is_alive():295            self.lp_job.interrupted.set()296            self.events.join()297    def post_process(self, rc):298        self.retcode = rc299        self.monitoring_queue.put(None)300        self.data_queue.put(None)301        if self.core.error_log:302            self.events_queue.put(None)303            self.events_reader.close()304            self.events_processing.close()305            self.events.join()306        logger.info("Waiting for sender threads to join.")307        if self.monitoring.is_alive():308            self.monitoring.join()309        if self.upload.is_alive():310            self.upload.join()311        self.finished = True312        logger.info(313            "Web link: %s", self.web_link)314        autostop = None315        try:316            autostop = self.core.get_plugin_of_type(AutostopPlugin)317        except KeyError:318            logger.debug("No autostop plugin loaded", exc_info=True)319        if autostop and autostop.cause_criterion:320            self.lp_job.set_imbalance_and_dsc(321                autostop.imbalance_rps, autostop.cause_criterion.explain())322        else:323            logger.debug("No autostop cause detected")324        self.__save_conf()325        return rc326    def on_aggregated_data(self, data, stats):327        """328        @data: aggregated data329        @stats: stats about gun330        """331        if not self.lp_job.interrupted.is_set():332            self.data_queue.put((data, stats))333    def monitoring_data(self, data_list):334        if not self.lp_job.interrupted.is_set():335            if len(data_list) > 0:336                [self.monitoring_queue.put(chunk) for chunk in chop(data_list, self.get_option("chunk_size"))]337    def __send_status(self):338        logger.info('Status sender thread started')339        lp_job = self.lp_job340        while not lp_job.interrupted.is_set():341            try:342                self.lp_job.send_status(self.core.info.get_info_dict())343                time.sleep(self.get_option('send_status_period'))344            except (APIClient.NetworkError, APIClient.NotAvailable) as e:345                logger.warn('Failed to send status')346                logger.debug(e)347                break348            except APIClient.StoppedFromOnline:349                logger.info("Test stopped from Lunapark")350                self.retcode = self.RC_STOP_FROM_WEB351                break352            if self.finished:353                break354        logger.info("Closed Status sender thread")355    def __uploader(self, queue, sender_method, name='Uploader'):356        logger.info('{} thread started'.format(name))357        while not self.lp_job.interrupted.is_set():358            try:359                entry = queue.get(timeout=1)360                if entry is None:361                    logger.info("{} queue returned None".format(name))362                    break363                sender_method(entry)364            except Empty:365                continue366            except APIClient.StoppedFromOnline:367                logger.warning("Lunapark is rejecting {} data".format(name))368                break369            except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:370                logger.warn('Failed to push {} data'.format(name))371                logger.warn(e)372                self.lp_job.interrupted.set()373            except Exception:374                exc_type, exc_value, exc_traceback = sys.exc_info()375                logger.error("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))376                break377        # purge queue378        while not queue.empty():379            if queue.get_nowait() is None:380                break381        logger.info("Closing {} thread".format(name))382    def __data_uploader(self):383        self.__uploader(self.data_queue,384                        lambda entry: self.lp_job.push_test_data(*entry),385                        'Data Uploader')386    def __monitoring_uploader(self):387        self.__uploader(self.monitoring_queue,388                        self.lp_job.push_monitoring_data,389                        'Monitoring Uploader')390    def __events_uploader(self):391        self.__uploader(self.events_queue,392                        self.lp_job.push_events_data,393                        'Events Uploader')394    # TODO: why we do it here? should be in core395    def __save_conf(self):396        for requisites, content in self.core.artifacts_to_send:397            self.lp_job.send_config(requisites, content)398    def parse_lock_targets(self):399        # prepare target lock list400        locks_list_cfg = self.get_option('lock_targets', 'auto')401        def no_target():402            logging.warn("Target lock set to 'auto', but no target info available")403            return {}404        locks_set = {self.target} or no_target() if locks_list_cfg == 'auto' else set(locks_list_cfg)405        targets_to_lock = [host for host in locks_set if host]406        return targets_to_lock407    def lock_targets(self, targets_to_lock, ignore, strict):408        locked_targets = [target for target in targets_to_lock409                          if self.lp_job.lock_target(target, self.lock_duration, ignore, strict)]410        return locked_targets411    def unlock_targets(self):412        logger.info("Unlocking targets: %s", self.locked_targets)413        for target in self.locked_targets:414            logger.info(target)415            self.lp_job.api_client.unlock_target(target)416    def check_and_lock_targets(self, strict, ignore):417        targets_list = self.parse_lock_targets()418        logger.info('Locking targets: %s', targets_list)419        locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)420        logger.info('Locked targets: %s', locked_targets)421        return locked_targets422    def make_symlink(self, name):423        PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')424        if not os.path.exists(PLUGIN_DIR):425            os.makedirs(PLUGIN_DIR)426        try:427            os.symlink(428                os.path.relpath(429                    self.core.artifacts_dir,430                    PLUGIN_DIR),431                os.path.join(432                    PLUGIN_DIR,433                    str(name)))434        # this exception catch for filesystems w/o symlinks435        except OSError:436            logger.warning('Unable to create symlink for artifact: %s', name)437    def _get_user_agent(self):438        plugin_agent = 'Uploader/{}'.format(self.VERSION)439        return ' '.join((plugin_agent,440                         self.core.get_user_agent()))441    def __get_operator(self):442        try:443            return self.get_option(444                'operator') or pwd.getpwuid(445                os.geteuid())[0]446        except:  # noqa: E722447            logger.error(448                "Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config "449                "file.")450            raise451    def __get_api_client(self):452        logging.info('Using {} backend'.format(self.backend_type))453        if self.backend_type == BackendTypes.LUNAPARK:454            client = APIClient455            self._api_token = None456        elif self.backend_type == BackendTypes.OVERLOAD:457            client = OverloadClient458            self._api_token = self.read_token(self.get_option("token_file"))459        else:460            raise RuntimeError("Backend type doesn't match any of the expected")461        return client(base_url=self.get_option('api_address'),462                      writer_url=self.get_option('writer_endpoint'),463                      network_attempts=self.get_option('network_attempts'),464                      api_attempts=self.get_option('api_attempts'),465                      maintenance_attempts=self.get_option('maintenance_attempts'),466                      network_timeout=self.get_option('network_timeout'),467                      api_timeout=self.get_option('api_timeout'),468                      maintenance_timeout=self.get_option('maintenance_timeout'),469                      connection_timeout=self.get_option('connection_timeout'),470                      user_agent=self._get_user_agent(),471                      api_token=self.api_token,472                      core_interrupted=self.interrupted)473    @property474    def lp_job(self):475        """476        :rtype: LPJob477        """478        if self._lp_job is None:479            self._lp_job = self.__get_lp_job()480            self.core.publish(self.SECTION, 'job_no', self._lp_job.number)481            self.core.publish(self.SECTION, 'web_link', self._lp_job.web_link)482            self.core.publish(self.SECTION, 'job_name', self._lp_job.name)483            self.core.publish(self.SECTION, 'job_dsc', self._lp_job.description)484            self.core.publish(self.SECTION, 'person', self._lp_job.person)485            self.core.publish(self.SECTION, 'task', self._lp_job.task)486            self.core.publish(self.SECTION, 'version', self._lp_job.version)487            self.core.publish(self.SECTION, 'component', self.get_option('component'))488            self.core.publish(self.SECTION, 'meta', self.cfg.get('meta', {}))489        return self._lp_job490    def __get_lp_job(self):491        """492        :rtype: LPJob493        """494        api_client = self.__get_api_client()495        info = self.get_generator_info()496        port = info.port497        loadscheme = [] if isinstance(info.rps_schedule, (str, dict)) else info.rps_schedule498        lp_job = LPJob(client=api_client,499                       target_host=self.target,500                       target_port=port,501                       number=self.cfg.get('jobno', None),502                       token=self.get_option('upload_token'),503                       person=self.__get_operator(),504                       task=self.task,505                       name=self.get_option('job_name', 'untitled'),506                       description=self.get_option('job_dsc'),507                       tank=self.core.job.tank,508                       notify_list=self.get_option("notify"),509                       load_scheme=loadscheme,510                       version=self.get_option('ver'),511                       log_data_requests=self.get_option('log_data_requests'),512                       log_monitoring_requests=self.get_option('log_monitoring_requests'),513                       log_status_requests=self.get_option('log_status_requests'),514                       log_other_requests=self.get_option('log_other_requests'),515                       add_cleanup=lambda: self.add_cleanup(self.close_job))516        lp_job.send_config(LPRequisites.CONFIGINITIAL, yaml.dump(self.core.configinitial))517        return lp_job518    @property519    def task(self):520        if self._task is None:521            task = self.get_option('task')522            if task == 'dir':523                task = self.search_task_from_cwd(os.getcwd())524            self._task = task525        return self._task526    @property527    def api_token(self):528        if self._api_token == '':529            if self.backend_type == BackendTypes.LUNAPARK:530                self._api_token = None531            elif self.backend_type == BackendTypes.OVERLOAD:532                self._api_token = self.read_token(self.get_option("token_file", ""))533            else:534                raise RuntimeError("Backend type doesn't match any of the expected")535        return self._api_token536    @staticmethod537    def read_token(filename):538        if filename:539            logger.debug("Trying to read token from %s", filename)540            try:541                with open(filename, 'r') as handle:542                    data = handle.read().strip()543                    logger.info(544                        "Read authentication token from %s, "545                        "token length is %d bytes", filename, len(str(data)))546            except IOError:547                logger.error(548                    "Failed to read Overload API token from %s", filename)549                logger.info(550                    "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"551                )552                raise RuntimeError("API token error")553            return data554        else:555            logger.error("Overload API token filename is not defined")556            logger.info(557                "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"558            )559            raise RuntimeError("API token error")560    def get_generator_info(self):561        return self.core.job.generator_plugin.get_info()562    @property563    def target(self):564        if self._target is None:565            self._target = self.get_generator_info().address566            logger.info("Detected target: %s", self.target)567        return self._target568class JobInfoWidget(AbstractInfoWidget):569    def __init__(self, sender):570        # type: (Plugin) -> object571        AbstractInfoWidget.__init__(self)572        self.owner = sender573    def get_index(self):574        return 1575    def render(self, screen):576        template = "Author: " + screen.markup.RED + "%s" + \577                   screen.markup.RESET + \578                   "%s\n   Job: %s %s\n  Task: %s %s\n   Web: %s%s"579        data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],...main.py
Source:main.py  
...10import datetime11import itertools12def main(): 13    fake = Faker()14    gen = Generator(get_generator_info(),fake)15    gen.generate_customers()16    gen.generator_products()17    gen.generate_order_headers()18    gen.generate_order_details()19#ToDo: gen info could be set as config file ....refactor... 20#ToDo: product_list list of str.....move external to config or pass in as list... refactor21def get_generator_info() -> GeneratorInfo:22    gen_info = GeneratorInfo(23        order_date_order = datetime.date(2021,1,1)24        ,order_date_ship = datetime.date(2021,1,1)25        ,number_of_orders = 100026        ,number_of_customers = 1027        ,product_list = ["Spoon", "Fork", "Knife", "Plate", "Bowl", "Spork", "Cup", "Glass", "Guzzler", "Straw"] # for use with Faker Provider28        ,tax_rate = 0.06    29    )30    return gen_info31if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
