How to use _make_rsync_compatible_source method in autotest

Best Python code snippet using autotest_python

abstract_ssh.py

Source:abstract_ssh.py Github

copy

Full Screen

...110 for pattern in patterns]111 else:112 return [utils.scp_remote_escape(path) + pattern113 for pattern in patterns]114 def _make_rsync_compatible_source(self, source, is_local):115 """116 Applies the same logic as _make_rsync_ccompatible_globs, but applies if117 to an entire list of sources, producing a new list of sources,118 properly quoted.119 """120 return sum((self._make_rsync_compatible_globs(path, is_local)121 for path in source), [])122 def _encode_remote_paths(self, paths, escape=True):123 """124 Given a list of file paths, encodes it as a single remote path, in the125 style used by rsync and scp.126 """127 if escape:128 paths = [utils.scp_remote_escape(path) for path in paths]129 return '%s@%s:"%s"' % (self.user, self.hostname, " ".join(paths))130 def _make_ssh_cmd(self, cmd):131 base_cmd = _make_ssh_cmd_default(user=self.user, port=self.port,132 opts=self.master_ssh_option,133 hosts_file=self.known_hosts_file)134 return '%s %s "%s"' % (base_cmd, self.hostname, utils.sh_escape(cmd))135 def _make_scp_cmd(self, sources, dest):136 """137 Given a list of source paths and a destination path, produces the138 appropriate scp command for encoding it. Remote paths must be139 pre-encoded.140 """141 command = ("scp -rq %s -o StrictHostKeyChecking=no -o "142 "UserKnownHostsFile=%s -P %d %s '%s'")143 return command % (self.master_ssh_option, self.known_hosts_file,144 self.port, " ".join(sources), dest)145 def _set_umask_perms(self, dest):146 """147 Given a destination file/dir (recursively) set the permissions on all148 the files and directories to the max allowed by running umask.149 """150 umask = os.umask(0)151 os.umask(umask)152 max_privs = 0777 & ~umask153 def set_file_privs(filename):154 file_stat = os.stat(filename)155 file_privs = max_privs156 # if the original file permissions do not have at least one157 # executable bit then do not set it anywhere158 if not file_stat.st_mode & 0111:159 file_privs &= ~0111160 os.chmod(filename, file_privs)161 for root, dirs, files in os.walk(dest, topdown=False):162 for dirname in dirs:163 os.chmod(os.path.join(root, dirname), max_privs)164 for filename in files:165 set_file_privs(os.path.join(root, filename))166 if os.path.isdir(dest):167 os.chmod(dest, max_privs)168 else:169 set_file_privs(dest)170 def get_file(self, source, dest, delete_dest=False, preserve_perm=True,171 preserve_symlinks=False):172 """173 Copy files from the remote host to a local path174 Directories will be copied recursively.175 Args:176 delete_dest: if it is true, the command will also clear out any177 old files at dest that are not in the source178 preserve_perm: tells get_file() to try to preserve the sources179 permissions on files and dirs180 preserve_symlinks: try to preserver symlinks instead of181 transforming them into files/dirs on copy182 Raiseds:183 the scp command failed184 """185 self.start_master_ssh()186 if isinstance(source, basestring):187 source = [source]188 dest = os.path.abspath(dest)189 try_scp = True190 if try_scp:191 if delete_dest and os.path.isdir(dest):192 shutil.rmtree(dest)193 os.mkdir(dest)194 remote_source = self._make_rsync_compatible_source(source, False)195 if remote_source:196 remote_source = self._encode_remote_paths(remote_source,197 escape=False)198 local_dest = utils.sh_escape(dest)199 scp = self._make_scp_cmd([remote_source], local_dest)200 try:201 utils.run(scp)202 except error.CmdError, e:203 raise error.ServeRunError(e.args[0], e.args[1])204 if not preserve_perm:205 # have no way to tell scp to not try to preserve the permssions so206 # set them after copy instead.207 # for rsync we could use "--no-p --chmod=ugo-rmX" but those208 # options are only in veryrecent rsync versions209 self._set_umask_perms = (dest)210 def send_file(self, source, dest, delete_dest=False,211 preserve_symlinks=False):212 """213 Copy files from a local path to the remote host.214 """215 self.start_master_ssh()216 if isinstance(source, basestring):217 source_is_dir = os.path.isdir(source)218 source = [source]219 remote_dest = self._encode_remote_paths([dest])220 # if rsync is diabled or fails, try scp221 try_scp = True222 if try_scp:223 # scp has no equivalent to --delete, just drop the entire dest dir224 if delete_dest:225 dest_exists = False226 try:227 self.run("test -x %s" % dest)228 dest_exists = True229 except error.ServRunError:230 pass231 dest_is_dir = False232 if dest_exists:233 try:234 self.run("test -d %s" % dest)235 dest_is_dir = True236 except error.ServRunError:237 pass238 # if there is a list of more than one path, destination *has*239 # to be a dir.It therr's a single path being transferred and240 # it is a dir, the destination also has to be a dir241 if len(source) > 1 and source_is_dir:242 dest_is_dir = True243 if dest_exists and dest_is_dir:244 cmd = "rm -fr %s && mkdir %s" % (dest, dest)245 elif not dest_exists and dest_is_dir:246 cmd = "mkdir %s" % dest247 self.run(cmd)248 local_sources = self._make_rsync_compatible_source(source, True)249 if local_sources:250 scp = self._make_scp_cmd(local_sources, remote_dest)251 try:252 utils.run(scp) # utils.run253 except error.CmdError, e:254 raise error.ServRunError(e.args[0], e.args[1])255 def ssh_ping(self, timeout=60):256 try:257 self.run("true", timeout=timeout, connect_timeout=timeout)258 except error.ServSSHTimeout:259 msg = "Host (ssh) verify timed out (timeout = %d)" % timeout260 raise error.ServSSHTimeout(msg)261 except error.ServSSHPermissionDeniedError:262 raise...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful