Best Python code snippet using tempest_python
organize_files.py
Source:organize_files.py  
1from ast import Raise2import os3import shutil4import re5import pandas as pd6from datetime import datetime7import sys8pd.set_option('display.float_format', lambda x: '%.3f' % x)9skip_rows = {'Client Billing Descending': 0, "Fee Breakdown by Dept and Fee Earner": 3,10             "Fee Summary by Dept and Fee Earner": 3, "Fees Billed": 3, "Matter Source of Business inc Matter Bills": 0,11             "Matters Opened by FE": 3, "Payment Received Analysis": 3, "Total Hours by Fee Earner-With Billings": 0}12def remove_cols(df):13    """ 14        This will remove all the columns that contain the word Textbox in them. 15        This Function takes a DataFrame as in input and returns all the columns except TextBox. 16    """17    cols = df.columns18    new_cols = []19    txt_chk = re.compile(r'Textbox')20    tot_hrs_col_name = ["RecordedHours2","NonChargeHours2","WOHours2","TotalHour2","bankRef"]21    new_cols = [col_name for col_name in cols if not(txt_chk.search(col_name)) and col_name not in tot_hrs_col_name]22    return new_cols23def get_rows(dct, match):24    """25        Takes a Dictionary and a filename as inputs and Returns how many rows need to be skipped for a filename. 26        Returns the Number of rows to skip, while creating a DataFrame.27    """28    for val in dct.keys():29        if re.match(val, match):30            return dct[val]31def get_date_from_Filename(fname):32    """33        Accepts a Filename that has fname_date.csv format. 34        It Extracts the From Date form the File and Returns the same. 35        These Are Datetime Objects.  36        If the Filename has only start date, it will just return the same date for Both Start and End Date. 37    """38    pattern = re.compile(r'_\d*')39    match = pattern.findall(fname)40    dt = match[0]41    dt = dt.split("_")[1]42    file_date = pd.to_datetime(dt, format='%d%m%Y')43    return file_date44def categorize_files(file_loc,log_writer):45    if os.name == 'posix':46            os.system('clear')47            unprocessed = file_loc + "/Unprocessed"48            processed = file_loc + "/Processed"49            separator = "/"50    else:51        os.system('cls')52        unprocessed = file_loc + "\\Unprocessed"53        processed = file_loc + "\\Processed"54        separator = "\\"55    ess_fold_list = [unprocessed,processed]56    57    for fold in ess_fold_list:58        if os.path.exists(fold):59            log_writer.info(f'\n[{fold}] already Exists in [{file_loc}]\n')60        else:61            os.mkdir(file_loc+separator+fold)62            log_writer.info(f'\nCreating Folder {fold} in {file_loc}\n')63    total_csv = len([f for f in os.listdir(file_loc) if f.endswith('.csv')])64    if total_csv > 0:65        log_writer.info(f"Found {total_csv} csv files")66    elif total_csv == 0:67        log_writer.info("No CSV Files Found. Exiting.")68        sys.exit()69    process_files = {}70    discard_files = {}71    file_list = os.listdir(file_loc)72    log_writer.info(f"Any xls file and files having Pie in the names will not be Processed")73    # Segregating the Files.74    discard_files['all_pie'] = [files for files in file_list if len(re.compile(r'[\sa-zA-Z\s]+Pie \w+_\d+.csv').findall(files))]75    discard_files['all_xlsx'] = [files for files in file_list if files.endswith(".xlsx")]76    # Move the above files to Unprocessed Folder before moving ahead77    for f in discard_files.keys():78                log_writer.info(f'Moving out files of list [{f}] to folder [{unprocessed}]')79                try:80                    [shutil.move(file_loc + separator + file, unprocessed)  for file in discard_files[f]]81                except:82                    log_writer.info("File Already exists in Destination")83                else:84                    log_writer.info("Files moved")85    process_files['client_billing'] = [files for files in file_list if len(re.compile(r'Client [a-zA-Z\s]+_\d+.csv').findall(files))]86    cnt = len(process_files['client_billing'])87    log_writer.info(f'Found [{cnt}] files of Client BIlling ')88    89    process_files['fee_brkdn_dept_fe'] = [files for files in file_list if len(re.compile(r'Fee Breakdown [a-zA-Z\s]+_\d+.csv').findall(files))]90    cnt = len(process_files['fee_brkdn_dept_fe'])91    log_writer.info(f'Found [{cnt}] files of Fee Breakdown by Dept ')92    93    process_files['fee_summ_dept_fe'] = [files for files in file_list if len(re.compile(r'Fee Summary [a-zA-Z\s]+_\d+.csv').findall(files))]94    cnt = len(process_files['fee_summ_dept_fe'])95    log_writer.info(f'Found [{cnt}] files of Fee Summary by Dept ')96        97    process_files['fees_billed'] = [files for files in file_list if len(re.compile(r'Fees B[a-zA-Z\s]+_\d+.csv').findall(files))]98    cnt = len(process_files['fees_billed'])99    log_writer.info(f'Found [{cnt}] files of Fees BIlled ')100        101    process_files['matter_src'] = [files for files in file_list if len(re.compile(r'Matter Source [a-zA-Z\s()]+_\d+.csv').findall(files))]102    cnt = len(process_files['matter_src'])103    log_writer.info(f'Found [{cnt}] files of Matter Source Reference ')104     105    process_files['matter_opened'] = [files for files in file_list if len(re.compile(r'Matters Open[\sa-zA-Z\s()]+_\d+.csv').findall(files))]106    cnt = len(process_files['matter_opened'])107    log_writer.info(f'Found [{cnt}] files of Matter Opened by FE ')108    109    process_files['payment_rcv'] = [files for files in file_list if len(re.compile(r'Payment [\sa-zA-Z\s()]+_\d+.csv').findall(files))]110    cnt = len(process_files['payment_rcv'])111    log_writer.info(f'Found [{cnt}] files of Payment Received ')112       113    process_files['tot_hrs_fe'] = [files for files in file_list if len(re.compile(r'([tT]otal[\sa-z-A-Z\s]*_\d+.csv)').findall(files))]114    cnt = len(process_files['tot_hrs_fe'])115    log_writer.info(f'Found [{cnt}] files of Total Hours by Fee Earner ')116    for f in process_files.keys():117        log_writer.info(f'Moving category [{f}]')118        try:119            [shutil.move(file_loc + separator + file, processed)  for file in process_files[f]]120            #[log_writer.info(f' Moving files {file} to processed')  for file in process_files[f]]121        except:122            log_writer.info("Error Moving File {}".format(f))123        else:124            [log_writer.info(f'Moving File --> {file}')  for file in process_files[f]]125            pass126        127    return process_files128def concat_files(dict_list, file_loc, log_writer):129    130    df_all_files = {}131    dict_fname= ""132     # If a Filename is not in this Dictionary, then it will not be Considered. 133    # date = (datetime.now()).strftime("%m-%d-%y")134    log_writer.info(f'Following Keys will be processed - [{dict_list.keys()}]')135    for file_cat in dict_list.keys():136        df_final = pd.DataFrame()137        log_writer.info('*' * 50)138        log_writer.info(f'Processing Category {file_cat}')139        for file in dict_list[file_cat]:140            dict_fname = file.split("_")[0]141            dfc_file = pd.read_csv((file_loc + "/Processed/" + file), skiprows=get_rows(skip_rows, file))142            dfc_file = dfc_file[remove_cols(dfc_file)]143            processing_date = get_date_from_Filename(file)144            dfc_file["Date_Added"] = processing_date145            df_final = pd.concat([df_final, dfc_file], ignore_index=True)146            df_final.fillna(0)147            df_final = df_final.replace(re.compile(r'£'), "").replace(re.compile(r','), "").replace(re.compile(r'\('),"-").replace(re.compile(r'\)'), "")148            for cols in df_final.columns:149                try:150                    df_final[cols].astype(float)151                except:152                    continue153                    # log_writer.info(f'Skipping Column {cols}')154                else:155                    # log_writer.info(f'Converting {cols} to float')156                    df_final[cols] = df_final[cols].astype(float)157        df_all_files[dict_fname] = df_final.sort_values(by="Date_Added",ascending=True)158    for f in df_all_files.keys():159        rows = df_all_files[f].shape[0]160        log_writer.info(f'Will Add -> {rows} entries for [{f}] to the database')...create_data_records.py
Source:create_data_records.py  
...10    if _opts.dataset == 'imagenet':11        if _opts.split == 'val':12            from src.data.dataprepare.imagenet.records_writer import ValRecordsWriter13            records_writer = ValRecordsWriter(_opts.data_dir, _opts.target_dir)14            records_writer.process_files()15            return16        if _opts.split == 'train':17            from src.data.dataprepare.imagenet.records_writer import TrainRecordsWriter18            records_writer = TrainRecordsWriter(_opts.data_dir, _opts.target_dir)19            records_writer.process_files()20            return21    if _opts.dataset == 'stanford_dogs':22        if _opts.split == 'val':23            from src.data.dataprepare.stanford_dogs.records_writer import ValRecordsWriter24            records_writer = ValRecordsWriter(_opts.data_dir, _opts.target_dir)25            records_writer.process_files()26            return27        if _opts.split == 'train':28            from src.data.dataprepare.stanford_dogs.records_writer import TrainRecordsWriter29            records_writer = TrainRecordsWriter(_opts.data_dir, _opts.target_dir)30            records_writer.process_files()31            return32    if _opts.dataset == 'cub200':33        assert _opts.data_dir is not None34        from src.data.dataprepare.cub200.records_writer import RecordsWriter35        records_writer = RecordsWriter(_opts.data_dir, 1 if _opts.split == 'train' else 0, _opts.target_dir)36        records_writer.process_files()37        return38    if _opts.dataset == 'kodak':39        assert _opts.data_dir is not None40        from src.data.dataprepare.kodak.records_writer import RecordsWriter41        records_writer = RecordsWriter(_opts.data_dir, _opts.target_dir)42        records_writer.process_files()43        return44if __name__ == '__main__':...runMain.py
Source:runMain.py  
...6    if filename.endswith(".xml"):7        print('Opening file {0} ...'.format(filename))8        try:9            # with open(path + filename) as f:10            #    process_files(f.read(), filename)11            process_files(filename)12        except IOError as exc:13            if exc.errno != errno.EISDIR:14                raise15    if filename.endswith(".txt"):16        print('Opening file {0} ...'.format(filename))17        try:18            # with open(path + filename) as f:19            #    process_files(f.read(), filename)20            process_files(filename)21        except IOError as exc:22            if exc.errno != errno.EISDIR:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
