How to use fail_path method in avocado

Best Python code snippet using avocado_python

merge.py

Source:merge.py Github

copy

Full Screen

1import csv2import sys3import os4import matplotlib.pyplot as plt5import numpy as np6def parseFailCsv(filename):7 fail_path = []8 fail_fund = []9 indices = []10 maximum = 011 with open(filename) as csv_file:12 csv_reader = csv.reader(csv_file, delimiter=',')13 line_count = 014 for row in csv_reader:15 if line_count == 0:16 indices = [i for i, x in enumerate(row) if x == "average"]17 line_count += 118 else:19 fail_path.append(row[indices[0]])20 fail_fund.append(row[indices[1]])21 line_count += 122 if (maximum < float(row[indices[0]])):23 maximum = float(row[indices[0]])24 if (maximum < float(row[indices[1]])):25 maximum = float(row[indices[1]])26 return (fail_path, fail_fund, maximum)27def parseImbalanceCsv(filename):28 time = []29 imbalance = []30 indices = []31 with open(filename) as csv_file:32 csv_reader = csv.reader(csv_file, delimiter=',')33 line_count = 034 for row in csv_reader:35 if line_count == 0:36 id_time = [i for i, x in enumerate(row) if x == "time"]37 indices = [i for i, x in enumerate(row) if x == "average"]38 line_count += 139 else:40 time.append(row[id_time[0]])41 imbalance.append(row[indices[0]])42 line_count += 143 return (time, imbalance)44def parseDir(dir):45 fail_path = []46 fail_fund = []47 imbalance = []48 imb_tag = []49 fail_tag = []50 time = []51 for filename in os.listdir(dir):52 if filename.endswith("_imbalance.csv"):53 (tm, imb) = parseImbalanceCsv(os.path.join(dir, filename))54 imbalance.append(imb)55 imb_tag.append(filename.split("_imbalance.csv")[0])56 time = tm57 else:58 continue59 y_max = 060 for filename in os.listdir(dir):61 if filename.endswith("_fail.csv"):62 (fail_p, fail_f, fail_max) = parseFailCsv(os.path.join(dir, filename))63 if (fail_max > y_max):64 y_max = fail_max65 fail_path.append(fail_p)66 fail_fund.append(fail_f)67 fail_tag.append(filename.split("_fail.csv")[0])68 else:69 continue70 return (time, fail_tag, fail_path, fail_fund, imb_tag, imbalance, y_max)71def plot(dir, time, fail_tag, fail_path, fail_fund, imb_tag, imbalance,72 fail_max):73 time = [float(i) for i in time]74 for j, ff in enumerate(fail_fund):75 ff = [float(i) for i in fail_fund[j]]76 plt.plot(time, ff, label=fail_tag[j])77 plt.axis([0, int(time[len(time) - 1] + 1), 0, fail_max + 10])78 plt.xlabel('time')79 plt.ylabel('fail transmit')80 plt.legend()81 figure = plt.gcf()82 figure.set_size_inches(10.5, 5.5)83 plt.savefig(os.path.join(dir, 'fail.png'))84 plt.clf()85 for j, tag in enumerate(fail_tag):86 if 'opt' in tag:87 ff = []88 cp_tag = 'default_' + tag.split('_')[1]89 cp_idx = fail_tag.index(cp_tag)90 for k, opt_f in enumerate(fail_fund[j]):91 ff.append(float(fail_fund[cp_idx][k]) - float(opt_f))92 plt.plot(time, ff, label=tag.split('_')[1])93 plt.xlabel('time')94 plt.ylabel('fail transmit')95 plt.legend()96 figure = plt.gcf()97 figure.set_size_inches(10.5, 5.5)98 plt.savefig(os.path.join(dir, 'fail_compare.png'))99 plt.clf()100 for j, imb in enumerate(imbalance):101 imb = [float(i) for i in imbalance[j]]102 plt.plot(time, imb, label=imb_tag[j])103 plt.axis([0, int(time[len(time) - 1] + 1), 0, 1])104 plt.xlabel('time')105 plt.ylabel('channel imbalance')106 plt.legend()107 figure = plt.gcf()108 figure.set_size_inches(10.5, 5.5)109 plt.savefig(os.path.join(dir, 'imbalance.png'))110def outCsv(dir):111 (time, fail_tag, fail_path, fail_fund, imb_tag, imbalance, fail_max) = parseDir(dir)112 with open(os.path.join(dir, 'merge.csv'), 'w') as csv_file:113 writer = csv.writer(csv_file)114 tag = ['time'] + imb_tag + ['']115 tag = tag + ['time'] + fail_tag + ['']116 tag = tag + ['time'] + fail_tag117 writer.writerow(tag)118 for i in range(len(fail_path[0])):119 row = []120 row.append(time[i])121 for d in imbalance:122 row.append(d[i])123 row.append('')124 row.append(time[i])125 for d in fail_path:126 row.append(d[i])127 row.append('')128 row.append(time[i])129 for d in fail_fund:130 row.append(d[i])131 writer.writerow(row)132 plot(dir, time, fail_tag, fail_path, fail_fund, imb_tag, imbalance,133 fail_max)134if __name__ == '__main__':135 if (len(sys.argv)) < 2:136 print('No directory')137 exit()...

