Best Python code snippet using avocado_python
salt_utils.py
Source:salt_utils.py  
1"""2Module to handle interaction with salt3"""4import json5import os6import time7import requests8from cfg_checker.common import config, logger, logger_cli9from cfg_checker.common.exception import InvalidReturnException, SaltException10from cfg_checker.common.other import shell11def _extract_password(_raw):12    if not isinstance(_raw, str):13        raise InvalidReturnException(_raw)14    else:15        try:16            _json = json.loads(_raw)17        except ValueError:18            raise SaltException(19                "# Return value is not a json: '{}'".format(_raw)20            )21    return _json["local"]22def get_remote_env_password():23    """Uses ssh call with configured options to get password from salt master24    :return: password string25    """26    _salt_cmd = "salt-call --out=json pillar.get _param:salt_api_password"27    _ssh_cmd = ["ssh"]28    # Build SSH cmd29    if config.ssh_key:30        _ssh_cmd.append("-i " + config.ssh_key)31    if config.ssh_user:32        _ssh_cmd.append(config.ssh_user+'@'+config.ssh_host)33    else:34        _ssh_cmd.append(config.ssh_host)35    if config.ssh_uses_sudo:36        _ssh_cmd.append("sudo")37    _ssh_cmd.append(_salt_cmd)38    _ssh_cmd = " ".join(_ssh_cmd)39    logger_cli.debug("... calling salt: '{}'".format(_ssh_cmd))40    try:41        _result = shell(_ssh_cmd)42        if len(_result) < 1:43            raise InvalidReturnException(44                "# Empty value returned for '{}".format(45                    _ssh_cmd46                )47            )48        else:49            return _extract_password(_result)50    except OSError as e:51        raise SaltException(52            "Salt error calling '{}': '{}'\n"53            "\nConsider checking 'SALT_ENV' "54            "and '<pkg>/etc/<env>.env' files".format(_ssh_cmd, e.strerror)55        )56def get_local_password():57    """Calls salt locally to get password from the pillar58    :return: password string59    """60    _cmd = []61    if config.ssh_uses_sudo:62        _cmd = ["sudo"]63    # salt commands64    _cmd.append("salt-call")65    _cmd.append("--out=json pillar.get _param:salt_api_password")66    try:67        _result = shell(" ".join(_cmd))68    except OSError as e:69        raise SaltException(70            "Salt error calling '{}': '{}'\n"71            "\nConsider checking 'SALT_ENV' "72            "and '<pkg>/etc/<env>.env' files".format(_cmd, e.strerror)73        )74    return _extract_password(_result)75def list_to_target_string(node_list, separator):76    result = ''77    for node in node_list:78        result += node + ' ' + separator + ' '79    return result[:-(len(separator)+2)]80class SaltRest(object):81    _host = config.salt_host82    _port = config.salt_port83    uri = "http://" + config.salt_host + ":" + config.salt_port84    _auth = {}85    default_headers = {86        'Accept': 'application/json',87        'Content-Type': 'application/json',88        'X-Auth-Token': None89    }90    def __init__(self):91        self._token = self._login()92        self.last_response = None93    def get(94        self,95        path='',96        headers=default_headers,97        cookies=None,98        timeout=None99    ):100        _path = os.path.join(self.uri, path)101        logger.debug("# GET '{}'\nHeaders: '{}'\nCookies: {}".format(102            _path,103            headers,104            cookies105        ))106        return requests.get(107            _path,108            headers=headers,109            cookies=cookies,110            timeout=timeout111        )112    def post(self, data, path='', headers=default_headers, cookies=None):113        if data is None:114            data = {}115        _path = os.path.join(self.uri, path)116        if path == 'login':117            _data = str(data).replace(self._pass, "*****")118        else:119            _data = data120        logger.debug(121            "# POST '{}'\nHeaders: '{}'\nCookies: {}\nBody: {}".format(122                _path,123                headers,124                cookies,125                _data126            )127        )128        return requests.post(129            os.path.join(self.uri, path),130            headers=headers,131            json=data,132            cookies=cookies133        )134    def _login(self):135        # if there is no password - try to get local, if this available136        if config.salt_env == "local":137            _pass = get_local_password()138        else:139            _pass = get_remote_env_password()140        login_payload = {141            'username': config.salt_user,142            'password': _pass,143            'eauth': 'pam'144        }145        self._pass = _pass146        logger.debug("# Logging in to salt master...")147        _response = self.post(login_payload, path='login')148        if _response.ok:149            self._auth['response'] = _response.json()['return'][0]150            self._auth['cookies'] = _response.cookies151            self.default_headers['X-Auth-Token'] = \152                self._auth['response']['token']153            return self._auth['response']['token']154        else:155            raise EnvironmentError(156                "# HTTP:{}, Not authorized?".format(_response.status_code)157            )158    def salt_request(self, fn, *args, **kwargs):159        # if token will expire in 5 min, re-login160        if self._auth['response']['expire'] < time.time() + 300:161            self._auth['response']['X-Auth-Token'] = self._login()162        _method = getattr(self, fn)163        _response = _method(*args, **kwargs)164        self.last_response = _response165        _content = "..."166        _len = len(_response.content)167        if _len < 1024:168            _content = _response.content169        logger.debug(170            "# Response (HTTP {}/{}), {}: {}".format(171                _response.status_code,172                _response.reason,173                _len,174                _content175            )176        )177        if _response.ok:178            return _response.json()['return']179        else:180            raise EnvironmentError(181                "# Salt Error: HTTP:{}, '{}'".format(182                    _response.status_code,183                    _response.reason184                )185            )186class SaltRemote(SaltRest):187    master_node = ""188    def __init__(self):189        super(SaltRemote, self).__init__()190    def cmd(191            self,192            tgt,193            fun,194            param=None,195            client='local',196            kwarg=None,197            expr_form=None,198            tgt_type=None,199            timeout=None200    ):201        _timeout = timeout if timeout is not None else config.salt_timeout202        _payload = {203            'fun': fun,204            'tgt': tgt,205            'client': client,206            'timeout': _timeout207        }208        if expr_form:209            _payload['expr_form'] = expr_form210        if tgt_type:211            _payload['tgt_type'] = tgt_type212        if param:213            _payload['arg'] = param214        if kwarg:215            _payload['kwarg'] = kwarg216        _response = self.salt_request('post', [_payload])217        if isinstance(_response, list):218            return _response[0]219        else:220            raise EnvironmentError(221                "# Unexpected response from from salt-api/LocalClient: "222                "{}".format(_response)223            )224    def run(self, fun, kwarg=None):225        _payload = {226            'client': 'runner',227            'fun': fun,228            'timeout': config.salt_timeout229        }230        if kwarg:231            _payload['kwarg'] = kwarg232        _response = self.salt_request('post', [_payload])233        if isinstance(_response, list):234            return _response[0]235        else:236            raise EnvironmentError(237                "# Unexpected response from from salt-api/RunnerClient: "238                "{}".format(_response)239            )240    def wheel(self, fun, arg=None, kwarg=None):241        _payload = {242            'client': 'wheel',243            'fun': fun,244            'timeout': config.salt_timeout245        }246        if arg:247            _payload['arg'] = arg248        if kwarg:249            _payload['kwarg'] = kwarg250        _response = self.salt_request('post', _payload)['data']251        if _response['success']:252            return _response253        else:254            raise EnvironmentError(255                "# Salt Error: '{}'".format(_response['return']))256    def pillar_request(self, node_target, pillar_submodule, argument):257        # example cli: 'salt "ctl01*" pillar.keys rsyslog'258        _type = "compound"259        if isinstance(node_target, list):260            _type = "list"261        return self.cmd(262            node_target,263            "pillar." + pillar_submodule,264            argument,265            expr_form=_type266        )267    def pillar_keys(self, node_target, argument):268        return self.pillar_request(node_target, 'keys', argument)269    def pillar_get(self, node_target, argument):270        return self.pillar_request(node_target, 'get', argument)271    def pillar_data(self, node_target, argument):272        return self.pillar_request(node_target, 'data', argument)273    def pillar_raw(self, node_target, argument):274        return self.pillar_request(node_target, 'raw', argument)275    def list_minions(self):276        """277            Fails in salt version 2016.3.8278            Works starting from 2017.7.7279            api returns dict of minions with grains280        """281        try:282            _r = self.salt_request('get', 'minions', timeout=10)283        except requests.exceptions.ReadTimeout:284            logger_cli.debug("... timeout waiting list minions from Salt API")285            _r = None286        return _r[0] if _r else None287    def list_keys(self):288        """289            Fails in salt version 2016.3.8290            Works starting from 2017.7.7291            api should return dict:292            {293                'local': [],294                'minions': [],295                'minions_denied': [],296                'minions_pre': [],297                'minions_rejected': [],298            }299        """300        return self.salt_request('get', path='keys')301    def get_status(self):302        """303            Fails in salt version 2017.7.7304            'runner' client is the equivalent of 'salt-run'305            Returns the306        """307        return self.run(308            'manage.status',309            kwarg={'timeout': 10}310        )311    def get_active_nodes(self):312        """Used when other minion list metods fail313        :return: json result from salt test.ping314        """315        if config.skip_nodes:316            logger.info("# Nodes to be skipped: {0}".format(config.skip_nodes))317            _r = self.cmd(318                '* and not ' + list_to_target_string(319                    config.skip_nodes,320                    'and not'321                ),322                'test.ping',323                expr_form='compound')324        else:325            _r = self.cmd('*', 'test.ping')326        # Return all nodes that responded327        return [node for node in _r.keys() if _r[node]]328    def get_monitoring_ip(self, param_name):329        salt_output = self.cmd(330            'docker:client:stack:monitoring',331            'pillar.get',332            param=param_name,333            expr_form='pillar')334        return salt_output[salt_output.keys()[0]]335    def f_touch_master(self, path, makedirs=True):336        _kwarg = {337            "makedirs": makedirs338        }339        salt_output = self.cmd(340            self.master_node,341            "file.touch",342            param=path,343            kwarg=_kwarg344        )345        return [*salt_output.values()][0]346    def f_append_master(self, path, strings_list, makedirs=True):347        _kwarg = {348            "makedirs": makedirs349        }350        _args = [path]351        _args.extend(strings_list)352        salt_output = self.cmd(353            self.master_node,354            "file.write",355            param=_args,356            kwarg=_kwarg357        )358        return [*salt_output.values()][0]359    def mkdir(self, target, path, tgt_type=None):360        salt_output = self.cmd(361            target,362            "file.mkdir",363            param=path,364            expr_form=tgt_type365        )366        return salt_output367    def f_manage_file(self, target_path, source,368                      sfn='', ret='{}',369                      source_hash={},370                      user='root', group='root', backup_mode='755',371                      show_diff='base',372                      contents='', makedirs=True):373        """374        REST variation of file.get_managed375        CLI execution goes like this (10 agrs):376        salt cfg01\\* file.manage_file /root/test_scripts/pkg_versions.py377        '' '{}' /root/diff_pkg_version.py378        '{hash_type: 'md5', 'hsum': <md5sum>}' root root '755' base ''379        makedirs=True380            param: name - target file placement when managed381            param: source - source for the file382        """383        _source_hash = {384            "hash_type": "md5",385            "hsum": 000386        }387        _arg = [388            target_path,389            sfn,390            ret,391            source,392            _source_hash,393            user,394            group,395            backup_mode,396            show_diff,397            contents398        ]399        _kwarg = {400            "makedirs": makedirs401        }402        salt_output = self.cmd(403            self.master_node,404            "file.manage_file",405            param=_arg,406            kwarg=_kwarg407        )408        return [*salt_output.values()][0]409    def cache_file(self, target, source_path):410        salt_output = self.cmd(411            target,412            "cp.cache_file",413            param=source_path414        )415        return [*salt_output.values()][0]416    def get_file(self, target, source_path, target_path, tgt_type=None):417        return self.cmd(418            target,419            "cp.get_file",420            param=[source_path, target_path],421            expr_form=tgt_type422        )423    @staticmethod424    def compound_string_from_list(nodes_list):...dirc_do_deploy.py
Source:dirc_do_deploy.py  
...26    get_jen_env['action_list'] = action.split(",")27    return get_jen_env28get_env = _get_env()29#  å®ä¹æ§è¡è¯å¥ç»ä¸é¨åï¼ç¨äºç®ååé¢çæä½  #30def _ssh_cmd(action_cmd):31    for i in get_env['target_ip_list']:32        remote_ip = str(i)33        ssh_cmd = "ssh -o StrictHostKeyChecking=no -q  " + get_env['target_user'] + "@" + remote_ip + " " + action_cmd34        return ssh_cmd35#  å®ä¹æä»¶å¤å¶è¯å¥ï¼ä»¥å¤è°ç¨  #36def _scp_cmd():37    for i in get_env['target_ip_list']:38        remote_ip = str(i)39        scp_cmd = "scp -r " + get_env['work_space'] + "/" + get_env['app_name'] + ' ' + get_env['target_user'] + "@" + remote_ip + ":" + APP_PATH40        return scp_cmd41#  å®ä¹æ¥è¯¢è¿ç¨PIDæ¹æ³ï¼ä»¥å¤è°ç¨  #42def _check_pid():43    check_cmd = "ps -ef |grep Bootstrap|grep -v grep|awk  \'{print $2}\'"44    run_check_pid = (_ssh_cmd(check_cmd))45    return run_check_pid46#  å®ä¹åæ¢è¿ç¨æ¹æ³ï¼ä»¥å¤è°ç¨  #47def _kill_pid(now_pid):48    kill_cmd = 'kill -9 ' + now_pid49    run_kill_sh = (_ssh_cmd(kill_cmd))50    return run_kill_sh51#  å®ä¹å¤ä»½åºç¨æ¹æ³ï¼ä»¥å¤è°ç¨  #52def _back_pkg():53    back_pkg_cmd = "mv" + ' ' + APP_PATH + get_env['app_name'] + ' ' + BACK_PATCH54    back_pkg_sh = (_ssh_cmd(back_pkg_cmd))55    return back_pkg_sh56#  å®ä¹å¯æå¡æ¹æ³ï¼ä»¥å¤è°ç¨  #57def _start_session():58    run_start_sh = (_ssh_cmd(START_SHELL))59    return run_start_sh60#  å®ä¹æ¸
空CACHEæ¹æ³ï¼ä»¥å¤è°ç¨  #61def _clear_cache():62    clear_cmd = 'rm -rf' + ' ' + APP_PATH + '*'63    run_clear_sh = (_ssh_cmd(clear_cmd))64    return run_clear_sh65#  å®ä¹åæ»æ¹æ³ï¼ä»¥å¤è°ç¨  #66def _rollback_pkg():67    rollback_cmd = "mv" + ' ' + BACK_PATCH + get_env['app_name'] + ' ' + APP_PATH68    rollback__sh = (_ssh_cmd(rollback_cmd))69    return rollback__sh70for j in get_env['action_list']:71    if j == 'check':    # è°ç¨æ¥è¯¢è¿ç¨åè½72        stat, result_id = commands.getstatusoutput(_check_pid())73        print stat, result_id74    elif j == 'stop':   # è°ç¨åæ¢è¿ç¨åè½75        stat, result_id = commands.getstatusoutput(_check_pid())76        print stat, result_id77        kill_stat, kill_result = commands.getstatusoutput(_kill_pid(result_id))78        print kill_stat, kill_result79    elif j == 'start':  # è°ç¨å次å¯å¨æå¡åè½80        start_stat, start_result = commands.getstatusoutput(_start_session())81        print start_stat, start_result82    elif j == 'deploy':  # è°ç¨å®ææå¡é¨ç½²åè½...iscsi.py
Source:iscsi.py  
...11        self.server_pw = server_pw12        self.pool = pool13        self.image_size = image_size14        self.device_link = device_link15def _ssh_cmd(host, rootpw, cmd, check=True):16    ssh_cmd = 'sshpass -p {} ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no root@{} {}'.format(rootpw, host, cmd)17    print("Running SSH command: {}".format(ssh_cmd))18    rc = subprocess.run(ssh_cmd, shell=True, capture_output=True, check=check)19    return rc.stdout.decode('utf-8')20     21def _fetch_gateway_hostids(args):22    host_ids = []23    for gw in args.gateways:24        print("Fetching hostid from {}".format(gw))25        id = _ssh_cmd(gw, args.gateway_pw, 'hostid')26        host_ids.append(id.rstrip())27    return host_ids28def _image_name(server):29    return 'benchmaster-{}'.format(server)30def _setup_initiator(args):31    initator = 'iqn.2014-01.com.softiron.iscsi_gw_v0:' + args.gateways[0]32    for s in args.servers:33        _ssh_cmd(s, args.server_pw, "sed -i 's/InitiatorName=.*/InitiatorName={}/' /etc/iscsi/initiatorname.iscsi".format(initator))34        _ssh_cmd(s, args.server_pw, 'systemctl restart iscsid')35def _create_images(args):36    print("Creating images")37    for s in args.servers:38        name = _image_name(s)39        cmd = 'rbd create {}/{} --size {} --image-feature=layering,exclusive-lock'.format(args.pool, name, args.image_size)40        _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)41    print("Created.")42def _delete_images(args):43    print("Deleting images")44    for s in args.servers:45        name = _image_name(s)46        cmd = 'rbd rm {} -p {}'.format(name, args.pool)47        _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)48    print("Deleted")49def _configure_images_on_gateways(args, operation, host_ids):50    print("Configuring images")51    flag = ""52    if    operation == 'export': flag = '-e'53    elif  operation == 'unexport': flag = '-u'54    elif  operation == 'reset': flag = '-r'55    else: raise Exception('Unsupported operation: {}'.format(operation))56    for s in args.servers:57        name = _image_name(s)58        cmd = 'rsmapadm -p {} {} {}'.format(args.pool, flag, name)59        for id in host_ids:60            cmd += ' -t {}'.format(id)61        _ssh_cmd(args.gateways[0], args.gateway_pw, cmd)62    print("Configured")63def _mount_images(args):64    print("Mounting images")65    for s in args.servers:66        name = _image_name(s)67    68        # Do discovery for each gateway69        for gw in args.gateways:70            discovery_cmd = 'iscsiadm --mode discoverydb --type sendtargets --portal {} --discover'.format(gw)    71            _ssh_cmd(s, args.server_pw, discovery_cmd)72        # Log in to the active and secondary ISCSI targets.73        for mode in ['act', 'nop']:74            login_cmd = 'iscsiadm --mode node --login --target iqn.2014-01.com.softiron:{}-{}-{}'.format(args.pool, name, mode)75            _ssh_cmd(s, args.server_pw, login_cmd)76    77        # Determine which devices they were assigned to.78        disk_cmd = 'ls -l /dev/disk/by-path/ | grep iqn.2014-01.com.softiron:{}-{}'.format(args.pool, name)79        lines = _ssh_cmd(s, args.server_pw, disk_cmd)80        mp_cmd = 'multipath -a'81        for l in lines.split('\n'):82            if l != '':83                mp_cmd += ' /dev/' + l[-3:]84        # We can't check the return code of this call, since rsmapamd returns the number of added mappings as the rc.85        # It shouldn't do that because it violates the standard, but we have to live with it, so we'll disable checking for 86        # this command.87        # (it should be output on stdout, or should only output as RC if we pass an explicit flag telling it to do so).88        _ssh_cmd(s, args.server_pw, mp_cmd, check=False)89       90        # Now we've added the mapping, tell multipath to use it.91        _ssh_cmd(s, args.server_pw, 'multipath -r')92    93        # Retrieve the device mapper entry94        dm = _ssh_cmd(s, args.server_pw, "multipath -l | grep benchmaster | awk '{print $3}'")95        # Create the link,96        link_cmd = 'ln -s /dev/{} {}'.format(str(dm).rstrip(), args.device_link)97        _ssh_cmd(s, args.server_pw, link_cmd)98    print("Mounted")99   100 101def _unmount_images(args):102    print("Unmounting images")103    for s in args.servers:104        _ssh_cmd(s, args.server_pw, 'unlink {}'.format(args.device_link), check=False)105        _ssh_cmd(s, args.server_pw, 'iscsiadm --mode node --logoutall=all', check=False)106        _ssh_cmd(s, args.server_pw, 'multipath -W', check=False)107    print("Unmounted")108                    109def setup(args):110    host_ids = _fetch_gateway_hostids(args)111    _setup_initiator(args)112    _create_images(args)113    _configure_images_on_gateways(args, 'export', host_ids)114    _mount_images(args)115    print("Ready.")116def teardown(args):117    host_ids = _fetch_gateway_hostids(args)118    _unmount_images(args)119    _configure_images_on_gateways(args, 'unexport', host_ids)120    _delete_images(args)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
