Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py  
...85    @abstractmethod86    def lock_target(self):87        """method for locking target during..."""88    @abstractmethod89    def set_imbalance_and_dsc(self):90        """method for imbalance detection"""91    @abstractmethod92    def is_target_locked(self):93        """method for checking target lock"""94def chop(data_list, chunk_size):95    if sys.getsizeof(str(data_list)) <= chunk_size:96        return [data_list]97    elif len(data_list) == 1:98        logger.info("Too large piece of Telegraf data. Might experience upload problems.")99        return [data_list]100    else:101        mid = len(data_list) / 2102        return chop(data_list[:mid], chunk_size) + chop(data_list[mid:], chunk_size)103class Plugin(AbstractPlugin, AggregateResultListener,104             MonitoringDataListener):105    RC_STOP_FROM_WEB = 8106    VERSION = '3.0'107    SECTION = 'uploader'108    def __init__(self, core, cfg, name):109        AbstractPlugin.__init__(self, core, cfg, name)110        self.data_queue = Queue()111        self.monitoring_queue = Queue()112        if self.core.error_log:113            self.events_queue = Queue()114            self.events_reader = EventsReader(self.core.error_log)115            self.events_processing = Drain(self.events_reader, self.events_queue)116            self.add_cleanup(self.stop_events_processing)117            self.events_processing.start()118            self.events = threading.Thread(target=self.__events_uploader)119            self.events.daemon = True120        self.retcode = -1121        self._target = None122        self.task_name = ''123        self.token_file = None124        self.version_tested = None125        self.send_status_period = 10126        self.status_sender = threading.Thread(target=self.__send_status)127        self.status_sender.daemon = True128        self.upload = threading.Thread(target=self.__data_uploader)129        self.upload.daemon = True130        self.monitoring = threading.Thread(target=self.__monitoring_uploader)131        self.monitoring.daemon = True132        self._is_telegraf = None133        self.backend_type = BackendTypes.identify_backend(self.cfg['api_address'], self.cfg_section_name)134        self._task = None135        self._api_token = ''136        self._lp_job = None137        self._lock_duration = None138        self._info = None139        self.locked_targets = []140        self.web_link = None141        self.finished = False142    def set_option(self, option, value):143        self.cfg.setdefault('meta', {})[option] = value144        self.core.publish(self.SECTION, 'meta.{}'.format(option), value)145    @staticmethod146    def get_key():147        return __file__148    @property149    def lock_duration(self):150        if self._lock_duration is None:151            info = self.get_generator_info()152            self._lock_duration = info.duration if info.duration else \153                expand_to_seconds(self.get_option("target_lock_duration"))154        return self._lock_duration155    def get_available_options(self):156        opts = [157            "api_address",158            "writer_endpoint",159            "task",160            "job_name",161            "job_dsc",162            "notify",163            "ver", "component",164            "operator",165            "jobno_file",166            "ignore_target_lock",167            "target_lock_duration",168            "lock_targets",169            "jobno",170            "upload_token",171            'connection_timeout',172            'network_attempts',173            'api_attempts',174            'maintenance_attempts',175            'network_timeout',176            'api_timeout',177            'maintenance_timeout',178            'strict_lock',179            'send_status_period',180            'log_data_requests',181            'log_monitoring_requests',182            'log_status_requests',183            'log_other_requests',184            'threads_timeout',185            'chunk_size'186        ]187        return opts188    def configure(self):189        self.core.publish(self.SECTION, 'component', self.get_option('component'))190        self.core.publish(self.SECTION, 'task', self.get_option('task'))191        self.core.publish(self.SECTION, 'job_name', self.get_option('job_name'))192    def check_task_is_open(self):193        if self.backend_type != BackendTypes.LUNAPARK:194            return195        TASK_TIP = 'The task should be connected to Lunapark.' \196                   'Open startrek task page, click "actions" -> "load testing".'197        logger.debug("Check if task %s is open", self.task)198        try:199            task_data = self.lp_job.get_task_data(self.task)[0]200            try:201                task_status = task_data['status']202                if task_status == 'Open':203                    logger.info("Task %s is ok", self.task)204                    self.task_name = str(task_data['name'])205                else:206                    logger.info("Task %s:" % self.task)207                    logger.info(task_data)208                    raise RuntimeError("Task is not open")209            except KeyError:210                try:211                    error = task_data['error']212                    raise RuntimeError(213                        "Task %s error: %s\n%s" %214                        (self.task, error, TASK_TIP))215                except KeyError:216                    raise RuntimeError(217                        'Unknown task data format:\n{}'.format(task_data))218        except requests.exceptions.HTTPError as ex:219            logger.error(220                "Failed to check task status for '%s': %s", self.task, ex)221            if ex.response.status_code == 404:222                raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))223            elif ex.response.status_code == 500 or ex.response.status_code == 400:224                raise RuntimeError(225                    "Unable to check task staus, id: %s, error code: %s" %226                    (self.task, ex.response.status_code))227            raise ex228    @staticmethod229    def search_task_from_cwd(cwd):230        issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")231        while cwd:232            logger.debug("Checking if dir is named like JIRA issue: %s", cwd)233            if issue.match(os.path.basename(cwd)):234                res = re.search(issue, os.path.basename(cwd))235                return res.group(1).upper()236            newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))237            if newdir == cwd:238                break239            else:240                cwd = newdir241        raise RuntimeError(242            "task=dir requested, but no JIRA issue name in cwd: %s" %243            os.getcwd())244    def prepare_test(self):245        info = self.get_generator_info()246        port = info.port247        instances = info.instances248        if info.ammo_file is not None:249            if info.ammo_file.startswith("http://") or info.ammo_file.startswith("https://"):250                ammo_path = info.ammo_file251            else:252                ammo_path = os.path.realpath(info.ammo_file)253        else:254            logger.warning('Failed to get info about ammo path')255            ammo_path = 'Undefined'256        loop_count = int(info.loop_count)257        try:258            lp_job = self.lp_job259            self.add_cleanup(self.unlock_targets)260            self.locked_targets = self.check_and_lock_targets(strict=self.get_option('strict_lock'),261                                                              ignore=self.get_option('ignore_target_lock'))262            if lp_job._number:263                self.make_symlink(lp_job._number)264                self.check_task_is_open()265            else:266                self.check_task_is_open()267                lp_job.create()268                self.make_symlink(lp_job.number)269            self.publish('job_no', lp_job.number)270        except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:271            logger.error(e)272            logger.error(273                'Failed to connect to Lunapark, disabling DataUploader')274            self.start_test = lambda *a, **kw: None275            self.post_process = lambda *a, **kw: None276            self.on_aggregated_data = lambda *a, **kw: None277            self.monitoring_data = lambda *a, **kw: None278            return279        cmdline = ' '.join(sys.argv)280        lp_job.edit_metainfo(281            instances=instances,282            ammo_path=ammo_path,283            loop_count=loop_count,284            regression_component=self.get_option("component"),285            cmdline=cmdline,286        )287        self.core.job.subscribe_plugin(self)288        try:289            console = self.core.get_plugin_of_type(ConsolePlugin)290        except KeyError as ex:291            logger.debug(ex)292            console = None293        if console:294            console.add_info_widget(JobInfoWidget(self))295        self.set_option('target_host', self.target)296        self.set_option('target_port', port)297        self.set_option('cmdline', cmdline)298        self.set_option('ammo_path', ammo_path)299        self.set_option('loop_count', loop_count)300        self.__save_conf()301    def start_test(self):302        self.add_cleanup(self.join_threads)303        self.status_sender.start()304        self.upload.start()305        self.monitoring.start()306        if self.core.error_log:307            self.events.start()308        self.web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))309        logger.info("Web link: %s", self.web_link)310        self.publish("jobno", self.lp_job.number)311        self.publish("web_link", self.web_link)312        jobno_file = self.get_option("jobno_file", '')313        if jobno_file:314            logger.debug("Saving jobno to: %s", jobno_file)315            with open(jobno_file, 'w') as fdes:316                fdes.write(str(self.lp_job.number))317            self.core.add_artifact_file(jobno_file)318        self.__save_conf()319    def is_test_finished(self):320        return self.retcode321    def end_test(self, retcode):322        if retcode != 0:323            self.lp_job.interrupted.set()324        self.__save_conf()325        self.unlock_targets()326        return retcode327    def close_job(self):328        self.lp_job.close(self.retcode)329    def join_threads(self):330        self.lp_job.interrupted.set()331        if self.monitoring.is_alive():332            self.monitoring.join()333        if self.upload.is_alive():334            self.upload.join()335    def stop_events_processing(self):336        self.events_queue.put(None)337        self.events_reader.close()338        self.events_processing.close()339        if self.events_processing.is_alive():340            self.events_processing.join()341        if self.events.is_alive():342            self.lp_job.interrupted.set()343            self.events.join()344    def post_process(self, rc):345        self.retcode = rc346        self.monitoring_queue.put(None)347        self.data_queue.put(None)348        if self.core.error_log:349            self.events_queue.put(None)350            self.events_reader.close()351            self.events_processing.close()352            self.events.join()353        logger.info("Waiting for sender threads to join.")354        if self.monitoring.is_alive():355            self.monitoring.join()356        if self.upload.is_alive():357            self.upload.join()358        self.finished = True359        logger.info(360            "Web link: %s", self.web_link)361        autostop = None362        try:363            autostop = self.core.get_plugin_of_type(AutostopPlugin)364        except KeyError as ex:365            logger.debug(ex)366        if autostop and autostop.cause_criterion:367            self.lp_job.set_imbalance_and_dsc(368                autostop.imbalance_rps, autostop.cause_criterion.explain())369        else:370            logger.debug("No autostop cause detected")371        self.__save_conf()372        return rc373    def on_aggregated_data(self, data, stats):374        """375        @data: aggregated data376        @stats: stats about gun377        """378        if not self.lp_job.interrupted.is_set():379            self.data_queue.put((data, stats))380    def monitoring_data(self, data_list):381        if not self.lp_job.interrupted.is_set():382            if len(data_list) > 0:383                [self.monitoring_queue.put(chunk) for chunk in chop(data_list, self.get_option("chunk_size"))]384    def __send_status(self):385        logger.info('Status sender thread started')386        lp_job = self.lp_job387        while not lp_job.interrupted.is_set():388            try:389                self.lp_job.send_status(self.core.info.get_info_dict())390                time.sleep(self.get_option('send_status_period'))391            except (APIClient.NetworkError, APIClient.NotAvailable) as e:392                logger.warn('Failed to send status')393                logger.debug(e)394                break395            except APIClient.StoppedFromOnline:396                logger.info("Test stopped from Lunapark")397                self.retcode = self.RC_STOP_FROM_WEB398                break399            if self.finished:400                break401        logger.info("Closed Status sender thread")402    def __uploader(self, queue, sender_method, name='Uploader'):403        logger.info('{} thread started'.format(name))404        while not self.lp_job.interrupted.is_set():405            try:406                entry = queue.get(timeout=1)407                if entry is None:408                    logger.info("{} queue returned None".format(name))409                    break410                sender_method(entry)411            except Empty:412                continue413            except APIClient.StoppedFromOnline:414                logger.warning("Lunapark is rejecting {} data".format(name))415                break416            except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:417                logger.warn('Failed to push {} data'.format(name))418                logger.warn(e)419                self.lp_job.interrupted.set()420            except Exception:421                exc_type, exc_value, exc_traceback = sys.exc_info()422                logger.error("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))423                break424        # purge queue425        while not queue.empty():426            if queue.get_nowait() is None:427                break428        logger.info("Closing {} thread".format(name))429    def __data_uploader(self):430        self.__uploader(self.data_queue,431                        lambda entry: self.lp_job.push_test_data(*entry),432                        'Data Uploader')433    def __monitoring_uploader(self):434        self.__uploader(self.monitoring_queue,435                        self.lp_job.push_monitoring_data,436                        'Monitoring Uploader')437    def __events_uploader(self):438        self.__uploader(self.events_queue,439                        self.lp_job.push_events_data,440                        'Events Uploader')441    # TODO: why we do it here? should be in core442    def __save_conf(self):443        for requisites, content in self.core.artifacts_to_send:444            self.lp_job.send_config(requisites, content)445    def parse_lock_targets(self):446        # prepare target lock list447        locks_list_cfg = self.get_option('lock_targets', 'auto')448        def no_target():449            logging.warn("Target lock set to 'auto', but no target info available")450            return {}451        locks_set = {self.target} or no_target() if locks_list_cfg == 'auto' else set(locks_list_cfg)452        targets_to_lock = [host for host in locks_set if host]453        return targets_to_lock454    def lock_targets(self, targets_to_lock, ignore, strict):455        locked_targets = [target for target in targets_to_lock456                          if self.lp_job.lock_target(target, self.lock_duration, ignore, strict)]457        return locked_targets458    def unlock_targets(self):459        logger.info("Unlocking targets: %s", self.locked_targets)460        for target in self.locked_targets:461            logger.info(target)462            self.lp_job.api_client.unlock_target(target)463    def check_and_lock_targets(self, strict, ignore):464        targets_list = self.parse_lock_targets()465        logger.info('Locking targets: %s', targets_list)466        locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)467        logger.info('Locked targets: %s', locked_targets)468        return locked_targets469    def make_symlink(self, name):470        PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')471        if not os.path.exists(PLUGIN_DIR):472            os.makedirs(PLUGIN_DIR)473        try:474            os.symlink(475                os.path.relpath(476                    self.core.artifacts_dir,477                    PLUGIN_DIR),478                os.path.join(479                    PLUGIN_DIR,480                    str(name)))481        # this exception catch for filesystems w/o symlinks482        except OSError:483            logger.warning('Unable to create symlink for artifact: %s', name)484    def _get_user_agent(self):485        plugin_agent = 'Uploader/{}'.format(self.VERSION)486        return ' '.join((plugin_agent,487                         self.core.get_user_agent()))488    def __get_operator(self):489        try:490            return self.get_option(491                'operator') or pwd.getpwuid(492                os.geteuid())[0]493        except:  # noqa: E722494            logger.error(495                "Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config "496                "file.")497            raise498    def __get_api_client(self):499        logging.info('Using {} backend'.format(self.backend_type))500        self._api_token = None501        if self.backend_type == BackendTypes.LUNAPARK:502            client = APIClient503        elif self.backend_type == BackendTypes.OVERLOAD:504            client = OverloadClient505            self._api_token = self.read_token(self.get_option("token_file"))506        elif self.backend_type == BackendTypes.CLOUD:507            return CloudGRPCClient(core_interrupted=self.interrupted,508                                   base_url=self.get_option('api_address'),509                                   api_attempts=self.get_option('api_attempts'),510                                   connection_timeout=self.get_option('connection_timeout'))511        else:512            raise RuntimeError("Backend type doesn't match any of the expected")513        return client(base_url=self.get_option('api_address'),514                      writer_url=self.get_option('writer_endpoint'),515                      network_attempts=self.get_option('network_attempts'),516                      api_attempts=self.get_option('api_attempts'),517                      maintenance_attempts=self.get_option('maintenance_attempts'),518                      network_timeout=self.get_option('network_timeout'),519                      api_timeout=self.get_option('api_timeout'),520                      maintenance_timeout=self.get_option('maintenance_timeout'),521                      connection_timeout=self.get_option('connection_timeout'),522                      user_agent=self._get_user_agent(),523                      api_token=self.api_token,524                      core_interrupted=self.interrupted)525    @property526    def lp_job(self):527        """528        :rtype: LPJob529        """530        if self._lp_job is None:531            self._lp_job = self.__get_lp_job()532            self.core.publish(self.SECTION, 'job_no', self._lp_job.number)533            self.core.publish(self.SECTION, 'web_link', self._lp_job.web_link)534            self.core.publish(self.SECTION, 'job_name', self._lp_job.name)535            self.core.publish(self.SECTION, 'job_dsc', self._lp_job.description)536            self.core.publish(self.SECTION, 'person', self._lp_job.person)537            self.core.publish(self.SECTION, 'task', self._lp_job.task)538            self.core.publish(self.SECTION, 'version', self._lp_job.version)539            self.core.publish(self.SECTION, 'component', self.get_option('component'))540            self.core.publish(self.SECTION, 'meta', self.cfg.get('meta', {}))541        return self._lp_job542    def __get_lp_job(self):543        """544        :rtype: LPJob545        """546        api_client = self.__get_api_client()547        info = self.get_generator_info()548        port = info.port549        loadscheme = [] if isinstance(info.rps_schedule, (str, dict)) else info.rps_schedule550        if self.backend_type == BackendTypes.CLOUD:551            lp_job = CloudLoadTestingJob(client=api_client,552                                         target_host=self.target,553                                         target_port=port,554                                         tank_job_id=self.core.test_id,555                                         storage=self.core.storage,556                                         name=self.get_option('job_name', 'untitled'),557                                         description=self.get_option('job_dsc'),558                                         load_scheme=loadscheme)559        else:560            lp_job = LPJob(client=api_client,561                           target_host=self.target,562                           target_port=port,563                           number=self.cfg.get('jobno'),564                           token=self.get_option('upload_token'),565                           person=self.__get_operator(),566                           task=self.task,567                           name=self.get_option('job_name', 'untitled'),568                           description=self.get_option('job_dsc'),569                           tank=self.core.job.tank,570                           notify_list=self.get_option("notify"),571                           load_scheme=loadscheme,572                           version=self.get_option('ver'),573                           log_data_requests=self.get_option('log_data_requests'),574                           log_monitoring_requests=self.get_option('log_monitoring_requests'),575                           log_status_requests=self.get_option('log_status_requests'),576                           log_other_requests=self.get_option('log_other_requests'),577                           add_cleanup=lambda: self.add_cleanup(self.close_job))578            lp_job.send_config(LPRequisites.CONFIGINITIAL, yaml.dump(self.core.configinitial))579        return lp_job580    @property581    def task(self):582        if self._task is None:583            task = self.get_option('task')584            if task == 'dir':585                task = self.search_task_from_cwd(os.getcwd())586            self._task = task587        return self._task588    @property589    def api_token(self):590        if self._api_token == '':591            if self.backend_type in [BackendTypes.LUNAPARK, BackendTypes.CLOUD]:592                self._api_token = None593            elif self.backend_type == BackendTypes.OVERLOAD:594                self._api_token = self.read_token(self.get_option("token_file", ""))595            else:596                raise RuntimeError("Backend type doesn't match any of the expected")597        return self._api_token598    @staticmethod599    def read_token(filename):600        if filename:601            logger.debug("Trying to read token from %s", filename)602            try:603                with open(filename, 'r') as handle:604                    data = handle.read().strip()605                    logger.info(606                        "Read authentication token from %s, "607                        "token length is %d bytes", filename, len(str(data)))608            except IOError:609                logger.error(610                    "Failed to read Overload API token from %s", filename)611                logger.info(612                    "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"613                )614                raise RuntimeError("API token error")615            return data616        else:617            logger.error("Overload API token filename is not defined")618            logger.info(619                "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"620            )621            raise RuntimeError("API token error")622    def get_generator_info(self):623        return self.core.job.generator_plugin.get_info()624    @property625    def target(self):626        if self._target is None:627            self._target = self.get_generator_info().address628            logger.info("Detected target: %s", self.target)629        return self._target630class JobInfoWidget(AbstractInfoWidget):631    def __init__(self, sender):632        # type: (Plugin) -> object633        AbstractInfoWidget.__init__(self)634        self.owner = sender635    def get_index(self):636        return 1637    def render(self, screen):638        template = "Author: " + screen.markup.RED + "%s" + \639                   screen.markup.RESET + \640                   "%s\n   Job: %s %s\n  Task: %s %s\n   Web: %s%s"641        data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],642                self.owner.lp_job.number, self.owner.lp_job.name, self.owner.lp_job.task,643                # todo: task_name from api_client.get_task_data()644                self.owner.lp_job.task, self.owner.lp_job.api_client.base_url,645                self.owner.lp_job.number)646        return template % data647class LPJob(Job):648    def __init__(649        self,650        client,651        target_host,652        target_port,653        person,654        task,655        name,656        description,657        tank,658        log_data_requests=False,659        log_other_requests=False,660        log_status_requests=False,661        log_monitoring_requests=False,662        number=None,663        token=None,664        notify_list=None,665        version=None,666        detailed_time=None,667        load_scheme=None,668        add_cleanup=lambda: None669    ):670        """671        :param client: APIClient672        :param log_data_requests: bool673        :param log_other_request: bool674        :param log_status_requests: bool675        :param log_monitoring_requests: bool676        """677        assert bool(number) == bool(678            token), 'Job number and upload token should come together'679        self.log_other_requests = log_other_requests680        self.log_data_requests = log_data_requests681        self.log_status_requests = log_status_requests682        self.log_monitoring_requests = log_monitoring_requests683        self.name = name684        self.tank = tank685        self.target_host = target_host686        self.target_port = target_port687        self.person = person688        self.task = task689        self.interrupted = threading.Event()690        self._number = number691        self._token = token692        self.api_client = client693        self.notify_list = notify_list694        self.description = description695        self.version = version696        self.detailed_time = detailed_time697        self.load_scheme = load_scheme698        self.is_finished = False699        self.web_link = ''700        self.add_cleanup = add_cleanup701        if self._number:702            self.add_cleanup()703    def push_test_data(self, data, stats):704        if not self.interrupted.is_set():705            try:706                self.api_client.push_test_data(707                    self.number, self.token, data, stats, self.interrupted, trace=self.log_data_requests)708            except (APIClient.NotAvailable, APIClient.NetworkError, APIClient.UnderMaintenance):709                logger.warn('Failed to push test data')710                self.interrupted.set()711    def edit_metainfo(712        self,713        instances=0,714        ammo_path=None,715        loop_count=None,716        regression_component=None,717        cmdline=None,718        is_starred=False,719        tank_type=1720    ):721        try:722            self.api_client.edit_job_metainfo(jobno=self.number,723                                              job_name=self.name,724                                              job_dsc=self.description,725                                              instances=instances,726                                              ammo_path=ammo_path,727                                              loop_count=loop_count,728                                              version_tested=self.version,729                                              component=regression_component,730                                              cmdline=cmdline,731                                              is_starred=is_starred,732                                              tank_type=tank_type,733                                              trace=self.log_other_requests)734        except (APIClient.NotAvailable, APIClient.StoppedFromOnline, APIClient.NetworkError,735                APIClient.UnderMaintenance) as e:736            logger.warn('Failed to edit job metainfo on Lunapark')737            logger.warn(e)738    @property739    def number(self):740        if not self._number:741            self.create()742        return self._number743    @property744    def token(self):745        if not self._token:746            self.create()747        return self._token748    def close(self, rc):749        if self._number:750            return self.api_client.close_job(self.number, rc, trace=self.log_other_requests)751        else:752            return True753    def create(self):754        self._number, self._token = self.api_client.new_job(task=self.task,755                                                            person=self.person,756                                                            tank=self.tank,757                                                            loadscheme=self.load_scheme,758                                                            target_host=self.target_host,759                                                            target_port=self.target_port,760                                                            detailed_time=self.detailed_time,761                                                            notify_list=self.notify_list,762                                                            trace=self.log_other_requests)763        self.add_cleanup()764        logger.info('Job created: {}'.format(self._number))765        self.web_link = urljoin(self.api_client.base_url, str(self._number))766    def send_status(self, status):767        if self._number and not self.interrupted.is_set():768            self.api_client.send_status(769                self.number,770                self.token,771                status,772                trace=self.log_status_requests)773    def get_task_data(self, task):774        return self.api_client.get_task_data(775            task, trace=self.log_other_requests)776    def send_config(self, lp_requisites, content):777        self.api_client.send_config(self.number, lp_requisites, content, trace=self.log_other_requests)778    def push_monitoring_data(self, data):779        if not self.interrupted.is_set():780            self.api_client.push_monitoring_data(781                self.number, self.token, data, self.interrupted, trace=self.log_monitoring_requests)782    def push_events_data(self, data):783        if not self.interrupted.is_set():784            self.api_client.push_events_data(self.number, self.person, data)785    def lock_target(self, lock_target, lock_target_duration, ignore, strict):786        lock_wait_timeout = 10787        maintenance_timeouts = iter([0]) if ignore else iter(lambda: lock_wait_timeout, 0)788        while True:789            try:790                self.api_client.lock_target(lock_target,791                                            lock_target_duration,792                                            trace=self.log_other_requests,793                                            maintenance_timeouts=maintenance_timeouts,794                                            maintenance_msg="Target is locked.\nManual unlock link: %s/%s" % (795                                                self.api_client.base_url,796                                                self.api_client.get_manual_unlock_link(lock_target)797                                            ))798                return True799            except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e:800                logger.info('Target is not locked due to %s', e)801                if ignore:802                    logger.info('ignore_target_locks = 1')803                    return False804                elif strict:805                    raise e806                else:807                    logger.info('strict_lock = 0')808                    return False809            except APIClient.UnderMaintenance:810                logger.info('Target is locked')811                if ignore:812                    logger.info('ignore_target_locks = 1')813                    return False814                logger.info("Manual unlock link: %s/%s",815                            self.api_client.base_url,816                            self.api_client.get_manual_unlock_link(lock_target))817                continue818    def set_imbalance_and_dsc(self, rps, comment):819        return self.api_client.set_imbalance_and_dsc(self.number, rps, comment)820    def is_target_locked(self, host, strict):821        while True:822            try:823                return self.api_client.is_target_locked(824                    host, trace=self.log_other_requests)825            except APIClient.UnderMaintenance:826                logger.info('Target is locked, retrying...')827                continue828            except (APIClient.StoppedFromOnline, APIClient.NotAvailable, APIClient.NetworkError):829                logger.info('Can\'t check whether target is locked\n')830                if strict:831                    logger.warn('Stopping test due to strict_lock')832                    raise833                else:834                    logger.warn('strict_lock is False, proceeding')835                    return {'status': 'ok'}836class CloudLoadTestingJob(Job):837    def __init__(838        self,839        client,840        target_host,841        target_port,842        name,843        description,844        tank_job_id,845        storage,846        load_scheme=None,847    ):848        self.target_host = target_host849        self.target_port = target_port850        self.tank_job_id = tank_job_id851        self.name = name852        self.description = description853        self._number = None  # cloud job id854        self.api_client = client855        self.load_scheme = load_scheme856        self.interrupted = threading.Event()857        self.storage = storage858        # self.create()  # FIXME check it out, maybe it is useless859    def push_test_data(self, data, stats):860        if not self.interrupted.is_set():861            try:862                self.api_client.push_test_data(863                    self.number, data, stats, self.interrupted)864            except (CloudGRPCClient.NotAvailable, CloudGRPCClient.AgentIdNotFound, RuntimeError):865                logger.warn('Failed to push test data')866                self.interrupted.set()867    def edit_metainfo(self, *args, **kwargs):868        logger.info('Cloud service has already set metainfo')869    # cloud job id870    @property871    def number(self):872        if not self._number:873            raise self.UnknownJobNumber('Job number is unknown')874        return self._number875    def close(self, *args, **kwargs):876        logger.debug('Cannot close job in the cloud mode')877    def create(self):878        cloud_job_id = self.storage.get_cloud_job_id(self.tank_job_id)879        if cloud_job_id is None:880            response = self.api_client.create_test(self.target_host, self.target_port, self.name, self.description, self.load_scheme)881            self.storage.push_job(cloud_job_id, self.core.test_id)882            metadata = test_service_pb2.CreateTestMetadata()883            response.metadata.Unpack(metadata)884            self._number = metadata.id885            logger.info('Job was created: {}'.format(self._number))886        else:887            self._number = cloud_job_id888    def send_status(self, *args, **kwargs):889        logger.debug('Tank client is sending the status')890    def send_config(self, *args, **kwargs):891        logger.debug('Do not send config to the cloud service')892    def push_monitoring_data(self, *args, **kwargs):893        logger.debug('Do not push monitoring data for cloud service')894    def push_events_data(self, *args, **kwargs):895        logger.debug('Do not push event data for cloud service')896    def lock_target(self, *args, **kwargs):897        logger.debug('Target locking is not implemented for cloud')898    def set_imbalance_and_dsc(self, *args, **kwargs):899        logger.debug('Imbalance detection is not implemented for cloud')900    def is_target_locked(self, *args, **kwargs):901        logger.debug('Target locking is not implemented for cloud')902class EventsReader(FileScanner):903    """904    Parse lines and return stats905    """906    def __init__(self, *args, **kwargs):907        super(EventsReader, self).__init__(*args, **kwargs)908    def _read_data(self, lines):909        results = []910        for line in lines:911            # 2018-03-30 13:40:50,541\tCan't get monitoring config912            data = line.split("\t")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
