How to use process_files method in tempest

Best Python code snippet using tempest_python

organize_files.py

Source:organize_files.py Github

copy

Full Screen

1from ast import Raise2import os3import shutil4import re5import pandas as pd6from datetime import datetime7import sys8pd.set_option('display.float_format', lambda x: '%.3f' % x)9skip_rows = {'Client Billing Descending': 0, "Fee Breakdown by Dept and Fee Earner": 3,10 "Fee Summary by Dept and Fee Earner": 3, "Fees Billed": 3, "Matter Source of Business inc Matter Bills": 0,11 "Matters Opened by FE": 3, "Payment Received Analysis": 3, "Total Hours by Fee Earner-With Billings": 0}12def remove_cols(df):13 """ 14 This will remove all the columns that contain the word Textbox in them. 15 This Function takes a DataFrame as in input and returns all the columns except TextBox. 16 """17 cols = df.columns18 new_cols = []19 txt_chk = re.compile(r'Textbox')20 tot_hrs_col_name = ["RecordedHours2","NonChargeHours2","WOHours2","TotalHour2","bankRef"]21 new_cols = [col_name for col_name in cols if not(txt_chk.search(col_name)) and col_name not in tot_hrs_col_name]22 return new_cols23def get_rows(dct, match):24 """25 Takes a Dictionary and a filename as inputs and Returns how many rows need to be skipped for a filename. 26 Returns the Number of rows to skip, while creating a DataFrame.27 """28 for val in dct.keys():29 if re.match(val, match):30 return dct[val]31def get_date_from_Filename(fname):32 """33 Accepts a Filename that has fname_date.csv format. 34 It Extracts the From Date form the File and Returns the same. 35 These Are Datetime Objects. 36 If the Filename has only start date, it will just return the same date for Both Start and End Date. 37 """38 pattern = re.compile(r'_\d*')39 match = pattern.findall(fname)40 dt = match[0]41 dt = dt.split("_")[1]42 file_date = pd.to_datetime(dt, format='%d%m%Y')43 return file_date44def categorize_files(file_loc,log_writer):45 if os.name == 'posix':46 os.system('clear')47 unprocessed = file_loc + "/Unprocessed"48 processed = file_loc + "/Processed"49 separator = "/"50 else:51 os.system('cls')52 unprocessed = file_loc + "\\Unprocessed"53 processed = file_loc + "\\Processed"54 separator = "\\"55 ess_fold_list = [unprocessed,processed]56 57 for fold in ess_fold_list:58 if os.path.exists(fold):59 log_writer.info(f'\n[{fold}] already Exists in [{file_loc}]\n')60 else:61 os.mkdir(file_loc+separator+fold)62 log_writer.info(f'\nCreating Folder {fold} in {file_loc}\n')63 total_csv = len([f for f in os.listdir(file_loc) if f.endswith('.csv')])64 if total_csv > 0:65 log_writer.info(f"Found {total_csv} csv files")66 elif total_csv == 0:67 log_writer.info("No CSV Files Found. Exiting.")68 sys.exit()69 process_files = {}70 discard_files = {}71 file_list = os.listdir(file_loc)72 log_writer.info(f"Any xls file and files having Pie in the names will not be Processed")73 # Segregating the Files.74 discard_files['all_pie'] = [files for files in file_list if len(re.compile(r'[\sa-zA-Z\s]+Pie \w+_\d+.csv').findall(files))]75 discard_files['all_xlsx'] = [files for files in file_list if files.endswith(".xlsx")]76 # Move the above files to Unprocessed Folder before moving ahead77 for f in discard_files.keys():78 log_writer.info(f'Moving out files of list [{f}] to folder [{unprocessed}]')79 try:80 [shutil.move(file_loc + separator + file, unprocessed) for file in discard_files[f]]81 except:82 log_writer.info("File Already exists in Destination")83 else:84 log_writer.info("Files moved")85 process_files['client_billing'] = [files for files in file_list if len(re.compile(r'Client [a-zA-Z\s]+_\d+.csv').findall(files))]86 cnt = len(process_files['client_billing'])87 log_writer.info(f'Found [{cnt}] files of Client BIlling ')88 89 process_files['fee_brkdn_dept_fe'] = [files for files in file_list if len(re.compile(r'Fee Breakdown [a-zA-Z\s]+_\d+.csv').findall(files))]90 cnt = len(process_files['fee_brkdn_dept_fe'])91 log_writer.info(f'Found [{cnt}] files of Fee Breakdown by Dept ')92 93 process_files['fee_summ_dept_fe'] = [files for files in file_list if len(re.compile(r'Fee Summary [a-zA-Z\s]+_\d+.csv').findall(files))]94 cnt = len(process_files['fee_summ_dept_fe'])95 log_writer.info(f'Found [{cnt}] files of Fee Summary by Dept ')96 97 process_files['fees_billed'] = [files for files in file_list if len(re.compile(r'Fees B[a-zA-Z\s]+_\d+.csv').findall(files))]98 cnt = len(process_files['fees_billed'])99 log_writer.info(f'Found [{cnt}] files of Fees BIlled ')100 101 process_files['matter_src'] = [files for files in file_list if len(re.compile(r'Matter Source [a-zA-Z\s()]+_\d+.csv').findall(files))]102 cnt = len(process_files['matter_src'])103 log_writer.info(f'Found [{cnt}] files of Matter Source Reference ')104 105 process_files['matter_opened'] = [files for files in file_list if len(re.compile(r'Matters Open[\sa-zA-Z\s()]+_\d+.csv').findall(files))]106 cnt = len(process_files['matter_opened'])107 log_writer.info(f'Found [{cnt}] files of Matter Opened by FE ')108 109 process_files['payment_rcv'] = [files for files in file_list if len(re.compile(r'Payment [\sa-zA-Z\s()]+_\d+.csv').findall(files))]110 cnt = len(process_files['payment_rcv'])111 log_writer.info(f'Found [{cnt}] files of Payment Received ')112 113 process_files['tot_hrs_fe'] = [files for files in file_list if len(re.compile(r'([tT]otal[\sa-z-A-Z\s]*_\d+.csv)').findall(files))]114 cnt = len(process_files['tot_hrs_fe'])115 log_writer.info(f'Found [{cnt}] files of Total Hours by Fee Earner ')116 for f in process_files.keys():117 log_writer.info(f'Moving category [{f}]')118 try:119 [shutil.move(file_loc + separator + file, processed) for file in process_files[f]]120 #[log_writer.info(f' Moving files {file} to processed') for file in process_files[f]]121 except:122 log_writer.info("Error Moving File {}".format(f))123 else:124 [log_writer.info(f'Moving File --> {file}') for file in process_files[f]]125 pass126 127 return process_files128def concat_files(dict_list, file_loc, log_writer):129 130 df_all_files = {}131 dict_fname= ""132 # If a Filename is not in this Dictionary, then it will not be Considered. 133 # date = (datetime.now()).strftime("%m-%d-%y")134 log_writer.info(f'Following Keys will be processed - [{dict_list.keys()}]')135 for file_cat in dict_list.keys():136 df_final = pd.DataFrame()137 log_writer.info('*' * 50)138 log_writer.info(f'Processing Category {file_cat}')139 for file in dict_list[file_cat]:140 dict_fname = file.split("_")[0]141 dfc_file = pd.read_csv((file_loc + "/Processed/" + file), skiprows=get_rows(skip_rows, file))142 dfc_file = dfc_file[remove_cols(dfc_file)]143 processing_date = get_date_from_Filename(file)144 dfc_file["Date_Added"] = processing_date145 df_final = pd.concat([df_final, dfc_file], ignore_index=True)146 df_final.fillna(0)147 df_final = df_final.replace(re.compile(r'£'), "").replace(re.compile(r','), "").replace(re.compile(r'\('),"-").replace(re.compile(r'\)'), "")148 for cols in df_final.columns:149 try:150 df_final[cols].astype(float)151 except:152 continue153 # log_writer.info(f'Skipping Column {cols}')154 else:155 # log_writer.info(f'Converting {cols} to float')156 df_final[cols] = df_final[cols].astype(float)157 df_all_files[dict_fname] = df_final.sort_values(by="Date_Added",ascending=True)158 for f in df_all_files.keys():159 rows = df_all_files[f].shape[0]160 log_writer.info(f'Will Add -> {rows} entries for [{f}] to the database')...

Full Screen

Full Screen

create_data_records.py

Source:create_data_records.py Github

copy

Full Screen

...10 if _opts.dataset == 'imagenet':11 if _opts.split == 'val':12 from src.data.dataprepare.imagenet.records_writer import ValRecordsWriter13 records_writer = ValRecordsWriter(_opts.data_dir, _opts.target_dir)14 records_writer.process_files()15 return16 if _opts.split == 'train':17 from src.data.dataprepare.imagenet.records_writer import TrainRecordsWriter18 records_writer = TrainRecordsWriter(_opts.data_dir, _opts.target_dir)19 records_writer.process_files()20 return21 if _opts.dataset == 'stanford_dogs':22 if _opts.split == 'val':23 from src.data.dataprepare.stanford_dogs.records_writer import ValRecordsWriter24 records_writer = ValRecordsWriter(_opts.data_dir, _opts.target_dir)25 records_writer.process_files()26 return27 if _opts.split == 'train':28 from src.data.dataprepare.stanford_dogs.records_writer import TrainRecordsWriter29 records_writer = TrainRecordsWriter(_opts.data_dir, _opts.target_dir)30 records_writer.process_files()31 return32 if _opts.dataset == 'cub200':33 assert _opts.data_dir is not None34 from src.data.dataprepare.cub200.records_writer import RecordsWriter35 records_writer = RecordsWriter(_opts.data_dir, 1 if _opts.split == 'train' else 0, _opts.target_dir)36 records_writer.process_files()37 return38 if _opts.dataset == 'kodak':39 assert _opts.data_dir is not None40 from src.data.dataprepare.kodak.records_writer import RecordsWriter41 records_writer = RecordsWriter(_opts.data_dir, _opts.target_dir)42 records_writer.process_files()43 return44if __name__ == '__main__':...

Full Screen

Full Screen

runMain.py

Source:runMain.py Github

copy

Full Screen

...6 if filename.endswith(".xml"):7 print('Opening file {0} ...'.format(filename))8 try:9 # with open(path + filename) as f:10 # process_files(f.read(), filename)11 process_files(filename)12 except IOError as exc:13 if exc.errno != errno.EISDIR:14 raise15 if filename.endswith(".txt"):16 print('Opening file {0} ...'.format(filename))17 try:18 # with open(path + filename) as f:19 # process_files(f.read(), filename)20 process_files(filename)21 except IOError as exc:22 if exc.errno != errno.EISDIR:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful