Best Python code snippet using yandex-tank
client.py
Source:client.py  
...346            self.ssh.rm_r(self.path['AGENT_REMOTE_FOLDER'])347        except Exception:348            logger.error("Unable to get agent artefacts", exc_info=True)349        if not self.successfull_stop:350            self._kill_agent()351        return log_filename, data_filename352    def _wait_for_stop(self):353        done = False354        agent_stop_timeout = 5355        while not done:356            if not self.stop_sent:357                logger.info('Session was broken on %s, switch to kill', self.host)358                break359            if (self.stop_sent + agent_stop_timeout) < time.time():360                logger.info("Agent hasn't finished in %s sec, force quit on %s",361                            agent_stop_timeout,362                            self.host)363                break364            if self.session.finished():365                logger.debug('Session ended with status %s with %s',366                             self.session.exit_status(),367                             self.host)368                self.successfull_stop = True369                done = True370            agent_stderr = self.session.read_err_maybe()371            if agent_stderr and 'stopped' in agent_stderr:372                logger.debug('Got stopped message from %s', self.host)373                done = True374    def _kill_agent(self):375        if self.agent_remote_folder:376            tpl = ('main_p=$(pgrep -f "[p]ython.*{folder}");'377                   'tlgrf_p=$(pgrep -f "[t]elegraf.*{folder}");'378                   'descendent_pids(){{ if [ "x$1" != "x" ]; then '379                   'pids=$(pgrep -P $1); echo $pids;'380                   'for p in $pids; do descendent_pids $p; done; fi }};'381                   'all_p=$(descendent_pids ${{main_p}});'382                   'if [ "x${{main_p}}${{tlgrf_p}}${{all_p}}" != "x" ] ; then '383                   ' kill -9 ${{main_p}} ${{tlgrf_p}} ${{all_p}}; fi')384            cmd = tpl.format(folder=self.agent_remote_folder)385            out, errors, err_code = self.ssh.execute(cmd)386            if errors:387                logger.error(388                    "[%s] error while killing agent: '%s'",...SSHer.py
Source:SSHer.py  
...198        cmd = ' '.join(pieces)199        self._info.log( 'execute: %s' % cmd )200        failed, output, error = spawn( cmd )201        if private_key:202            self._kill_agent(pid)203        if failed and not suppressException:204            msg = '%r failed: %s' % (205                cmd, error )206            raise RemoteAccessError, msg207        return failed, output, error208    def _create_agent(self):209        cmd = 'eval `ssh-agent`  > /dev/null; echo $SSH_AUTH_SOCK; echo $SSH_AGENT_PID'210        failed, output, error = spawn( cmd )211        if failed:212            msg = '%r failed: %s' % (213                cmd, error )214            raise RuntimeError, msg215        lines = output.splitlines()216        socket = lines[0].strip()217        pid = lines[1].strip()218        return socket, pid219    220    def _create_agent_with_key(self, private_key):221        socket, pid = self._create_agent()222        cmd = 'SSH_AUTH_SOCK=%s ssh-add "%s" > /dev/null; ' % (223            socket, private_key)224        failed, output, error = spawn( cmd )225        if failed:226            self._kill_agent(pid)227            msg = '%r failed: %s' % (228                cmd, error )229            raise RuntimeError, msg230        return socket, pid231    def _kill_agent(self, pid):232        cmd = 'kill -9 %s' % pid233        failed, output, error = spawn( cmd )234        if failed:235            msg = '%r failed: %s' % (236                cmd, error )237            raise RuntimeError, msg238        return239    def _isdirectory(self, server, path):240        if _localhost(server):241            return os.path.isdir(path)242        cmd = '''python -c "import os; print int(os.path.isdir('%s'))"''' % path243        failed, out, err = self.execute(cmd, server, '')244        return int(out)245        ...__init__.py
Source:__init__.py  
2from . import parse_agent_variables3import logging4import os5import atexit6def _kill_agent():7    logging.info('killing previously started ssh-agent')8    subprocess.run(['ssh-agent', '-k'])9    del os.environ['SSH_AUTH_SOCK']10    del os.environ['SSH_AGENT_PID']11def _setup_agent():12    process = subprocess.run(['ssh-agent', '-s'], stdout=subprocess.PIPE, text=True)13    agent_data = parse_agent_variables.parse(process.stdout)14    logging.info('ssh agent data: {}'.format(agent_data))15    logging.info('exporting ssh agent environment variables')16    os.environ['SSH_AUTH_SOCK'] = agent_data['SSH_AUTH_SOCK']17    os.environ['SSH_AGENT_PID'] = agent_data['SSH_AGENT_PID']18    atexit.register(_kill_agent)19def agent_up():20    return os.environ.get('SSH_AUTH_SOCK') is not None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
