How to use get_process_name method in autotest

Best Python code snippet using autotest_python

parse_comments.py

Source:parse_comments.py Github

copy

Full Screen

...48def read_bzfile(item):49 if not item: return50 bzfile, name = item51 #if not db_exists(name):52 print(get_process_name(), 'Read Archive', bzfile, 'name', name)53 source_file = bz2.BZ2File(bzfile, "r")54 parse_comment(source_file, name)55 return name56def start_parse():57 global archives, num_runned_threads58 config = get_config('config.json')59 total = len(archives)60 processed = 061 all_files = []62 with Pool(processes=config['max_threads']) as pool:63 for batch_files in iterate_by_batch(archives, config['max_threads'], None):64 files = pool.map(read_bzfile, batch_files, 1)65 processed += len(files)66 print('Processed', files, ',', processed, 'of', total)67 all_files += files68def db_exists(name):69 if os.path.isfile('C:/db/{}.db'.format(name)):70 print(get_process_name(), 'File ', name, 'Exists in C:/db/')71 return True72 if os.path.isfile('D:/datasets/db/{}.db'.format(name)):73 print(get_process_name(), 'File ', name, 'Exists in D:/datasets/db')74 return True75 print(get_process_name(), 'File', name, 'Doesnt Exists')76 return False77def get_config(filename):78 with open(filename) as f:79 return json.loads(f.read())80def iterate_by_batch(array_list, amount, fillvalue=None):81 args = [iter(array_list)] * amount82 return zip_longest(*args, fillvalue=fillvalue)83def parse_comment(file, db_name):84 total_rows = 085 parsed_rows = 086 replaced_rows = 087 print(get_process_name(), 'Create db', db_name, 'in', db_folder)88 repository = CommentsRepository(SQLiteStorage(db_name, db_folder=db_folder))89 repository.create_table()90 for row in file:91 total_rows += 192 try:93 row = json.loads(row)94 comment_id = row.get('name', 't1_' + row.get('id'))95 parent_id = row['parent_id']96 body = format_data(row['body'])97 created_at = row['created_utc']98 score = row['score']99 subreddit = row['subreddit']100 parent_data = repository.find_parent_comment(parent_id)101 except Exception as e:102 print(get_process_name(), 'Exception', str(e))103 print(get_process_name(), row)104 continue105 if score >= 2 and acceptable(body):106 existing_comment_score = repository.find_existing_score(parent_id)107 if existing_comment_score:108 if score > existing_comment_score:109 repository.replace_comment(comment_id, parent_id, parent_data, body, subreddit, created_at, score)110 replaced_rows += 1111 else:112 if parent_data:113 repository.insert_has_parent(comment_id, parent_id, parent_data, body, subreddit, created_at, score)114 parsed_rows += 1115 else:116 repository.insert_no_parent(comment_id, parent_id, body, subreddit, created_at, score)117 if total_rows % 500000 == 0:118 print(get_process_name(), 'File {}, Total rows read: {}, Paired rows: {}, Time: {}'.format(db_name, total_rows, parsed_rows, str(datetime.now())))119 print(get_process_name(), 'Finish ', db_name, "Total rows read: {}, Paired rows: {}".format(total_rows, parsed_rows))120def format_data(body):121 return body.replace("\n", "<EOF>").replace("\r", "<EOF>").replace('"', "'")122def acceptable(data, treshold=50):123 if len(data.split(' ')) > treshold or len(data) < 1:124 return False125 if len(data) > 1000:126 return False127 if data == '[deleted]' or data == '[removed]':128 return False129 return True130def sizeof_fmt(num, suffix='B'):131 for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:132 if abs(num) < 1024.0:133 return "%3.1f %s%s" % (num, unit, suffix)134 num /= 1024.0135 return "%.1f %s%s" % (num, 'Yi', suffix)136def move_file(file, dest):137 shutil.move(file, os.path.join(dest, os.path.basename(file)))138def check_disk_space():139 space = FreeSpace(r'C:')140 print('Check disk space', space)141 if space < 10:142 files = os.listdir(db_folder)143 #move_file(os.path.join(db_folder, files[0]), 'D:/datasets/db')144 #print('Move file', files[0], 'to D:/datasets/db')145def get_process_name():146 return "Process {}".format(os.getpid())147if __name__ == "__main__":148 parse_comments()...

Full Screen

Full Screen

Process.py

Source:Process.py Github

copy

Full Screen

...13 def add_key_value(self, key, value):14 self[key] = value1516 def get_current_status(process_id):17 table_name = JobInfo.get_process_name(process_id)18 return str(MongoOperations.get_max_record_from_db(table_name))1920 def get_updated_application_id(table_name):21 curr_date = '2020-05-17'#str(datetime.datetime.now().date())22 return MongoOperations.get_latest_app_id(table_name, curr_date)2324 def insert_initial_process_detailss(process_id, jc_status):25 insert_document = {}26 insert_document["date"] = "2020-05-17"#str(datetime.datetime.now().date())27 insert_document["process_id"] = process_id28 insert_document["process_name"] = JobInfo.get_process_name(process_id)29 insert_document["status"] = jc_status30 for i in StatusInfo.get_be_trends_columns(process_id):31 insert_document[i] = None32 insert_document["output_count"] = None33 insert_document["application_id"] = None34 insert_document["start_time"] = str(datetime.datetime.now().time())35 insert_document["end_time"] = None36 insert_document["number_of_retries"] = None37 insert_document["list_of_alerts"] = None38 insert_document["failed_reasons"] = None39 MongoOperations.insert_record(DEVICE_RECON, insert_document)40 return None4142 def update_be_trends(process_id):43 table_name = JobInfo.get_process_name(process_id)44 appl_id = Processes.get_updated_application_id(table_name)45 # if appl_id is not None:46 # appl_id = "application_id_01" #delete thi after testing.47 be_trends_dict = StatusInfo.get_be_trends(appl_id)48 for be_column_name, be_column_value in be_trends_dict.items():49 MongoOperations.update_be_trends(table_name, appl_id, be_column_name, be_column_value)5051 def update_be_trends_for_after_failed(process_id):52 table_name = JobInfo.get_process_name(process_id)53 appl_id = Processes.get_updated_application_id(table_name)54 # if appl_id is not None:55 # appl_id = "application_id_01" #delete thi after testing.56 be_trends_dict = StatusInfo.get_be_trends(appl_id)57 for be_column_name, be_column_value in be_trends_dict.items():58 MongoOperations.update_be_trends_for_after_failed(table_name, appl_id, be_column_name, be_column_value)5960 def update_final_output_count(process_id):61 table_name = JobInfo.get_process_name(process_id)62 process_name = JobInfo.get_process_name(process_id)63 date = '2020-05-17'#str(datetime.datetime.now().date())64 output_count = StatusInfo.get_output_count(process_name, date)65 MongoOperations.update_final_output_count(table_name, output_count, date)6667 def update_application_id(process_id):68 table_name = JobInfo.get_process_name(process_id)69 yarn_process_name = JobInfo.get_yarn_process_name(process_id)70 appl_id = StatusInfo.get_application_id(yarn_process_name)71 MongoOperations.update_application_id(table_name, "2020-05-17", appl_id) #str(datetime.datetime.now().date())7273 def update_end_time(process_id):74 table_name = JobInfo.get_process_name(process_id)75 MongoOperations.update_end_time(table_name, '2020-05-17',#str(datetime.datetime.now().date()),76 str(datetime.datetime.now().time()))7778 def update_status(process_id):79 table_name = JobInfo.get_process_name(process_id)80 jc_name = JobInfo.get_jc_name(process_id)81 curr_status = StatusInfo.get_jc_status("2020-05-17", jc_name) #str(datetime.datetime.now().date()82 MongoOperations.update_status(table_name, curr_status)8384 def process_rerunning(process_id):85 yarn_process_name = JobInfo.get_yarn_process_name(process_id)86 return StatusInfo.check_if_the_process_rerunning(yarn_process_name)8788 def return_count_of_records(process_id):89 table_name = JobInfo.get_process_name(process_id)90 MongoOperations.return_count_of_records(table_name)9192 def check_for_todays_date_if_present(process_id):93 table_name = JobInfo.get_process_name(process_id)94 date = "2020-05-17" #str(datetime.datetime.now().date()95 return MongoOperations.check_for_todays_date_if_present(table_name, date)96 97 def check_for_yesterdays_date_if_present(process_id):98 table_name = JobInfo.get_process_name(process_id)99 date = "2020-05-17" #str(datetime.datetime.now().date()100 return MongoOperations.check_for_todays_date_if_present(table_name, date)101 102 def check_for_day_before_sterday_date_if_present(process_id):103 table_name = JobInfo.get_process_name(process_id)104 date = "2020-05-17" #str(datetime.datetime.now().date()105 return MongoOperations.check_for_todays_date_if_present(table_name, date)106 107 ...

Full Screen

Full Screen

platform_CheckCriticalProcesses.py

Source:platform_CheckCriticalProcesses.py Github

copy

Full Screen

...10 Builds a process list (without spawning 'ps'), and validates11 that among these processes all the expected processes are running.12 """13 version = 114 def get_process_name(self, pid):15 """Gathers info about one process, given its PID16 @param pid string representing the process ID17 @return string process name18 """19 with open(os.path.join('/proc', pid, 'status')) as pid_status_file:20 for line in pid_status_file:21 fields = re.split('\s+',line)22 if fields[0] == 'Name:':23 return fields[1]24 def get_process_list(self):25 """Returns the set the process names"""26 process_names = set()27 for pid in os.listdir('/proc'):28 if not pid.isdigit():29 continue30 # There can be a race where after we listdir(), a process31 # exits. In that case get_process_name will throw an IOError32 # becase /proc/NNNN won't exist.33 # In those cases, skip to the next go-round of our loop.34 try:35 process_names.add(self.get_process_name(pid))36 except IOError:37 continue38 return process_names39 def run_once(self, process_list):40 """41 Verify processes in |process_list| are running.42 @param process_list: list of process names to check43 """44 processes = self.get_process_list()45 missing_processes = []46 for p in process_list:47 processes_names = p.split('|')48 if set(processes_names).isdisjoint(processes):49 missing_processes.append(p)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful