...588 task=models.SpecialTask.Task.CLEANUP,589 host=models.Host.objects.get(,590 requested_by=self.job.owner_model())591 self.set_status(Status.ABORTED)592 self.job.abort_delay_ready_task()593 def get_group_name(self):594 atomic_group = self.atomic_group595 if not atomic_group:596 return ''597 # Look at any meta_host and dependency labels and pick the first598 # one that also specifies this atomic group. Use that label name599 # as the group name if possible (it is more specific).600 for label in self.get_labels():601 if label.atomic_group_id:602 assert label.atomic_group_id == atomic_group.id603 return label.name604 return atomic_group.name605 def execution_tag(self):606 assert self.execution_subdir607 return "%s/%s" % (self.job.tag(), self.execution_subdir)608 def execution_path(self):609 return self.execution_tag()610 def set_started_on_now(self):611 self.update_field('started_on', def is_hostless(self):613 return (self.host_id is None614 and self.meta_host is None615 and self.atomic_group_id is None)616class Job(DBObject):617 _table_name = 'afe_jobs'618 _fields = ('id', 'owner', 'name', 'priority', 'control_file',619 'control_type', 'created_on', 'synch_count', 'timeout',620 'run_verify', 'email_list', 'reboot_before', 'reboot_after',621 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',622 'parameterized_job_id')623 # This does not need to be a column in the DB. The delays are likely to624 # be configured short. If the scheduler is stopped and restarted in625 # the middle of a job's delay cycle, the delay cycle will either be626 # repeated or skipped depending on the number of Pending machines found627 # when the restarted scheduler recovers to track it. Not a problem.628 #629 # A reference to the DelayedCallTask that will wake up the job should630 # no other HQEs change state in time. Its end_time attribute is used631 # by our run_with_ready_delay() method to determine if the wait is over.632 _delay_ready_task = None633 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on634 # all status='Pending' atomic group HQEs incase a delay was running when the635 # scheduler was restarted and no more hosts ever successfully exit Verify.636 def __init__(self, id=None, row=None, **kwargs):637 assert id or row638 super(Job, self).__init__(id=id, row=row, **kwargs)639 self._owner_model = None # caches model instance of owner640 def model(self):641 return models.Job.objects.get( def owner_model(self):643 # work around the fact that the Job owner field is a string, not a644 # foreign key645 if not self._owner_model:646 self._owner_model = models.User.objects.get(login=self.owner)647 return self._owner_model648 def is_server_job(self):649 return self.control_type != 2650 def tag(self):651 return "%s-%s" % (, self.owner)652 def get_host_queue_entries(self):653 rows = _db.execute("""654 SELECT * FROM afe_host_queue_entries655 WHERE job_id= %s656 """, (,))657 entries = [HostQueueEntry(row=i) for i in rows]658 assert len(entries)>0659 return entries660 def get_execution_details(self):661 """662 Get test execution details for this job.663 @return: Dictionary with test execution details664 """665 def _find_test_jobs(rows):666 """667 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*668 Those are autotest 'internal job' tests, so they should not be669 counted when evaluating the test stats.670 @param rows: List of rows (matrix) with database results.671 """672 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')673 n_test_jobs = 0674 for r in rows:675 test_name = r[0]676 if job_test_pattern.match(test_name):677 n_test_jobs += 1678 return n_test_jobs679 stats = {}680 rows = _db.execute("""681 SELECT t.test, s.word, t.reason682 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s683 WHERE t.job_idx = j.job_idx684 AND s.status_idx = t.status685 AND j.afe_job_id = %s686 ORDER BY t.reason687 """ % failed_rows = [r for r in rows if not 'GOOD' in r]689 n_test_jobs = _find_test_jobs(rows)690 n_test_jobs_failed = _find_test_jobs(failed_rows)691 total_executed = len(rows) - n_test_jobs692 total_failed = len(failed_rows) - n_test_jobs_failed693 if total_executed > 0:694 success_rate = 100 - ((total_failed / float(total_executed)) * 100)695 else:696 success_rate = 0697 stats['total_executed'] = total_executed698 stats['total_failed'] = total_failed699 stats['total_passed'] = total_executed - total_failed700 stats['success_rate'] = success_rate701 status_header = ("Test Name", "Status", "Reason")702 if failed_rows:703 stats['failed_rows'] = utils.matrix_to_string(failed_rows,704 status_header)705 else:706 stats['failed_rows'] = ''707 time_row = _db.execute("""708 SELECT started_time, finished_time709 FROM tko_jobs710 WHERE afe_job_id = %s711 """ % if time_row:713 t_begin, t_end = time_row[0]714 try:715 delta = t_end - t_begin716 minutes, seconds = divmod(delta.seconds, 60)717 hours, minutes = divmod(minutes, 60)718 stats['execution_time'] = ("%02d:%02d:%02d" %719 (hours, minutes, seconds))720 # One of t_end or t_begin are None721 except TypeError:722 stats['execution_time'] = '(could not determine)'723 else:724 stats['execution_time'] = '(none)'725 return stats726 def set_status(self, status, update_queues=False):727 self.update_field('status',status)728 if update_queues:729 for queue_entry in self.get_host_queue_entries():730 queue_entry.set_status(status)731 def keyval_dict(self):732 return self.model().keyval_dict()733 def _atomic_and_has_started(self):734 """735 @returns True if any of the HostQueueEntries associated with this job736 have entered the Status.STARTING state or beyond.737 """738 atomic_entries = models.HostQueueEntry.objects.filter(739, atomic_group__isnull=False)740 if atomic_entries.count() <= 0:741 return False742 # These states may *only* be reached if has been called.743 started_statuses = (models.HostQueueEntry.Status.STARTING,744 models.HostQueueEntry.Status.RUNNING,745 models.HostQueueEntry.Status.COMPLETED)746 started_entries = atomic_entries.filter(status__in=started_statuses)747 return started_entries.count() > 0748 def _hosts_assigned_count(self):749 """The number of HostQueueEntries assigned a Host for this job."""750 entries = models.HostQueueEntry.objects.filter(,751 host__isnull=False)752 return entries.count()753 def _pending_count(self):754 """The number of HostQueueEntries for this job in the Pending state."""755 pending_entries = models.HostQueueEntry.objects.filter(756, status=models.HostQueueEntry.Status.PENDING)757 return pending_entries.count()758 def _max_hosts_needed_to_run(self, atomic_group):759 """760 @param atomic_group: The AtomicGroup associated with this job that we761 are using to set an upper bound on the threshold.762 @returns The maximum number of HostQueueEntries assigned a Host before763 this job can run.764 """765 return min(self._hosts_assigned_count(),766 atomic_group.max_number_of_machines)767 def _min_hosts_needed_to_run(self):768 """Return the minumum number of hsots needed to run this job."""769 return self.synch_count770 def is_ready(self):771 # NOTE: Atomic group jobs stop reporting ready after they have been772 # started to avoid launching multiple copies of one atomic job.773 # Only possible if synch_count is less than than half the number of774 # machines in the atomic group.775 pending_count = self._pending_count()776 atomic_and_has_started = self._atomic_and_has_started()777 ready = (pending_count >= self.synch_count778 and not atomic_and_has_started)779 if not ready:780 'Job %s not ready: %s pending, %s required '782 '(Atomic and started: %s)',783 self, pending_count, self.synch_count,784 atomic_and_has_started)785 return ready786 def num_machines(self, clause = None):787 sql = "job_id=%s" % self.id788 if clause:789 sql += " AND (%s)" % clause790 return self.count(sql, table='afe_host_queue_entries')791 def num_queued(self):792 return self.num_machines('not complete')793 def num_active(self):794 return self.num_machines('active')795 def num_complete(self):796 return self.num_machines('complete')797 def is_finished(self):798 return self.num_complete() == self.num_machines()799 def _not_yet_run_entries(self, include_verifying=True):800 statuses = [models.HostQueueEntry.Status.QUEUED,801 models.HostQueueEntry.Status.PENDING]802 if include_verifying:803 statuses.append(models.HostQueueEntry.Status.VERIFYING)804 return models.HostQueueEntry.objects.filter(,805 status__in=statuses)806 def _stop_all_entries(self):807 entries_to_stop = self._not_yet_run_entries(808 include_verifying=False)809 for child_entry in entries_to_stop:810 assert not child_entry.complete, (811 '%s status=%s, active=%s, complete=%s' %812 (, child_entry.status,,813 child_entry.complete))814 if child_entry.status == models.HostQueueEntry.Status.PENDING:815 = models.Host.Status.READY816 child_entry.status = models.HostQueueEntry.Status.STOPPED818 def stop_if_necessary(self):820 not_yet_run = self._not_yet_run_entries()821 if not_yet_run.count() < self.synch_count:822 self._stop_all_entries()823 def write_to_machines_file(self, queue_entry):824 hostname = file_path = os.path.join(self.tag(), '.machines')826 _drone_manager.write_lines_to_file(file_path, [hostname])827 def _next_group_name(self, group_name=''):828 """@returns a directory name to use for the next host group results."""829 if group_name:830 # Sanitize for use as a pathname.831 group_name = group_name.replace(os.path.sep, '_')832 if group_name.startswith('.'):833 group_name = '_' + group_name[1:]834 # Add a separator between the group name and 'group%d'.835 group_name += '.'836 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))837 query = models.HostQueueEntry.objects.filter(838'execution_subdir').distinct()839 subdirs = (entry['execution_subdir'] for entry in query)840 group_matches = (group_count_re.match(subdir) for subdir in subdirs)841 ids = [int( for match in group_matches if match]842 if ids:843 next_id = max(ids) + 1844 else:845 next_id = 0846 return '%sgroup%d' % (group_name, next_id)847 def get_group_entries(self, queue_entry_from_group):848 """849 @param queue_entry_from_group: A HostQueueEntry instance to find other850 group entries on this job for.851 @returns A list of HostQueueEntry objects all executing this job as852 part of the same group as the one supplied (having the same853 execution_subdir).854 """855 execution_subdir = queue_entry_from_group.execution_subdir856 return list(HostQueueEntry.fetch(857 where='job_id=%s AND execution_subdir=%s',858 params=(, execution_subdir)))859 def _should_run_cleanup(self, queue_entry):860 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:861 return True862 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:863 return return False865 def _should_run_verify(self, queue_entry):866 do_not_verify = ( ==867 host_protections.Protection.DO_NOT_VERIFY)868 if do_not_verify:869 return False870 return self.run_verify871 def schedule_pre_job_tasks(self, queue_entry):872 """873 Get a list of tasks to perform before the host_queue_entry874 may be used to run this Job (such as Cleanup & Verify).875 @returns A list of tasks to be done to the given queue_entry before876 it should be considered be ready to run this job. The last877 task in the list calls HostQueueEntry.on_pending(), which878 continues the flow of the job.879 """880 if self._should_run_cleanup(queue_entry):881 task = models.SpecialTask.Task.CLEANUP882 elif self._should_run_verify(queue_entry):883 task = models.SpecialTask.Task.VERIFY884 else:885 queue_entry.on_pending()886 return887 queue_entry = models.HostQueueEntry.objects.get( models.SpecialTask.objects.create(889 host=models.Host.objects.get(id=queue_entry.host_id),890 queue_entry=queue_entry, task=task)891 def _assign_new_group(self, queue_entries, group_name=''):892 if len(queue_entries) == 1:893 group_subdir_name = queue_entries[0].host.hostname894 else:895 group_subdir_name = self._next_group_name(group_name)896'Running synchronous job %d hosts %s as %s',897, [ for entry in queue_entries],898 group_subdir_name)899 for queue_entry in queue_entries:900 queue_entry.set_execution_subdir(group_subdir_name)901 def _choose_group_to_run(self, include_queue_entry):902 """903 @returns A tuple containing a list of HostQueueEntry instances to be904 used to run this Job, a string group name to suggest giving905 to this job in the results database.906 """907 atomic_group = include_queue_entry.atomic_group908 chosen_entries = [include_queue_entry]909 if atomic_group:910 num_entries_wanted = atomic_group.max_number_of_machines911 else:912 num_entries_wanted = self.synch_count913 num_entries_wanted -= len(chosen_entries)914 if num_entries_wanted > 0:915 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'916 pending_entries = list(HostQueueEntry.fetch(917 where=where_clause,918 params=(, # Sort the chosen hosts by hostname before slicing.920 def cmp_queue_entries_by_hostname(entry_a, entry_b):921 return Host.cmp_for_sort(, pending_entries.sort(cmp=cmp_queue_entries_by_hostname)923 chosen_entries += pending_entries[:num_entries_wanted]924 # Sanity check. We'll only ever be called if this can be met.925 if len(chosen_entries) < self.synch_count:926 message = ('job %s got less than %s chosen entries: %s' % (927, self.synch_count, chosen_entries))928 logging.error(message)929 email_manager.manager.enqueue_notify_email(930 'Job not started, too few chosen entries', message)931 return []932 group_name = include_queue_entry.get_group_name()933 self._assign_new_group(chosen_entries, group_name=group_name)934 return chosen_entries935 def run_if_ready(self, queue_entry):936 """937 Run this job by kicking its HQEs into status='Starting' if enough938 hosts are ready for it to run.939 Cleans up by kicking HQEs into status='Stopped' if this Job is not940 ready to run.941 """942 if not self.is_ready():943 self.stop_if_necessary()944 elif queue_entry.atomic_group:945 self.run_with_ready_delay(queue_entry)946 else:947 def run_with_ready_delay(self, queue_entry):949 """950 Start a delay to wait for more hosts to enter Pending state before951 launching an atomic group job. Once set, the a delay cannot be reset.952 @param queue_entry: The HostQueueEntry object to get atomic group953 info from and pass to run_if_ready when the delay is up.954 @returns An Agent to run the job as appropriate or None if a delay955 has already been set.956 """957 assert queue_entry.job_id == self.id958 assert queue_entry.atomic_group959 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts960 over_max_threshold = (self._pending_count() >=961 self._max_hosts_needed_to_run(queue_entry.atomic_group))962 delay_expired = (self._delay_ready_task and963 time.time() >= self._delay_ready_task.end_time)964 # Delay is disabled or we already have enough? Do not wait to run.965 if not delay or over_max_threshold or delay_expired:966 else:968 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)969 def request_abort(self):970 """Request that this Job be aborted on the next scheduler cycle."""971 self.model().abort()972 def schedule_delayed_callback_task(self, queue_entry):973 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)974 if self._delay_ready_task:975 return None976 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts977 def run_job_after_delay():978'Job %s done waiting for extra hosts.', self)979 # Check to see if the job is still relevant. It could have aborted980 # while we were waiting or hosts could have disappearred, etc.981 if self._pending_count() < self._min_hosts_needed_to_run():982'Job %s had too few Pending hosts after waiting '983 'for extras. Not running.', self)984 self.request_abort()985 return986 return'Job %s waiting up to %s seconds for more hosts.',988, delay)989 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,990 callback=run_job_after_delay)991 return self._delay_ready_task992 def run(self, queue_entry):993 """994 @param queue_entry: The HostQueueEntry instance calling this method.995 """996 if queue_entry.atomic_group and self._atomic_and_has_started():997 logging.error(' called on running atomic Job %d '998 'with HQE %s.',, queue_entry)999 return1000 queue_entries = self._choose_group_to_run(queue_entry)1001 if queue_entries:1002 self._finish_run(queue_entries)1003 def _finish_run(self, queue_entries):1004 for queue_entry in queue_entries:1005 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)1006 self.abort_delay_ready_task()1007 def abort_delay_ready_task(self):1008 """Abort the delayed task associated with this job, if any."""1009 if self._delay_ready_task:1010 # Cancel any pending callback that would try to run again1011 # as we are already running.1012 self._delay_ready_task.abort()1013 def __str__(self):...

