Best Python code snippet using autotest_python
s3.py
Source:s3.py  
...72	def clear_state(self, s_app_dir, s_exchange_basename, s_symbol, s_odf_basename):73		(s_odf_bucket, s_odf_remote_path) = self.get_odf_remote_path(s_exchange_basename, 74																	s_odf_basename)7576		if self.remote_path_exists(s_odf_bucket, s_odf_remote_path):77			self.remote_remove(s_odf_remote_path)7879		self.clear_fce_state(s_app_dir, s_exchange_basename, s_symbol, s_odf_basename)8081	def clear_fce_state(self, s_app_dir, s_exchange_basename, s_symbol, s_odf_basename):82		# Delete tmp path, fce output directory on s3 and chunk output directory on s383		fce_pathspec = self.get_fce_pathspec(s_app_dir, s_exchange_basename, s_symbol, s_odf_basename)8485		(s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(self, fce_pathspec, fce_pathspec.s_fce_header_filename)8687		if self.local_path_exists(s_fce_local_path):88			self.local_remove(s_fce_local_path)89			90		(s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(self, fce_pathspec, fce_pathspec.s_fce_header_filename)91		92		if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):93			self.remote_remove(s_fce_bucket, s_fce_remote_path)94	95	def get_odf_remote_path(self, s_exchange_basename, s_odf_basename):9697		s_odf_remote_path = self.remote_make_path(self.s_bucket, s_exchange_basename, "odf", s_odf_basename)98		s_odf_remote_path = '.'.join([s_odf_remote_path, "rs3"])99		s_odf_bucket = self.remote_bucket(s_odf_remote_path)100101		if not self.remote_bucket_exists(s_odf_bucket):102			self.remote_make_bucket(s_odf_bucket)103104		return (s_odf_bucket, s_odf_remote_path)105106	def save_odf(self, s_exchange_basename, s_odf_basename, odf_obj):107		108		(s_odf_local_dir, s_odf_local_path) = self.get_odf_local_path(s_exchange_basename, s_odf_basename)109		110		if not self.local_path_exists(s_odf_local_dir):111			self.local_make_dirs(s_odf_local_dir)112113		odf_obj.to_bin_file(s_odf_local_path)114		115		(s_odf_bucket, s_odf_remote_path) = self.get_odf_remote_path(s_exchange_basename, s_odf_basename)116		117		self.upload_file(s_odf_local_path, s_odf_bucket, s_odf_remote_path)118		119		return True		120121	'''122		def get_fce_pathspec(self, s_app_dir,123									s_exchange_basename,124									s_symbol,125									s_odf_basename):126			return context.FCEContext(s_app_dir,127									s_exchange_basename,128									s_symbol,129									s_odf_basename)130	'''131132	def get_fce_local_path(self, ctx, s_fce_header_filename):133		s_fce_local_path = self.lsep.join([ctx.s_fce_local_dir, "18", 134											str(ctx.odf_jsunnoon), s_fce_header_filename])135		136		s_fce_local_dir = self.local_dirname(s_fce_local_path)137		return (s_fce_local_dir, s_fce_local_path)138	139	def get_fce_remote_path(self, ctx, s_fce_header_filename):140				141		s_fce_remote_path = self.remote_make_path(self.s_bucket, ctx.s_fce_remote_prefix, "18", 142											str(ctx.odf_jsunnoon), s_fce_header_filename)143		144		s_fce_bucket = self.remote_bucket(s_fce_remote_path)145		#log.debug(s_fce_bucket)146		#log.debug(s_fce_remote_path)147		return (s_fce_bucket, s_fce_remote_path)148149	def open_fce(self, ctx, s_fce_header_filename, key=None):150151		(s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)152153		if not self.local_path_exists(s_fce_local_path):154			(s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename)			155			156			if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):157				if not self.local_path_exists(s_fce_local_dir):158					self.local_make_dirs(s_fce_local_dir)159				self.download_file(s_fce_bucket, s_fce_remote_path, s_fce_local_path)160			else:161				return None162		163		fp_bin = open(s_fce_local_path, "rb")164165		fce_obj = fce.FCE()166167		fce_obj.read_bin_stream(fp_bin, key)168		169		fp_bin.close()170171		return fce_obj172173	def fce_exists(self, ctx, s_fce_header_filename):174		175		(s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)176		177		if self.local_path_exists(s_fce_local_path):178			return True179180		(s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename)181		182		if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):183			if not self.local_path_exists(s_fce_local_dir):184				self.local_make_dirs(s_fce_local_dir)185			self.download_file(s_fce_bucket, s_fce_remote_path, s_fce_local_path)186			return True187188		return False189190	def save_fce(self, ctx, s_fce_header_filename, fce_obj, key=None, b_save_csv=False):191		192		# First save the fce in the tmp directory193		(s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)194195		if not self.local_path_exists(s_fce_local_dir):196			self.local_make_dirs(s_fce_local_dir)197198		fce_obj.to_bin_file(s_fce_local_path, key)199		200		if b_save_csv:201			fce_obj.to_csv_file(s_fce_local_path + ".csv")202		203		# copy the tmp file to S3 directory204		(s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename)205		206		#log.debug(s_fce_bucket)207		#log.debug(s_fce_remote_path)208		self.upload_file(s_fce_local_path, s_fce_bucket, s_fce_remote_path)209		210211	def get_chunk_file_local_path(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):212		213		s_chunk_file_local_path = self.lsep.join([ctx.s_fce_local_dir, str(L_no), str(fce_jsunnoon), s_chunk_file_name])214		215		s_chunk_local_dir = self.local_dirname(s_chunk_file_local_path)216		217		if not self.local_path_exists(s_chunk_local_dir):218			self.local_make_dirs(s_chunk_local_dir)219		220		return (s_chunk_local_dir, s_chunk_file_local_path)221222	def get_chunk_file_remote_path(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):223		224		s_chunk_file_remote_path = self.remote_make_path(self.s_bucket, ctx.s_fce_remote_prefix, str(L_no), str(fce_jsunnoon), s_chunk_file_name)225		226		s_chunk_file_bucket = self.remote_bucket(s_chunk_file_remote_path)227		228		if not self.remote_bucket_exists(s_chunk_file_bucket):229			try:230				self.remote_make_bucket(s_chunk_file_bucket)231			except:232				log.error(s_chunk_file_bucket)233				log.error(s_chunk_file_remote_path)234				raise235			236		return (s_chunk_file_bucket, s_chunk_file_remote_path)237238	def chunk_file_exists(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):239		240		(s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)241		242		if self.local_path_exists(s_chunk_file_local_path):243			return True244245		(s_chunk_file_bucket, s_chunk_file_remote_path) = self.get_chunk_file_remote_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)246		247		if self.remote_path_exists(s_chunk_file_bucket, s_chunk_file_remote_path):248			if not self.local_path_exists(s_chunk_file_local_dir):249				self.local_make_dirs(s_chunk_file_local_dir)250			self.download_file(s_chunk_file_bucket, s_chunk_file_remote_path, s_chunk_file_local_path)251			return True252253		return False254255	def open_chunk_file(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name, chunk_size, key):256		257		(s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)258		259		chunk_arr_short = chunk.read_short_chunk_array(s_chunk_file_local_path, s_chunk_file_name, chunk_size, key)260		261		return chunk_arr_short262	263	def save_chunk_file(self, ctx, L_no, fce_jsunnoon, chunk_array, key=None, b_do_csv_chunk=False):264		265		s_chunk_file_name = chunk_array.get_name()266		267		# First save the fce in the tmp directory268		(s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)269270		if not self.local_path_exists(s_chunk_file_local_dir):271			self.local_make_dirs(s_chunk_file_local_dir)272273		chunk_array.to_bin_file_short(s_chunk_file_local_path, key)274		275		if b_do_csv_chunk:276			s_chunk_csv_local_path = '.'.join([s_chunk_file_local_path, "csv"])277			#log.debug("Saving chunk CSV: %s..." % (s_chunk_csv_local_path))278			chunk_array.save_csv(s_chunk_csv_local_path)279			280		# copy the tmp file to S3 directory281		(s_chunk_file_bucket, s_chunk_file_remote_path) = self.get_chunk_file_remote_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)282		283		self.upload_file(s_chunk_file_local_path, s_chunk_file_bucket, s_chunk_file_remote_path)284		285	def download_file(self, s_bucket, s_remote_file_path, s_local_file_path):286		287		s_local_dir = self.local_dirname(s_local_file_path)288		289		if not self.local_path_exists(s_local_dir):290			self.local_make_dirs(s_local_dir)291		292		self.download(s_bucket, s_remote_file_path, s_local_file_path)293294	def upload_file(self, s_local_file_path, s_bucket, s_remote_file_path):295		296		if not self.remote_bucket_exists(s_bucket):297			self.remote_make_bucket(s_bucket)298		299		self.upload(s_local_file_path, s_bucket, s_remote_file_path)300301	def move_up(self, s_local_file_path, s_bucket, s_remote_file_path):302		303		self.upload_file(s_local_file_path, s_bucket, s_remote_file_path)304		305		self.local_remove(s_local_file_path)306307	def move_down(self, s_bucket, s_remote_file_path, s_local_file_path):308		309		self.download_file(s_bucket, s_remote_file_path, s_local_file_path)310		311		self.remote_remove(s_bucket, s_remote_file_path)312313314class S3Store(AbstractFileStore):315	316	rsep = "/"317318	def __init__(self, s_bucket, s_aws_access_id, s_aws_secret_access_key, s_aws_region):319		super(S3Store, self).__init__()320		self.connection = boto.s3.connection.S3Connection(s_aws_access_id,321														s_aws_secret_access_key)322		self.s_bucket = s_bucket323324	def get_key(self, s_bucket, s_path):325		bucket = boto.s3.bucket.Bucket(connection=self.connection, name=s_bucket)326		key = boto.s3.key.Key(bucket)327		key.key = s_path328		return key329	330	def remote_make_path(self, s_bucket, *kargs):331		return self.rsep.join(kargs)332	333	def remote_abspath(self, s_bucket):334		return s_bucket335	336	def remote_path_exists(self, s_bucket, s_path):337		key = self.get_key(s_bucket, s_path)338		return key.exists()339	340	def remote_rmtree(self, s_bucket, s_path):341		self.remote_remove(s_bucket, s_path)342343	def remote_remove(self, s_bucket, s_path):344		key = self.get_key(s_bucket, s_path)345		key.delete()346	347	def remote_bucket(self, s_path):348		return self.s_bucket349	350	def remote_bucket_exists(self, s_bucket):351		bucket = boto.s3.bucket.Bucket(connection=self.connection, name=s_bucket)352		if not bucket:353			return False354		return True355		356	def remote_make_bucket(self, s_path):357		pass358	359	def upload(self, s_local_file_path, s_bucket, s_remote_file_path):360		key = self.get_key(s_bucket, s_remote_file_path)361		key.set_contents_from_filename(s_local_file_path)362		return key363364	def download(self, s_bucket, s_remote_file_path, s_local_file_path):365		key = self.get_key(s_bucket, s_remote_file_path)366		key.get_contents_to_filename(s_local_file_path)367		return key368369	370class LocalS3Store(AbstractFileStore):371	372	rsep = os.sep373	374	def __init__(self, s_remote_root):375		self.s_bucket = self.remote_abspath(s_remote_root)376		super(LocalS3Store, self).__init__()377378	def remote_make_path(self, s_bucket, *kargs):379		ls_args = [s_bucket]380		ls_args = ls_args + list(kargs)381		return self.rsep.join(ls_args)382	383	def remote_abspath(self, s_bucket):384		return os.path.abspath(s_bucket)385	386	def remote_path_exists(self, s_bucket, s_path):387		return os.path.exists(s_path)388	389	def remote_rmtree(self, s_path):390		return shutil.rmtree(s_path)391392	def remote_remove(self, s_bucket, s_path):393		return os.remove(s_path)394			395	def remote_bucket(self, s_path):396		return os.path.dirname(s_path)397	398	def remote_bucket_exists(self, s_bucket):399		return os.path.exists(s_bucket)400		
...path.py
Source:path.py  
...35        if os.path.exists(s):36            return s37        else:38            raise argparse.ArgumentTypeError(f"readable_dir:{s} is not a valid path")39    def remote_path_exists(self, s):40        re_path = "(/[a-zA-Z0-9_\s]+)*" # e.g. /home/user/Documents41        re_rem = "([a-zA-Z0-9]+:)" # e.g. GoogleDrive:42        if re.fullmatch('{}{}'.format(re_rem, re_path), s) == None:43            raise argparse.ArgumentTypeError(f'{s} is not a valid remote path')44        remote = s.split(':')[0]45        output = os.popen("rclone listremotes --long")46        remote_list = output.read().split()47        if remote + ":" in remote_list[::2]:48            pos = remote_list.index(remote + ":")49            self.remote_type = remote_list[pos+1]50            if self.remote_type != "drive":51                raise argparse.ArgumentError(f"The type of the given remote {remote} is not 'drive', and therefore currently not supported")52        else: raise argparse.ArgumentTypeError(f'{remote} is not a valid remote')53        return s...hpcutils.py
Source:hpcutils.py  
...19def Generate2mass(field_name):20def ListLCDir(remote_path, stfp):21    sftp.chdir(remote_path)22    return stfp.listdir()23def remote_path_exists(path, stfp):24	try:25		stfp.stat(path)26		return True27	except IOError, e:28		return False29def LoadRemoteKeylist(field_name, stfp):30	fname = remote_keylist_fname(field_name)31	32	if not remote_path_exists(fname, stfp): return None33	34	stfp.get(fname, '%s/keylist_field%s.txt'%(keylist_dir, field_name)35	return np.loadtxt(fname, dtype=keylist_dt)36def FindLightcurvePathOnDella(ID,MPI_COMM):37	with closing(SSHClient()) as ssh:38	    ssh.load_system_host_keys() #NOTE: no AutoAddPolicy() 39	    ssh.connect(d['hostname'], username=d.get('user'))40	    with closing(ssh.open_sftp()) as sftp:41			for field in field_info:42				lcpath = "%s/%s.tfalc"%(field_info[field]['path'], ID)43				if remote_path_exists(lcpath, stfp): return lcpath...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
