How to use get_last_file method in Slash

Best Python code snippet using slash

core.py

Source:core.py Github

copy

Full Screen

...21def check_pro(ps_cmd):22 ex_result = ex_cmd(ps_cmd)23 status = ex_result[2]24 return status25def get_last_file(model):26 position_file = position_dir + model + '_position.json'27 with open(position_file,'r') as f:28 data = json.load(f,encoding='utf-8')29 return [data[-1]["inode"],data[-1]["pos"],data[-1]["file"]] #{u'inode': 922009, u'pos': 55574166, u'file': u'/data0/logs/clickstream/staytime.20171221160000'}30def check_size(model):31 size_ngixn_cmd = "du -sb " + get_last_file(model)[2] + "|awk '{print $1}'"32 size_flume_pos,size_nginx_log = int(get_last_file(model)[1]),int(ex_cmd(size_ngixn_cmd)[0])33 if size_flume_pos and size_nginx_log:34 diff_num = size_nginx_log - size_flume_pos35 get_flag = "YES"36 else:37 get_flag = "NO"38 diff_num = 100000039 return {"size_nginx_log": size_nginx_log, "size_flume_pos": size_flume_pos, "diff_num": diff_num, "get_flag": get_flag,"last_file":get_last_file(model)[2]}40def diff_model_allert(model):41 for i in range(3):42 size_list = check_size(model)43 if size_list["get_flag"] == "NO":44 printLog('''model,"未获取到对比数据,将会重试2次,间隔20秒"''',3)45 time.sleep(20)46 continue47 #print model + ": size_flume_pos :", size_list["size_flume_pos"], " size_nginx_log:", size_list["size_nginx_log"], "diff_num:",size_list["diff_num"]48 if size_list["diff_num"] >= allert_num:49 err_mess = "%s 同步延迟超过%sB延迟大小(日志实际大小-flume读取大小)为:%sB延迟读取文件: %s" % (model,allert_num,size_list["diff_num"],size_list["last_file"])50 printLog(err_mess,3)51 global read_err_model_list52 read_err_model_list.append({model:size_list["diff_num"]})53 global err_mess_list...

Full Screen

Full Screen

file_manager.py

Source:file_manager.py Github

copy

Full Screen

...24 if pair in s and os.path.isfile(os.path.join(path, s))]25 # a.sort(key=lambda s: os.path.getmtime(os.path.join(path, s)))26 return a27 @staticmethod28 def get_last_file(exchange, pair, stream):29 file_name = '{}_{}_{}'.format(exchange,30 pair.upper(),31 str(datetime.now().date()))32 return os.path.join(os.getcwd(), 'files', exchange, stream, file_name)33 @staticmethod34 def does_file_exist(exchange, stream, pair):35 file_path = FileManager.get_last_file(exchange, pair, stream)36 return os.path.exists(file_path)37 @staticmethod38 def save_data_frame_to_file(exchange, stream, pair, df):39 file_path = FileManager.get_last_file(exchange, pair, stream)40 with open(file_path, 'a') as file:41 df.to_csv(file, sep=',', header=file.tell() == 0, index=False)42 @staticmethod43 def save_data_to_file(exchange, stream, pair, data, columns=None):44 file_path = FileManager.get_last_file(exchange, pair, stream)45 with open(file_path, 'a') as file:46 if file.tell() == 0 and columns:47 file.write(columns+"\n")48 file.write(data)49 @staticmethod50 def save_trades_to_file(exchange, stream, pair, trade, columns=None):51 file_path = FileManager.get_last_file(exchange, pair, stream)52 if isinstance(trade, dict):53 with open(file_path, 'a') as file:54 w = csv.DictWriter(file, trade.keys())55 if file.tell() == 0:56 w.writeheader()57 w.writerow(trade)58 elif isinstance(trade, str):59 with open(file_path, 'a') as file:60 if file.tell() == 0 and columns:61 file.write(columns + "\n")...

Full Screen

Full Screen

Efine_RT.py

Source:Efine_RT.py Github

copy

Full Screen

...39decode_istance=deode_TL.reader("raw_data/")40def decode_data(filename,gemroc):41 out_name=("root_files/")+filename.split("raw_data/")[1].split(".")[0]+".root"42 decode_istance.write_root(filename,gemroc, 0, 0, out_name)43def get_last_file(mypath):44 onlyfiles = [f for f in os.listdir(mypath) if (os.path.isfile(os.path.join(mypath, f)) and 'missing_frames' not in f and "efine" in f)]45 onlyfiles.sort()46 return (os.path.join(mypath,onlyfiles[-1]))47def run_acq():48 while True:49 acquire_data(gemroc,acq_time)50 raw_f=get_last_file("raw_data/")51 decode_data(get_last_file("raw_data/"),gemroc)52 root_f=get_last_file("root_files/")53 f=R.TFile(get_last_file("root_files/"))54 f.tree.Draw("efine","")55 c.Update()56 os.remove(raw_f)57 os.remove(root_f)58 #root.update()59def status_run():60 running = True61 62def status_stop():63 running = False64 65#root = tk.Tk()66#frame = tk.Frame(root)67#frame.pack()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful