Best Python code snippet using autotest_python
scheduler_models.py
Source:scheduler_models.py  
...502        for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:503            pidfile_id = _drone_manager.get_pidfile_id_from(504                self.execution_path(), pidfile_name=pidfile_name)505            _drone_manager.unregister_pidfile(pidfile_id)506    def _get_status_email_contents(self, status, summary=None, hostname=None):507        """508        Gather info for the status notification e-mails.509        If needed, we could start using the Django templating engine to create510        the subject and the e-mail body, but that doesn't seem necessary right511        now.512        :param status: Job status text. Mandatory.513        :param summary: Job summary text. Optional.514        :param hostname: A hostname for the job. Optional.515        :return: Tuple (subject, body) for the notification e-mail.516        """517        job_stats = Job(id=self.job.id).get_execution_details()518        subject = 'Autotest #%s' % self.job.id519        if hostname is not None:520            subject += ' | %s on %s' % (self.job.name, hostname)521        else:522            subject += ' | %s' % self.job.name523        status = status.split()[-1]524        if status == "Completed":525            subject += ' | success: %.2f%%' % job_stats['success_rate']526        else:527            subject += ' | %s' % status528        body = ""529        if int(job_stats['total_executed']) > 0:530            body += ("run: %s | pass: %s | skip: %s | fail: %s" %531                     (job_stats['total_executed'], job_stats['total_passed'],532                      job_stats['total_skipped'], job_stats['total_failed']))533            e_time = job_stats['execution_time']534            if e_time not in ['(could not determine)', '(none)']:535                body += " | runtime: %s" % e_time536            if status == "Completed":537                body += " | success: %.2f%%" % job_stats['success_rate']538            else:539                body += "\n[Warning] Job status: %s" % status540            body += "\n%s\n\n" % self._view_job_url()541        body += job_stats['fail_detail']542        body += job_stats['warn_detail']543        body += job_stats['skip_detail']544        body += job_stats['pass_detail']545        keyval_list = job_stats['keyval_dict_list']546        if keyval_list:547            for kv in keyval_list:548                k, v = kv.items()[0]549                body += "%s:\n" % k550                for part in v.split():551                    body += "  %s\n" % part552                body += "\n"553        if hostname is not None:554            body += "Job was run on host %s\n" % hostname555        body += ("For more details, check the full report on the web:\n%s\n" %556                 self._view_job_url())557        return subject, body558    def _email_on_status(self, status):559        hostname = self._get_hostname()560        subject, body = self._get_status_email_contents(status, None, hostname)561        mail.manager.send(self.job.email_list, subject, body)562    def _email_admin_on_status(self, status):563        hostname = self._get_hostname()564        subject, body = self._get_status_email_contents(status, None, hostname)565        mail.manager.send(_grid_admin_email, subject, body)566    def _email_on_job_complete(self, email_admin=False):567        if not self.job.is_finished():568            return569        summary = []570        hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)571        for queue_entry in hosts_queue:572            summary.append("Host: %s Status: %s" %573                           (queue_entry._get_hostname(),574                            queue_entry.status))575        summary = "\n".join(summary)576        status_counts = models.Job.objects.get_status_counts(577            [self.job.id])[self.job.id]578        status = ', '.join('%d %s' % (count, status) for status, count579                           in status_counts.iteritems())580        subject, body = self._get_status_email_contents(status, summary, None)581        if email_admin:582            mail.manager.send(_grid_admin_email, subject, body)583        else:584            mail.manager.send(self.job.email_list, subject, body)585    def schedule_pre_job_tasks(self):586        logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",587                     self.job.name, self.meta_host, self.atomic_group_id,588                     self.job.id, self.id, self.host.hostname, self.status)589        # just before doing any actual work reserve the host if requested590        if self.job.reserve_hosts:591            logging.info("Job %s, reserving %s for %s",592                         self.job.id, self.host.hostname, self.job.owner)593            reservations.create([self.host.hostname], self.job.owner)594        self._do_schedule_pre_job_tasks()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
