...624 status = ', '.join('%d %s' % (count, status) for status, count625 in status_counts.iteritems())626 subject, body = self._get_status_email_contents(status, summary, None)627 email_manager.manager.send_email(self.job.email_list, subject, body)628 def schedule_pre_job_tasks(self):629"%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",630, self.meta_host, self.atomic_group_id,631,,, self.status)632 self._do_schedule_pre_job_tasks()633 def _do_schedule_pre_job_tasks(self):634 self.job.schedule_pre_job_tasks(queue_entry=self)635 def requeue(self):636 assert self.host637 self.set_status(models.HostQueueEntry.Status.QUEUED)638 self.update_field('started_on', None)639 self.update_field('finished_on', None)640 # verify/cleanup failure sets the execution subdir, so reset it here641 self.set_execution_subdir('')642 if self.meta_host:643 self.set_host(None)644 @property645 def aborted_by(self):646 self._load_abort_info()647 return self._aborted_by648 @property649 def aborted_on(self):650 self._load_abort_info()651 return self._aborted_on652 def _load_abort_info(self):653 """ Fetch info about who aborted the job. """654 if hasattr(self, "_aborted_by"):655 return656 rows = _db.execute("""657 SELECT afe_users.login,658 afe_aborted_host_queue_entries.aborted_on659 FROM afe_aborted_host_queue_entries660 INNER JOIN afe_users661 ON = afe_aborted_host_queue_entries.aborted_by_id662 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s663 """, (,))664 if rows:665 self._aborted_by, self._aborted_on = rows[0]666 else:667 self._aborted_by = self._aborted_on = None668 def on_pending(self):669 """670 Called when an entry in a synchronous job has passed verify. If the671 job is ready to run, sets the entries to STARTING. Otherwise, it leaves672 them in PENDING.673 """674 self.set_status(models.HostQueueEntry.Status.PENDING)675 if not raise scheduler_lib.NoHostIdError(677 'Failed to recover a job whose host_queue_entry_id=%r due'678 ' to no host_id.'679 % # Some debug code here: sends an email if an asynchronous job does not682 # immediately enter Starting.683 # TODO: Remove this once we figure out why asynchronous jobs are getting684 # stuck in Pending.685 self.job.run_if_ready(queue_entry=self)686 if (self.job.synch_count == 1 and687 self.status == models.HostQueueEntry.Status.PENDING):688 subject = 'Job %s (id %s)' % (, message = 'Asynchronous job stuck in Pending'690 email_manager.manager.enqueue_notify_email(subject, message)691 def abort(self, dispatcher):692 assert self.aborted and not self.complete693 Status = models.HostQueueEntry.Status694 if self.status in {Status.GATHERING, Status.PARSING}:695 # do nothing; post-job tasks will finish and then mark this entry696 # with status "Aborted" and take care of the host697 return698 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:699 # If hqe is in any of these status, it should not have any700 # unfinished agent before it can be aborted.701 agents = dispatcher.get_agents_for_entry(self)702 # Agent with finished task can be left behind. This is added to703 # handle the special case of aborting hostless job in STARTING704 # status, in which the agent has only a HostlessQueueTask705 # associated. The finished HostlessQueueTask will be cleaned up in706 # the next tick, so it's safe to leave the agent there. Without707 # filtering out finished agent, HQE abort won't be able to proceed.708 assert all([agent.is_done() for agent in agents])709 # If hqe is still in STARTING status, it may not have assigned a710 # host yet.711 if elif (self.status == Status.VERIFYING or714 self.status == Status.RESETTING):715 models.SpecialTask.objects.create(716 task=models.SpecialTask.Task.CLEANUP,717 host=models.Host.objects.get(,718 requested_by=self.job.owner_model())719 elif self.status == Status.PROVISIONING:720 models.SpecialTask.objects.create(721 task=models.SpecialTask.Task.REPAIR,722 host=models.Host.objects.get(,723 requested_by=self.job.owner_model())724 self.set_status(Status.ABORTED)725 def execution_tag(self):726 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '727 'complete!=1 AND execution_subdir="" AND '728 'status!="Queued";')729 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '730 'status="Aborted" WHERE id=%s;')731 try:732 assert self.execution_subdir733 except AssertionError:734 # TODO(scottz): Remove temporary fix/info gathering pathway for735 # once issue is root caused.736 logging.error('No execution_subdir for host queue id:%s.', logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)738 for row in _db.execute(SQL_SUSPECT_ENTRIES):739 logging.error(row)740 logging.error('====DB DEBUG====\n')741 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id742 logging.error('EXECUTING: %s', fix_query)743 _db.execute(SQL_FIX_SUSPECT_ENTRY % raise AssertionError(('self.execution_subdir not found. '745 'See log for details.'))746 return "%s/%s" % (self.job.tag(), self.execution_subdir)747 def execution_path(self):748 return self.execution_tag()749 def set_started_on_now(self):750 self.update_field('started_on', def set_finished_on_now(self):752 self.update_field('finished_on', def is_hostless(self):754 return (self.host_id is None755 and self.meta_host is None)756def hqe_trace_id(hqe_id):757 """Constructs the canonical trace id based on the HQE's id.758 Encodes 'HQE' in base16 and concatenates with the hex representation759 of the HQE's id.760 @param hqe_id: The HostQueueEntry's id.761 Returns:762 A trace id (in hex format)763 """764 return base64.b16encode('HQE') + hex(hqe_id)[2:]765class Job(DBObject):766 _table_name = 'afe_jobs'767 _fields = ('id', 'owner', 'name', 'priority', 'control_file',768 'control_type', 'created_on', 'synch_count', 'timeout',769 'run_verify', 'email_list', 'reboot_before', 'reboot_after',770 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',771 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',772 'test_retry', 'run_reset', 'timeout_mins', 'shard_id',773 'require_ssp')774 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on775 # all status='Pending' atomic group HQEs incase a delay was running when the776 # scheduler was restarted and no more hosts ever successfully exit Verify.777 def __init__(self, id=None, row=None, **kwargs):778 assert id or row779 super(Job, self).__init__(id=id, row=row, **kwargs)780 self._owner_model = None # caches model instance of owner781 self.update_image_path = None # path of OS image to install782 def model(self):783 return models.Job.objects.get( def owner_model(self):785 # work around the fact that the Job owner field is a string, not a786 # foreign key787 if not self._owner_model:788 self._owner_model = models.User.objects.get(login=self.owner)789 return self._owner_model790 def tag(self):791 return "%s-%s" % (, self.owner)792 def get_execution_details(self):793 """794 Get test execution details for this job.795 @return: Dictionary with test execution details796 """797 def _find_test_jobs(rows):798 """799 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*800 Those are autotest 'internal job' tests, so they should not be801 counted when evaluating the test stats.802 @param rows: List of rows (matrix) with database results.803 """804 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')805 n_test_jobs = 0806 for r in rows:807 test_name = r[0]808 if job_test_pattern.match(test_name):809 n_test_jobs += 1810 return n_test_jobs811 stats = {}812 rows = _db.execute("""813 SELECT t.test, s.word, t.reason814 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s815 WHERE t.job_idx = j.job_idx816 AND s.status_idx = t.status817 AND j.afe_job_id = %s818 ORDER BY t.reason819 """ % failed_rows = [r for r in rows if not r[1] == 'GOOD']821 n_test_jobs = _find_test_jobs(rows)822 n_test_jobs_failed = _find_test_jobs(failed_rows)823 total_executed = len(rows) - n_test_jobs824 total_failed = len(failed_rows) - n_test_jobs_failed825 if total_executed > 0:826 success_rate = 100 - ((total_failed / float(total_executed)) * 100)827 else:828 success_rate = 0829 stats['total_executed'] = total_executed830 stats['total_failed'] = total_failed831 stats['total_passed'] = total_executed - total_failed832 stats['success_rate'] = success_rate833 status_header = ("Test Name", "Status", "Reason")834 if failed_rows:835 stats['failed_rows'] = utils.matrix_to_string(failed_rows,836 status_header)837 else:838 stats['failed_rows'] = ''839 time_row = _db.execute("""840 SELECT started_time, finished_time841 FROM tko_jobs842 WHERE afe_job_id = %s843 """ % if time_row:845 t_begin, t_end = time_row[0]846 try:847 delta = t_end - t_begin848 minutes, seconds = divmod(delta.seconds, 60)849 hours, minutes = divmod(minutes, 60)850 stats['execution_time'] = ("%02d:%02d:%02d" %851 (hours, minutes, seconds))852 # One of t_end or t_begin are None853 except TypeError:854 stats['execution_time'] = '(could not determine)'855 else:856 stats['execution_time'] = '(none)'857 return stats858 def keyval_dict(self):859 return self.model().keyval_dict()860 def _pending_count(self):861 """The number of HostQueueEntries for this job in the Pending state."""862 pending_entries = models.HostQueueEntry.objects.filter(863, status=models.HostQueueEntry.Status.PENDING)864 return pending_entries.count()865 def is_ready(self):866 pending_count = self._pending_count()867 ready = (pending_count >= self.synch_count)868 if not ready:869 'Job %s not ready: %s pending, %s required ',871 self, pending_count, self.synch_count)872 return ready873 def num_machines(self, clause = None):874 sql = "job_id=%s" % self.id875 if clause:876 sql += " AND (%s)" % clause877 return self.count(sql, table='afe_host_queue_entries')878 def num_queued(self):879 return self.num_machines('not complete')880 def num_active(self):881 return self.num_machines('active')882 def num_complete(self):883 return self.num_machines('complete')884 def is_finished(self):885 return self.num_complete() == self.num_machines()886 def _not_yet_run_entries(self, include_active=True):887 if include_active:888 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)889 else:890 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)891 return models.HostQueueEntry.objects.filter(,892 status__in=statuses)893 def _stop_all_entries(self):894 """Stops the job's inactive pre-job HQEs."""895 entries_to_stop = self._not_yet_run_entries(896 include_active=False)897 for child_entry in entries_to_stop:898 assert not child_entry.complete, (899 '%s status=%s, active=%s, complete=%s' %900 (, child_entry.status,,901 child_entry.complete))902 if child_entry.status == models.HostQueueEntry.Status.PENDING:903 = models.Host.Status.READY904 child_entry.status = models.HostQueueEntry.Status.STOPPED906 def stop_if_necessary(self):908 not_yet_run = self._not_yet_run_entries()909 if not_yet_run.count() < self.synch_count:910 self._stop_all_entries()911 def _next_group_name(self):912 """@returns a directory name to use for the next host group results."""913 group_name = ''914 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))915 query = models.HostQueueEntry.objects.filter(916'execution_subdir').distinct()917 subdirs = (entry['execution_subdir'] for entry in query)918 group_matches = (group_count_re.match(subdir) for subdir in subdirs)919 ids = [int( for match in group_matches if match]920 if ids:921 next_id = max(ids) + 1922 else:923 next_id = 0924 return '%sgroup%d' % (group_name, next_id)925 def get_group_entries(self, queue_entry_from_group):926 """927 @param queue_entry_from_group: A HostQueueEntry instance to find other928 group entries on this job for.929 @returns A list of HostQueueEntry objects all executing this job as930 part of the same group as the one supplied (having the same931 execution_subdir).932 """933 execution_subdir = queue_entry_from_group.execution_subdir934 return list(HostQueueEntry.fetch(935 where='job_id=%s AND execution_subdir=%s',936 params=(, execution_subdir)))937 def _should_run_cleanup(self, queue_entry):938 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:939 return True940 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:941 return return False943 def _should_run_verify(self, queue_entry):944 do_not_verify = ( ==945 host_protections.Protection.DO_NOT_VERIFY)946 if do_not_verify:947 return False948 # If RebootBefore is set to NEVER, then we won't run reset because949 # we can't cleanup, so we need to weaken a Reset into a Verify.950 weaker_reset = (self.run_reset and951 self.reboot_before == model_attributes.RebootBefore.NEVER)952 return self.run_verify or weaker_reset953 def _should_run_reset(self, queue_entry):954 can_verify = ( !=955 host_protections.Protection.DO_NOT_VERIFY)956 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER957 return (can_reboot and can_verify and (self.run_reset or958 (self._should_run_cleanup(queue_entry) and959 self._should_run_verify(queue_entry))))960 def _should_run_provision(self, queue_entry):961 """962 Determine if the queue_entry needs to have a provision task run before963 it to provision @param queue_entry: The host queue entry in question.965 @returns: True if we should schedule a provision task, False otherwise.966 """967 # If we get to this point, it means that the scheduler has already968 # vetted that all the unprovisionable labels match, so we can just969 # find all labels on the job that aren't on the host to get the list970 # of what we need to provision. (See the scheduling logic in971 # where we discard all972 # actionable labels when assigning jobs to hosts.)973 job_labels = { for x in queue_entry.get_labels()}974 # Skip provision if `skip_provision` is listed in the job labels.975 if provision.SKIP_PROVISION in job_labels:976 return False977 _, host_labels = # If there are any labels on the job that are not on the host and they979 # are labels that provisioning knows how to change, then that means980 # there is provisioning work to do. If there's no provisioning work to981 # do, then obviously we have no reason to schedule a provision task!982 diff = job_labels - set(host_labels)983 if any([provision.Provision.acts_on(x) for x in diff]):984 return True985 return False986 def _queue_special_task(self, queue_entry, task):987 """988 Create a special task and associate it with a host queue entry.989 @param queue_entry: The queue entry this special task should be990 associated with.991 @param task: One of the members of the enum models.SpecialTask.Task.992 @returns: None993 """994 models.SpecialTask.objects.create(995 host=models.Host.objects.get(id=queue_entry.host_id),996 queue_entry=queue_entry, task=task)997 def schedule_pre_job_tasks(self, queue_entry):998 """999 Queue all of the special tasks that need to be run before a host1000 queue entry may run.1001 If no special taskes need to be scheduled, then |on_pending| will be1002 called directly.1003 @returns None1004 """1005 task_queued = False1006 hqe_model = models.HostQueueEntry.objects.get( if self._should_run_provision(queue_entry):1008 self._queue_special_task(hqe_model,1009 models.SpecialTask.Task.PROVISION)1010 task_queued = True1011 elif self._should_run_reset(queue_entry):...

