How to use _ssh_cmd method in avocado

Best Python code snippet using avocado_python

salt_utils.py

Source:salt_utils.py Github

copy

Full Screen

1"""2Module to handle interaction with salt3"""4import json5import os6import time7import requests8from cfg_checker.common import config, logger, logger_cli9from cfg_checker.common.exception import InvalidReturnException, SaltException10from cfg_checker.common.other import shell11def _extract_password(_raw):12 if not isinstance(_raw, str):13 raise InvalidReturnException(_raw)14 else:15 try:16 _json = json.loads(_raw)17 except ValueError:18 raise SaltException(19 "# Return value is not a json: '{}'".format(_raw)20 )21 return _json["local"]22def get_remote_env_password():23 """Uses ssh call with configured options to get password from salt master24 :return: password string25 """26 _salt_cmd = "salt-call --out=json pillar.get _param:salt_api_password"27 _ssh_cmd = ["ssh"]28 # Build SSH cmd29 if config.ssh_key:30 _ssh_cmd.append("-i " + config.ssh_key)31 if config.ssh_user:32 _ssh_cmd.append(config.ssh_user+'@'+config.ssh_host)33 else:34 _ssh_cmd.append(config.ssh_host)35 if config.ssh_uses_sudo:36 _ssh_cmd.append("sudo")37 _ssh_cmd.append(_salt_cmd)38 _ssh_cmd = " ".join(_ssh_cmd)39 logger_cli.debug("... calling salt: '{}'".format(_ssh_cmd))40 try:41 _result = shell(_ssh_cmd)42 if len(_result) < 1:43 raise InvalidReturnException(44 "# Empty value returned for '{}".format(45 _ssh_cmd46 )47 )48 else:49 return _extract_password(_result)50 except OSError as e:51 raise SaltException(52 "Salt error calling '{}': '{}'\n"53 "\nConsider checking 'SALT_ENV' "54 "and '<pkg>/etc/<env>.env' files".format(_ssh_cmd, e.strerror)55 )56def get_local_password():57 """Calls salt locally to get password from the pillar58 :return: password string59 """60 _cmd = []61 if config.ssh_uses_sudo:62 _cmd = ["sudo"]63 # salt commands64 _cmd.append("salt-call")65 _cmd.append("--out=json pillar.get _param:salt_api_password")66 try:67 _result = shell(" ".join(_cmd))68 except OSError as e:69 raise SaltException(70 "Salt error calling '{}': '{}'\n"71 "\nConsider checking 'SALT_ENV' "72 "and '<pkg>/etc/<env>.env' files".format(_cmd, e.strerror)73 )74 return _extract_password(_result)75def list_to_target_string(node_list, separator):76 result = ''77 for node in node_list:78 result += node + ' ' + separator + ' '79 return result[:-(len(separator)+2)]80class SaltRest(object):81 _host = config.salt_host82 _port = config.salt_port83 uri = "http://" + config.salt_host + ":" + config.salt_port84 _auth = {}85 default_headers = {86 'Accept': 'application/json',87 'Content-Type': 'application/json',88 'X-Auth-Token': None89 }90 def __init__(self):91 self._token = self._login()92 self.last_response = None93 def get(94 self,95 path='',96 headers=default_headers,97 cookies=None,98 timeout=None99 ):100 _path = os.path.join(self.uri, path)101 logger.debug("# GET '{}'\nHeaders: '{}'\nCookies: {}".format(102 _path,103 headers,104 cookies105 ))106 return requests.get(107 _path,108 headers=headers,109 cookies=cookies,110 timeout=timeout111 )112 def post(self, data, path='', headers=default_headers, cookies=None):113 if data is None:114 data = {}115 _path = os.path.join(self.uri, path)116 if path == 'login':117 _data = str(data).replace(self._pass, "*****")118 else:119 _data = data120 logger.debug(121 "# POST '{}'\nHeaders: '{}'\nCookies: {}\nBody: {}".format(122 _path,123 headers,124 cookies,125 _data126 )127 )128 return requests.post(129 os.path.join(self.uri, path),130 headers=headers,131 json=data,132 cookies=cookies133 )134 def _login(self):135 # if there is no password - try to get local, if this available136 if config.salt_env == "local":137 _pass = get_local_password()138 else:139 _pass = get_remote_env_password()140 login_payload = {141 'username': config.salt_user,142 'password': _pass,143 'eauth': 'pam'144 }145 self._pass = _pass146 logger.debug("# Logging in to salt master...")147 _response = self.post(login_payload, path='login')148 if _response.ok:149 self._auth['response'] = _response.json()['return'][0]150 self._auth['cookies'] = _response.cookies151 self.default_headers['X-Auth-Token'] = \152 self._auth['response']['token']153 return self._auth['response']['token']154 else:155 raise EnvironmentError(156 "# HTTP:{}, Not authorized?".format(_response.status_code)157 )158 def salt_request(self, fn, *args, **kwargs):159 # if token will expire in 5 min, re-login160 if self._auth['response']['expire'] < time.time() + 300:161 self._auth['response']['X-Auth-Token'] = self._login()162 _method = getattr(self, fn)163 _response = _method(*args, **kwargs)164 self.last_response = _response165 _content = "..."166 _len = len(_response.content)167 if _len < 1024:168 _content = _response.content169 logger.debug(170 "# Response (HTTP {}/{}), {}: {}".format(171 _response.status_code,172 _response.reason,173 _len,174 _content175 )176 )177 if _response.ok:178 return _response.json()['return']179 else:180 raise EnvironmentError(181 "# Salt Error: HTTP:{}, '{}'".format(182 _response.status_code,183 _response.reason184 )185 )186class SaltRemote(SaltRest):187 master_node = ""188 def __init__(self):189 super(SaltRemote, self).__init__()190 def cmd(191 self,192 tgt,193 fun,194 param=None,195 client='local',196 kwarg=None,197 expr_form=None,198 tgt_type=None,199 timeout=None200 ):201 _timeout = timeout if timeout is not None else config.salt_timeout202 _payload = {203 'fun': fun,204 'tgt': tgt,205 'client': client,206 'timeout': _timeout207 }208 if expr_form:209 _payload['expr_form'] = expr_form210 if tgt_type:211 _payload['tgt_type'] = tgt_type212 if param:213 _payload['arg'] = param214 if kwarg:215 _payload['kwarg'] = kwarg216 _response = self.salt_request('post', [_payload])217 if isinstance(_response, list):218 return _response[0]219 else:220 raise EnvironmentError(221 "# Unexpected response from from salt-api/LocalClient: "222 "{}".format(_response)223 )224 def run(self, fun, kwarg=None):225 _payload = {226 'client': 'runner',227 'fun': fun,228 'timeout': config.salt_timeout229 }230 if kwarg:231 _payload['kwarg'] = kwarg232 _response = self.salt_request('post', [_payload])233 if isinstance(_response, list):234 return _response[0]235 else:236 raise EnvironmentError(237 "# Unexpected response from from salt-api/RunnerClient: "238 "{}".format(_response)239 )240 def wheel(self, fun, arg=None, kwarg=None):241 _payload = {242 'client': 'wheel',243 'fun': fun,244 'timeout': config.salt_timeout245 }246 if arg:247 _payload['arg'] = arg248 if kwarg:249 _payload['kwarg'] = kwarg250 _response = self.salt_request('post', _payload)['data']251 if _response['success']:252 return _response253 else:254 raise EnvironmentError(255 "# Salt Error: '{}'".format(_response['return']))256 def pillar_request(self, node_target, pillar_submodule, argument):257 # example cli: 'salt "ctl01*" pillar.keys rsyslog'258 _type = "compound"259 if isinstance(node_target, list):260 _type = "list"261 return self.cmd(262 node_target,263 "pillar." + pillar_submodule,264 argument,265 expr_form=_type266 )267 def pillar_keys(self, node_target, argument):268 return self.pillar_request(node_target, 'keys', argument)269 def pillar_get(self, node_target, argument):270 return self.pillar_request(node_target, 'get', argument)271 def pillar_data(self, node_target, argument):272 return self.pillar_request(node_target, 'data', argument)273 def pillar_raw(self, node_target, argument):274 return self.pillar_request(node_target, 'raw', argument)275 def list_minions(self):276 """277 Fails in salt version 2016.3.8278 Works starting from 2017.7.7279 api returns dict of minions with grains280 """281 try:282 _r = self.salt_request('get', 'minions', timeout=10)283 except requests.exceptions.ReadTimeout:284 logger_cli.debug("... timeout waiting list minions from Salt API")285 _r = None286 return _r[0] if _r else None287 def list_keys(self):288 """289 Fails in salt version 2016.3.8290 Works starting from 2017.7.7291 api should return dict:292 {293 'local': [],294 'minions': [],295 'minions_denied': [],296 'minions_pre': [],297 'minions_rejected': [],298 }299 """300 return self.salt_request('get', path='keys')301 def get_status(self):302 """303 Fails in salt version 2017.7.7304 'runner' client is the equivalent of 'salt-run'305 Returns the306 """307 return self.run(308 'manage.status',309 kwarg={'timeout': 10}310 )311 def get_active_nodes(self):312 """Used when other minion list metods fail313 :return: json result from salt test.ping314 """315 if config.skip_nodes:316 logger.info("# Nodes to be skipped: {0}".format(config.skip_nodes))317 _r = self.cmd(318 '* and not ' + list_to_target_string(319 config.skip_nodes,320 'and not'321 ),322 'test.ping',323 expr_form='compound')324 else:325 _r = self.cmd('*', 'test.ping')326 # Return all nodes that responded327 return [node for node in _r.keys() if _r[node]]328 def get_monitoring_ip(self, param_name):329 salt_output = self.cmd(330 'docker:client:stack:monitoring',331 'pillar.get',332 param=param_name,333 expr_form='pillar')334 return salt_output[salt_output.keys()[0]]335 def f_touch_master(self, path, makedirs=True):336 _kwarg = {337 "makedirs": makedirs338 }339 salt_output = self.cmd(340 self.master_node,341 "file.touch",342 param=path,343 kwarg=_kwarg344 )345 return [*salt_output.values()][0]346 def f_append_master(self, path, strings_list, makedirs=True):347 _kwarg = {348 "makedirs": makedirs349 }350 _args = [path]351 _args.extend(strings_list)352 salt_output = self.cmd(353 self.master_node,354 "file.write",355 param=_args,356 kwarg=_kwarg357 )358 return [*salt_output.values()][0]359 def mkdir(self, target, path, tgt_type=None):360 salt_output = self.cmd(361 target,362 "file.mkdir",363 param=path,364 expr_form=tgt_type365 )366 return salt_output367 def f_manage_file(self, target_path, source,368 sfn='', ret='{}',369 source_hash={},370 user='root', group='root', backup_mode='755',371 show_diff='base',372 contents='', makedirs=True):373 """374 REST variation of file.get_managed375 CLI execution goes like this (10 agrs):376 salt cfg01\\* file.manage_file /root/test_scripts/pkg_versions.py377 '' '{}' /root/diff_pkg_version.py378 '{hash_type: 'md5', 'hsum': <md5sum>}' root root '755' base ''379 makedirs=True380 param: name - target file placement when managed381 param: source - source for the file382 """383 _source_hash = {384 "hash_type": "md5",385 "hsum": 000386 }387 _arg = [388 target_path,389 sfn,390 ret,391 source,392 _source_hash,393 user,394 group,395 backup_mode,396 show_diff,397 contents398 ]399 _kwarg = {400 "makedirs": makedirs401 }402 salt_output = self.cmd(403 self.master_node,404 "file.manage_file",405 param=_arg,406 kwarg=_kwarg407 )408 return [*salt_output.values()][0]409 def cache_file(self, target, source_path):410 salt_output = self.cmd(411 target,412 "cp.cache_file",413 param=source_path414 )415 return [*salt_output.values()][0]416 def get_file(self, target, source_path, target_path, tgt_type=None):417 return self.cmd(418 target,419 "cp.get_file",420 param=[source_path, target_path],421 expr_form=tgt_type422 )423 @staticmethod424 def compound_string_from_list(nodes_list):...

Full Screen

Full Screen

dirc_do_deploy.py

Source:dirc_do_deploy.py Github

copy

Full Screen

...26 get_jen_env['action_list'] = action.split(",")27 return get_jen_env28get_env = _get_env()29# 定义执行语句统一部分,用于简化后面的操作 #30def _ssh_cmd(action_cmd):31 for i in get_env['target_ip_list']:32 remote_ip = str(i)33 ssh_cmd = "ssh -o StrictHostKeyChecking=no -q " + get_env['target_user'] + "@" + remote_ip + " " + action_cmd34 return ssh_cmd35# 定义文件复制语句,以备调用 #36def _scp_cmd():37 for i in get_env['target_ip_list']:38 remote_ip = str(i)39 scp_cmd = "scp -r " + get_env['work_space'] + "/" + get_env['app_name'] + ' ' + get_env['target_user'] + "@" + remote_ip + ":" + APP_PATH40 return scp_cmd41# 定义查询进程PID方法,以备调用 #42def _check_pid():43 check_cmd = "ps -ef |grep Bootstrap|grep -v grep|awk \'{print $2}\'"44 run_check_pid = (_ssh_cmd(check_cmd))45 return run_check_pid46# 定义停止进程方法,以备调用 #47def _kill_pid(now_pid):48 kill_cmd = 'kill -9 ' + now_pid49 run_kill_sh = (_ssh_cmd(kill_cmd))50 return run_kill_sh51# 定义备份应用方法,以备调用 #52def _back_pkg():53 back_pkg_cmd = "mv" + ' ' + APP_PATH + get_env['app_name'] + ' ' + BACK_PATCH54 back_pkg_sh = (_ssh_cmd(back_pkg_cmd))55 return back_pkg_sh56# 定义启服务方法,以备调用 #57def _start_session():58 run_start_sh = (_ssh_cmd(START_SHELL))59 return run_start_sh60# 定义清空CACHE方法,以备调用 #61def _clear_cache():62 clear_cmd = 'rm -rf' + ' ' + APP_PATH + '*'63 run_clear_sh = (_ssh_cmd(clear_cmd))64 return run_clear_sh65# 定义回滚方法,以备调用 #66def _rollback_pkg():67 rollback_cmd = "mv" + ' ' + BACK_PATCH + get_env['app_name'] + ' ' + APP_PATH68 rollback__sh = (_ssh_cmd(rollback_cmd))69 return rollback__sh70for j in get_env['action_list']:71 if j == 'check': # 调用查询进程功能72 stat, result_id = commands.getstatusoutput(_check_pid())73 print stat, result_id74 elif j == 'stop': # 调用停止进程功能75 stat, result_id = commands.getstatusoutput(_check_pid())76 print stat, result_id77 kill_stat, kill_result = commands.getstatusoutput(_kill_pid(result_id))78 print kill_stat, kill_result79 elif j == 'start': # 调用单次启动服务功能80 start_stat, start_result = commands.getstatusoutput(_start_session())81 print start_stat, start_result82 elif j == 'deploy': # 调用完成服务部署功能...

