Best Python code snippet using Contexts
_NodeFileTransferMethods.py
Source:_NodeFileTransferMethods.py  
1import threading2import os3import sys4sys.path.append(os.path.dirname(os.path.realpath(__file__)))5from _utils import file_size, md5, eprint6from _NodeInternalClasses import Future, Thread7def _write_to_file(self, data, filename):8	self._request(-2, data=data, file_name=filename)9def _process__write_to_file(self, request):10	try:11		filename = os.path.abspath(request["data"]["file_name"])12		if self._out_file != None and self._out_file.name != filename:13			self._out_file.close()14			self._out_file = None15		if self._out_file == None:16			if not os.path.isdir(os.path.dirname(filename)):17				os.makedirs(os.path.dirname(filename))18			self._out_file = open(filename, "wb")19		self._out_file.write(request["data"]["data"])20		self._out_file.flush()21	except:22		pass23def _close_file(self):24	self._request(-2)25def _process__close_file(self, request):26	try:27		if self._out_file != None:28			self._out_file.close()29			self._out_file = None30	except:31		pass32def get_file(self, src_filename, dest_filename = None, block=True):33	session_id = self._get_session_id()34	if dest_filename == None:35		dest_filename = os.path.basename(src_filename)36	self._request(session_id, src_filename=src_filename, file_size=file_size(dest_filename), md5=md5(dest_filename), block=block)37	def session():38		response = self._recv_response(session_id)39		if response["cancel"] or response["data"]["same_file"]:40			return41		if not response["success"]:42			eprint(response["traceback"])43			raise response["exception"]44		45		if not os.path.isdir(os.path.dirname(os.path.abspath(dest_filename))):46			os.makedirs(os.path.dirname(os.path.abspath(dest_filename)))47		try:48			file = open(dest_filename, "wb")49			self._respond_ok(session_id)50			file.close()51		except BaseException as e:52			self._respond_exception(session_id, e)53			if block:54				raise e55			else:56				return57		recved_size = 058		full_size = response["data"]["file_size"]59		with open(dest_filename, "wb") as file:60			while recved_size < full_size:61				response = self._recv_response(session_id)62				if response["cancel"]:63					break64				file.write(response["data"]["data"])65				file.flush()66				recved_size += len(response["data"]["data"])67	if block:68		session()69	else:70		thread = threading.Thread(target=session)71		thread.start()72		return Future(self, session_id)73def _process_get_file(self, request):74	session_id = request["session_id"]75	def session():76		src_filename = request["data"]["src_filename"]77		src_file_size = file_size(src_filename)78		same_file = (src_file_size != 0 and request["data"]["file_size"] == src_file_size and \79			         request["data"]["md5"] == md5(src_filename))80		try:81			file = open(src_filename, "rb")82			file.close()83			self._respond_ok(session_id, same_file=same_file, file_size=src_file_size)84		except BaseException as e:85			self._respond_exception(session_id, e, same_file=same_file, block=request["block"])86			self._make_signal(session_id)87			return88		if same_file:89			if not request["block"]:90				self._put_result(session_id)91			self._make_signal(session_id)92			return93		response = self._recv_response(session_id)94		if not response["success"]:95			if not request["block"]:96				self._put_exception(session_id, response["exception"])97			self._make_signal(session_id)98			return99		100		block_size = 8192*1024101		sent_size = 0102		with open(src_filename, "rb") as file:103			while sent_size < src_file_size:104				data = file.read(block_size)105				self._respond_ok(session_id, data=data)106				sent_size += len(data)107		if not request["block"]:108			self._put_result(session_id)109		self._make_signal(session_id)110	thread = Thread(target=session)111	thread.start()112	signal = self._recv_signal(session_id)113	if signal["cancel"] and thread.is_alive():114		thread.kill()115		thread.join()116		self._respond_ok(session_id, cancel=True)117		self._put_result(session_id, cancelled=True)118	else:119		thread.join()120def put_file(self, src_filename, dest_filename = None, block=True):121	file = open(src_filename, "rb")122	file.close()123	session_id = self._get_session_id()124	if dest_filename == None:125		dest_filename = os.path.basename(src_filename)126	src_file_size = file_size(src_filename)127	self._stop_send = False128	self._request(session_id, md5=md5(src_filename), file_size=src_file_size, dest_filename=dest_filename, block=block)129	130	def session():131		response = self._recv_response(session_id)132		if response["cancel"] or response["data"]["same_file"]:133			return134		if not response["success"]:135			eprint(response["traceback"])136			raise response["exception"]137		block_size = 8192*1024138		sent_size = 0139		with open(src_filename, "rb") as file:140			while sent_size < src_file_size:141				if self._stop_send:142					break143				data = file.read(block_size)144				self._respond_ok(session_id, data=data)145				sent_size += len(data)146	if block:147		session()148	else:149		thread = Thread(target=session)150		thread.start()151		return Future(self, session_id)152def _process_put_file(self, request):153	session_id = request["session_id"]154	def session():155		dest_filename = request["data"]["dest_filename"]156		full_size = request["data"]["file_size"]157		if full_size == file_size(dest_filename) and \158		   request["data"]["md5"] == md5(dest_filename):159			self._respond_ok(session_id, same_file=True, block=request["block"])160			self._make_signal(session_id)161			return162		163		if not os.path.isdir(os.path.dirname(os.path.abspath(dest_filename))):164			os.makedirs(os.path.dirname(os.path.abspath(dest_filename)))165		166		try:167			file = open(dest_filename, "wb")168			file.close()169			self._respond_ok(session_id, same_file=False)170		except BaseException as e:171			self._respond_exception(session_id, e, same_file=False, block=request["block"])172			self._make_signal(session_id)173			return174		recved_size = 0175		with open(dest_filename, "wb") as file:176			while recved_size < full_size:177				response = self._recv_response(session_id)					178				file.write(response["data"]["data"])179				file.flush()180				recved_size += len(response["data"]["data"])181		if not request["block"]:182			self._put_result(session_id)183		self._make_signal(session_id)184	thread = Thread(target=session)185	thread.start()186	signal = self._recv_signal(session_id)187	if signal["cancel"] and thread.is_alive():188		thread.kill()189		thread.join()190		self._respond_ok(session_id, cancel=True)191		self._put_result(session_id, cancelled=True)192	else:193		thread.join()194def get_folder(self, src_foldername, dest_foldername = None, block=True):195	session_id = self._get_session_id()196	if dest_foldername == None:197		dest_foldername = os.path.basename(src_foldername)198	self._request(session_id, src_foldername=src_foldername, block=block)199	# self._request(session_id, src_foldername=src_foldername, block=block, debug="I need folder \"" + src_foldername + "\"")200	def session():201		response = self._recv_response(session_id)202		if response["cancel"]:203			return204		if not response["success"]:205			eprint(response["traceback"])206			raise response["exception"]207		for folder in response["data"]["folders"]:208			if not os.path.isdir(dest_foldername + "/" + folder):209				try:210					os.makedirs(dest_foldername + "/" + folder)211				except:212					eprint(self._traceback(False))213		self._respond_ok(session_id)214		# self._respond_ok(session_id, debug="Created all folders. Please send files.")215		while True:216			response = self._recv_response(session_id)217			if response["cancel"] or response["data"]["finished"]:218				return219			if not response["success"]:220				eprint(response["traceback"])221				continue222			dest_filename = dest_foldername + "/" + response["data"]["file_name"]223			same_file = (response["data"]["file_size"] == file_size(dest_filename) and \224				         response["data"]["md5"] == md5(dest_filename))225			if same_file:226				self._respond_ok(session_id, same_file = same_file)227				# self._respond_ok(session_id, same_file = same_file, debug="Already have file: " + dest_filename)228				continue229			else:230				try:231					file = open(dest_filename, "wb")232					self._respond_ok(session_id, same_file = same_file)233					# self._respond_ok(session_id, same_file = same_file, debug="I don't have this file, please send me.")234					file.close()235				except BaseException as e:236					eprint(self._traceback())237					self._respond_exception(session_id, e)238					# self._respond_exception(session_id, e, debug="Error")239					continue240			recved_size = 0241			full_size = response["data"]["file_size"]242			with open(dest_filename, "wb") as file:243				while recved_size < full_size:244					response = self._recv_response(session_id)245					if response["cancel"]:246						return247					file.write(response["data"]["data"])248					file.flush()249					recved_size += len(response["data"]["data"])250	if block:251		session()252	else:253		thread = threading.Thread(target=session)254		thread.start()255		return Future(self, session_id)256def _process_get_folder(self, request):257	session_id = request["session_id"]258	def session():259		src_foldername = request["data"]["src_foldername"]260		if not os.path.isdir(src_foldername):261			self._respond_exception(session_id, FileNotFoundError(src_foldername + " is not a folder."), block=request["block"])262			self._make_signal(session_id)263			return264		i = len(src_foldername)-1265		while src_foldername[i] in ['/', '\\']:266			i -= 1267		folders = []268		for root, dirs, files in os.walk(src_foldername):269			for name in dirs:270				folders.append(os.path.join(root, name)[i+2:])271		self._respond_ok(session_id, folders=folders)272		# self._respond_ok(session_id, folders=folders, debug="Please create folders: " + str(folders))273		response = self._recv_response(session_id)274		for root, dirs, files in os.walk(src_foldername):275			for name in files:276				src_filename = os.path.join(root, name)277				src_file_size = file_size(src_filename)278				relative_file_name = src_filename[i+2:]279				280				try:281					file = open(src_filename, "rb")282					file.close()283					self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename))284					# self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename), debug="Do you have file " + src_filename + "?")285				except BaseException as e:286					self._respond_exception(session_id, e)287					# self._respond_exception(session_id, e, debug="I cannot open file: " + src_filename)288					continue289				response = self._recv_response(session_id)290				if not response["success"] or response["data"]["same_file"]:291					continue292				block_size = 8192*1024293				sent_size = 0294				with open(src_filename, "rb") as file:295					while sent_size < src_file_size:296						data = file.read(block_size)297						self._respond_ok(session_id, data=data)298						# self._respond_ok(session_id, data=data, debug="Please write " + str(len(data)) + " bytes data to file " + relative_file_name)299						sent_size += len(data)300		self._respond_ok(session_id, finished=True)301		# self._respond_ok(session_id, finished=True, debug="Sent all files.")302		if not request["block"]:303			self._put_result(session_id)304		self._make_signal(session_id)305	thread = Thread(target=session)306	thread.start()307	signal = self._recv_signal(session_id)308	if signal["cancel"] and thread.is_alive():309		thread.kill()310		thread.join()311		self._respond_ok(session_id, cancel=True)312		# self._respond_ok(session_id, cancel=True, debug="Cancel.")313		self._put_result(session_id, cancelled=True)314	else:315		thread.join()316def put_folder(self, src_foldername, dest_foldername = None, block = True):317	if not os.path.isdir(src_foldername):318		raise FileNotFoundError(src_foldername + " is not a folder.")319	session_id = self._get_session_id()320	if dest_foldername == None:321		dest_foldername = os.path.basename(src_foldername)322	self._stop_send = False323	self._request(session_id, dest_foldername=dest_foldername, block=block)324	# self._request(session_id, dest_foldername=dest_foldername, block=block, debug="I will send you folder: " + dest_foldername)325	def session():326		response = self._recv_response(session_id)327		if response["cancel"]:328			return329		if not response["success"]:330			eprint(response["traceback"])331			raise response["exception"]332		i = len(src_foldername)-1333		while src_foldername[i] in ['/', '\\']:334			i -= 1335		folders = []336		for root, dirs, files in os.walk(src_foldername):337			for name in dirs:338				folders.append(os.path.join(root, name)[i+2:])339		self._respond_ok(session_id, folders=folders)340		# self._respond_ok(session_id, folders=folders, debug="Please create folders: " + str(folders))341		response = self._recv_response(session_id)342		if response["cancel"]:343			return344		for root, dirs, files in os.walk(src_foldername):345			for name in files:346				src_filename = os.path.join(root, name)347				src_file_size = file_size(src_filename)348				relative_file_name = src_filename[i+2:]349				350				try:351					file = open(src_filename, "rb")352					file.close()353					self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename))354					# self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename), debug="Do you have file " + relative_file_name + "?")355				except BaseException as e:356					self._respond_exception(session_id, e) # , debug="Error")357					continue358				response = self._recv_response(session_id)359				if response["cancel"]:360					return361				if not response["success"] or response["data"]["same_file"]:362					continue363				block_size = 8192*1024364				sent_size = 0365				with open(src_filename, "rb") as file:366					while sent_size < src_file_size:367						if self._stop_send:368							return369						data = file.read(block_size)370						self._respond_ok(session_id, data=data)371						# self._respond_ok(session_id, data=data, debug="Please write " + str(len(data)) + " bytes to file " + relative_file_name)372						sent_size += len(data)373		self._respond_ok(session_id, finished=True)374		# self._respond_ok(session_id, finished=True, debug="Sent all files.")375	if block:376		session()377	else:378		thread = threading.Thread(target=session)379		thread.start()380		return Future(self, session_id)381def _process_put_folder(self, request):382	session_id = request["session_id"]383	def session():384		dest_foldername = request["data"]["dest_foldername"]385		try:386			if not os.path.isdir(dest_foldername):387				os.makedirs(dest_foldername)388			self._respond_ok(session_id)389			# self._respond_ok(session_id, debug="Created folder " + dest_foldername)390		except BaseException as e:391			self._respond_exception(session_id, e, block=request["block"])392			# self._respond_exception(session_id, e, block=request["block"], debug="Cannot create folder " + dest_foldername)393			self._make_signal(session_id)394			return395		response = self._recv_response(session_id)396		for folder in response["data"]["folders"]:397			if not os.path.isdir(dest_foldername + "/" + folder):398				try:399					os.makedirs(dest_foldername + "/" + folder)400				except:401					pass402		self._respond_ok(session_id)403		# self._respond_ok(session_id, debug="Created all folders.")404		while True:405			response = self._recv_response(session_id)406			if not response["success"]:407				continue408			if response["data"]["finished"]:409				break410			dest_filename = dest_foldername + "/" + response["data"]["file_name"]411			same_file = (response["data"]["file_size"] == file_size(dest_filename) and \412				         response["data"]["md5"] == md5(dest_filename))413			if same_file:414				self._respond_ok(session_id, same_file = same_file)415				# self._respond_ok(session_id, same_file = same_file, debug="Already have file " + dest_filename)416				continue417			else:418				try:419					file = open(dest_filename, "wb")420					self._respond_ok(session_id, same_file = same_file)421					# self._respond_ok(session_id, same_file = same_file, debug="I don't have this file, please send me.")422					file.close()423				except BaseException as e:424					self._respond_exception(session_id, e)425					# self._respond_exception(session_id, e, debug="Error")426					continue427			recved_size = 0428			full_size = response["data"]["file_size"]429			with open(dest_filename, "wb") as file:430				while recved_size < full_size:431					response = self._recv_response(session_id)432					file.write(response["data"]["data"])433					file.flush()434					recved_size += len(response["data"]["data"])435		436		if not request["block"]:437			self._put_result(session_id)438		self._make_signal(session_id)439	thread = Thread(target=session)440	thread.start()441	signal = self._recv_signal(session_id)442	if signal["cancel"] and thread.is_alive():443		thread.kill()444		thread.join()445		self._respond_ok(session_id, cancel=True)446		# self._respond_ok(session_id, cancel=True, debug="Cancel.")447		self._put_result(session_id, cancelled=True)448	else:...reqNewItem.py
Source:reqNewItem.py  
1from itemRec.newRec import itemSearch2# from newRec import itemSearch3import pandas as pd4DB_user = 'mdmig'5import os6LOCATION = r"C:/Users/ezmedicom/PycharmProjects/pythonProject/instantclient_19_11"7os.environ["PATH"] = LOCATION + ";" + os.environ["PATH"]  # íê²½ë³ì ë±ë¡8import cx_Oracle9con = cx_Oracle.connect(DB_user, 'medilinx00', '10.29.22.8:1522/mdvans', encoding="UTF-8")10file_DB = 'ICOMATCH'11req_DB = 'MDVUDIITEMREQUEST'12sql = """SELECT 13                seq14               ,reqDate15               ,reqFileId16               ,ATC.UUID_SEQ AS reqFileNm17               ,ATC.FILE_PATH AS reqFilePath18               ,reqDiv19               ,reqState20               ,sameFileId21               ,accFileId22               ,reqRemark23               ,regUserId24               ,regDate25               ,modDate26               ,addcolumn27        FROM MDVUDIITEMREQUEST REQ28        LEFT JOIN ICOMATCH ATC ON ATC.UUID = REQ.REQFILEID AND ATC.UUID_SEQ = REQ.reqFileIdSeq29        WHERE REQSTATE = 'N'30        AND ATC.FILE_NAME IS NOT NULL31        AND ATC.FILE_PATH IS NOT NULL32        ORDER BY SEQ"""33swtich_df = pd.read_sql("SELECT * FROM MDVUDIITEMREQUEST WHERE REQSTATE = 'P' ORDER BY SEQ DESC", con)34if swtich_df.empty:35    file_df = pd.read_sql(sql, con)36else:37    file_df = pd.DataFrame(columns = ['SEQ', 'REQFILEID', 'REQFILENM', 'REQFILEPATH', 'REQDIV', 'ADDCOLUMN'])38file_df = file_df[file_df['REQFILEID'].notnull()]39file_df = file_df.reset_index(drop=True)40update_df = file_df[['SEQ', 'REQFILEID', 'REQFILENM', 'REQFILEPATH', 'REQDIV', 'ADDCOLUMN']]41update_df['ADDCOLUMN'] = update_df['ADDCOLUMN'].fillna('')42# update_df['REQFILEPATH'] = update_df['REQFILEPATH'].str.replace('Z:', '/z')      # Linux43ing_sql = "UPDATE " + req_DB44ing_sql += " SET REQSTATE = 'P', MODDATE = SYSDATE, MODUSERID = 'pyadmin', RESULTMSG = ''"45ing_sql += " WHERE SEQ = :SEQ"46file_sql = " INSERT INTO " + file_DB + " ("47file_sql += " HOUSE_CODE, UUID, UUID_SEQ, FILE_NAME, FILE_PATH, FILE_SIZE, FILE_EXTENSION,"48file_sql += " REAL_FILE_NAME, BIZ_TYPE, ADD_DATE, ADD_TIME, ADD_USER_ID )"49file_sql += " SELECT "50file_sql += " '100', SEQ_ATTACHID.NEXTVAL, :UUID_SEQ, :FILE_NAME, :FILE_PATH, :FILE_SIZE, :FILE_EXTENSION,"51file_sql += " :REAL_FILE_NAME, 'UDI', TO_CHAR(SYSDATE,'YYYYMMDD'), TO_CHAR(SYSDATE,'HH24MISS'), 'pyadmin'"52file_sql += " FROM DUAL "53end1_sql = " UPDATE " + req_DB + " SET (REQSTATE, SAMEFILEID, MODDATE, MODUSERID) ="54end1_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, SAME.UUID AS SAMEFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"55end1_sql += " LEFT JOIN  ICOMATCH SAME ON SAME.FILE_NAME = :SAME_FILE_NAME)"56end1_sql += " WHERE SEQ = :SEQ"57end2_sql = " UPDATE " + req_DB + " SET (REQSTATE, ACCFILEID, MODDATE, MODUSERID) ="58end2_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, ACC.UUID AS ACCFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"59end2_sql += " LEFT JOIN  ICOMATCH ACC ON ACC.FILE_NAME = :ACC_FILE_NAME)"60end2_sql += " WHERE SEQ = :SEQ"61end3_sql = " UPDATE " + req_DB + " SET (REQSTATE, SAMEFILEID, ACCFILEID, MODDATE, MODUSERID) ="62end3_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, SAME.UUID AS SAMEFILEID, ACC.UUID AS ACCFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"63end3_sql += " LEFT JOIN  ICOMATCH SAME ON SAME.FILE_NAME = :SAME_FILE_NAME"64end3_sql += " LEFT JOIN  ICOMATCH ACC ON ACC.FILE_NAME = :ACC_FILE_NAME)"65end3_sql += " WHERE SEQ = :SEQ"66error_sql = "UPDATE " + req_DB67error_sql += " SET REQSTATE = 'E', MODDATE = SYSDATE, MODUSERID = 'pyadmin', RESULTMSG = :RESULTMSG"68error_sql += " WHERE SEQ = :SEQ"69try:70    cur = con.cursor()71    for file_data in update_df.to_dict('index').values():72        seq = file_data['SEQ']73        try:74            cur.prepare(ing_sql)75            print('íì¼ ìì± ìì')76            cur.execute(ing_sql, {'SEQ': seq})77            print('íì¼ ìì± ì¤')78        except Exception as e1:79            errMsg = ">>>> :: " + str(e1)80            con.rollback()81            print(errMsg)82        finally:83            try:84                con.commit()85            except Exception as e3:86                print('commit err' + str(e3))87        # í목ì¶ì²88        file_path = file_data['REQFILEPATH']89        file_nm = file_data['REQFILENM']90        div = file_data['REQDIV']91        if file_data['ADDCOLUMN'] == '':92            sel_cols = []93        else:94            sel_cols = file_data['ADDCOLUMN'].split(',')95        try :96            same_file, acc_file = itemSearch.item_search(file_path, file_nm, div, seq, sel_cols)97            print('íì¼ ìì± ìë£')98        except Exception as e1:99            print('íì¼ ì
ë ¥ ì¤ë¥')100            errMsg = str(e1)101            cur.prepare(error_sql)102            cur.execute(error_sql, {'SEQ': seq, 'RESULTMSG': errMsg})103        try:104            if file_data['REQDIV'] == '1':105                cur.prepare(file_sql)106                cur.execute(file_sql, {'UUID_SEQ': same_file,107                                       'FILE_NAME': same_file,108                                       'FILE_PATH': file_path,109                                       'FILE_SIZE': os.path.getsize(file_path + same_file),110                                       'FILE_EXTENSION': same_file.split('.')[1],111                                       'REAL_FILE_NAME': same_file})112                cur.prepare(end1_sql)113                cur.execute(end1_sql, {'SAME_FILE_NAME': same_file,114                                       'SEQ': seq})115                print('ìí ì
ë°ì´í¸ ìë£')116            elif file_data['REQDIV'] == '2':117                cur.prepare(file_sql)118                cur.execute(file_sql, {'UUID_SEQ': acc_file,119                                       'FILE_NAME': acc_file,120                                       'FILE_PATH': file_path,121                                       'FILE_SIZE': os.path.getsize(file_path + acc_file),122                                       'FILE_EXTENSION': acc_file.split('.')[1],123                                       'REAL_FILE_NAME': acc_file})124                cur.prepare(end2_sql)125                cur.execute(end2_sql, {'ACC_FILE_NAME': acc_file,126                                       'SEQ': seq})127                print('ìí ì
ë°ì´í¸ ìë£')128            elif file_data['REQDIV'] == '3':129                cur.prepare(file_sql)130                cur.execute(file_sql, {'UUID_SEQ': same_file,131                                       'FILE_NAME': same_file,132                                       'FILE_PATH': file_path,133                                       'FILE_SIZE': os.path.getsize(file_path + same_file),134                                       'FILE_EXTENSION': same_file.split('.')[1],135                                       'REAL_FILE_NAME': same_file})136                cur.prepare(file_sql)137                cur.execute(file_sql, {'UUID_SEQ': acc_file,138                                       'FILE_NAME': acc_file,139                                       'FILE_PATH': file_path,140                                       'FILE_SIZE': os.path.getsize(file_path + acc_file),141                                       'FILE_EXTENSION': acc_file.split('.')[1],142                                       'REAL_FILE_NAME': acc_file})143                cur.prepare(end3_sql)144                cur.execute(end3_sql, {'SAME_FILE_NAME': same_file,145                                       'ACC_FILE_NAME': acc_file,146                                       'SEQ': seq})147                print('ìí ì
ë°ì´í¸ ìë£')148            else:149                print('REQDIV error')150        except Exception as e:151            print('update err : ' + str(e))152        finally:153            try:154                con.commit()155            except Exception as e3:156                print('commit err : ' + str(e3))157except Exception as e1:158    print('íì¼ ì
ë ¥ ì¤ë¥')159    errMsg = str(e1)160    cur.prepare(error_sql)161    cur.execute(error_sql, {'SEQ': seq, 'RESULTMSG': errMsg})162    con.commit()163finally:164    try:165        cur.close()166    except Exception as e3:...test.py
Source:test.py  
1import unittest2from os import listdir, path, remove3from os.path import isfile, join4import filecmp5import netCDF46import numpy as np7from radarconverter.radar_grib2netcdf.radar_grib2netcdf import radar_grib2netcdf8from radarconverter.radar_netcdf2grib.radar_netcdf2grib import radar_netcdf2grib9path_grib1 = "dataset/grib1/"10path_grib2 = "dataset/grib2/"11path_netcdf = "dataset/netcdf/"12path_temps = "dataset/temps/"13def clear_dir(dir_name):14    """15    Delete all file ins pecified dir16    """17    print("CLEANING DIR {} ...".format(dir_name))18    for f in listdir(dir_name):19            if isfile(join(dir_name, f)):20                remove(join(dir_name, f))21    print("CLEAN DIR {} DONE".format(dir_name))22def read_netcdf_data(name_nc):23    # Apertura del file netcdf24    ncid = netCDF4.Dataset(name_nc)25    for k in ncid.variables.keys():26        if k == 'cum_pr_mm':  # Estraggo gli attributi del campo di pioggia27            varid_pr = ncid.variables['cum_pr_mm']28            temp = [varid_pr[:]][0]29            temp = np.array(temp)30            cum_pr_mm = temp[0]31        if k == 'lat':32            varid_pr = ncid.variables['lat']33            temp = [varid_pr[:]][0]34            temp = np.array(temp)35            lats = temp[0]36        if k == 'lon':37            varid_pr = ncid.variables['lon']38            temp = [varid_pr[:]][0]39            temp = np.array(temp)40            lons = temp[0]41    return lats, lons, cum_pr_mm42class TestMethods(unittest.TestCase):43    """44    Insert45    """46    def test_convertion_grib1(self):47        print("TEST GRIB1")48        same_file = True49        files = [f for f in listdir(path_grib1) if isfile(join(path_grib1, f))]50        for f in files:51            grib_filename_orig = path_grib1 + path.basename(f)52            grib_filename_new = path_temps + path.basename(f)53            nc_filename = path_temps + path.basename(f).replace("grib1", "nc")54            radar_grib2netcdf(grib_filename_orig, nc_filename)55            radar_netcdf2grib(nc_filename, grib_filename_new, 1)56            if not filecmp.cmp(grib_filename_orig, grib_filename_new):57                same_file = False58                print("Errore su file: {}".format(grib_filename_orig))59        self.assertTrue(same_file)60        clear_dir(path_temps)61    '''62    def test_convertion_grib2(self):63        print("TEST GRIB2")64        same_file = True65        files = [f for f in listdir(path_grib2) if isfile(join(path_grib2, f))]66        for f in files:67            grib_filename_orig = path_grib2 + path.basename(f)68            grib_filename_new = path_temps + path.basename(f)69            nc_filename = path_temps + path.basename(f).replace("grib2", "nc")70            radar_grib2netcdf(grib_filename_orig, nc_filename)71            radar_netcdf2grib(nc_filename, grib_filename_new, 2)72            if not filecmp.cmp(grib_filename_orig, grib_filename_new):73                same_file = False74                print("Errore su file: {}".format(grib_filename_orig))75        self.assertTrue(same_file)76        clear_dir(path_temps)77    '''78    def test_convertion_netcdf(self):79        print("TEST NETCDF")80        same_file = True81        files = [f for f in listdir(path_netcdf) if isfile(join(path_netcdf, f))]82        for f in files:83            nc_filename_orig = path_netcdf + path.basename(f)84            nc_filename_new = path_temps + path.basename(f)85            grib_filename = path_temps + path.basename(f).replace("nc", "grib1")86            radar_netcdf2grib(nc_filename_orig, grib_filename, 1)87            radar_grib2netcdf(grib_filename, nc_filename_new)88            if not filecmp.cmp(nc_filename_orig, nc_filename_new):89                lats_new, lons_new, cum_pr_mm_new = read_netcdf_data(nc_filename_new)90                lats_orig, lons_orig, cum_pr_mm_orig = read_netcdf_data(nc_filename_orig)91                error_max_lat = np.max(np.abs(lats_new - lats_orig))92                error_max_lon = np.max(np.abs(lons_new - lons_orig))93                error_max_cum_pr_mm = np.max(np.abs(cum_pr_mm_new - cum_pr_mm_orig))94                error_max = error_max_lat95                if error_max < error_max_lon:96                    error_max = error_max_lon97                if error_max < error_max_cum_pr_mm:98                    error_max = error_max_cum_pr_mm99                if error_max >= 0.005:100                    print("errore massmo {}".format(str(error_max)))101                    same_file = False102                    print("Errore su file: {}".format(nc_filename_orig))103        self.assertTrue(same_file)104        clear_dir(path_temps)105if __name__ == '__main__':...test_cli.py
Source:test_cli.py  
1import unittest2from hashlib import md53from tempfile import gettempdir4class Test(unittest.TestCase):5    def test_link(self):6        import string7        from os.path import join, split, exists8        from os import makedirs9        from shutil import rmtree10        tmp = join(gettempdir(), "dupln")11        data = dict(12            filter(13                (lambda _: isinstance(_[1], str)),14                ((k, getattr(string, k)) for k in dir(string) if k[0].isalpha()),15            )16        )17        # pprint.pprint(data)18        def test(sepins=[2, 5, 8]):19            exists(tmp) and rmtree(tmp)20            same_file = {}21            def put_bytes(b, path):22                h = md5()23                h.update(b)24                q = (len(b), h.hexdigest())25                if q in same_file:26                    same_file[q] += 127                else:28                    same_file[q] = 129                dir = split(path)[0]30                exists(dir) or makedirs(dir)31                with open(path, "wb") as o:32                    o.write(b)33            for k, v in data.items():34                b = v.encode("UTF-8")35                put_bytes(b[::-1], join(tmp, k))  # reversed36                for i in sepins:37                    if i < len(k):38                        k = k[:i] + "/" + k[i:]39                        put_bytes(b, join(tmp, k))40            return dict(41                same_file=sum(1 for count in same_file.values() if count > 1),42                unique_files=len(same_file),43                disk_size=sum(size_hash[0] for size_hash in same_file.keys()),44                size=sum(45                    size_hash[0] * count for size_hash, count in same_file.items()46                ),47                files=sum(same_file.values()),48                same_size=len(set(size_hash[0] for size_hash in same_file.keys())),49                linked=sum(count - 1 for count in same_file.values() if count > 1),50            )51        v = test()52        from .__main__ import App53        # Total disk_size 836b; files 26; inodes 26; same_size 8; size 836b;54        total = App().main(["cmd", "stat", tmp])55        self.assertEqual(total.disk_size, v["size"])56        self.assertEqual(total.files, v["files"])57        self.assertEqual(total.inodes, v["files"])58        self.assertEqual(total.same_size, v["same_size"])59        self.assertEqual(total.size, v["size"])60        # Total devices 1; disk_size 836b; files 26; inodes 26; size 836b; unique_size 8;61        total = App().main(["cmd", "unique_files", tmp])62        self.assertEqual(total.disk_size, v["size"])63        self.assertEqual(total.files, v["files"])64        self.assertEqual(total.inodes, v["files"])65        self.assertEqual(total.size, v["size"])66        self.assertEqual(total.unique_size, v["same_size"])67        # disk_size ; files ; inodes ; linked ; same_hash ; same_size ; size ;68        total = App().main(["cmd", "link", tmp])69        self.assertEqual(total.disk_size, v["disk_size"])70        self.assertEqual(total.files, v["files"])71        self.assertEqual(total.inodes, v["files"])72        self.assertEqual(total.linked, v["linked"])73        self.assertEqual(total.same_hash, v["same_file"])74        self.assertEqual(total.same_size, v["same_size"])75        self.assertEqual(total.size, v["size"])76        #77        total = App().main(["cmd", "unique_files", tmp])78        self.assertEqual(total.disk_size, v["disk_size"])79        self.assertEqual(total.files, v["files"])80        self.assertEqual(total.inodes, v["unique_files"])81        self.assertEqual(total.size, v["size"])82        self.assertEqual(total.unique_size, v["same_size"])83        #84        total = App().main(["cmd", "stat", tmp])85        self.assertEqual(total.disk_size, v["disk_size"])86        self.assertEqual(total.files, v["files"])87        self.assertEqual(total.same_size, v["same_size"])88        self.assertEqual(total.inodes, v["unique_files"])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
