Best Python code snippet using yandex-tank
agent.py
Source:agent.py  
...177            shell=True,178            stdout=subprocess.PIPE,179            stderr=subprocess.PIPE,180            stdin=subprocess.PIPE, )181    def read_startup_config(self, cfg_file='agent_startup.cfg'):182        try:183            config = ConfigParser.ConfigParser()184            with open(os.path.join(self.working_dir, cfg_file), 'rb') as f:185                config.readfp(f)186            if config.has_section('startup'):187                for option in config.options('startup'):188                    if option.startswith('cmd'):189                        self.startups.append(config.get('startup', option))190            if config.has_section('shutdown'):191                for option in config.options('shutdown'):192                    if option.startswith('cmd'):193                        self.shutdowns.append(config.get('shutdown', option))194            if config.has_section('source'):195                for option in config.options('source'):196                    if option.startswith('file'):197                        self.custom_sources.append(config.get('source', option))198            logger.info(199                'Successfully loaded startup config.\n'200                'Startups: %s\n'201                'Shutdowns: %s\n', self.startups, self.shutdowns)202        except:203            logger.error(204                'Error trying to read agent startup config', exc_info=True)205    def run(self):206        logger.info("Running startup commands")207        for cmnd in self.startups:208            logger.debug("Run: %s", cmnd)209            proc = self.popen(cmnd)210            logger.info('Started with pid %d', proc.pid)211            self.startup_processes.append(proc)212        logger.info('Starting metrics collector..')213        cmnd = "{telegraf} -config {working_dir}/agent.cfg".format(214            telegraf=self.telegraf_path, working_dir=self.working_dir)215        self.collector = self.popen(cmnd)216        logger.info('Started with pid %d', self.collector.pid)217        telegraf_output = self.working_dir + '/monitoring.rawdata'218        sources = [telegraf_output] + self.custom_sources219        for _ in range(10):220            self.collector.poll()221            if not self.collector.returncode:222                logger.info("Waiting for telegraf...")223            else:224                logger.info(225                    "Telegraf with pid %d ended with code %d",226                    self.collector.pid, self.collector.returncode)227            if os.path.isfile(telegraf_output):228                break229            time.sleep(1)230        self.drain = Drain(231            Consolidator([iter(DataReader(f)) for f in sources]), self.results)232        self.drain.start()233        self.drain_stdout = Drain(234            DataReader(235                self.collector.stdout, pipe=True), self.results_stdout)236        self.drain_stdout.start()237        self.drain_err = Drain(238            DataReader(239                self.collector.stderr, pipe=True), self.results_err)240        self.drain_err.start()241        while not self.finished:242            for _ in range(self.results.qsize()):243                try:244                    data = self.results.get_nowait()245                    logger.debug(246                        'send %s bytes of data to collector', len(data))247                    sys.stdout.write(str(data) + '\n')248                    sys.stdout.flush()249                except q.Empty:250                    break251                except:252                    logger.error(253                        'Something nasty happend trying to send data',254                        exc_info=True)255            for _ in range(self.results_stdout.qsize()):256                try:257                    data = self.results_stdout.get_nowait()258                    if data:259                        collector_logger.info("STDOUT: %s", data)260                except q.Empty:261                    break262            for _ in range(self.results_err.qsize()):263                try:264                    data = self.results_err.get_nowait()265                    if data:266                        collector_logger.info("STDERR: %s", data.rstrip('\n'))267                except q.Empty:268                    break269            time.sleep(1)270        self.drain.close()271        self.drain_stdout.close()272        self.drain_err.close()273        self.stop()274    def proc_stop(self, proc, kill=False):275        proc.poll()276        if proc.returncode is None:277            try:278                if kill:279                    logger.info("Killing PID %s", proc.pid)280                    os.killpg(proc.pid, signal.SIGKILL)281                else:282                    logger.debug("Terminating: %s", proc.pid)283                    os.killpg(proc.pid, signal.SIGTERM)284                    proc.wait()285                    logger.info(286                        'Retcode for PID %s %s', proc.pid, proc.returncode)287            except OSError as ex:288                if ex.errno == 3:289                    logger.info("PID %s already died", proc.pid)290    def kill(self):291        logger.info("Forced stop")292        for proc in self.startup_processes:293            self.proc_stop(proc, kill=True)294        self.proc_stop(self.collector, kill=True)295    def stop(self):296        logger.info("Terminating startup commands")297        for proc in self.startup_processes:298            self.proc_stop(proc)299        logger.info('Terminating collector process: %s', self.collector)300        self.proc_stop(self.collector)301        logger.info("Running shutdown commands")302        for cmnd in self.shutdowns:303            logger.debug("Run: %s", cmnd)304            subprocess.call(cmnd, shell=True)305        self.finished = True306        logger.info("Worker thread finished")307        sys.stderr.write('stopped\n')308def main():309    fname = os.path.dirname(__file__) + "/_agent.log"310    logging.basicConfig(311        level=logging.DEBUG,312        filename=fname,313        format='%(asctime)s [%(levelname)s] %(name)s:%(lineno)d %(message)s')314    parser = OptionParser()315    parser.add_option(316        "",317        "--telegraf",318        dest="telegraf_path",319        help="telegraf_path",320        default="/tmp/telegraf")321    parser.add_option(322        "",323        "--host",324        dest="hostname_path",325        help="telegraf_path",326        default="/usr/bin/telegraf")327    (options, args) = parser.parse_args()328    logger.info('Init')329    customs_script = os.path.dirname(__file__) + '/agent_customs.sh'330    try:331        logger.info(332            'Trying to make telegraf executable: %s', options.telegraf_path)333        # 0o755 compatible with old python versions. 744 is NOT enough334        os.chmod(options.telegraf_path, 493)335    except OSError:336        logger.warning(337            'Unable to set %s access rights to execute.',338            options.telegraf_path,339            exc_info=True)340    try:341        logger.info(342            'Trying to make customs script executable: %s', customs_script)343        # 0o755 compatible with old python versions. 744 is NOT enough344        os.chmod(customs_script, 493)345    except OSError:346        logger.warning(347            'Unable to set %s access rights to execute.',348            customs_script,349            exc_info=True)350    worker = AgentWorker(options.telegraf_path)351    worker.read_startup_config()352    logger.info('Starting AgentWorker: %s', worker)353    worker.start()354    try:355        logger.debug("Check for any stdin command for shutdown")356        cmd = sys.stdin.readline()357        if cmd:358            logger.info("Stdin cmd received: %s", cmd)359    except KeyboardInterrupt:360        logger.debug("Interrupted")361    except:362        logger.error(363            "Something nasty happened while waiting for stop", exc_info=True)364    worker.finished = True365    agent_finished = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
