Best Python code snippet using autotest_python
abstract_ssh.py
Source:abstract_ssh.py  
...110                    for pattern in patterns]111        else:112            return [utils.scp_remote_escape(path) + pattern113                    for pattern in patterns]114    def _make_rsync_compatible_source(self, source, is_local):115        """116        Applies the same logic as _make_rsync_ccompatible_globs, but applies if117        to an entire list of sources, producing a new list of sources,118        properly quoted.119        """120        return sum((self._make_rsync_compatible_globs(path, is_local)121                    for path in source), [])122    def _encode_remote_paths(self, paths, escape=True):123        """124        Given a list of file paths, encodes it as a single remote path, in the125        style used by rsync and scp.126        """127        if escape:128            paths = [utils.scp_remote_escape(path) for path in paths]129        return '%s@%s:"%s"' % (self.user, self.hostname, " ".join(paths))130    def _make_ssh_cmd(self, cmd):131        base_cmd = _make_ssh_cmd_default(user=self.user, port=self.port,132                                    opts=self.master_ssh_option,133                                    hosts_file=self.known_hosts_file)134        return '%s %s "%s"' % (base_cmd, self.hostname, utils.sh_escape(cmd))135    def _make_scp_cmd(self, sources, dest):136        """137        Given a list of source paths and a destination path, produces the138        appropriate scp command for encoding it. Remote paths must be139        pre-encoded.140        """141        command = ("scp -rq %s -o StrictHostKeyChecking=no -o "142                    "UserKnownHostsFile=%s -P %d %s '%s'")143        return command % (self.master_ssh_option, self.known_hosts_file,144                            self.port, " ".join(sources), dest)145    def _set_umask_perms(self, dest):146        """147        Given a destination file/dir (recursively) set the permissions on all148        the files and directories to the max allowed by running umask.149        """150        umask = os.umask(0)151        os.umask(umask)152        max_privs = 0777 & ~umask153        def set_file_privs(filename):154            file_stat = os.stat(filename)155            file_privs = max_privs156            # if the original file permissions do not have at least one157            # executable bit then do not set it anywhere158            if not file_stat.st_mode & 0111:159                file_privs &= ~0111160            os.chmod(filename, file_privs)161        for root, dirs, files in os.walk(dest, topdown=False):162            for dirname in dirs:163                os.chmod(os.path.join(root, dirname), max_privs)164            for filename in files:165                set_file_privs(os.path.join(root, filename))166        if os.path.isdir(dest):167            os.chmod(dest, max_privs)168        else:169            set_file_privs(dest)170    def get_file(self, source, dest, delete_dest=False, preserve_perm=True,171                preserve_symlinks=False):172        """173        Copy files from the remote host to a local path174        Directories will be copied recursively.175        Args:176            delete_dest: if it is true, the command will also clear out any177                        old files at dest that are not in the source178            preserve_perm: tells get_file() to try to preserve the sources179                            permissions on files and dirs180            preserve_symlinks: try to preserver symlinks instead of181                            transforming them into files/dirs on copy182        Raiseds:183            the scp command failed184        """185        self.start_master_ssh()186        if isinstance(source, basestring):187            source = [source]188        dest = os.path.abspath(dest)189        try_scp = True190        if try_scp:191            if delete_dest and os.path.isdir(dest):192                shutil.rmtree(dest)193                os.mkdir(dest)194            remote_source = self._make_rsync_compatible_source(source, False)195            if remote_source:196                remote_source = self._encode_remote_paths(remote_source,197                                                escape=False)198                local_dest = utils.sh_escape(dest)199                scp = self._make_scp_cmd([remote_source], local_dest)200                try:201                    utils.run(scp)202                except error.CmdError, e:203                    raise error.ServeRunError(e.args[0], e.args[1])204        if not preserve_perm:205            # have no way to tell scp to not try to preserve the permssions so206            # set them after copy instead.207            # for rsync we could use "--no-p --chmod=ugo-rmX" but those208            # options are only in veryrecent rsync versions209            self._set_umask_perms = (dest)210    def send_file(self, source, dest, delete_dest=False,211            preserve_symlinks=False):212        """213        Copy files from a local path to the remote host.214        """215        self.start_master_ssh()216        if isinstance(source, basestring):217            source_is_dir = os.path.isdir(source)218            source = [source]219        remote_dest = self._encode_remote_paths([dest])220        # if rsync is diabled or fails, try scp221        try_scp = True222        if try_scp:223            # scp has no equivalent to --delete, just drop the entire dest dir224            if delete_dest:225                dest_exists = False226                try:227                    self.run("test -x %s" % dest)228                    dest_exists = True229                except error.ServRunError:230                    pass231                dest_is_dir = False232                if dest_exists:233                    try:234                        self.run("test -d %s" % dest)235                        dest_is_dir = True236                    except error.ServRunError:237                        pass238                # if there is a list of more than one path, destination *has*239                # to be a dir.It therr's a single path being transferred and240                # it is a dir, the destination also has to be a dir241                if len(source) > 1 and source_is_dir:242                    dest_is_dir = True243                if dest_exists and dest_is_dir:244                    cmd = "rm -fr %s && mkdir %s" % (dest, dest)245                elif not dest_exists and dest_is_dir:246                    cmd = "mkdir %s" % dest247                    self.run(cmd)248            local_sources = self._make_rsync_compatible_source(source, True)249            if local_sources:250                scp = self._make_scp_cmd(local_sources, remote_dest)251                try:252                    utils.run(scp)   # utils.run253                except error.CmdError, e:254                    raise error.ServRunError(e.args[0], e.args[1])255    def ssh_ping(self, timeout=60):256        try:257            self.run("true", timeout=timeout, connect_timeout=timeout)258        except error.ServSSHTimeout:259            msg = "Host (ssh) verify timed out (timeout = %d)" % timeout260            raise error.ServSSHTimeout(msg)261        except error.ServSSHPermissionDeniedError:262            raise...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
