Best Python code snippet using autotest_python
parse_comments.py
Source:parse_comments.py  
...48def read_bzfile(item):49    if not item: return50    bzfile, name = item51    #if not db_exists(name):52    print(get_process_name(), 'Read Archive', bzfile, 'name', name)53    source_file = bz2.BZ2File(bzfile, "r")54    parse_comment(source_file, name)55    return name56def start_parse():57    global archives, num_runned_threads58    config = get_config('config.json')59    total = len(archives)60    processed = 061    all_files = []62    with Pool(processes=config['max_threads']) as pool:63        for batch_files in iterate_by_batch(archives, config['max_threads'], None):64            files = pool.map(read_bzfile, batch_files, 1)65            processed += len(files)66            print('Processed', files, ',', processed, 'of', total)67            all_files += files68def db_exists(name):69    if os.path.isfile('C:/db/{}.db'.format(name)):70        print(get_process_name(), 'File ', name, 'Exists in C:/db/')71        return True72    if os.path.isfile('D:/datasets/db/{}.db'.format(name)):73        print(get_process_name(), 'File ', name, 'Exists in D:/datasets/db')74        return True75    print(get_process_name(), 'File', name, 'Doesnt Exists')76    return False77def get_config(filename):78    with open(filename) as f:79        return json.loads(f.read())80def iterate_by_batch(array_list, amount, fillvalue=None):81    args = [iter(array_list)] * amount82    return zip_longest(*args, fillvalue=fillvalue)83def parse_comment(file, db_name):84    total_rows = 085    parsed_rows = 086    replaced_rows = 087    print(get_process_name(), 'Create db', db_name, 'in', db_folder)88    repository = CommentsRepository(SQLiteStorage(db_name, db_folder=db_folder))89    repository.create_table()90    for row in file:91        total_rows += 192        try:93            row = json.loads(row)94            comment_id = row.get('name', 't1_' + row.get('id'))95            parent_id = row['parent_id']96            body = format_data(row['body'])97            created_at = row['created_utc']98            score = row['score']99            subreddit = row['subreddit']100            parent_data = repository.find_parent_comment(parent_id)101        except Exception as e:102            print(get_process_name(), 'Exception', str(e))103            print(get_process_name(), row)104            continue105        if score >= 2 and acceptable(body):106            existing_comment_score = repository.find_existing_score(parent_id)107            if existing_comment_score:108                if score > existing_comment_score:109                    repository.replace_comment(comment_id, parent_id, parent_data, body, subreddit, created_at, score)110                    replaced_rows += 1111            else:112                if parent_data:113                    repository.insert_has_parent(comment_id, parent_id, parent_data, body, subreddit, created_at, score)114                    parsed_rows += 1115                else:116                    repository.insert_no_parent(comment_id, parent_id, body, subreddit, created_at, score)117        if total_rows % 500000 == 0:118            print(get_process_name(), 'File {}, Total rows read: {}, Paired rows: {}, Time: {}'.format(db_name, total_rows, parsed_rows, str(datetime.now())))119    print(get_process_name(), 'Finish ', db_name, "Total rows read: {}, Paired rows: {}".format(total_rows, parsed_rows))120def format_data(body):121    return body.replace("\n", "<EOF>").replace("\r", "<EOF>").replace('"', "'")122def acceptable(data, treshold=50):123    if len(data.split(' ')) > treshold or len(data) < 1:124        return False125    if len(data) > 1000:126        return False127    if data == '[deleted]' or data == '[removed]':128        return False129    return True130def sizeof_fmt(num, suffix='B'):131    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:132        if abs(num) < 1024.0:133            return "%3.1f %s%s" % (num, unit, suffix)134        num /= 1024.0135    return "%.1f %s%s" % (num, 'Yi', suffix)136def move_file(file, dest):137    shutil.move(file, os.path.join(dest, os.path.basename(file)))138def check_disk_space():139    space = FreeSpace(r'C:')140    print('Check disk space', space)141    if space < 10:142        files = os.listdir(db_folder)143        #move_file(os.path.join(db_folder, files[0]), 'D:/datasets/db')144        #print('Move file', files[0], 'to D:/datasets/db')145def get_process_name():146    return "Process {}".format(os.getpid())147if __name__ == "__main__":148    parse_comments()...Process.py
Source:Process.py  
...13    def add_key_value(self, key, value):14        self[key] = value1516    def get_current_status(process_id):17        table_name = JobInfo.get_process_name(process_id)18        return str(MongoOperations.get_max_record_from_db(table_name))1920    def get_updated_application_id(table_name):21        curr_date = '2020-05-17'#str(datetime.datetime.now().date())22        return MongoOperations.get_latest_app_id(table_name, curr_date)2324    def insert_initial_process_detailss(process_id, jc_status):25        insert_document = {}26        insert_document["date"] = "2020-05-17"#str(datetime.datetime.now().date())27        insert_document["process_id"] = process_id28        insert_document["process_name"] = JobInfo.get_process_name(process_id)29        insert_document["status"] = jc_status30        for i in StatusInfo.get_be_trends_columns(process_id):31            insert_document[i] = None32        insert_document["output_count"] = None33        insert_document["application_id"] = None34        insert_document["start_time"] = str(datetime.datetime.now().time())35        insert_document["end_time"] = None36        insert_document["number_of_retries"] = None37        insert_document["list_of_alerts"] = None38        insert_document["failed_reasons"] = None39        MongoOperations.insert_record(DEVICE_RECON, insert_document)40        return None4142    def update_be_trends(process_id):43        table_name = JobInfo.get_process_name(process_id)44        appl_id = Processes.get_updated_application_id(table_name)45        # if appl_id is not None:46        #     appl_id = "application_id_01" #delete thi after testing.47        be_trends_dict = StatusInfo.get_be_trends(appl_id)48        for be_column_name, be_column_value in be_trends_dict.items():49            MongoOperations.update_be_trends(table_name, appl_id, be_column_name, be_column_value)5051    def update_be_trends_for_after_failed(process_id):52        table_name = JobInfo.get_process_name(process_id)53        appl_id = Processes.get_updated_application_id(table_name)54        # if appl_id is not None:55        #     appl_id = "application_id_01" #delete thi after testing.56        be_trends_dict = StatusInfo.get_be_trends(appl_id)57        for be_column_name, be_column_value in be_trends_dict.items():58            MongoOperations.update_be_trends_for_after_failed(table_name, appl_id, be_column_name, be_column_value)5960    def update_final_output_count(process_id):61        table_name = JobInfo.get_process_name(process_id)62        process_name = JobInfo.get_process_name(process_id)63        date = '2020-05-17'#str(datetime.datetime.now().date())64        output_count = StatusInfo.get_output_count(process_name, date)65        MongoOperations.update_final_output_count(table_name, output_count, date)6667    def update_application_id(process_id):68        table_name = JobInfo.get_process_name(process_id)69        yarn_process_name = JobInfo.get_yarn_process_name(process_id)70        appl_id = StatusInfo.get_application_id(yarn_process_name)71        MongoOperations.update_application_id(table_name, "2020-05-17", appl_id)    #str(datetime.datetime.now().date())7273    def update_end_time(process_id):74        table_name = JobInfo.get_process_name(process_id)75        MongoOperations.update_end_time(table_name, '2020-05-17',#str(datetime.datetime.now().date()),76                                        str(datetime.datetime.now().time()))7778    def update_status(process_id):79        table_name = JobInfo.get_process_name(process_id)80        jc_name = JobInfo.get_jc_name(process_id)81        curr_status = StatusInfo.get_jc_status("2020-05-17", jc_name)   #str(datetime.datetime.now().date()82        MongoOperations.update_status(table_name, curr_status)8384    def process_rerunning(process_id):85        yarn_process_name = JobInfo.get_yarn_process_name(process_id)86        return StatusInfo.check_if_the_process_rerunning(yarn_process_name)8788    def return_count_of_records(process_id):89        table_name = JobInfo.get_process_name(process_id)90        MongoOperations.return_count_of_records(table_name)9192    def check_for_todays_date_if_present(process_id):93        table_name = JobInfo.get_process_name(process_id)94        date = "2020-05-17" #str(datetime.datetime.now().date()95        return MongoOperations.check_for_todays_date_if_present(table_name, date)96    97    def check_for_yesterdays_date_if_present(process_id):98        table_name = JobInfo.get_process_name(process_id)99        date = "2020-05-17" #str(datetime.datetime.now().date()100        return MongoOperations.check_for_todays_date_if_present(table_name, date)101    102     def check_for_day_before_sterday_date_if_present(process_id):103        table_name = JobInfo.get_process_name(process_id)104        date = "2020-05-17" #str(datetime.datetime.now().date()105        return MongoOperations.check_for_todays_date_if_present(table_name, date)106    107    
...platform_CheckCriticalProcesses.py
Source:platform_CheckCriticalProcesses.py  
...10    Builds a process list (without spawning 'ps'), and validates11    that among these processes all the expected processes are running.12    """13    version = 114    def get_process_name(self, pid):15        """Gathers info about one process, given its PID16        @param pid string representing the process ID17        @return string process name18        """19        with open(os.path.join('/proc', pid, 'status')) as pid_status_file:20            for line in pid_status_file:21                fields = re.split('\s+',line)22                if fields[0] == 'Name:':23                    return fields[1]24    def get_process_list(self):25        """Returns the set the process names"""26        process_names = set()27        for pid in os.listdir('/proc'):28            if not pid.isdigit():29                continue30            # There can be a race where after we listdir(), a process31            # exits. In that case get_process_name will throw an IOError32            # becase /proc/NNNN won't exist.33            # In those cases, skip to the next go-round of our loop.34            try:35                process_names.add(self.get_process_name(pid))36            except IOError:37                continue38        return process_names39    def run_once(self, process_list):40        """41        Verify processes in |process_list| are running.42        @param process_list: list of process names to check43        """44        processes = self.get_process_list()45        missing_processes = []46        for p in process_list:47            processes_names = p.split('|')48            if set(processes_names).isdisjoint(processes):49                missing_processes.append(p)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
