Best Python code snippet using yandex-tank
__init__.py
Source:__init__.py  
...18@click.argument('instance', default=1, metavar='<instance>', type=click.INT)19def up(instance):20    name = 'coreos-{:02d}'.format(instance)21    host = Host(root=base_host.host_path(name))22    host.config = base_host.get_host_config(name)23    host.up()24@cli.command('pause', short_help='Pause the machine (aka `vagrant suspend <instance>`)')25@click.pass_context26@click.argument('instance', default=1, metavar='<instance>', type=click.INT)27def pause(ctx, instance):28    name = 'coreos-{:02d}'.format(instance)29    host = Host(root=base_host.host_path(name))30    host.config = base_host.get_host_config(name)31    host.pause()32@cli.command('stop', short_help='Stop the machine (aka `vagrant halt <instance>`)')33@click.pass_context34@click.argument('instance', default=1, metavar='<instance>', type=click.INT)35def stop(ctx, instance):36    name = 'coreos-{:02d}'.format(instance)37    host = Host(root=base_host.host_path(name))38    host.config = base_host.get_host_config(name)39    host.stop()40@cli.command('restart', short_help='Restart the machine (aka `vagrant reload <instance>`)')41@click.pass_context42@click.argument('instance', default=1, metavar='<instance>', type=click.INT)43def restart(ctx, instance):44    name = 'coreos-{:02d}'.format(instance)45    host = Host(root=base_host.host_path(name))46    host.config = base_host.get_host_config(name)47    host.restart()48@cli.command('remove', short_help='Removes the machine (aka `vagrant destroy <instance>`)')49@click.pass_context50@click.argument('instance', default=1, metavar='<instance>', type=click.INT)51@click.option('--force', '-f', help='Do not argue to remove the machine', is_flag=True)52def remove(ctx, instance, force):53    name = 'coreos-{:02d}'.format(instance)54    host = Host(root=base_host.host_path(name))55    host.config = base_host.get_host_config(name)56    remove = force or click.confirm("Are you sure you want to remove '{}'?".format(host.name))57    if not remove:58        return59    host.remove()60@cli.command('recreate', short_help='Rebuilds the machine (aka `vagrant destroy + up <instance>`)')61@click.pass_context62@click.argument('instance', default=1, metavar='<instance>', type=click.INT)63@click.option('--force', '-f', help='Do not argue to recreate the machine', is_flag=True)64def recreate(ctx, instance, force):65    name = 'coreos-{:02d}'.format(instance)66    host = Host(root=base_host.host_path(name))67    host.config = base_host.get_host_config(name)68    recreate = force or click.confirm("Are you sure you want to recreate '{}'?".format(host.name))69    if not recreate:70        return71    host.recreate()72@cli.command('state', short_help='Get the status of a machine (aka `vagrant status <instance>`)')73@click.pass_context74@click.argument('instance', default=1, metavar='<instance>', type=click.INT)75def state(ctx, instance):76    name = 'coreos-{:02d}'.format(instance)77    host = Host(root=base_host.host_path(name))78    host.config = base_host.get_host_config(name)79    click.echo(host.status.get('state'))80@cli.command('status-all', short_help='Get the status of all machines (aka `vagrant status`)')81@click.pass_context82def status_all(ctx):83    host = Host()84    table = [[instance, status.get('state'), status.get('provider')]85        for instance, status in host.status_all.iteritems()]86    click.echo(tabulate(table, headers=['Instance', 'Status', 'Provider']))87@cli.command('ssh', short_help='SSH into the machine (aka `vagrant ssh <instance>`)')88@click.pass_context89@click.argument('instance', default=1, metavar='<instance>', type=click.INT)90@click.option('--command', '-c', default=None, help='Run a one-off commmand via SSH')91@click.option('--force', '-f', is_flag=True, help='Do not prompt')92@click.option('--restart', is_flag=True, help='Reload the instance')93@click.option('--recreate', is_flag=True, help='Recreate the instance')94def ssh(ctx, instance, command, force, restart, recreate):95    name = 'coreos-{:02d}'.format(instance)96    host = Host(root=base_host.host_path(name))97    host.config = base_host.get_host_config(name)98    try:99        host.ping()100        if recreate:101            recreate = force or click.confirm("Are you sure you want to recreate '{}'?".format(host.name))102            if not recreate:103                return104            host.recreate()105        elif restart:106            host.restart()107        result = host.ssh(command, stdout=True)108        if result is not None:109            click.echo(''.join(result))110    except base_host.HostDownException:111        if not utils.confirm_host_up(force=force, host=host):112            return113        ctx.invoke(ssh, instance=instance, command=command, force=True, restart=restart, recreate=recreate)114@cli.command('ssh-config', short_help='Print the SSH config (aka `vagrant ssh-config <instance>`)')115@click.pass_context116@click.argument('instance', default=1, metavar='<instance>', type=click.INT)117@click.option('--fetch', '-F', is_flag=True, help='Refetch ssh-config')118@click.option('--force', '-f', is_flag=True, help='Do not argue')119def ssh_config(ctx, instance, fetch, force):120    name = 'coreos-{:02d}'.format(instance)121    host = Host(root=base_host.host_path(name))122    host.config = base_host.get_host_config(name)123    host.fetch = fetch124    try:125        host.ping()126        click.echo(host.flat_ssh_config)127    except base_host.HostDownException:128        if not utils.confirm_host_up(force=force, host=host):129            return130        ctx.invoke(ssh_config, instance=instance, fetch=fetch, force=True)131@cli.command('ssh-command', short_help='Print the SSH command')132@click.pass_context133@click.argument('instance', default=1, metavar='<instance>', type=click.INT)134@click.option('--fetch', '-F', is_flag=True, help='Refetch ssh-config')135@click.option('--command', '-c', default=None, help='Run a one-off commmand via ssh')136@click.option('--force', '-f', is_flag=True, help='Do not argue')137def ssh_command(ctx, instance, fetch, command, force):138    name = 'coreos-{:02d}'.format(instance)139    host = Host(root=base_host.host_path(name))140    host.config = base_host.get_host_config(name)141    host.fetch = fetch142    try:143        host.ping()144        click.echo(' '.join(host.ssh_command(command)))145    except base_host.HostDownException:146        if not utils.confirm_host_up(force=force, host=host):147            return148        ctx.invoke(ssh_command, instance=instance, fetch=fetch, command=command, force=True)149@cli.command('ip', short_help='Fetch the local ip')150@click.pass_context151@click.argument('instance', default=1, metavar='<instance>', type=click.INT)152@click.option('--fetch', '-F', is_flag=True, help='Refetch ssh-config')153@click.option('--force', '-f', is_flag=True, help='Do not argue')154def ip(ctx, instance, fetch, force):155    name = 'coreos-{:02d}'.format(instance)156    host = Host(root=base_host.host_path(name))157    host.config = base_host.get_host_config(name)158    host.fetch = fetch159    try:160        click.echo(host.ip)161    except base_host.HostDownException:162        if not utils.confirm_host_up(force=force, host=host):163            return164        ctx.invoke(ssh_command, instance=instance, fetch=fetch, force=True)165@cli.command('update-status', short_help='Updates the status of the machine')166@click.pass_context167@click.argument('instance', default=1, metavar='<instance>', type=click.INT)168def update_status(ctx, instance):169    name = 'coreos-{:02d}'.format(instance)170    host = Host(root=base_host.host_path(name))171    host.config = base_host.get_host_config(name)172    host.update_status()173class Host(base_host.BaseHost):174    _status = None175    _vagrant = None176    fetch = False177    yes_to_all = False178    version = VERSION179    @property180    def vagrant(self):181        if self._vagrant is None:182            self._vagrant = vagrant.Vagrant(root=self.root, quiet_stdout=False, quiet_stderr=False)183        return self._vagrant;184    @property185    def ip_list(self):...main.py
Source:main.py  
...9class MonitorClient():10    def __init__(self):11        self.r = RedisHelper()12        self.ip = settings.ClientIP13        self.host_config = self.get_host_config()14    def start(self):   #å¼å¯çæ§15        self.handle()16    def get_host_config(self):  #åæå¡å¨ç«¯é
置信æ¯17        config_key = ("HostConfig::%s" %self.ip)18        config = self.r.get(config_key)19        if config:20            config = json.loads(config.decode())21        print(config)22        return config23    def handle(self):  #æ ¹æ®é
置信æ¯è¿è¡çæ§24        if self.host_config:25            while True:26                for service,val in self.host_config.items():27                    if len(val) < 3:28                        self.host_config[service].append(0)29                    plugin_name,interval,last_run_time = val30                    service_time = time.time() - last_run_time...test_db.py
Source:test_db.py  
...16class DBTest(unittest.TestCase):17    def get_test_db(self):18        signac.db.get_database('testing', hostname='testing')19    def test_get_connector(self):20        host_config = signac.common.host.get_host_config(hostname='testing')21        signac.common.host.get_connector(host_config)22    def test_get_connector_no_client(self):23        host_config = signac.common.host.get_host_config(hostname='testing')24        c = signac.common.host.get_connector(host_config)25        with self.assertRaises(RuntimeError):26            c.client27    def test_get_client(self):28        host_config = signac.common.host.get_host_config(hostname='testing')29        signac.common.host.get_client(host_config)30    def test_connector_get_host(self):31        host_config = signac.common.host.get_host_config(hostname='testing')32        c = signac.common.host.get_connector(host_config)33        self.assertEqual(host_config['url'], c.host)34        self.assertEqual(host_config, c.config)35    def test_logout(self):36        host_config = signac.common.host.get_host_config(hostname='testing')37        c = signac.common.host.get_connector(host_config)38        with self.assertRaises(RuntimeError):39            c.client40        c.connect()41        c.client42        c.authenticate()43        c.logout()44if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
