How to use create_job_by_test method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...269 def get_host_diagnosis_interval(self, host_id, end_time, success):270 return'get_host_diagnosis_interval',271 host_id=host_id, end_time=end_time,272 success=success)273 def create_job_by_test(self, tests, kernel=None, use_container=False,274 kernel_cmdline=None, **dargs):275 """276 Given a test name, fetch the appropriate control file from the server277 and submit it.278 @param kernel: A comma separated list of kernel versions to boot.279 @param kernel_cmdline: The command line used to boot all kernels listed280 in the kernel parameter.281 Returns a list of job objects282 """283 assert ('hosts' in dargs or284 'atomic_group_name' in dargs and 'synch_count' in dargs)285 if kernel:286 kernel_list = re.split('[\s,]+', kernel.strip())287 kernel_info = []288 for version in kernel_list:289 kernel_dict = {'version': version}290 if kernel_cmdline is not None:291 kernel_dict['cmdline'] = kernel_cmdline292 kernel_info.append(kernel_dict)293 else:294 kernel_info = None295 control_file = self.generate_control_file(296 tests=tests, kernel=kernel_info, use_container=use_container)297 if control_file.is_server:298 dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER299 else:300 dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT301 dargs['dependencies'] = dargs.get('dependencies', []) + \302 control_file.dependencies303 dargs['control_file'] = control_file.control_file304 if not dargs.get('synch_count', None):305 dargs['synch_count'] = control_file.synch_count306 if 'hosts' in dargs and len(dargs['hosts']) < dargs['synch_count']:307 # will not be able to satisfy this request308 return None309 return self.create_job(**dargs)310 def create_job(self, control_file, name=' ', priority='Medium',311 control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs):312 id ='create_job', name=name, priority=priority,313 control_file=control_file, control_type=control_type, **dargs)314 return self.get_jobs(id=id)[0]315 def run_test_suites(self, pairings, kernel, kernel_label=None,316 priority='Medium', wait=True, poll_interval=10,317 email_from=None, email_to=None, timeout_mins=10080,318 max_runtime_mins=10080, kernel_cmdline=None):319 """320 Run a list of test suites on a particular kernel.321 Poll for them to complete, and return whether they worked or not.322 @param pairings: List of MachineTestPairing objects to invoke.323 @param kernel: Name of the kernel to run.324 @param kernel_label: Label (string) of the kernel to run such as325 '<kernel-version> : <config> : <date>'326 If any pairing object has its job_label attribute set it327 will override this value for that particular job.328 @param kernel_cmdline: The command line to boot the kernel(s) with.329 @param wait: boolean - Wait for the results to come back?330 @param poll_interval: Interval between polling for job results (in mins)331 @param email_from: Send notification email upon completion from here.332 @param email_from: Send notification email upon completion to here.333 """334 jobs = []335 for pairing in pairings:336 try:337 new_job = self.invoke_test(pairing, kernel, kernel_label,338 priority, timeout_mins=timeout_mins,339 kernel_cmdline=kernel_cmdline,340 max_runtime_mins=max_runtime_mins)341 if not new_job:342 continue343 jobs.append(new_job)344 except Exception, e:345 traceback.print_exc()346 if not wait or not jobs:347 return348 tko = TKO()349 while True:350 time.sleep(60 * poll_interval)351 result = self.poll_all_jobs(tko, jobs, email_from, email_to)352 if result is not None:353 return result354 def result_notify(self, job, email_from, email_to):355 """356 Notify about the result of a job. Will always print, if email data357 is provided, will send email for it as well.358 job: job object to notify about359 email_from: send notification email upon completion from here360 email_from: send notification email upon completion to here361 """362 if job.result == True:363 subject = 'Testing PASSED: '364 else:365 subject = 'Testing FAILED: '366 subject += '%s : %s\n' % (, text = []368 for platform in job.results_platform_map:369 for status in job.results_platform_map[platform]:370 if status == 'Total':371 continue372 for host in job.results_platform_map[platform][status]:373 text.append('%20s %10s %10s' % (platform, status, host))374 if status == 'Failed':375 for test_status in job.test_status[host].fail:376 text.append('(%s, %s) : %s' % \377 (host, test_status.test_name,378 test_status.reason))379 text.append('')380 base_url = 'http://' + self.server381 params = ('columns=test',382 'rows=machine_group',383 "condition=tag~'%s-%%25'" %,384 'title=Report')385 query_string = '&'.join(params)386 url = '%s/tko/compose_query.cgi?%s' % (base_url, query_string)387 text.append(url + '\n')388 url = '%s/afe/#tab_id=view_job&object_id=%s' % (base_url, text.append(url + '\n')390 body = '\n'.join(text)391 print '---------------------------------------------------'392 print 'Subject: ', subject393 print body394 print '---------------------------------------------------'395 if email_from and email_to:396 print 'Sending email ...'397 utils.send_email(email_from, email_to, subject, body)398 print399 def print_job_result(self, job):400 """401 Print the result of a single job.402 job: a job object403 """404 if job.result is None:405 print 'PENDING',406 elif job.result == True:407 print 'PASSED',408 elif job.result == False:409 print 'FAILED',410 elif job.result == "Abort":411 print 'ABORT',412 print ' %s : %s' % (, def poll_all_jobs(self, tko, jobs, email_from=None, email_to=None):414 """415 Poll all jobs in a list.416 jobs: list of job objects to poll417 email_from: send notification email upon completion from here418 email_from: send notification email upon completion to here419 Returns:420 a) All complete successfully (return True)421 b) One or more has failed (return False)422 c) Cannot tell yet (return None)423 """424 results = []425 for job in jobs:426 if getattr(job, 'result', None) is None:427 job.result = self.poll_job_results(tko, job)428 if job.result is not None:429 self.result_notify(job, email_from, email_to)430 results.append(job.result)431 self.print_job_result(job)432 if None in results:433 return None434 elif False in results or "Abort" in results:435 return False436 else:437 return True438 def _included_platform(self, host, platforms):439 """440 See if host's platforms matches any of the patterns in the included441 platforms list.442 """443 if not platforms:444 return True # No filtering of platforms445 for platform in platforms:446 if, host.platform):447 return True448 return False449 def invoke_test(self, pairing, kernel, kernel_label, priority='Medium',450 kernel_cmdline=None, **dargs):451 """452 Given a pairing of a control file to a machine label, find all machines453 with that label, and submit that control file to them.454 @param kernel_label: Label (string) of the kernel to run such as455 '<kernel-version> : <config> : <date>'456 If any pairing object has its job_label attribute set it457 will override this value for that particular job.458 @returns A list of job objects.459 """460 # The pairing can override the job label.461 if pairing.job_label:462 kernel_label = pairing.job_label463 job_name = '%s : %s' % (pairing.machine_label, kernel_label)464 hosts = self.get_hosts(multiple_labels=[pairing.machine_label])465 platforms = pairing.platforms466 hosts = [h for h in hosts if self._included_platform(h, platforms)]467 dead_statuses = self.host_statuses(live=False)468 host_list = [h.hostname for h in hosts if h.status not in dead_statuses]469 print 'HOSTS: %s' % host_list470 if pairing.atomic_group_sched:471 dargs['synch_count'] = pairing.synch_count472 dargs['atomic_group_name'] = pairing.machine_label473 else:474 dargs['hosts'] = host_list475 new_job = self.create_job_by_test(name=job_name,476 dependencies=[pairing.machine_label],477 tests=[pairing.control_file],478 priority=priority,479 kernel=kernel,480 kernel_cmdline=kernel_cmdline,481 use_container=pairing.container,482 **dargs)483 if new_job:484 if pairing.testname:485 new_job.testname = pairing.testname486 print 'Invoked test %s : %s' % (, job_name)487 return new_job488 def _job_test_results(self, tko, job, debug, tests=[]):489 """...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?