How to use write_processed_tests method in autotest

Best Python code snippet using autotest_python

harness_beaker.py

Source:harness_beaker.py Github

copy

Full Screen

...177 # convert recipe xml into control file178 for task in recipe.tasks:179 self.convert_task_to_control(fetchdir, control_file, task)180 # getting the task id later, will be hard, store it in file/memory181 self.write_processed_tests(self.get_test_name(task), task.id)182 control_file.close()183 except HarnessException:184 # hook to bail out on reservesys systems and not run autotest185 return None186 except Exception, ex:187 os.remove(control_file_path)188 raise error.HarnessError('beaker_harness: convert failed with -> %s' % ex)189 # autotest should find this under FETCHDIRTEST because it is unique190 return control_file_path191 def init_recipe_from_beaker(self):192 logging.debug('Contacting beaker to get task details')193 bxp = BeakerXMLParser()194 recipe_xml = self.get_recipe_from_LC()195 recipes_dict = bxp.parse_xml(recipe_xml)196 return self.find_recipe(recipes_dict)197 def init_task_params(self, task):198 logging.debug('PrepareTaskParams')199 if task is None:200 raise error.HarnessError('No valid task')201 for (name, value) in task.params.items():202 logging.debug('adding to os.environ: <%s=%s>', name, value)203 os.environ[name] = value204 def get_recipe_from_LC(self):205 logging.debug('trying to get recipe from LC:')206 try:207 recipe = self.bkr_proxy.get_recipe()208 except Exception, exc:209 raise error.HarnessError('Failed to retrieve xml: %s' % exc)210 return recipe211 def find_recipe(self, recipes_dict):212 if self.hostname in recipes_dict:213 return recipes_dict[self.hostname]214 for h in recipes_dict:215 if self.recipe_id == recipes_dict[h].id:216 return recipes_dict[h]217 raise error.HarnessError('No valid recipe for host %s' % self.hostname)218 # the block below was taken from standalone harness219 def setupInitSymlink(self):220 logging.debug('Symlinking init scripts')221 autodir = os.environ.get('AUTODIR')222 rc = os.path.join(autodir, 'tools/autotest')223 if os.path.isfile(rc) and os.path.islink(rc):224 # nothing to do225 return226 # see if system supports event.d versus inittab227 if os.path.exists('/etc/event.d'):228 # NB: assuming current runlevel is default229 initdefault = utils.system_output('/sbin/runlevel').split()[1]230 elif os.path.exists('/etc/inittab'):231 initdefault = utils.system_output('grep :initdefault: /etc/inittab')232 initdefault = initdefault.split(':')[1]233 else:234 initdefault = '2'235 try:236 utils.system('ln -sf %s /etc/init.d/autotest' % rc)237 utils.system('ln -sf %s /etc/rc%s.d/S99autotest' % (rc, initdefault))238 logging.debug('Labeling init scripts with unconfined_exec_t')239 utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/init.d/autotest')240 utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/rc%s.d/S99autotest' % initdefault)241 autotest_init = os.path.join(autodir, 'tools/autotest')242 ret = os.system('chcon system_u:object_r:unconfined_exec_t:s0 %s' % autotest_init)243 logging.debug('chcon returned <%s>', ret)244 except Exception:245 logging.warning('Linking init scripts failed')246 def get_test_name(self, task):247 name = re.sub('-', '_', task.rpmName)248 return re.sub('\.', '_', name)249 def convert_task_to_control(self, fetchdir, control, task):250 """Tasks are really just:251 # yum install $TEST252 # cd /mnt/tests/$TEST253 # make run254 Convert that into a test module with a control file255 """256 timeout = ''257 if task.timeout:258 timeout = ", timeout=%s" % task.timeout259 # python doesn't like '-' in its class names260 rpm_name = self.get_test_name(task)261 rpm_dir = fetchdir + '/' + rpm_name262 rpm_file = rpm_dir + '/' + rpm_name + '.py'263 if task.status == 'Completed' and not self.offline:264 logging.debug("SKIP Completed test %s" % rpm_name)265 return266 if task.status == 'Running' and not self.offline:267 if re.search('reservesys', task.rpmName):268 logging.debug("Found reservesys, skipping execution")269 raise HarnessException('executing under a reservesys')270 else:271 logging.warning("Found Running test %s that isn't reservesys" % task.rpmName)272 # append test name to control file273 logging.debug('adding test %s to control file' % rpm_name)274 # Trick to avoid downloading XML all the time275 # statically update each TASK_ID276 control.write("os.environ['BEAKER_TASK_ID']='%s'\n" % task.id)277 control.write("job.run_test('%s'%s)\n" % (rpm_name, timeout))278 # TODO check for git commands in task.params279 # create the test itself280 logging.debug('setting up test %s' % (rpm_file))281 if not os.path.exists(rpm_dir):282 os.mkdir(rpm_dir)283 test = open(rpm_file, 'w')284 test.write("import os\n")285 test.write("from autotest.client import test, utils\n\n")286 test.write("class %s(test.test):\n" % rpm_name)287 test.write(" version=1\n\n")288 test.write(" def initialize(self):\n")289 test.write(" utils.system('yum install -y %s')\n" % task.rpmName)290 for param in task.params:291 test.write(" os.environ['%s']='%s'\n" % (param, task.params[param]))292 test.write(" def run_once(self):\n")293 test.write(" os.chdir('%s')\n" % task.rpmPath)294 test.write(" raw_output = utils.system_output('make run', retain_output=True)\n")295 test.write(" self.results = raw_output\n")296 test.close()297 def run_start(self):298 """A run within this job is starting"""299 logging.debug('run_start')300 try:301 self.start_watchdog(BEAKER_CONSOLE_HEARTBEAT)302 except Exception:303 logging.critical('ERROR: Failed to start watchdog')304 def run_pause(self):305 """A run within this job is completing (expect continue)"""306 logging.debug('run_pause')307 def run_reboot(self):308 """A run within this job is performing a reboot309 (expect continue following reboot)310 """311 logging.debug('run_reboot')312 def run_abort(self):313 """A run within this job is aborting. It all went wrong"""314 logging.debug('run_abort')315 self.bkr_proxy.recipe_abort()316 self.tear_down()317 def run_complete(self):318 """A run within this job is completing (all done)"""319 logging.debug('run_complete')320 self.tear_down()321 def run_test_complete(self):322 """A test run by this job is complete. Note that if multiple323 tests are run in parallel, this will only be called when all324 of the parallel runs complete."""325 logging.debug('run_test_complete')326 def test_status(self, status, tag):327 """A test within this job is completing"""328 logging.debug('test_status ' + status + ' / ' + tag)329 def test_status_detail(self, code, subdir, operation, status, tag,330 optional_fields):331 """A test within this job is completing (detail)"""332 logging.debug('test_status_detail %s / %s / %s / %s / %s / %s',333 code, subdir, operation, status, tag, str(optional_fields))334 if not subdir:335 # recipes - covered by run_start/complete/abort336 return337 """The mapping between beaker tasks and non-beaker tasks is not easy to338 separate. Therefore we use the START and END markers along with the339 environment variable BEAKER_TASK_ID to help us.340 We keep an on-disk-file that stores the tests we have seen (or will run341 [add by the conversion function above]). If the test is expected, it342 will have a task id associated with it and we can communicate with beaker343 about it. Otherwise if no 'id' is found, assume this is a sub-task that344 beaker doesn't care about and keep all the results contained to the345 beaker results directory.346 """347 if code.startswith('START'):348 if subdir in self.tests and self.tests[subdir] != '0':349 # predefined beaker task350 self.bkr_proxy.task_start(self.tests[subdir])351 else:352 # some random sub-task, save for cleanup purposes353 self.write_processed_tests(subdir)354 return355 elif code.startswith('END'):356 if subdir in self.tests and self.tests[subdir] != '0':357 # predefined beaker task358 self.upload_task_files(self.tests[subdir], subdir)359 self.bkr_proxy.task_stop(self.tests[subdir])360 return361 else:362 if subdir in self.tests and self.tests[subdir] != '0':363 # predefine beaker tasks, will upload on END364 task_id = self.tests[subdir]365 task_upload = False366 else:367 # some random sub-task, save upload as task result368 # because there is no beaker task to add them too369 # task id was not saved in dictionary, get it from env370 if 'BEAKER_TASK_ID' not in os.environ:371 raise error.HarnessError("No BEAKER_TASK_ID set")372 task_id = os.environ['BEAKER_TASK_ID']373 task_upload = True374 bkr_status = get_beaker_code(code)375 try:376 resultid = self.bkr_proxy.task_result(task_id, bkr_status,377 subdir, 1, '')378 if task_upload:379 self.upload_result_files(task_id, resultid, subdir)380 except Exception:381 logging.critical('ERROR: Failed to process test results')382 def tear_down(self):383 '''called from complete and abort. clean up and shutdown'''384 self.kill_watchdog()385 if self.recipe_id != '0':386 self.upload_recipe_files()387 self.bkr_proxy.recipe_stop()388 os.remove(self.state_file)389 def start_watchdog(self, heartbeat):390 logging.debug('harness: Starting watchdog process, heartbeat: %d' % heartbeat)391 try:392 pid = os.fork()393 if pid == 0:394 self.watchdog_loop(heartbeat)395 else:396 self.watchdog_pid = pid397 logging.debug('harness: Watchdog process started, pid: %d', self.watchdog_pid)398 except OSError, e:399 logging.error('harness: fork in start_watchdog failed: %d (%s)\n' % (e.errno, e.strerror))400 def kill_watchdog(self):401 logging.debug('harness: Killing watchdog, pid: %d', self.watchdog_pid)402 utils.nuke_pid(self.watchdog_pid)403 self.watchdog_pid = None404 def watchdog_loop(self, heartbeat):405 while True:406 time.sleep(heartbeat)407 logging.info('[-- MARK -- %s]' % time.asctime(time.localtime(time.time())))408 sys.exit()409 def get_processed_tests(self):410 tests = {}411 if not os.path.isfile(self.state_file):412 return tests413 f = open(self.state_file, 'r')414 lines = f.readlines()415 f.close()416 for line in lines:417 subdir, t_id = line.strip().split()418 # duplicates result from multiple writers419 # once during the conversion and then again420 # during an update of a test run421 # former has task ids, latter will not422 if subdir not in tests:423 tests[subdir] = t_id424 return tests425 def write_processed_tests(self, subdir, t_id='0'):426 f = open(self.state_file, 'a')427 f.write(subdir + ' ' + t_id + '\n')428 f.close()429 def upload_recipe_files(self):430 path = self.job.resultdir431 # refresh latest executed tests432 tests = self.get_processed_tests()433 logging.debug("Recipe filtering following tests: %s" % tests)434 for root, dirnames, files in os.walk(path):435 '''do not upload previously uploaded results files'''436 for d in dirnames:437 if d in tests:438 dirnames.remove(d)439 for name in files:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful