How to use schedule_pre_job_tasks method in autotest

Best Python code snippet using autotest_python

scheduler_models.py

Source:scheduler_models.py Github

copy

Full Screen

...624 status = ', '.join('%d %s' % (count, status) for status, count625 in status_counts.iteritems())626 subject, body = self._get_status_email_contents(status, summary, None)627 email_manager.manager.send_email(self.job.email_list, subject, body)628 def schedule_pre_job_tasks(self):629 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",630 self.job.name, self.meta_host, self.atomic_group_id,631 self.job.id, self.id, self.host.hostname, self.status)632 self._do_schedule_pre_job_tasks()633 def _do_schedule_pre_job_tasks(self):634 self.job.schedule_pre_job_tasks(queue_entry=self)635 def requeue(self):636 assert self.host637 self.set_status(models.HostQueueEntry.Status.QUEUED)638 self.update_field('started_on', None)639 self.update_field('finished_on', None)640 # verify/cleanup failure sets the execution subdir, so reset it here641 self.set_execution_subdir('')642 if self.meta_host:643 self.set_host(None)644 @property645 def aborted_by(self):646 self._load_abort_info()647 return self._aborted_by648 @property649 def aborted_on(self):650 self._load_abort_info()651 return self._aborted_on652 def _load_abort_info(self):653 """ Fetch info about who aborted the job. """654 if hasattr(self, "_aborted_by"):655 return656 rows = _db.execute("""657 SELECT afe_users.login,658 afe_aborted_host_queue_entries.aborted_on659 FROM afe_aborted_host_queue_entries660 INNER JOIN afe_users661 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id662 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s663 """, (self.id,))664 if rows:665 self._aborted_by, self._aborted_on = rows[0]666 else:667 self._aborted_by = self._aborted_on = None668 def on_pending(self):669 """670 Called when an entry in a synchronous job has passed verify. If the671 job is ready to run, sets the entries to STARTING. Otherwise, it leaves672 them in PENDING.673 """674 self.set_status(models.HostQueueEntry.Status.PENDING)675 if not self.host:676 raise scheduler_lib.NoHostIdError(677 'Failed to recover a job whose host_queue_entry_id=%r due'678 ' to no host_id.'679 % self.id)680 self.host.set_status(models.Host.Status.PENDING)681 # Some debug code here: sends an email if an asynchronous job does not682 # immediately enter Starting.683 # TODO: Remove this once we figure out why asynchronous jobs are getting684 # stuck in Pending.685 self.job.run_if_ready(queue_entry=self)686 if (self.job.synch_count == 1 and687 self.status == models.HostQueueEntry.Status.PENDING):688 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)689 message = 'Asynchronous job stuck in Pending'690 email_manager.manager.enqueue_notify_email(subject, message)691 def abort(self, dispatcher):692 assert self.aborted and not self.complete693 Status = models.HostQueueEntry.Status694 if self.status in {Status.GATHERING, Status.PARSING}:695 # do nothing; post-job tasks will finish and then mark this entry696 # with status "Aborted" and take care of the host697 return698 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:699 # If hqe is in any of these status, it should not have any700 # unfinished agent before it can be aborted.701 agents = dispatcher.get_agents_for_entry(self)702 # Agent with finished task can be left behind. This is added to703 # handle the special case of aborting hostless job in STARTING704 # status, in which the agent has only a HostlessQueueTask705 # associated. The finished HostlessQueueTask will be cleaned up in706 # the next tick, so it's safe to leave the agent there. Without707 # filtering out finished agent, HQE abort won't be able to proceed.708 assert all([agent.is_done() for agent in agents])709 # If hqe is still in STARTING status, it may not have assigned a710 # host yet.711 if self.host:712 self.host.set_status(models.Host.Status.READY)713 elif (self.status == Status.VERIFYING or714 self.status == Status.RESETTING):715 models.SpecialTask.objects.create(716 task=models.SpecialTask.Task.CLEANUP,717 host=models.Host.objects.get(id=self.host.id),718 requested_by=self.job.owner_model())719 elif self.status == Status.PROVISIONING:720 models.SpecialTask.objects.create(721 task=models.SpecialTask.Task.REPAIR,722 host=models.Host.objects.get(id=self.host.id),723 requested_by=self.job.owner_model())724 self.set_status(Status.ABORTED)725 def execution_tag(self):726 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '727 'complete!=1 AND execution_subdir="" AND '728 'status!="Queued";')729 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '730 'status="Aborted" WHERE id=%s;')731 try:732 assert self.execution_subdir733 except AssertionError:734 # TODO(scottz): Remove temporary fix/info gathering pathway for735 # crosbug.com/31595 once issue is root caused.736 logging.error('No execution_subdir for host queue id:%s.', self.id)737 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)738 for row in _db.execute(SQL_SUSPECT_ENTRIES):739 logging.error(row)740 logging.error('====DB DEBUG====\n')741 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id742 logging.error('EXECUTING: %s', fix_query)743 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)744 raise AssertionError(('self.execution_subdir not found. '745 'See log for details.'))746 return "%s/%s" % (self.job.tag(), self.execution_subdir)747 def execution_path(self):748 return self.execution_tag()749 def set_started_on_now(self):750 self.update_field('started_on', datetime.datetime.now())751 def set_finished_on_now(self):752 self.update_field('finished_on', datetime.datetime.now())753 def is_hostless(self):754 return (self.host_id is None755 and self.meta_host is None)756def hqe_trace_id(hqe_id):757 """Constructs the canonical trace id based on the HQE's id.758 Encodes 'HQE' in base16 and concatenates with the hex representation759 of the HQE's id.760 @param hqe_id: The HostQueueEntry's id.761 Returns:762 A trace id (in hex format)763 """764 return base64.b16encode('HQE') + hex(hqe_id)[2:]765class Job(DBObject):766 _table_name = 'afe_jobs'767 _fields = ('id', 'owner', 'name', 'priority', 'control_file',768 'control_type', 'created_on', 'synch_count', 'timeout',769 'run_verify', 'email_list', 'reboot_before', 'reboot_after',770 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',771 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',772 'test_retry', 'run_reset', 'timeout_mins', 'shard_id',773 'require_ssp')774 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on775 # all status='Pending' atomic group HQEs incase a delay was running when the776 # scheduler was restarted and no more hosts ever successfully exit Verify.777 def __init__(self, id=None, row=None, **kwargs):778 assert id or row779 super(Job, self).__init__(id=id, row=row, **kwargs)780 self._owner_model = None # caches model instance of owner781 self.update_image_path = None # path of OS image to install782 def model(self):783 return models.Job.objects.get(id=self.id)784 def owner_model(self):785 # work around the fact that the Job owner field is a string, not a786 # foreign key787 if not self._owner_model:788 self._owner_model = models.User.objects.get(login=self.owner)789 return self._owner_model790 def tag(self):791 return "%s-%s" % (self.id, self.owner)792 def get_execution_details(self):793 """794 Get test execution details for this job.795 @return: Dictionary with test execution details796 """797 def _find_test_jobs(rows):798 """799 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*800 Those are autotest 'internal job' tests, so they should not be801 counted when evaluating the test stats.802 @param rows: List of rows (matrix) with database results.803 """804 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')805 n_test_jobs = 0806 for r in rows:807 test_name = r[0]808 if job_test_pattern.match(test_name):809 n_test_jobs += 1810 return n_test_jobs811 stats = {}812 rows = _db.execute("""813 SELECT t.test, s.word, t.reason814 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s815 WHERE t.job_idx = j.job_idx816 AND s.status_idx = t.status817 AND j.afe_job_id = %s818 ORDER BY t.reason819 """ % self.id)820 failed_rows = [r for r in rows if not r[1] == 'GOOD']821 n_test_jobs = _find_test_jobs(rows)822 n_test_jobs_failed = _find_test_jobs(failed_rows)823 total_executed = len(rows) - n_test_jobs824 total_failed = len(failed_rows) - n_test_jobs_failed825 if total_executed > 0:826 success_rate = 100 - ((total_failed / float(total_executed)) * 100)827 else:828 success_rate = 0829 stats['total_executed'] = total_executed830 stats['total_failed'] = total_failed831 stats['total_passed'] = total_executed - total_failed832 stats['success_rate'] = success_rate833 status_header = ("Test Name", "Status", "Reason")834 if failed_rows:835 stats['failed_rows'] = utils.matrix_to_string(failed_rows,836 status_header)837 else:838 stats['failed_rows'] = ''839 time_row = _db.execute("""840 SELECT started_time, finished_time841 FROM tko_jobs842 WHERE afe_job_id = %s843 """ % self.id)844 if time_row:845 t_begin, t_end = time_row[0]846 try:847 delta = t_end - t_begin848 minutes, seconds = divmod(delta.seconds, 60)849 hours, minutes = divmod(minutes, 60)850 stats['execution_time'] = ("%02d:%02d:%02d" %851 (hours, minutes, seconds))852 # One of t_end or t_begin are None853 except TypeError:854 stats['execution_time'] = '(could not determine)'855 else:856 stats['execution_time'] = '(none)'857 return stats858 def keyval_dict(self):859 return self.model().keyval_dict()860 def _pending_count(self):861 """The number of HostQueueEntries for this job in the Pending state."""862 pending_entries = models.HostQueueEntry.objects.filter(863 job=self.id, status=models.HostQueueEntry.Status.PENDING)864 return pending_entries.count()865 def is_ready(self):866 pending_count = self._pending_count()867 ready = (pending_count >= self.synch_count)868 if not ready:869 logging.info(870 'Job %s not ready: %s pending, %s required ',871 self, pending_count, self.synch_count)872 return ready873 def num_machines(self, clause = None):874 sql = "job_id=%s" % self.id875 if clause:876 sql += " AND (%s)" % clause877 return self.count(sql, table='afe_host_queue_entries')878 def num_queued(self):879 return self.num_machines('not complete')880 def num_active(self):881 return self.num_machines('active')882 def num_complete(self):883 return self.num_machines('complete')884 def is_finished(self):885 return self.num_complete() == self.num_machines()886 def _not_yet_run_entries(self, include_active=True):887 if include_active:888 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)889 else:890 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)891 return models.HostQueueEntry.objects.filter(job=self.id,892 status__in=statuses)893 def _stop_all_entries(self):894 """Stops the job's inactive pre-job HQEs."""895 entries_to_stop = self._not_yet_run_entries(896 include_active=False)897 for child_entry in entries_to_stop:898 assert not child_entry.complete, (899 '%s status=%s, active=%s, complete=%s' %900 (child_entry.id, child_entry.status, child_entry.active,901 child_entry.complete))902 if child_entry.status == models.HostQueueEntry.Status.PENDING:903 child_entry.host.status = models.Host.Status.READY904 child_entry.host.save()905 child_entry.status = models.HostQueueEntry.Status.STOPPED906 child_entry.save()907 def stop_if_necessary(self):908 not_yet_run = self._not_yet_run_entries()909 if not_yet_run.count() < self.synch_count:910 self._stop_all_entries()911 def _next_group_name(self):912 """@returns a directory name to use for the next host group results."""913 group_name = ''914 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))915 query = models.HostQueueEntry.objects.filter(916 job=self.id).values('execution_subdir').distinct()917 subdirs = (entry['execution_subdir'] for entry in query)918 group_matches = (group_count_re.match(subdir) for subdir in subdirs)919 ids = [int(match.group(1)) for match in group_matches if match]920 if ids:921 next_id = max(ids) + 1922 else:923 next_id = 0924 return '%sgroup%d' % (group_name, next_id)925 def get_group_entries(self, queue_entry_from_group):926 """927 @param queue_entry_from_group: A HostQueueEntry instance to find other928 group entries on this job for.929 @returns A list of HostQueueEntry objects all executing this job as930 part of the same group as the one supplied (having the same931 execution_subdir).932 """933 execution_subdir = queue_entry_from_group.execution_subdir934 return list(HostQueueEntry.fetch(935 where='job_id=%s AND execution_subdir=%s',936 params=(self.id, execution_subdir)))937 def _should_run_cleanup(self, queue_entry):938 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:939 return True940 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:941 return queue_entry.host.dirty942 return False943 def _should_run_verify(self, queue_entry):944 do_not_verify = (queue_entry.host.protection ==945 host_protections.Protection.DO_NOT_VERIFY)946 if do_not_verify:947 return False948 # If RebootBefore is set to NEVER, then we won't run reset because949 # we can't cleanup, so we need to weaken a Reset into a Verify.950 weaker_reset = (self.run_reset and951 self.reboot_before == model_attributes.RebootBefore.NEVER)952 return self.run_verify or weaker_reset953 def _should_run_reset(self, queue_entry):954 can_verify = (queue_entry.host.protection !=955 host_protections.Protection.DO_NOT_VERIFY)956 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER957 return (can_reboot and can_verify and (self.run_reset or958 (self._should_run_cleanup(queue_entry) and959 self._should_run_verify(queue_entry))))960 def _should_run_provision(self, queue_entry):961 """962 Determine if the queue_entry needs to have a provision task run before963 it to provision queue_entry.host.964 @param queue_entry: The host queue entry in question.965 @returns: True if we should schedule a provision task, False otherwise.966 """967 # If we get to this point, it means that the scheduler has already968 # vetted that all the unprovisionable labels match, so we can just969 # find all labels on the job that aren't on the host to get the list970 # of what we need to provision. (See the scheduling logic in971 # host_scheduler.py:is_host_eligable_for_job() where we discard all972 # actionable labels when assigning jobs to hosts.)973 job_labels = {x.name for x in queue_entry.get_labels()}974 # Skip provision if `skip_provision` is listed in the job labels.975 if provision.SKIP_PROVISION in job_labels:976 return False977 _, host_labels = queue_entry.host.platform_and_labels()978 # If there are any labels on the job that are not on the host and they979 # are labels that provisioning knows how to change, then that means980 # there is provisioning work to do. If there's no provisioning work to981 # do, then obviously we have no reason to schedule a provision task!982 diff = job_labels - set(host_labels)983 if any([provision.Provision.acts_on(x) for x in diff]):984 return True985 return False986 def _queue_special_task(self, queue_entry, task):987 """988 Create a special task and associate it with a host queue entry.989 @param queue_entry: The queue entry this special task should be990 associated with.991 @param task: One of the members of the enum models.SpecialTask.Task.992 @returns: None993 """994 models.SpecialTask.objects.create(995 host=models.Host.objects.get(id=queue_entry.host_id),996 queue_entry=queue_entry, task=task)997 def schedule_pre_job_tasks(self, queue_entry):998 """999 Queue all of the special tasks that need to be run before a host1000 queue entry may run.1001 If no special taskes need to be scheduled, then |on_pending| will be1002 called directly.1003 @returns None1004 """1005 task_queued = False1006 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)1007 if self._should_run_provision(queue_entry):1008 self._queue_special_task(hqe_model,1009 models.SpecialTask.Task.PROVISION)1010 task_queued = True1011 elif self._should_run_reset(queue_entry):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful