How to use set_execution_subdir method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...464 blocks = IneligibleHostQueue.fetch(465 'job_id=%d and host_id=%d' % (, host_id))466 for block in blocks:467 block.delete()468 def set_execution_subdir(self, subdir=None):469 if subdir is None:470 assert self.host471 subdir = self.update_field('execution_subdir', subdir)473 def _get_hostname(self):474 if return return 'no host'477 def get_dbg_str(self):478 """Get a debug string to identify this host.479 @return: A string containing the hqe and job id.480 """481 try:482 return 'HQE: %s, for job: %s' % (, self.job_id)483 except AttributeError as e:484 return 'HQE has not been initialized yet: %s' % e485 def __str__(self):486 flags = []487 if flags.append('active')489 if self.complete:490 flags.append('complete')491 if self.deleted:492 flags.append('deleted')493 if self.aborted:494 flags.append('aborted')495 flags_str = ','.join(flags)496 if flags_str:497 flags_str = ' [%s]' % flags_str498 return ("%s and host: %s has status:%s%s" %499 (self.get_dbg_str(), self._get_hostname(), self.status,500 flags_str))501 def set_status(self, status):502"%s -> %s", self, status)503 self.update_field('status', status)504 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)505 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)506 self.update_field('active', active)507 # The ordering of these operations is important. Once we set the508 # complete bit this job will become indistinguishable from all509 # the other complete jobs, unless we first set shard_id to NULL510 # to signal to the shard_client that we need to upload it. However,511 # we can only set both these after we've updated finished_on etc512 # within _on_complete or the job will get synced in an intermediate513 # state. This means that if someone sigkills the scheduler between514 # setting finished_on and complete, we will have inconsistent jobs.515 # This should be fine, because nothing critical checks finished_on,516 # and the scheduler should never be killed mid-tick.517 if complete:518 self._on_complete(status)519 self._email_on_job_complete()520 self.update_field('complete', complete)521 should_email_status = (status.lower() in _notify_email_statuses or522 'all' in _notify_email_statuses)523 if should_email_status:524 self._email_on_status(status)525 logging.debug('HQE Set Status Complete')526 def _on_complete(self, status):527 metric_fields = {'status': status.lower()}528 if metric_fields['board'] = or ''530 if len( == 1:531 metric_fields['pool'] =[0]532 else:533 metric_fields['pool'] = 'MULTIPLE'534 else:535 metric_fields['board'] = 'NO_HOST'536 metric_fields['pool'] = 'NO_HOST'537 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)538 if status is not models.HostQueueEntry.Status.ABORTED:539 self.job.stop_if_necessary()540 if self.started_on:541 self.set_finished_on_now()542 self._log_trace()543 if self.job.shard_id is not None:544 # If shard_id is None, the job will be synced back to the master545 self.job.update_field('shard_id', None)546 if not self.execution_subdir:547 return548 # unregister any possible pidfiles associated with this queue entry549 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:550 pidfile_id = _drone_manager.get_pidfile_id_from(551 self.execution_path(), pidfile_name=pidfile_name)552 _drone_manager.unregister_pidfile(pidfile_id)553 def _log_trace(self):554 """Emits a Cloud Trace span for the HQE's duration."""555 if self.started_on and self.finished_on:556 span = cloud_trace.Span('HQE', spanId='0',557 traceId=hqe_trace_id( # TODO(phobbs) make a .SetStart() and .SetEnd() helper method559 span.startTime = types.Timestamp()560 span.startTime.FromDatetime(self.started_on)561 span.endTime = types.Timestamp()562 span.endTime.FromDatetime(self.finished_on)563 # TODO(phobbs) any LogSpan calls need to be wrapped in this for564 # safety during tests, so this should be caught within LogSpan.565 try:566 cloud_trace.LogSpan(span)567 except IOError as e:568 if e.errno == errno.ENOENT:569 logging.warning('Error writing to cloud trace results '570 'directory: %s', e)571 def _get_status_email_contents(self, status, summary=None, hostname=None):572 """573 Gather info for the status notification e-mails.574 If needed, we could start using the Django templating engine to create575 the subject and the e-mail body, but that doesn't seem necessary right576 now.577 @param status: Job status text. Mandatory.578 @param summary: Job summary text. Optional.579 @param hostname: A hostname for the job. Optional.580 @return: Tuple (subject, body) for the notification e-mail.581 """582 job_stats = Job( subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %584 (,, status))585 if hostname is not None:586 subject += '| Hostname: %s ' % hostname587 if status not in ["1 Failed", "Failed"]:588 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']589 body = "Job ID: %s\n" % self.job.id590 body += "Job name: %s\n" % self.job.name591 if hostname is not None:592 body += "Host: %s\n" % hostname593 if summary is not None:594 body += "Summary: %s\n" % summary595 body += "Status: %s\n" % status596 body += "Results interface URL: %s\n" % self._view_job_url()597 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']598 if int(job_stats['total_executed']) > 0:599 body += "User tests executed: %s\n" % job_stats['total_executed']600 body += "User tests passed: %s\n" % job_stats['total_passed']601 body += "User tests failed: %s\n" % job_stats['total_failed']602 body += ("User tests success rate: %.2f %%\n" %603 job_stats['success_rate'])604 if job_stats['failed_rows']:605 body += "Failures:\n"606 body += job_stats['failed_rows']607 return subject, body608 def _email_on_status(self, status):609 hostname = self._get_hostname()610 subject, body = self._get_status_email_contents(status, None, hostname)611 email_manager.manager.send_email(self.job.email_list, subject, body)612 def _email_on_job_complete(self):613 if not self.job.is_finished():614 return615 summary = []616 hosts_queue = HostQueueEntry.fetch('job_id = %s' % for queue_entry in hosts_queue:618 summary.append("Host: %s Status: %s" %619 (queue_entry._get_hostname(),620 queue_entry.status))621 summary = "\n".join(summary)622 status_counts = models.Job.objects.get_status_counts(623 [])[]624 status = ', '.join('%d %s' % (count, status) for status, count625 in status_counts.iteritems())626 subject, body = self._get_status_email_contents(status, summary, None)627 email_manager.manager.send_email(self.job.email_list, subject, body)628 def schedule_pre_job_tasks(self):629"%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",630, self.meta_host, self.atomic_group_id,631,,, self.status)632 self._do_schedule_pre_job_tasks()633 def _do_schedule_pre_job_tasks(self):634 self.job.schedule_pre_job_tasks(queue_entry=self)635 def requeue(self):636 assert self.host637 self.set_status(models.HostQueueEntry.Status.QUEUED)638 self.update_field('started_on', None)639 self.update_field('finished_on', None)640 # verify/cleanup failure sets the execution subdir, so reset it here641 self.set_execution_subdir('')642 if self.meta_host:643 self.set_host(None)644 @property645 def aborted_by(self):646 self._load_abort_info()647 return self._aborted_by648 @property649 def aborted_on(self):650 self._load_abort_info()651 return self._aborted_on652 def _load_abort_info(self):653 """ Fetch info about who aborted the job. """654 if hasattr(self, "_aborted_by"):655 return656 rows = _db.execute("""657 SELECT afe_users.login,658 afe_aborted_host_queue_entries.aborted_on659 FROM afe_aborted_host_queue_entries660 INNER JOIN afe_users661 ON = afe_aborted_host_queue_entries.aborted_by_id662 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s663 """, (,))664 if rows:665 self._aborted_by, self._aborted_on = rows[0]666 else:667 self._aborted_by = self._aborted_on = None668 def on_pending(self):669 """670 Called when an entry in a synchronous job has passed verify. If the671 job is ready to run, sets the entries to STARTING. Otherwise, it leaves672 them in PENDING.673 """674 self.set_status(models.HostQueueEntry.Status.PENDING)675 if not raise scheduler_lib.NoHostIdError(677 'Failed to recover a job whose host_queue_entry_id=%r due'678 ' to no host_id.'679 % # Some debug code here: sends an email if an asynchronous job does not682 # immediately enter Starting.683 # TODO: Remove this once we figure out why asynchronous jobs are getting684 # stuck in Pending.685 self.job.run_if_ready(queue_entry=self)686 if (self.job.synch_count == 1 and687 self.status == models.HostQueueEntry.Status.PENDING):688 subject = 'Job %s (id %s)' % (, message = 'Asynchronous job stuck in Pending'690 email_manager.manager.enqueue_notify_email(subject, message)691 def abort(self, dispatcher):692 assert self.aborted and not self.complete693 Status = models.HostQueueEntry.Status694 if self.status in {Status.GATHERING, Status.PARSING}:695 # do nothing; post-job tasks will finish and then mark this entry696 # with status "Aborted" and take care of the host697 return698 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:699 # If hqe is in any of these status, it should not have any700 # unfinished agent before it can be aborted.701 agents = dispatcher.get_agents_for_entry(self)702 # Agent with finished task can be left behind. This is added to703 # handle the special case of aborting hostless job in STARTING704 # status, in which the agent has only a HostlessQueueTask705 # associated. The finished HostlessQueueTask will be cleaned up in706 # the next tick, so it's safe to leave the agent there. Without707 # filtering out finished agent, HQE abort won't be able to proceed.708 assert all([agent.is_done() for agent in agents])709 # If hqe is still in STARTING status, it may not have assigned a710 # host yet.711 if elif (self.status == Status.VERIFYING or714 self.status == Status.RESETTING):715 models.SpecialTask.objects.create(716 task=models.SpecialTask.Task.CLEANUP,717 host=models.Host.objects.get(,718 requested_by=self.job.owner_model())719 elif self.status == Status.PROVISIONING:720 models.SpecialTask.objects.create(721 task=models.SpecialTask.Task.REPAIR,722 host=models.Host.objects.get(,723 requested_by=self.job.owner_model())724 self.set_status(Status.ABORTED)725 def execution_tag(self):726 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '727 'complete!=1 AND execution_subdir="" AND '728 'status!="Queued";')729 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '730 'status="Aborted" WHERE id=%s;')731 try:732 assert self.execution_subdir733 except AssertionError:734 # TODO(scottz): Remove temporary fix/info gathering pathway for735 # once issue is root caused.736 logging.error('No execution_subdir for host queue id:%s.', logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)738 for row in _db.execute(SQL_SUSPECT_ENTRIES):739 logging.error(row)740 logging.error('====DB DEBUG====\n')741 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id742 logging.error('EXECUTING: %s', fix_query)743 _db.execute(SQL_FIX_SUSPECT_ENTRY % raise AssertionError(('self.execution_subdir not found. '745 'See log for details.'))746 return "%s/%s" % (self.job.tag(), self.execution_subdir)747 def execution_path(self):748 return self.execution_tag()749 def set_started_on_now(self):750 self.update_field('started_on', def set_finished_on_now(self):752 self.update_field('finished_on', def is_hostless(self):754 return (self.host_id is None755 and self.meta_host is None)756def hqe_trace_id(hqe_id):757 """Constructs the canonical trace id based on the HQE's id.758 Encodes 'HQE' in base16 and concatenates with the hex representation759 of the HQE's id.760 @param hqe_id: The HostQueueEntry's id.761 Returns:762 A trace id (in hex format)763 """764 return base64.b16encode('HQE') + hex(hqe_id)[2:]765class Job(DBObject):766 _table_name = 'afe_jobs'767 _fields = ('id', 'owner', 'name', 'priority', 'control_file',768 'control_type', 'created_on', 'synch_count', 'timeout',769 'run_verify', 'email_list', 'reboot_before', 'reboot_after',770 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',771 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',772 'test_retry', 'run_reset', 'timeout_mins', 'shard_id',773 'require_ssp')774 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on775 # all status='Pending' atomic group HQEs incase a delay was running when the776 # scheduler was restarted and no more hosts ever successfully exit Verify.777 def __init__(self, id=None, row=None, **kwargs):778 assert id or row779 super(Job, self).__init__(id=id, row=row, **kwargs)780 self._owner_model = None # caches model instance of owner781 self.update_image_path = None # path of OS image to install782 def model(self):783 return models.Job.objects.get( def owner_model(self):785 # work around the fact that the Job owner field is a string, not a786 # foreign key787 if not self._owner_model:788 self._owner_model = models.User.objects.get(login=self.owner)789 return self._owner_model790 def tag(self):791 return "%s-%s" % (, self.owner)792 def get_execution_details(self):793 """794 Get test execution details for this job.795 @return: Dictionary with test execution details796 """797 def _find_test_jobs(rows):798 """799 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*800 Those are autotest 'internal job' tests, so they should not be801 counted when evaluating the test stats.802 @param rows: List of rows (matrix) with database results.803 """804 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')805 n_test_jobs = 0806 for r in rows:807 test_name = r[0]808 if job_test_pattern.match(test_name):809 n_test_jobs += 1810 return n_test_jobs811 stats = {}812 rows = _db.execute("""813 SELECT t.test, s.word, t.reason814 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s815 WHERE t.job_idx = j.job_idx816 AND s.status_idx = t.status817 AND j.afe_job_id = %s818 ORDER BY t.reason819 """ % failed_rows = [r for r in rows if not r[1] == 'GOOD']821 n_test_jobs = _find_test_jobs(rows)822 n_test_jobs_failed = _find_test_jobs(failed_rows)823 total_executed = len(rows) - n_test_jobs824 total_failed = len(failed_rows) - n_test_jobs_failed825 if total_executed > 0:826 success_rate = 100 - ((total_failed / float(total_executed)) * 100)827 else:828 success_rate = 0829 stats['total_executed'] = total_executed830 stats['total_failed'] = total_failed831 stats['total_passed'] = total_executed - total_failed832 stats['success_rate'] = success_rate833 status_header = ("Test Name", "Status", "Reason")834 if failed_rows:835 stats['failed_rows'] = utils.matrix_to_string(failed_rows,836 status_header)837 else:838 stats['failed_rows'] = ''839 time_row = _db.execute("""840 SELECT started_time, finished_time841 FROM tko_jobs842 WHERE afe_job_id = %s843 """ % if time_row:845 t_begin, t_end = time_row[0]846 try:847 delta = t_end - t_begin848 minutes, seconds = divmod(delta.seconds, 60)849 hours, minutes = divmod(minutes, 60)850 stats['execution_time'] = ("%02d:%02d:%02d" %851 (hours, minutes, seconds))852 # One of t_end or t_begin are None853 except TypeError:854 stats['execution_time'] = '(could not determine)'855 else:856 stats['execution_time'] = '(none)'857 return stats858 def keyval_dict(self):859 return self.model().keyval_dict()860 def _pending_count(self):861 """The number of HostQueueEntries for this job in the Pending state."""862 pending_entries = models.HostQueueEntry.objects.filter(863, status=models.HostQueueEntry.Status.PENDING)864 return pending_entries.count()865 def is_ready(self):866 pending_count = self._pending_count()867 ready = (pending_count >= self.synch_count)868 if not ready:869 'Job %s not ready: %s pending, %s required ',871 self, pending_count, self.synch_count)872 return ready873 def num_machines(self, clause = None):874 sql = "job_id=%s" % self.id875 if clause:876 sql += " AND (%s)" % clause877 return self.count(sql, table='afe_host_queue_entries')878 def num_queued(self):879 return self.num_machines('not complete')880 def num_active(self):881 return self.num_machines('active')882 def num_complete(self):883 return self.num_machines('complete')884 def is_finished(self):885 return self.num_complete() == self.num_machines()886 def _not_yet_run_entries(self, include_active=True):887 if include_active:888 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)889 else:890 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)891 return models.HostQueueEntry.objects.filter(,892 status__in=statuses)893 def _stop_all_entries(self):894 """Stops the job's inactive pre-job HQEs."""895 entries_to_stop = self._not_yet_run_entries(896 include_active=False)897 for child_entry in entries_to_stop:898 assert not child_entry.complete, (899 '%s status=%s, active=%s, complete=%s' %900 (, child_entry.status,,901 child_entry.complete))902 if child_entry.status == models.HostQueueEntry.Status.PENDING:903 = models.Host.Status.READY904 child_entry.status = models.HostQueueEntry.Status.STOPPED906 def stop_if_necessary(self):908 not_yet_run = self._not_yet_run_entries()909 if not_yet_run.count() < self.synch_count:910 self._stop_all_entries()911 def _next_group_name(self):912 """@returns a directory name to use for the next host group results."""913 group_name = ''914 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))915 query = models.HostQueueEntry.objects.filter(916'execution_subdir').distinct()917 subdirs = (entry['execution_subdir'] for entry in query)918 group_matches = (group_count_re.match(subdir) for subdir in subdirs)919 ids = [int( for match in group_matches if match]920 if ids:921 next_id = max(ids) + 1922 else:923 next_id = 0924 return '%sgroup%d' % (group_name, next_id)925 def get_group_entries(self, queue_entry_from_group):926 """927 @param queue_entry_from_group: A HostQueueEntry instance to find other928 group entries on this job for.929 @returns A list of HostQueueEntry objects all executing this job as930 part of the same group as the one supplied (having the same931 execution_subdir).932 """933 execution_subdir = queue_entry_from_group.execution_subdir934 return list(HostQueueEntry.fetch(935 where='job_id=%s AND execution_subdir=%s',936 params=(, execution_subdir)))937 def _should_run_cleanup(self, queue_entry):938 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:939 return True940 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:941 return return False943 def _should_run_verify(self, queue_entry):944 do_not_verify = ( ==945 host_protections.Protection.DO_NOT_VERIFY)946 if do_not_verify:947 return False948 # If RebootBefore is set to NEVER, then we won't run reset because949 # we can't cleanup, so we need to weaken a Reset into a Verify.950 weaker_reset = (self.run_reset and951 self.reboot_before == model_attributes.RebootBefore.NEVER)952 return self.run_verify or weaker_reset953 def _should_run_reset(self, queue_entry):954 can_verify = ( !=955 host_protections.Protection.DO_NOT_VERIFY)956 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER957 return (can_reboot and can_verify and (self.run_reset or958 (self._should_run_cleanup(queue_entry) and959 self._should_run_verify(queue_entry))))960 def _should_run_provision(self, queue_entry):961 """962 Determine if the queue_entry needs to have a provision task run before963 it to provision @param queue_entry: The host queue entry in question.965 @returns: True if we should schedule a provision task, False otherwise.966 """967 # If we get to this point, it means that the scheduler has already968 # vetted that all the unprovisionable labels match, so we can just969 # find all labels on the job that aren't on the host to get the list970 # of what we need to provision. (See the scheduling logic in971 # where we discard all972 # actionable labels when assigning jobs to hosts.)973 job_labels = { for x in queue_entry.get_labels()}974 # Skip provision if `skip_provision` is listed in the job labels.975 if provision.SKIP_PROVISION in job_labels:976 return False977 _, host_labels = # If there are any labels on the job that are not on the host and they979 # are labels that provisioning knows how to change, then that means980 # there is provisioning work to do. If there's no provisioning work to981 # do, then obviously we have no reason to schedule a provision task!982 diff = job_labels - set(host_labels)983 if any([provision.Provision.acts_on(x) for x in diff]):984 return True985 return False986 def _queue_special_task(self, queue_entry, task):987 """988 Create a special task and associate it with a host queue entry.989 @param queue_entry: The queue entry this special task should be990 associated with.991 @param task: One of the members of the enum models.SpecialTask.Task.992 @returns: None993 """994 models.SpecialTask.objects.create(995 host=models.Host.objects.get(id=queue_entry.host_id),996 queue_entry=queue_entry, task=task)997 def schedule_pre_job_tasks(self, queue_entry):998 """999 Queue all of the special tasks that need to be run before a host1000 queue entry may run.1001 If no special taskes need to be scheduled, then |on_pending| will be1002 called directly.1003 @returns None1004 """1005 task_queued = False1006 hqe_model = models.HostQueueEntry.objects.get( if self._should_run_provision(queue_entry):1008 self._queue_special_task(hqe_model,1009 models.SpecialTask.Task.PROVISION)1010 task_queued = True1011 elif self._should_run_reset(queue_entry):1012 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)1013 task_queued = True1014 else:1015 if self._should_run_cleanup(queue_entry):1016 self._queue_special_task(hqe_model,1017 models.SpecialTask.Task.CLEANUP)1018 task_queued = True1019 if self._should_run_verify(queue_entry):1020 self._queue_special_task(hqe_model,1021 models.SpecialTask.Task.VERIFY)1022 task_queued = True1023 if not task_queued:1024 queue_entry.on_pending()1025 def _assign_new_group(self, queue_entries):1026 if len(queue_entries) == 1:1027 group_subdir_name = queue_entries[0].host.hostname1028 else:1029 group_subdir_name = self._next_group_name()1030'Running synchronous job %d hosts %s as %s',1031, [ for entry in queue_entries],1032 group_subdir_name)1033 for queue_entry in queue_entries:1034 queue_entry.set_execution_subdir(group_subdir_name)1035 def _choose_group_to_run(self, include_queue_entry):1036 """1037 @returns A tuple containing a list of HostQueueEntry instances to be1038 used to run this Job, a string group name to suggest giving1039 to this job in the results database.1040 """1041 chosen_entries = [include_queue_entry]1042 num_entries_wanted = self.synch_count1043 num_entries_wanted -= len(chosen_entries)1044 if num_entries_wanted > 0:1045 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'1046 pending_entries = list(HostQueueEntry.fetch(1047 where=where_clause,1048 params=(,

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?