Best Python code snippet using autotest_python
scheduler_models.py
Source:scheduler_models.py  
...588                    task=models.SpecialTask.Task.CLEANUP,589                    host=models.Host.objects.get(id=self.host.id),590                    requested_by=self.job.owner_model())591        self.set_status(Status.ABORTED)592        self.job.abort_delay_ready_task()593    def get_group_name(self):594        atomic_group = self.atomic_group595        if not atomic_group:596            return ''597        # Look at any meta_host and dependency labels and pick the first598        # one that also specifies this atomic group.  Use that label name599        # as the group name if possible (it is more specific).600        for label in self.get_labels():601            if label.atomic_group_id:602                assert label.atomic_group_id == atomic_group.id603                return label.name604        return atomic_group.name605    def execution_tag(self):606        assert self.execution_subdir607        return "%s/%s" % (self.job.tag(), self.execution_subdir)608    def execution_path(self):609        return self.execution_tag()610    def set_started_on_now(self):611        self.update_field('started_on', datetime.datetime.now())612    def is_hostless(self):613        return (self.host_id is None614                and self.meta_host is None615                and self.atomic_group_id is None)616class Job(DBObject):617    _table_name = 'afe_jobs'618    _fields = ('id', 'owner', 'name', 'priority', 'control_file',619               'control_type', 'created_on', 'synch_count', 'timeout',620               'run_verify', 'email_list', 'reboot_before', 'reboot_after',621               'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',622               'parameterized_job_id')623    # This does not need to be a column in the DB.  The delays are likely to624    # be configured short.  If the scheduler is stopped and restarted in625    # the middle of a job's delay cycle, the delay cycle will either be626    # repeated or skipped depending on the number of Pending machines found627    # when the restarted scheduler recovers to track it.  Not a problem.628    #629    # A reference to the DelayedCallTask that will wake up the job should630    # no other HQEs change state in time.  Its end_time attribute is used631    # by our run_with_ready_delay() method to determine if the wait is over.632    _delay_ready_task = None633    # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on634    # all status='Pending' atomic group HQEs incase a delay was running when the635    # scheduler was restarted and no more hosts ever successfully exit Verify.636    def __init__(self, id=None, row=None, **kwargs):637        assert id or row638        super(Job, self).__init__(id=id, row=row, **kwargs)639        self._owner_model = None # caches model instance of owner640    def model(self):641        return models.Job.objects.get(id=self.id)642    def owner_model(self):643        # work around the fact that the Job owner field is a string, not a644        # foreign key645        if not self._owner_model:646            self._owner_model = models.User.objects.get(login=self.owner)647        return self._owner_model648    def is_server_job(self):649        return self.control_type != 2650    def tag(self):651        return "%s-%s" % (self.id, self.owner)652    def get_host_queue_entries(self):653        rows = _db.execute("""654                SELECT * FROM afe_host_queue_entries655                WHERE job_id= %s656        """, (self.id,))657        entries = [HostQueueEntry(row=i) for i in rows]658        assert len(entries)>0659        return entries660    def get_execution_details(self):661        """662        Get test execution details for this job.663        @return: Dictionary with test execution details664        """665        def _find_test_jobs(rows):666            """667            Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*668            Those are autotest 'internal job' tests, so they should not be669            counted when evaluating the test stats.670            @param rows: List of rows (matrix) with database results.671            """672            job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')673            n_test_jobs = 0674            for r in rows:675                test_name = r[0]676                if job_test_pattern.match(test_name):677                    n_test_jobs += 1678            return n_test_jobs679        stats = {}680        rows = _db.execute("""681                SELECT t.test, s.word, t.reason682                FROM tko_tests AS t, tko_jobs AS j, tko_status AS s683                WHERE t.job_idx = j.job_idx684                AND s.status_idx = t.status685                AND j.afe_job_id = %s686                ORDER BY t.reason687                """ % self.id)688        failed_rows = [r for r in rows if not 'GOOD' in r]689        n_test_jobs = _find_test_jobs(rows)690        n_test_jobs_failed = _find_test_jobs(failed_rows)691        total_executed = len(rows) - n_test_jobs692        total_failed = len(failed_rows) - n_test_jobs_failed693        if total_executed > 0:694            success_rate = 100 - ((total_failed / float(total_executed)) * 100)695        else:696            success_rate = 0697        stats['total_executed'] = total_executed698        stats['total_failed'] = total_failed699        stats['total_passed'] = total_executed - total_failed700        stats['success_rate'] = success_rate701        status_header = ("Test Name", "Status", "Reason")702        if failed_rows:703            stats['failed_rows'] = utils.matrix_to_string(failed_rows,704                                                          status_header)705        else:706            stats['failed_rows'] = ''707        time_row = _db.execute("""708                   SELECT started_time, finished_time709                   FROM tko_jobs710                   WHERE afe_job_id = %s711                   """ % self.id)712        if time_row:713            t_begin, t_end = time_row[0]714            try:715                delta = t_end - t_begin716                minutes, seconds = divmod(delta.seconds, 60)717                hours, minutes = divmod(minutes, 60)718                stats['execution_time'] = ("%02d:%02d:%02d" %719                                           (hours, minutes, seconds))720            # One of t_end or t_begin are None721            except TypeError:722                stats['execution_time'] = '(could not determine)'723        else:724            stats['execution_time'] = '(none)'725        return stats726    def set_status(self, status, update_queues=False):727        self.update_field('status',status)728        if update_queues:729            for queue_entry in self.get_host_queue_entries():730                queue_entry.set_status(status)731    def keyval_dict(self):732        return self.model().keyval_dict()733    def _atomic_and_has_started(self):734        """735        @returns True if any of the HostQueueEntries associated with this job736        have entered the Status.STARTING state or beyond.737        """738        atomic_entries = models.HostQueueEntry.objects.filter(739                job=self.id, atomic_group__isnull=False)740        if atomic_entries.count() <= 0:741            return False742        # These states may *only* be reached if Job.run() has been called.743        started_statuses = (models.HostQueueEntry.Status.STARTING,744                            models.HostQueueEntry.Status.RUNNING,745                            models.HostQueueEntry.Status.COMPLETED)746        started_entries = atomic_entries.filter(status__in=started_statuses)747        return started_entries.count() > 0748    def _hosts_assigned_count(self):749        """The number of HostQueueEntries assigned a Host for this job."""750        entries = models.HostQueueEntry.objects.filter(job=self.id,751                                                       host__isnull=False)752        return entries.count()753    def _pending_count(self):754        """The number of HostQueueEntries for this job in the Pending state."""755        pending_entries = models.HostQueueEntry.objects.filter(756                job=self.id, status=models.HostQueueEntry.Status.PENDING)757        return pending_entries.count()758    def _max_hosts_needed_to_run(self, atomic_group):759        """760        @param atomic_group: The AtomicGroup associated with this job that we761                are using to set an upper bound on the threshold.762        @returns The maximum number of HostQueueEntries assigned a Host before763                this job can run.764        """765        return min(self._hosts_assigned_count(),766                   atomic_group.max_number_of_machines)767    def _min_hosts_needed_to_run(self):768        """Return the minumum number of hsots needed to run this job."""769        return self.synch_count770    def is_ready(self):771        # NOTE: Atomic group jobs stop reporting ready after they have been772        # started to avoid launching multiple copies of one atomic job.773        # Only possible if synch_count is less than than half the number of774        # machines in the atomic group.775        pending_count = self._pending_count()776        atomic_and_has_started = self._atomic_and_has_started()777        ready = (pending_count >= self.synch_count778                 and not atomic_and_has_started)779        if not ready:780            logging.info(781                    'Job %s not ready: %s pending, %s required '782                    '(Atomic and started: %s)',783                    self, pending_count, self.synch_count,784                    atomic_and_has_started)785        return ready786    def num_machines(self, clause = None):787        sql = "job_id=%s" % self.id788        if clause:789            sql += " AND (%s)" % clause790        return self.count(sql, table='afe_host_queue_entries')791    def num_queued(self):792        return self.num_machines('not complete')793    def num_active(self):794        return self.num_machines('active')795    def num_complete(self):796        return self.num_machines('complete')797    def is_finished(self):798        return self.num_complete() == self.num_machines()799    def _not_yet_run_entries(self, include_verifying=True):800        statuses = [models.HostQueueEntry.Status.QUEUED,801                    models.HostQueueEntry.Status.PENDING]802        if include_verifying:803            statuses.append(models.HostQueueEntry.Status.VERIFYING)804        return models.HostQueueEntry.objects.filter(job=self.id,805                                                    status__in=statuses)806    def _stop_all_entries(self):807        entries_to_stop = self._not_yet_run_entries(808            include_verifying=False)809        for child_entry in entries_to_stop:810            assert not child_entry.complete, (811                '%s status=%s, active=%s, complete=%s' %812                (child_entry.id, child_entry.status, child_entry.active,813                 child_entry.complete))814            if child_entry.status == models.HostQueueEntry.Status.PENDING:815                child_entry.host.status = models.Host.Status.READY816                child_entry.host.save()817            child_entry.status = models.HostQueueEntry.Status.STOPPED818            child_entry.save()819    def stop_if_necessary(self):820        not_yet_run = self._not_yet_run_entries()821        if not_yet_run.count() < self.synch_count:822            self._stop_all_entries()823    def write_to_machines_file(self, queue_entry):824        hostname = queue_entry.host.hostname825        file_path = os.path.join(self.tag(), '.machines')826        _drone_manager.write_lines_to_file(file_path, [hostname])827    def _next_group_name(self, group_name=''):828        """@returns a directory name to use for the next host group results."""829        if group_name:830            # Sanitize for use as a pathname.831            group_name = group_name.replace(os.path.sep, '_')832            if group_name.startswith('.'):833                group_name = '_' + group_name[1:]834            # Add a separator between the group name and 'group%d'.835            group_name += '.'836        group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))837        query = models.HostQueueEntry.objects.filter(838            job=self.id).values('execution_subdir').distinct()839        subdirs = (entry['execution_subdir'] for entry in query)840        group_matches = (group_count_re.match(subdir) for subdir in subdirs)841        ids = [int(match.group(1)) for match in group_matches if match]842        if ids:843            next_id = max(ids) + 1844        else:845            next_id = 0846        return '%sgroup%d' % (group_name, next_id)847    def get_group_entries(self, queue_entry_from_group):848        """849        @param queue_entry_from_group: A HostQueueEntry instance to find other850                group entries on this job for.851        @returns A list of HostQueueEntry objects all executing this job as852                part of the same group as the one supplied (having the same853                execution_subdir).854        """855        execution_subdir = queue_entry_from_group.execution_subdir856        return list(HostQueueEntry.fetch(857            where='job_id=%s AND execution_subdir=%s',858            params=(self.id, execution_subdir)))859    def _should_run_cleanup(self, queue_entry):860        if self.reboot_before == model_attributes.RebootBefore.ALWAYS:861            return True862        elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:863            return queue_entry.host.dirty864        return False865    def _should_run_verify(self, queue_entry):866        do_not_verify = (queue_entry.host.protection ==867                         host_protections.Protection.DO_NOT_VERIFY)868        if do_not_verify:869            return False870        return self.run_verify871    def schedule_pre_job_tasks(self, queue_entry):872        """873        Get a list of tasks to perform before the host_queue_entry874        may be used to run this Job (such as Cleanup & Verify).875        @returns A list of tasks to be done to the given queue_entry before876                it should be considered be ready to run this job.  The last877                task in the list calls HostQueueEntry.on_pending(), which878                continues the flow of the job.879        """880        if self._should_run_cleanup(queue_entry):881            task = models.SpecialTask.Task.CLEANUP882        elif self._should_run_verify(queue_entry):883            task = models.SpecialTask.Task.VERIFY884        else:885            queue_entry.on_pending()886            return887        queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)888        models.SpecialTask.objects.create(889                host=models.Host.objects.get(id=queue_entry.host_id),890                queue_entry=queue_entry, task=task)891    def _assign_new_group(self, queue_entries, group_name=''):892        if len(queue_entries) == 1:893            group_subdir_name = queue_entries[0].host.hostname894        else:895            group_subdir_name = self._next_group_name(group_name)896            logging.info('Running synchronous job %d hosts %s as %s',897                self.id, [entry.host.hostname for entry in queue_entries],898                group_subdir_name)899        for queue_entry in queue_entries:900            queue_entry.set_execution_subdir(group_subdir_name)901    def _choose_group_to_run(self, include_queue_entry):902        """903        @returns A tuple containing a list of HostQueueEntry instances to be904                used to run this Job, a string group name to suggest giving905                to this job in the results database.906        """907        atomic_group = include_queue_entry.atomic_group908        chosen_entries = [include_queue_entry]909        if atomic_group:910            num_entries_wanted = atomic_group.max_number_of_machines911        else:912            num_entries_wanted = self.synch_count913        num_entries_wanted -= len(chosen_entries)914        if num_entries_wanted > 0:915            where_clause = 'job_id = %s AND status = "Pending" AND id != %s'916            pending_entries = list(HostQueueEntry.fetch(917                     where=where_clause,918                     params=(self.id, include_queue_entry.id)))919            # Sort the chosen hosts by hostname before slicing.920            def cmp_queue_entries_by_hostname(entry_a, entry_b):921                return Host.cmp_for_sort(entry_a.host, entry_b.host)922            pending_entries.sort(cmp=cmp_queue_entries_by_hostname)923            chosen_entries += pending_entries[:num_entries_wanted]924        # Sanity check.  We'll only ever be called if this can be met.925        if len(chosen_entries) < self.synch_count:926            message = ('job %s got less than %s chosen entries: %s' % (927                    self.id, self.synch_count, chosen_entries))928            logging.error(message)929            email_manager.manager.enqueue_notify_email(930                    'Job not started, too few chosen entries', message)931            return []932        group_name = include_queue_entry.get_group_name()933        self._assign_new_group(chosen_entries, group_name=group_name)934        return chosen_entries935    def run_if_ready(self, queue_entry):936        """937        Run this job by kicking its HQEs into status='Starting' if enough938        hosts are ready for it to run.939        Cleans up by kicking HQEs into status='Stopped' if this Job is not940        ready to run.941        """942        if not self.is_ready():943            self.stop_if_necessary()944        elif queue_entry.atomic_group:945            self.run_with_ready_delay(queue_entry)946        else:947            self.run(queue_entry)948    def run_with_ready_delay(self, queue_entry):949        """950        Start a delay to wait for more hosts to enter Pending state before951        launching an atomic group job.  Once set, the a delay cannot be reset.952        @param queue_entry: The HostQueueEntry object to get atomic group953                info from and pass to run_if_ready when the delay is up.954        @returns An Agent to run the job as appropriate or None if a delay955                has already been set.956        """957        assert queue_entry.job_id == self.id958        assert queue_entry.atomic_group959        delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts960        over_max_threshold = (self._pending_count() >=961                self._max_hosts_needed_to_run(queue_entry.atomic_group))962        delay_expired = (self._delay_ready_task and963                         time.time() >= self._delay_ready_task.end_time)964        # Delay is disabled or we already have enough?  Do not wait to run.965        if not delay or over_max_threshold or delay_expired:966            self.run(queue_entry)967        else:968            queue_entry.set_status(models.HostQueueEntry.Status.WAITING)969    def request_abort(self):970        """Request that this Job be aborted on the next scheduler cycle."""971        self.model().abort()972    def schedule_delayed_callback_task(self, queue_entry):973        queue_entry.set_status(models.HostQueueEntry.Status.PENDING)974        if self._delay_ready_task:975            return None976        delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts977        def run_job_after_delay():978            logging.info('Job %s done waiting for extra hosts.', self)979            # Check to see if the job is still relevant.  It could have aborted980            # while we were waiting or hosts could have disappearred, etc.981            if self._pending_count() < self._min_hosts_needed_to_run():982                logging.info('Job %s had too few Pending hosts after waiting '983                             'for extras.  Not running.', self)984                self.request_abort()985                return986            return self.run(queue_entry)987        logging.info('Job %s waiting up to %s seconds for more hosts.',988                     self.id, delay)989        self._delay_ready_task = DelayedCallTask(delay_seconds=delay,990                                                 callback=run_job_after_delay)991        return self._delay_ready_task992    def run(self, queue_entry):993        """994        @param queue_entry: The HostQueueEntry instance calling this method.995        """996        if queue_entry.atomic_group and self._atomic_and_has_started():997            logging.error('Job.run() called on running atomic Job %d '998                          'with HQE %s.', self.id, queue_entry)999            return1000        queue_entries = self._choose_group_to_run(queue_entry)1001        if queue_entries:1002            self._finish_run(queue_entries)1003    def _finish_run(self, queue_entries):1004        for queue_entry in queue_entries:1005            queue_entry.set_status(models.HostQueueEntry.Status.STARTING)1006        self.abort_delay_ready_task()1007    def abort_delay_ready_task(self):1008        """Abort the delayed task associated with this job, if any."""1009        if self._delay_ready_task:1010            # Cancel any pending callback that would try to run again1011            # as we are already running.1012            self._delay_ready_task.abort()1013    def __str__(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