Full Screen

Full Screen

iscsi.py

Source:iscsi.py Github

copy

Full Screen

...11 self.server_pw = server_pw12 self.pool = pool13 self.image_size = image_size14 self.device_link = device_link15def _ssh_cmd(host, rootpw, cmd, check=True):16 ssh_cmd = 'sshpass -p {} ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@{} {}'.format(rootpw, host, cmd)17 print("Running SSH command: {}".format(ssh_cmd))18 rc = subprocess.run(ssh_cmd, shell=True, capture_output=True, check=check)19 return rc.stdout.decode('utf-8')20 21def _fetch_gateway_hostids(args):22 host_ids = []23 for gw in args.gateways:24 print("Fetching hostid from {}".format(gw))25 id = _ssh_cmd(gw, args.gateway_pw, 'hostid')26 host_ids.append(id.rstrip())27 return host_ids28def _image_name(server):29 return 'benchmaster-{}'.format(server)30def _setup_initiator(args):31 initator = 'iqn.2014-01.com.softiron.iscsi_gw_v0:' + args.gateways[0]32 for s in args.servers:33 _ssh_cmd(s, args.server_pw, "sed -i 's/InitiatorName=.*/InitiatorName={}/' /etc/iscsi/initiatorname.iscsi".format(initator))34 _ssh_cmd(s, args.server_pw, 'systemctl restart iscsid')35def _create_images(args):36 print("Creating images")37 for s in args.servers:38 name = _image_name(s)39 cmd = 'rbd create {}/{} --size {} --image-feature=layering,exclusive-lock'.format(args.pool, name, args.image_size)40 _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)41 print("Created.")42def _delete_images(args):43 print("Deleting images")44 for s in args.servers:45 name = _image_name(s)46 cmd = 'rbd rm {} -p {}'.format(name, args.pool)47 _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)48 print("Deleted")49def _configure_images_on_gateways(args, operation, host_ids):50 print("Configuring images")51 flag = ""52 if operation == 'export': flag = '-e'53 elif operation == 'unexport': flag = '-u'54 elif operation == 'reset': flag = '-r'55 else: raise Exception('Unsupported operation: {}'.format(operation))56 for s in args.servers:57 name = _image_name(s)58 cmd = 'rsmapadm -p {} {} {}'.format(args.pool, flag, name)59 for id in host_ids:60 cmd += ' -t {}'.format(id)61 _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)62 print("Configured")63def _mount_images(args):64 print("Mounting images")65 for s in args.servers:66 name = _image_name(s)67 68 # Do discovery for each gateway69 for gw in args.gateways:70 discovery_cmd = 'iscsiadm --mode discoverydb --type sendtargets --portal {} --discover'.format(gw) 71 _ssh_cmd(s, args.server_pw, discovery_cmd)72 # Log in to the active and secondary ISCSI targets.73 for mode in ['act', 'nop']:74 login_cmd = 'iscsiadm --mode node --login --target iqn.2014-01.com.softiron:{}-{}-{}'.format(args.pool, name, mode)75 _ssh_cmd(s, args.server_pw, login_cmd)76 77 # Determine which devices they were assigned to.78 disk_cmd = 'ls -l /dev/disk/by-path/ | grep iqn.2014-01.com.softiron:{}-{}'.format(args.pool, name)79 lines = _ssh_cmd(s, args.server_pw, disk_cmd)80 mp_cmd = 'multipath -a'81 for l in lines.split('\n'):82 if l != '':83 mp_cmd += ' /dev/' + l[-3:]84 # We can't check the return code of this call, since rsmapamd returns the number of added mappings as the rc.85 # It shouldn't do that because it violates the standard, but we have to live with it, so we'll disable checking for 86 # this command.87 # (it should be output on stdout, or should only output as RC if we pass an explicit flag telling it to do so).88 _ssh_cmd(s, args.server_pw, mp_cmd, check=False)89 90 # Now we've added the mapping, tell multipath to use it.91 _ssh_cmd(s, args.server_pw, 'multipath -r')92 93 # Retrieve the device mapper entry94 dm = _ssh_cmd(s, args.server_pw, "multipath -l | grep benchmaster | awk '{print $3}'")95 # Create the link,96 link_cmd = 'ln -s /dev/{} {}'.format(str(dm).rstrip(), args.device_link)97 _ssh_cmd(s, args.server_pw, link_cmd)98 print("Mounted")99 100 101def _unmount_images(args):102 print("Unmounting images")103 for s in args.servers:104 _ssh_cmd(s, args.server_pw, 'unlink {}'.format(args.device_link), check=False)105 _ssh_cmd(s, args.server_pw, 'iscsiadm --mode node --logoutall=all', check=False)106 _ssh_cmd(s, args.server_pw, 'multipath -W', check=False)107 print("Unmounted")108 109def setup(args):110 host_ids = _fetch_gateway_hostids(args)111 _setup_initiator(args)112 _create_images(args)113 _configure_images_on_gateways(args, 'export', host_ids)114 _mount_images(args)115 print("Ready.")116def teardown(args):117 host_ids = _fetch_gateway_hostids(args)118 _unmount_images(args)119 _configure_images_on_gateways(args, 'unexport', host_ids)120 _delete_images(args)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful