Best Python code snippet using autotest_python
postjob_task.py
Source:postjob_task.py  
...64        # the task.  post-job tasks continue when the job is aborted.65        pass66    def _pidfile_label(self):67        # '.autoserv_execute' -> 'autoserv'68        return self._pidfile_name()[1:-len('_execute')]69class SelfThrottledPostJobTask(PostJobTask):70    """71    PostJobTask that maintains its own process limit.72    We throttle tasks like parsing because we don't want them to73    hold up tests. At the same time we don't wish to build up load74    that will take forever to parse.75    """76    _num_running_processes = 077    # Last known limit of max processes, used to check whether78    # max processes config has been changed.79    _last_known_max_processes = 080    # Whether an email should be sent to notifiy process limit being hit.81    _notification_on = True82    # Once process limit is hit, an email will be sent.83    # To prevent spams, do not send another email until84    # it drops to lower than the following level.85    REVIVE_NOTIFICATION_THRESHOLD = 0.8086    @classmethod87    def _increment_running_processes(cls):88        cls._num_running_processes += 189        autotest_stats.Gauge('scheduler').send(90                '%s.num_running_processes' % cls.__name__,91                cls._num_running_processes)92    @classmethod93    def _decrement_running_processes(cls):94        cls._num_running_processes -= 195        autotest_stats.Gauge('scheduler').send(96                '%s.num_running_processes' % cls.__name__,97                cls._num_running_processes)98    @classmethod99    def _max_processes(cls):100        raise NotImplementedError101    @classmethod102    def _can_run_new_process(cls):103        return cls._num_running_processes < cls._max_processes()104    def _process_started(self):105        return bool(self.monitor)106    def tick(self):107        # override tick to keep trying to start until the process count goes108        # down and we can, at which point we revert to default behavior109        if self._process_started():110            super(SelfThrottledPostJobTask, self).tick()111        else:112            self._try_starting_process()113    def run(self):114        # override run() to not actually run unless we can115        self._try_starting_process()116    @classmethod117    def _notify_process_limit_hit(cls):118        """Send an email to notify that process limit is hit."""119        if cls._notification_on:120            subject = '%s: hitting max process limit.' % cls.__name__121            message = ('Running processes/Max processes: %d/%d'122                       % (cls._num_running_processes, cls._max_processes()))123            email_manager.manager.enqueue_notify_email(subject, message)124            cls._notification_on = False125    @classmethod126    def _reset_notification_switch_if_necessary(cls):127        """Reset _notification_on if necessary.128        Set _notification_on to True on the following cases:129        1) If the limit of max processes configuration changes;130        2) If _notification_on is False and the number of running processes131           drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.132        """133        if cls._last_known_max_processes != cls._max_processes():134            cls._notification_on = True135            cls._last_known_max_processes = cls._max_processes()136            return137        percentage = float(cls._num_running_processes) / cls._max_processes()138        if (not cls._notification_on and139            percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):140            cls._notification_on = True141    def _try_starting_process(self):142        self._reset_notification_switch_if_necessary()143        if not self._can_run_new_process():144            self._notify_process_limit_hit()145            return146        # actually run the command147        super(SelfThrottledPostJobTask, self).run()148        if self._process_started():149            self._increment_running_processes()150    def finished(self, success):151        super(SelfThrottledPostJobTask, self).finished(success)152        if self._process_started():153            self._decrement_running_processes()154class GatherLogsTask(PostJobTask):155    """156    Task responsible for157    * gathering uncollected logs (if Autoserv crashed hard or was killed)158    * copying logs to the results repository159    * spawning CleanupTasks for hosts, if necessary160    * spawning a FinalReparseTask for the job161    * setting the final status of the host, directly or through a cleanup162    """163    def __init__(self, queue_entries, recover_run_monitor=None):164        self._job = queue_entries[0].job165        super(GatherLogsTask, self).__init__(166            queue_entries, log_file_name='.collect_crashinfo.log')167        self._set_ids(queue_entries=queue_entries)168    # TODO: Refactor into autoserv_utils. crbug.com/243090169    def _generate_command(self, results_dir):170        host_list = ','.join(queue_entry.host.hostname171                             for queue_entry in self.queue_entries)172        return [autoserv_utils.autoserv_path , '-p',173                '--pidfile-label=%s' % self._pidfile_label(),174                '--use-existing-results', '--collect-crashinfo',175                '-m', host_list, '-r', results_dir]176    @property177    def num_processes(self):178        return len(self.queue_entries)179    def _pidfile_name(self):180        return drone_manager.CRASHINFO_PID_FILE181    def prolog(self):182        self._check_queue_entry_statuses(183                self.queue_entries,184                allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),185                allowed_host_statuses=(models.Host.Status.RUNNING,))186        super(GatherLogsTask, self).prolog()187    def epilog(self):188        super(GatherLogsTask, self).epilog()189        self._parse_results(self.queue_entries)190        self._reboot_hosts()191    def _reboot_hosts(self):192        if self._autoserv_monitor.has_process():193            final_success = (self._final_status() ==194                             models.HostQueueEntry.Status.COMPLETED)195            num_tests_failed = self._autoserv_monitor.num_tests_failed()196        else:197            final_success = False198            num_tests_failed = 0199        reboot_after = self._job.reboot_after200        do_reboot = (201                # always reboot after aborted jobs202                self._final_status() == models.HostQueueEntry.Status.ABORTED203                or reboot_after == model_attributes.RebootAfter.ALWAYS204                or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED205                    and final_success and num_tests_failed == 0)206                or num_tests_failed > 0)207        for queue_entry in self.queue_entries:208            if do_reboot:209                # don't pass the queue entry to the CleanupTask. if the cleanup210                # fails, the job doesn't care -- it's over.211                models.SpecialTask.objects.create(212                        host=models.Host.objects.get(id=queue_entry.host.id),213                        task=models.SpecialTask.Task.CLEANUP,214                        requested_by=self._job.owner_model())215            else:216                queue_entry.host.set_status(models.Host.Status.READY)217    def run(self):218        autoserv_exit_code = self._autoserv_monitor.exit_code()219        # only run if Autoserv exited due to some signal. if we have no exit220        # code, assume something bad (and signal-like) happened.221        if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):222            super(GatherLogsTask, self).run()223        else:224            self.finished(True)225class FinalReparseTask(SelfThrottledPostJobTask):226    def __init__(self, queue_entries):227        super(FinalReparseTask, self).__init__(queue_entries,228                                               log_file_name='.parse.log')229        # don't use _set_ids, since we don't want to set the host_ids230        self.queue_entry_ids = [entry.id for entry in queue_entries]231    def _generate_command(self, results_dir):232        return [_parser_path, '--write-pidfile', '--record-duration',233                '-l', '2', '-r', '-o', results_dir]234    @property235    def num_processes(self):236        return 0 # don't include parser processes in accounting237    def _pidfile_name(self):238        return drone_manager.PARSER_PID_FILE239    @classmethod240    def _max_processes(cls):241        return scheduler_config.config.max_parse_processes242    def prolog(self):243        self._check_queue_entry_statuses(244                self.queue_entries,245                allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))246        super(FinalReparseTask, self).prolog()247    def epilog(self):248        super(FinalReparseTask, self).epilog()249        self._archive_results(self.queue_entries)250class ArchiveResultsTask(SelfThrottledPostJobTask):251    _ARCHIVING_FAILED_FILE = '.archiver_failed'252    def __init__(self, queue_entries):253        super(ArchiveResultsTask, self).__init__(queue_entries,254                                                 log_file_name='.archiving.log')255        # don't use _set_ids, since we don't want to set the host_ids256        self.queue_entry_ids = [entry.id for entry in queue_entries]257    def _pidfile_name(self):258        return drone_manager.ARCHIVER_PID_FILE259    # TODO: Refactor into autoserv_utils. crbug.com/243090260    def _generate_command(self, results_dir):261        return [autoserv_utils.autoserv_path , '-p',262                '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,263                '--use-existing-results', '--control-filename=control.archive',264                os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',265                             'archive_results.control.srv')]266    @classmethod267    def _max_processes(cls):268        return scheduler_config.config.max_transfer_processes269    def prolog(self):270        self._check_queue_entry_statuses(271                self.queue_entries,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
