How to use get_processed_tests method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...56 self.state_file = os.path.join(os.path.dirname(__file__), 'harness_beaker.state')57 self.recipe_id = recipe_id58 self.labc_url = os.environ.get('BEAKER_LAB_CONTROLLER_URL')59 self.hostname = os.environ.get('HOSTNAME')60 self.tests = self.get_processed_tests()61 self.watchdog_pid = None62 self.offline = False63 self.cmd = None64 # handle legacy rhts scripts called from inside tests65 os.environ['PATH'] = "%s:%s" % ('/var/cache/autotest', os.environ['PATH'])66 if harness_args:67'harness_args: %s' % harness_args)68 os.environ['AUTOTEST_HARNESS_ARGS'] = harness_args69 self.args = self.parse_args(harness_args, is_bootstrap)70 logging.debug('harness_beaker: state_file: <%s>', self.state_file)71 logging.debug('harness_beaker: hostname: <%s>', self.hostname)72 logging.debug('harness_beaker: labc_url: <%s>', self.labc_url)73 if not self.hostname:74 raise error.HarnessError('Need valid hostname')75 # hack for flexible debug environment76 labc = not self.offline and self.labc_url or None77 self.bkr_proxy = BkrProxy(self.recipe_id, labc)78 self.setupInitSymlink()79 def parse_args(self, args, is_bootstrap):80 if not args:81 return82 for a in args.split(','):83 if a == 'offline':84 # use cached recipe and stay offline whole time85 self.offline = True86 elif a[:5] == 'cache':87 if len(a) > 5 and a[5] == '=':88 # cache a different recipe instead89 self.recipe_id = a[6:]90 # remotely retrieve recipe, but stay offline during run91 if not is_bootstrap:92 self.offline = True93 elif a[:8] == 'quickcmd':94 if len(a) < 8 or a[8] != '=':95 raise error.HarnessError("Bad use of 'quickcmd'")96 self.cmd = a[9:]97 else:98 raise error.HarnessError("Unknown beaker harness arg: %s" % a)99 def parse_quickcmd(self, args):100 # hack allow tests to quickly submit feedback through harness101 if not args:102 return103 if 'BEAKER_TASK_ID' not in os.environ:104 raise error.HarnessError("No BEAKER_TASK_ID set")105 task_id = os.environ['BEAKER_TASK_ID']106 # Commands are from tests and should be reported as results107 cmd, q_args = args.split(':')108 if cmd == 'submit_log':109 try:110 # rhts_submit_log has as args: -S -T -l111 # we just care about -l112 f = None113 arg_list = q_args.split(' ')114 while arg_list:115 arg = arg_list.pop(0)116 if arg == '-l':117 f = arg_list.pop(0)118 break119 if not f:120 raise HarnessException("Argument -l not found in q_args "121 "'%s'" % q_args)122 self.bkr_proxy.task_upload_file(task_id, f)123 except Exception:124 logging.critical('ERROR: Failed to process quick cmd %s' % cmd)125 elif cmd == 'submit_result':126 def init_args(testname='Need/a/testname/here', status="None", logfile=None, score="0"):127 return testname, status, logfile, score128 try:129 # report_result has TESTNAME STATUS LOGFILE SCORE130 arg_list = q_args.split(' ')131 testname, status, logfile, score = init_args(*arg_list)132 resultid = self.bkr_proxy.task_result(task_id, status,133 testname, score, '')134 if (logfile and os.path.isfile(logfile) and135 os.path.getsize(logfile) != 0):136 self.bkr_proxy.result_upload_file(task_id, resultid, logfile)137 # save the dmesg file138 dfile = '/tmp/beaker.dmesg'139 utils.system('dmesg -c > %s' % dfile)140 if os.path.getsize(dfile) != 0:141 self.bkr_proxy.result_upload_file(task_id, resultid, dfile)142 # os.remove(dfile)143 except Exception:144 logging.critical('ERROR: Failed to process quick cmd %s' % cmd)145 elif cmd == 'reboot':146 # we are in a stub job. Can't use self.job.reboot() :-(147 utils.system("sync; sync; reboot")148 self.run_pause()149 raise error.JobContinue("more to come")150 else:151 raise error.HarnessError("Bad sub-quickcmd: %s" % cmd)152 def bootstrap(self, fetchdir):153 '''How to kickstart autotest when you have no control file?154 You download the beaker XML, convert it to a control file155 and pass it back to autotest. Much like bootstrapping.. :-)156 '''157 # hack to sneakily pass results back to beaker without running158 # autotest. Need to avoid calling get_recipe below159 if self.cmd:160 self.parse_quickcmd(self.cmd)161 return None162 recipe = self.init_recipe_from_beaker()163 # remove stale file164 if os.path.isfile(self.state_file):165 os.remove(self.state_file)166 self.tests = {}167 # sanity check168 if self.recipe_id != raise error.HarnessError('Recipe mismatch: machine %s.. != XML %s..' %170 (self.recipe_id, # create unique name172 control_file_name = recipe.job_id + '_' + + '.control'173 control_file_path = fetchdir + '/' + control_file_name174 logging.debug('setting up control file - %s' % control_file_path)175 control_file = open(control_file_path, 'w')176 try:177 # convert recipe xml into control file178 for task in recipe.tasks:179 self.convert_task_to_control(fetchdir, control_file, task)180 # getting the task id later, will be hard, store it in file/memory181 self.write_processed_tests(self.get_test_name(task), control_file.close()183 except HarnessException:184 # hook to bail out on reservesys systems and not run autotest185 return None186 except Exception, ex:187 os.remove(control_file_path)188 raise error.HarnessError('beaker_harness: convert failed with -> %s' % ex)189 # autotest should find this under FETCHDIRTEST because it is unique190 return control_file_path191 def init_recipe_from_beaker(self):192 logging.debug('Contacting beaker to get task details')193 bxp = BeakerXMLParser()194 recipe_xml = self.get_recipe_from_LC()195 recipes_dict = bxp.parse_xml(recipe_xml)196 return self.find_recipe(recipes_dict)197 def init_task_params(self, task):198 logging.debug('PrepareTaskParams')199 if task is None:200 raise error.HarnessError('No valid task')201 for (name, value) in task.params.items():202 logging.debug('adding to os.environ: <%s=%s>', name, value)203 os.environ[name] = value204 def get_recipe_from_LC(self):205 logging.debug('trying to get recipe from LC:')206 try:207 recipe = self.bkr_proxy.get_recipe()208 except Exception, exc:209 raise error.HarnessError('Failed to retrieve xml: %s' % exc)210 return recipe211 def find_recipe(self, recipes_dict):212 if self.hostname in recipes_dict:213 return recipes_dict[self.hostname]214 for h in recipes_dict:215 if self.recipe_id == recipes_dict[h].id:216 return recipes_dict[h]217 raise error.HarnessError('No valid recipe for host %s' % self.hostname)218 # the block below was taken from standalone harness219 def setupInitSymlink(self):220 logging.debug('Symlinking init scripts')221 autodir = os.environ.get('AUTODIR')222 rc = os.path.join(autodir, 'tools/autotest')223 if os.path.isfile(rc) and os.path.islink(rc):224 # nothing to do225 return226 # see if system supports event.d versus inittab227 if os.path.exists('/etc/event.d'):228 # NB: assuming current runlevel is default229 initdefault = utils.system_output('/sbin/runlevel').split()[1]230 elif os.path.exists('/etc/inittab'):231 initdefault = utils.system_output('grep :initdefault: /etc/inittab')232 initdefault = initdefault.split(':')[1]233 else:234 initdefault = '2'235 try:236 utils.system('ln -sf %s /etc/init.d/autotest' % rc)237 utils.system('ln -sf %s /etc/rc%s.d/S99autotest' % (rc, initdefault))238 logging.debug('Labeling init scripts with unconfined_exec_t')239 utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/init.d/autotest')240 utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/rc%s.d/S99autotest' % initdefault)241 autotest_init = os.path.join(autodir, 'tools/autotest')242 ret = os.system('chcon system_u:object_r:unconfined_exec_t:s0 %s' % autotest_init)243 logging.debug('chcon returned <%s>', ret)244 except Exception:245 logging.warning('Linking init scripts failed')246 def get_test_name(self, task):247 name = re.sub('-', '_', task.rpmName)248 return re.sub('\.', '_', name)249 def convert_task_to_control(self, fetchdir, control, task):250 """Tasks are really just:251 # yum install $TEST252 # cd /mnt/tests/$TEST253 # make run254 Convert that into a test module with a control file255 """256 timeout = ''257 if task.timeout:258 timeout = ", timeout=%s" % task.timeout259 # python doesn't like '-' in its class names260 rpm_name = self.get_test_name(task)261 rpm_dir = fetchdir + '/' + rpm_name262 rpm_file = rpm_dir + '/' + rpm_name + '.py'263 if task.status == 'Completed' and not self.offline:264 logging.debug("SKIP Completed test %s" % rpm_name)265 return266 if task.status == 'Running' and not self.offline:267 if'reservesys', task.rpmName):268 logging.debug("Found reservesys, skipping execution")269 raise HarnessException('executing under a reservesys')270 else:271 logging.warning("Found Running test %s that isn't reservesys" % task.rpmName)272 # append test name to control file273 logging.debug('adding test %s to control file' % rpm_name)274 # Trick to avoid downloading XML all the time275 # statically update each TASK_ID276 control.write("os.environ['BEAKER_TASK_ID']='%s'\n" % control.write("job.run_test('%s'%s)\n" % (rpm_name, timeout))278 # TODO check for git commands in task.params279 # create the test itself280 logging.debug('setting up test %s' % (rpm_file))281 if not os.path.exists(rpm_dir):282 os.mkdir(rpm_dir)283 test = open(rpm_file, 'w')284 test.write("import os\n")285 test.write("from autotest.client import test, utils\n\n")286 test.write("class %s(test.test):\n" % rpm_name)287 test.write(" version=1\n\n")288 test.write(" def initialize(self):\n")289 test.write(" utils.system('yum install -y %s')\n" % task.rpmName)290 for param in task.params:291 test.write(" os.environ['%s']='%s'\n" % (param, task.params[param]))292 test.write(" def run_once(self):\n")293 test.write(" os.chdir('%s')\n" % task.rpmPath)294 test.write(" raw_output = utils.system_output('make run', retain_output=True)\n")295 test.write(" self.results = raw_output\n")296 test.close()297 def run_start(self):298 """A run within this job is starting"""299 logging.debug('run_start')300 try:301 self.start_watchdog(BEAKER_CONSOLE_HEARTBEAT)302 except Exception:303 logging.critical('ERROR: Failed to start watchdog')304 def run_pause(self):305 """A run within this job is completing (expect continue)"""306 logging.debug('run_pause')307 def run_reboot(self):308 """A run within this job is performing a reboot309 (expect continue following reboot)310 """311 logging.debug('run_reboot')312 def run_abort(self):313 """A run within this job is aborting. It all went wrong"""314 logging.debug('run_abort')315 self.bkr_proxy.recipe_abort()316 self.tear_down()317 def run_complete(self):318 """A run within this job is completing (all done)"""319 logging.debug('run_complete')320 self.tear_down()321 def run_test_complete(self):322 """A test run by this job is complete. Note that if multiple323 tests are run in parallel, this will only be called when all324 of the parallel runs complete."""325 logging.debug('run_test_complete')326 def test_status(self, status, tag):327 """A test within this job is completing"""328 logging.debug('test_status ' + status + ' / ' + tag)329 def test_status_detail(self, code, subdir, operation, status, tag,330 optional_fields):331 """A test within this job is completing (detail)"""332 logging.debug('test_status_detail %s / %s / %s / %s / %s / %s',333 code, subdir, operation, status, tag, str(optional_fields))334 if not subdir:335 # recipes - covered by run_start/complete/abort336 return337 """The mapping between beaker tasks and non-beaker tasks is not easy to338 separate. Therefore we use the START and END markers along with the339 environment variable BEAKER_TASK_ID to help us.340 We keep an on-disk-file that stores the tests we have seen (or will run341 [add by the conversion function above]). If the test is expected, it342 will have a task id associated with it and we can communicate with beaker343 about it. Otherwise if no 'id' is found, assume this is a sub-task that344 beaker doesn't care about and keep all the results contained to the345 beaker results directory.346 """347 if code.startswith('START'):348 if subdir in self.tests and self.tests[subdir] != '0':349 # predefined beaker task350 self.bkr_proxy.task_start(self.tests[subdir])351 else:352 # some random sub-task, save for cleanup purposes353 self.write_processed_tests(subdir)354 return355 elif code.startswith('END'):356 if subdir in self.tests and self.tests[subdir] != '0':357 # predefined beaker task358 self.upload_task_files(self.tests[subdir], subdir)359 self.bkr_proxy.task_stop(self.tests[subdir])360 return361 else:362 if subdir in self.tests and self.tests[subdir] != '0':363 # predefine beaker tasks, will upload on END364 task_id = self.tests[subdir]365 task_upload = False366 else:367 # some random sub-task, save upload as task result368 # because there is no beaker task to add them too369 # task id was not saved in dictionary, get it from env370 if 'BEAKER_TASK_ID' not in os.environ:371 raise error.HarnessError("No BEAKER_TASK_ID set")372 task_id = os.environ['BEAKER_TASK_ID']373 task_upload = True374 bkr_status = get_beaker_code(code)375 try:376 resultid = self.bkr_proxy.task_result(task_id, bkr_status,377 subdir, 1, '')378 if task_upload:379 self.upload_result_files(task_id, resultid, subdir)380 except Exception:381 logging.critical('ERROR: Failed to process test results')382 def tear_down(self):383 '''called from complete and abort. clean up and shutdown'''384 self.kill_watchdog()385 if self.recipe_id != '0':386 self.upload_recipe_files()387 self.bkr_proxy.recipe_stop()388 os.remove(self.state_file)389 def start_watchdog(self, heartbeat):390 logging.debug('harness: Starting watchdog process, heartbeat: %d' % heartbeat)391 try:392 pid = os.fork()393 if pid == 0:394 self.watchdog_loop(heartbeat)395 else:396 self.watchdog_pid = pid397 logging.debug('harness: Watchdog process started, pid: %d', self.watchdog_pid)398 except OSError, e:399 logging.error('harness: fork in start_watchdog failed: %d (%s)\n' % (e.errno, e.strerror))400 def kill_watchdog(self):401 logging.debug('harness: Killing watchdog, pid: %d', self.watchdog_pid)402 utils.nuke_pid(self.watchdog_pid)403 self.watchdog_pid = None404 def watchdog_loop(self, heartbeat):405 while True:406 time.sleep(heartbeat)407'[-- MARK -- %s]' % time.asctime(time.localtime(time.time())))408 sys.exit()409 def get_processed_tests(self):410 tests = {}411 if not os.path.isfile(self.state_file):412 return tests413 f = open(self.state_file, 'r')414 lines = f.readlines()415 f.close()416 for line in lines:417 subdir, t_id = line.strip().split()418 # duplicates result from multiple writers419 # once during the conversion and then again420 # during an update of a test run421 # former has task ids, latter will not422 if subdir not in tests:423 tests[subdir] = t_id424 return tests425 def write_processed_tests(self, subdir, t_id='0'):426 f = open(self.state_file, 'a')427 f.write(subdir + ' ' + t_id + '\n')428 f.close()429 def upload_recipe_files(self):430 path = self.job.resultdir431 # refresh latest executed tests432 tests = self.get_processed_tests()433 logging.debug("Recipe filtering following tests: %s" % tests)434 for root, dirnames, files in os.walk(path):435 '''do not upload previously uploaded results files'''436 for d in dirnames:437 if d in tests:438 dirnames.remove(d)439 for name in files:440 # strip full path441 remotepath = re.sub(path, "", root)442 # The localfile has the full path443 localfile = os.path.join(root, name)444 if os.path.getsize(localfile) == 0:445 continue # skip empty files446 # Upload the file...

Full Screen

Full Screen Github


Full Screen

...59 header.setSectionResizeMode(1, QtWidgets.QHeaderView.Stretch)60 header.setSectionResizeMode(2, QtWidgets.QHeaderView.Stretch)61 header.setSectionResizeMode(3, QtWidgets.QHeaderView.Stretch)62 header.setSectionResizeMode(4, QtWidgets.QHeaderView.Stretch)63 self.get_processed_tests([None, None, None, self.username])64 self.scroll.setWidgetResizable(True)65 self.scroll.setFixedHeight(300)66 # Bottom buttons67 self.back_home = QPushButton("Back (Home)")68 self.reset_btn = QPushButton("Reset")69 self.filter_btn = QPushButton("Filter")70 bottom_btns.addWidget(self.back_home)71 bottom_btns.addWidget(self.reset_btn)72 bottom_btns.addWidget(self.filter_btn)73 self.reset_btn.clicked.connect(self.handle_reset_btn)74 self.filter_btn.clicked.connect(self.handle_filter_btn)75 if user_type == "labtech":76 self.back_home.clicked.connect(self.labtech_back)77 elif user_type == "labtester":78 self.back_home.clicked.connect(self.labtester_back)79 def labtech_back(self):80 self.close()81 labtech_home.LabTechHome(self.connection, self.username).exec()82 def labtester_back(self):83 self.close()84 lab_tester_home.LabTesterHome(self.connection, self.username).exec()85 def handle_reset_btn(self):86 # clear result87 self.get_processed_tests([None, None, None, self.username])88 # clear filters89 self.from_date_tested.clear()90 self.to_date_tested.clear()91 self.test_result.setCurrentText('ALL')92 def handle_filter_btn(self):93 params = []94 try:95 # from date96 if self.from_date_tested.text() == '':97 params.append(None)98 else:99 params.append(datetime.strptime(self.from_date_tested.text(), '%Y-%m-%d'))100 # to date101 if self.to_date_tested.text() == '':102 params.append(None)103 else:104 params.append(datetime.strptime(self.to_date_tested.text(), '%Y-%m-%d'))105 except ValueError:106 popup.Error('Date must be valid and in the format YYYY-MM-DD.').exec()107 return108 # test result109 if self.test_result.currentText() == 'ALL':110 params.append(None)111 else:112 params.append(self.test_result.currentText())113 params.append(self.username)114 self.get_processed_tests(params)115 def get_processed_tests(self, params):116 self.cursor.callproc('tests_processed', params)117 self.cursor.execute('SELECT * FROM tests_processed_result;')118 results = self.cursor.fetchall()119 self.table.setRowCount(len(results))120 self.table.setSortingEnabled(False) # important: this line must be executed before populating table contents121 row_num = 0122 for result in results:123 self.table.setItem(row_num, 0, QTableWidgetItem(result['test_id']))124 # id.setStyleSheet("color: blue; text-decoration: underline;")125 self.table.setItem(row_num, 1, QTableWidgetItem(result['pool_id']))126 test_date = result['test_date'].strftime("%Y-%m-%d")127 self.table.setItem(row_num, 2, QTableWidgetItem(test_date))128 process_date = result['process_date'].strftime("%Y-%m-%d")129 self.table.setItem(row_num, 3, QTableWidgetItem(process_date))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?