Full Screen

Full Screen

dataqueue.py

Source:dataqueue.py Github

copy

Full Screen

1"""Data stores."""2import json3import os4import shutil5import errno6import settings7DATA_STORE = os.path.join(settings.DATASTORE_PATH, 'datastores')8def make_sure_path_exists(path):9 """Create dir if it doesn't exist."""10 try:11 os.makedirs(path)12 except OSError as exception:13 if exception.errno != errno.EEXIST:14 raise15class DataQueue(object):16 """Handle data streams."""17 def __init__(self, name):18 """Constructor."""19 self.name = name20 self.PENDING_PATH = os.path.join(DATA_STORE, name, 'pending')21 self.SUCCESS_PATH = os.path.join(DATA_STORE, name, 'success')22 self.FAIL_PATH = os.path.join(DATA_STORE, name, 'fail')23 self.create_stores(name)24 def destroy_stores(self):25 """Destroy all the stores."""26 shutil.rmtree(os.path.join(DATA_STORE, self.name))27 def create_stores(self, name):28 """Create the stores."""29 make_sure_path_exists(self.PENDING_PATH)30 make_sure_path_exists(self.SUCCESS_PATH)31 make_sure_path_exists(self.FAIL_PATH)32 def add(self, data, id):33 """Write a data piece to the input stream."""34 filename = os.path.join(self.PENDING_PATH, '%s.json' % id)35 output_file = open(filename, 'wt')36 output_file.write(json.dumps(data))37 output_file.close()38 def success(self, id):39 """Move a data piece into the success stream."""40 filename_source = os.path.join(self.PENDING_PATH, '%s.json' % id)41 filename_destination = os.path.join(self.SUCCESS_PATH, '%s.json' % id)42 shutil.move(filename_source, filename_destination)43 def fail(self, id, error=None):44 """Move a data piece into the success stream."""45 filename_source = os.path.join(self.PENDING_PATH, '%s.json' % id)46 filename_destination = os.path.join(self.FAIL_PATH, '%s.json' % id)47 shutil.move(filename_source, filename_destination)48 if error:49 self.write_failure_notice(id, error)50 def write_failure_notice(self, id, error):51 """Write the reason why."""52 filename = os.path.join(self.FAIL_PATH, '%s.log' % id)53 output_file = open(filename, 'wt')54 output_file.write(error)55 output_file.close()56 def read(self, id):57 """Read the file and return json object."""58 filename = os.path.join(self.PENDING_PATH, '%s.json' % id)59 file_handle = open(filename, 'rt')60 data = json.load(file_handle)61 file_handle.close()62 return data63 def find(self, id):64 """Get the item no matter which stream it's gone in."""65 seek_paths = [66 self.PENDING_PATH,67 self.SUCCESS_PATH,68 self.FAIL_PATH69 ]70 for path in seek_paths:71 filename = os.path.join(self.PENDING_PATH, '%s.json' % id)72 if os.path.exists(filename):73 file_handle = open(filename, 'rt')74 data = json.load(file_handle)75 file_handle.close()76 return data77 return None78 def next(self):79 """Get the next file from pending."""80 files = os.listdir(self.PENDING_PATH)81 files.sort()82 if len(files) > 0:83 next_id = files[0].split('.json')[0]84 return next_id, self.read(next_id)85 return None, None86 def status(self):87 """Get status of queues."""88 return {89 'name': self.name,90 'pending': len(os.listdir(self.PENDING_PATH)),91 'success': len(os.listdir(self.SUCCESS_PATH)),92 'fail': len(os.listdir(self.FAIL_PATH)),93 }94 def requeue(self):95 """Put the failed items back in."""96 files = os.listdir(self.FAIL_PATH)97 for file_name in files:98 if not file_name.endswith('.log'):99 filename_destination = os.path.join(self.PENDING_PATH, file_name)100 filename_source = os.path.join(self.FAIL_PATH, file_name)...

Full Screen

Full Screen

练习.py

Source:练习.py Github

copy

Full Screen

1# D:\\agon2import os3def next_free(func):4 '初始化装饰器'5 def wrapper(*args,**kwargs):6 res=func(*args,**kwargs)7 next(res)8 return res9 return wrapper10@next_free11def searcher(target):12 while True: #为什么while要放到yield的上面,因为如果在下面send一次后就找不到yield了,就会报错并结束运行13 fail_path = yield14 for i in os.walk(fail_path):15 for j in i[-1]:16 fail_path=("%s\\%s" %(i[0],j))17 target.send(fail_path)18@next_free19def opener(target):20 while True:21 fail_path=yield22 with open(fail_path) as f:23 target.send((fail_path,f))24@next_free25def greper(word,target):26 while True:27 fail_path,f = yield28 for line in f:29 if word in line:30 target.send((fail_path))31@next_free32def printer():33 while True:34 fail_path=yield35 print(fail_path)36g=searcher(opener(greper("python",printer())))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful