How to use _email_on_job_complete method in autotest

...515 # This should be fine, because nothing critical checks finished_on,516 # and the scheduler should never be killed mid-tick.517 if complete:518 self._on_complete(status)519 self._email_on_job_complete()520 self.update_field('complete', complete)521 should_email_status = (status.lower() in _notify_email_statuses or522 'all' in _notify_email_statuses)523 if should_email_status:524 self._email_on_status(status)525 logging.debug('HQE Set Status Complete')526 def _on_complete(self, status):527 metric_fields = {'status': status.lower()}528 if metric_fields['board'] = or ''530 if len( == 1:531 metric_fields['pool'] =[0]532 else:533 metric_fields['pool'] = 'MULTIPLE'534 else:535 metric_fields['board'] = 'NO_HOST'536 metric_fields['pool'] = 'NO_HOST'537 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)538 if status is not models.HostQueueEntry.Status.ABORTED:539 self.job.stop_if_necessary()540 if self.started_on:541 self.set_finished_on_now()542 self._log_trace()543 if self.job.shard_id is not None:544 # If shard_id is None, the job will be synced back to the master545 self.job.update_field('shard_id', None)546 if not self.execution_subdir:547 return548 # unregister any possible pidfiles associated with this queue entry549 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:550 pidfile_id = _drone_manager.get_pidfile_id_from(551 self.execution_path(), pidfile_name=pidfile_name)552 _drone_manager.unregister_pidfile(pidfile_id)553 def _log_trace(self):554 """Emits a Cloud Trace span for the HQE's duration."""555 if self.started_on and self.finished_on:556 span = cloud_trace.Span('HQE', spanId='0',557 traceId=hqe_trace_id( # TODO(phobbs) make a .SetStart() and .SetEnd() helper method559 span.startTime = types.Timestamp()560 span.startTime.FromDatetime(self.started_on)561 span.endTime = types.Timestamp()562 span.endTime.FromDatetime(self.finished_on)563 # TODO(phobbs) any LogSpan calls need to be wrapped in this for564 # safety during tests, so this should be caught within LogSpan.565 try:566 cloud_trace.LogSpan(span)567 except IOError as e:568 if e.errno == errno.ENOENT:569 logging.warning('Error writing to cloud trace results '570 'directory: %s', e)571 def _get_status_email_contents(self, status, summary=None, hostname=None):572 """573 Gather info for the status notification e-mails.574 If needed, we could start using the Django templating engine to create575 the subject and the e-mail body, but that doesn't seem necessary right576 now.577 @param status: Job status text. Mandatory.578 @param summary: Job summary text. Optional.579 @param hostname: A hostname for the job. Optional.580 @return: Tuple (subject, body) for the notification e-mail.581 """582 job_stats = Job( subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %584 (,, status))585 if hostname is not None:586 subject += '| Hostname: %s ' % hostname587 if status not in ["1 Failed", "Failed"]:588 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']589 body = "Job ID: %s\n" % self.job.id590 body += "Job name: %s\n" % self.job.name591 if hostname is not None:592 body += "Host: %s\n" % hostname593 if summary is not None:594 body += "Summary: %s\n" % summary595 body += "Status: %s\n" % status596 body += "Results interface URL: %s\n" % self._view_job_url()597 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']598 if int(job_stats['total_executed']) > 0:599 body += "User tests executed: %s\n" % job_stats['total_executed']600 body += "User tests passed: %s\n" % job_stats['total_passed']601 body += "User tests failed: %s\n" % job_stats['total_failed']602 body += ("User tests success rate: %.2f %%\n" %603 job_stats['success_rate'])604 if job_stats['failed_rows']:605 body += "Failures:\n"606 body += job_stats['failed_rows']607 return subject, body608 def _email_on_status(self, status):609 hostname = self._get_hostname()610 subject, body = self._get_status_email_contents(status, None, hostname)611 email_manager.manager.send_email(self.job.email_list, subject, body)612 def _email_on_job_complete(self):613 if not self.job.is_finished():614 return615 summary = []616 hosts_queue = HostQueueEntry.fetch('job_id = %s' % for queue_entry in hosts_queue:618 summary.append("Host: %s Status: %s" %619 (queue_entry._get_hostname(),620 queue_entry.status))621 summary = "\n".join(summary)622 status_counts = models.Job.objects.get_status_counts(623 [])[]624 status = ', '.join('%d %s' % (count, status) for status, count625 in status_counts.iteritems())626 subject, body = self._get_status_email_contents(status, summary, None)...

