Best Python code snippet using autotest_python
monitor_db.py
Source:monitor_db.py  
...430        agent.dispatcher = self431        self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)432        self._register_agent_for_ids(self._queue_entry_agents,433                                     agent.queue_entry_ids, agent)434    def get_agents_for_entry(self, queue_entry):435        """436        Find agents corresponding to the specified queue_entry.437        """438        return list(self._queue_entry_agents.get(queue_entry.id, set()))439    def host_has_agent(self, host):440        """441        Determine if there is currently an Agent present using this host.442        """443        return bool(self._host_agents.get(host.id, None))444    def remove_agent(self, agent):445        self._agents.remove(agent)446        self._unregister_agent_for_ids(self._host_agents, agent.host_ids,447                                       agent)448        self._unregister_agent_for_ids(self._queue_entry_agents,449                                       agent.queue_entry_ids, agent)450    def _host_has_scheduled_special_task(self, host):451        return bool(models.SpecialTask.objects.filter(host__id=host.id,452                                                      is_active=False,453                                                      is_complete=False))454    def _recover_processes(self):455        agent_tasks = self._create_recovery_agent_tasks()456        self._register_pidfiles(agent_tasks)457        _drone_manager.refresh()458        self._recover_tasks(agent_tasks)459        self._recover_pending_entries()460        self._check_for_unrecovered_verifying_entries()461        self._reverify_remaining_hosts()462        # reinitialize drones after killing orphaned processes, since they can463        # leave around files when they die464        _drone_manager.execute_actions()465        _drone_manager.reinitialize_drones()466    def _create_recovery_agent_tasks(self):467        return (self._get_queue_entry_agent_tasks()468                + self._get_special_task_agent_tasks(is_active=True))469    def _get_queue_entry_agent_tasks(self):470        """471        Get agent tasks for all hqe in the specified states.472        Loosely this translates to taking a hqe in one of the specified states,473        say parsing, and getting an AgentTask for it, like the FinalReparseTask,474        through _get_agent_task_for_queue_entry. Each queue entry can only have475        one agent task at a time, but there might be multiple queue entries in476        the group.477        @return: A list of AgentTasks.478        """479        # host queue entry statuses handled directly by AgentTasks480        # (Verifying is handled through SpecialTasks, so is not481        # listed here)482        statuses = (models.HostQueueEntry.Status.STARTING,483                    models.HostQueueEntry.Status.RUNNING,484                    models.HostQueueEntry.Status.GATHERING,485                    models.HostQueueEntry.Status.PARSING)486        status_list = ','.join("'%s'" % status for status in statuses)487        queue_entries = scheduler_models.HostQueueEntry.fetch(488                where='status IN (%s)' % status_list)489        agent_tasks = []490        used_queue_entries = set()491        hqe_count_by_status = {}492        for entry in queue_entries:493            try:494                hqe_count_by_status[entry.status] = (495                    hqe_count_by_status.get(entry.status, 0) + 1)496                if self.get_agents_for_entry(entry):497                    # already being handled498                    continue499                if entry in used_queue_entries:500                    # already picked up by a synchronous job501                    continue502                try:503                    agent_task = self._get_agent_task_for_queue_entry(entry)504                except scheduler_lib.SchedulerError:505                    # Probably being handled by lucifer crbug.com/809773506                    continue507                agent_tasks.append(agent_task)508                used_queue_entries.update(agent_task.queue_entries)509            except scheduler_lib.MalformedRecordError as e:510                logging.exception('Skipping agent task for a malformed hqe.')511                # TODO(akeshet): figure out a way to safely permanently discard512                # this errant HQE. It appears that calling entry.abort() is not513                # sufficient, as that already makes some assumptions about514                # record sanity that may be violated. See crbug.com/739530 for515                # context.516                m = 'chromeos/autotest/scheduler/skipped_malformed_hqe'517                metrics.Counter(m).increment()518        for status, count in hqe_count_by_status.iteritems():519            metrics.Gauge(520                'chromeos/autotest/scheduler/active_host_queue_entries'521            ).set(count, fields={'status': status})522        return agent_tasks523    def _get_special_task_agent_tasks(self, is_active=False):524        special_tasks = models.SpecialTask.objects.filter(525                is_active=is_active, is_complete=False)526        agent_tasks = []527        for task in special_tasks:528          try:529              agent_tasks.append(self._get_agent_task_for_special_task(task))530          except scheduler_lib.MalformedRecordError as e:531              logging.exception('Skipping agent task for malformed special '532                                'task.')533              m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'534              metrics.Counter(m).increment()535        return agent_tasks536    def _get_agent_task_for_queue_entry(self, queue_entry):537        """538        Construct an AgentTask instance for the given active HostQueueEntry.539        @param queue_entry: a HostQueueEntry540        @return: an AgentTask to run the queue entry541        """542        task_entries = queue_entry.job.get_group_entries(queue_entry)543        self._check_for_duplicate_host_entries(task_entries)544        if queue_entry.status in (models.HostQueueEntry.Status.STARTING,545                                  models.HostQueueEntry.Status.RUNNING):546            if queue_entry.is_hostless():547                return HostlessQueueTask(queue_entry=queue_entry)548            return QueueTask(queue_entries=task_entries)549        if queue_entry.status == models.HostQueueEntry.Status.GATHERING:550            return postjob_task.GatherLogsTask(queue_entries=task_entries)551        if queue_entry.status == models.HostQueueEntry.Status.PARSING:552            return postjob_task.FinalReparseTask(queue_entries=task_entries)553        raise scheduler_lib.MalformedRecordError(554                '_get_agent_task_for_queue_entry got entry with '555                'invalid status %s: %s' % (queue_entry.status, queue_entry))556    def _check_for_duplicate_host_entries(self, task_entries):557        non_host_statuses = {models.HostQueueEntry.Status.PARSING}558        for task_entry in task_entries:559            using_host = (task_entry.host is not None560                          and task_entry.status not in non_host_statuses)561            if using_host:562                self._assert_host_has_no_agent(task_entry)563    def _assert_host_has_no_agent(self, entry):564        """565        @param entry: a HostQueueEntry or a SpecialTask566        """567        if self.host_has_agent(entry.host):568            agent = tuple(self._host_agents.get(entry.host.id))[0]569            raise scheduler_lib.MalformedRecordError(570                    'While scheduling %s, host %s already has a host agent %s'571                    % (entry, entry.host, agent.task))572    def _get_agent_task_for_special_task(self, special_task):573        """574        Construct an AgentTask class to run the given SpecialTask and add it575        to this dispatcher.576        A special task is created through schedule_special_tasks, but only if577        the host doesn't already have an agent. This happens through578        add_agent_task. All special agent tasks are given a host on creation,579        and a Null hqe. To create a SpecialAgentTask object, you need a580        models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask581        object contains a hqe it's passed on to the special agent task, which582        creates a HostQueueEntry and saves it as it's queue_entry.583        @param special_task: a models.SpecialTask instance584        @returns an AgentTask to run this SpecialTask585        """586        self._assert_host_has_no_agent(special_task)587        special_agent_task_classes = (prejob_task.CleanupTask,588                                      prejob_task.VerifyTask,589                                      prejob_task.RepairTask,590                                      prejob_task.ResetTask,591                                      prejob_task.ProvisionTask)592        for agent_task_class in special_agent_task_classes:593            if agent_task_class.TASK_TYPE == special_task.task:594                return agent_task_class(task=special_task)595        raise scheduler_lib.MalformedRecordError(596                'No AgentTask class for task', str(special_task))597    def _register_pidfiles(self, agent_tasks):598        for agent_task in agent_tasks:599            agent_task.register_necessary_pidfiles()600    def _recover_tasks(self, agent_tasks):601        orphans = _drone_manager.get_orphaned_autoserv_processes()602        for agent_task in agent_tasks:603            agent_task.recover()604            if agent_task.monitor and agent_task.monitor.has_process():605                orphans.discard(agent_task.monitor.get_process())606            self.add_agent_task(agent_task)607        self._check_for_remaining_orphan_processes(orphans)608    def _get_unassigned_entries(self, status):609        for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"610                                                           % status):611            if entry.status == status and not self.get_agents_for_entry(entry):612                # The status can change during iteration, e.g., if job.run()613                # sets a group of queue entries to Starting614                yield entry615    def _check_for_remaining_orphan_processes(self, orphans):616        m = 'chromeos/autotest/errors/unrecovered_orphan_processes'617        metrics.Gauge(m).set(len(orphans))618        if not orphans:619            return620        subject = 'Unrecovered orphan autoserv processes remain'621        message = '\n'.join(str(process) for process in orphans)622        die_on_orphans = global_config.global_config.get_config_value(623            scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)624        if die_on_orphans:625            raise RuntimeError(subject + '\n' + message)626    def _recover_pending_entries(self):627        for entry in self._get_unassigned_entries(628                models.HostQueueEntry.Status.PENDING):629            logging.info('Recovering Pending entry %s', entry)630            try:631                entry.on_pending()632            except scheduler_lib.MalformedRecordError as e:633                logging.exception(634                        'Skipping agent task for malformed special task.')635                m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'636                metrics.Counter(m).increment()637    def _check_for_unrecovered_verifying_entries(self):638        # Verify is replaced by Reset.639        queue_entries = scheduler_models.HostQueueEntry.fetch(640                where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)641        for queue_entry in queue_entries:642            special_tasks = models.SpecialTask.objects.filter(643                    task__in=(models.SpecialTask.Task.CLEANUP,644                              models.SpecialTask.Task.VERIFY,645                              models.SpecialTask.Task.RESET),646                    queue_entry__id=queue_entry.id,647                    is_complete=False)648            if special_tasks.count() == 0:649                logging.error('Unrecovered Resetting host queue entry: %s. '650                              'Setting status to Queued.', str(queue_entry))651                # Essentially this host queue entry was set to be Verifying652                # however no special task exists for entry. This occurs if the653                # scheduler dies between changing the status and creating the654                # special task. By setting it to queued, the job can restart655                # from the beginning and proceed correctly. This is much more656                # preferable than having monitor_db not launching.657                queue_entry.set_status('Queued')658    @_calls_log_tick_msg659    def _schedule_special_tasks(self):660        """661        Execute queued SpecialTasks that are ready to run on idle hosts.662        Special tasks include PreJobTasks like verify, reset and cleanup.663        They are created through _schedule_new_jobs and associated with a hqe664        This method translates SpecialTasks to the appropriate AgentTask and665        adds them to the dispatchers agents list, so _handle_agents can execute666        them.667        """668        # When the host scheduler is responsible for acquisition we only want669        # to run tasks with leased hosts. All hqe tasks will already have670        # leased hosts, and we don't want to run frontend tasks till the host671        # scheduler has vetted the assignment. Note that this doesn't include672        # frontend tasks with hosts leased by other active hqes.673        for task in self._job_query_manager.get_prioritized_special_tasks(674                only_tasks_with_leased_hosts=not self._inline_host_acquisition):675            if self.host_has_agent(task.host):676                continue677            try:678                self.add_agent_task(self._get_agent_task_for_special_task(task))679            except scheduler_lib.MalformedRecordError:680                logging.exception('Skipping schedule for malformed '681                                  'special task.')682                m = 'chromeos/autotest/scheduler/skipped_schedule_special_task'683                metrics.Counter(m).increment()684    def _reverify_remaining_hosts(self):685        # recover active hosts that have not yet been recovered, although this686        # should never happen687        message = ('Recovering active host %s - this probably indicates a '688                   'scheduler bug')689        self._reverify_hosts_where(690                "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",691                print_message=message)692    DEFAULT_REQUESTED_BY_USER_ID = 1693    def _reverify_hosts_where(self, where,694                              print_message='Reverifying host %s'):695        full_where='locked = 0 AND invalid = 0 AND ' + where696        for host in scheduler_models.Host.fetch(where=full_where):697            if self.host_has_agent(host):698                # host has already been recovered in some way699                continue700            if self._host_has_scheduled_special_task(host):701                # host will have a special task scheduled on the next cycle702                continue703            if print_message:704                logging.error(print_message, host.hostname)705            try:706                user = models.User.objects.get(login='autotest_system')707            except models.User.DoesNotExist:708                user = models.User.objects.get(709                        id=self.DEFAULT_REQUESTED_BY_USER_ID)710            models.SpecialTask.objects.create(711                    task=models.SpecialTask.Task.RESET,712                    host=models.Host.objects.get(id=host.id),713                    requested_by=user)714    def _recover_hosts(self):715        # recover "Repair Failed" hosts716        message = 'Reverifying dead host %s'717        self._reverify_hosts_where("status = 'Repair Failed'",718                                   print_message=message)719    def _refresh_pending_queue_entries(self):720        """721        Lookup the pending HostQueueEntries and call our HostScheduler722        refresh() method given that list.  Return the list.723        @returns A list of pending HostQueueEntries sorted in priority order.724        """725        queue_entries = self._job_query_manager.get_pending_queue_entries(726                only_hostless=not self._inline_host_acquisition)727        if not queue_entries:728            return []729        return queue_entries730    def _schedule_hostless_job(self, queue_entry):731        """Schedule a hostless (suite) job.732        @param queue_entry: The queue_entry representing the hostless job.733        """734        if not luciferlib.is_enabled_for('STARTING'):735            self.add_agent_task(HostlessQueueTask(queue_entry))736        # Need to set execution_subdir before setting the status:737        # After a restart of the scheduler, agents will be restored for HQEs in738        # Starting, Running, Gathering, Parsing or Archiving. To do this, the739        # execution_subdir is needed. Therefore it must be set before entering740        # one of these states.741        # Otherwise, if the scheduler was interrupted between setting the status742        # and the execution_subdir, upon it's restart restoring agents would743        # fail.744        # Is there a way to get a status in one of these states without going745        # through this code? Following cases are possible:746        # - If it's aborted before being started:747        #     active bit will be 0, so there's nothing to parse, it will just be748        #     set to completed by _find_aborting. Critical statuses are skipped.749        # - If it's aborted or it fails after being started:750        #     It was started, so this code was executed.751        queue_entry.update_field('execution_subdir', 'hostless')752        queue_entry.set_status(models.HostQueueEntry.Status.STARTING)753    def _schedule_host_job(self, host, queue_entry):754        """Schedules a job on the given host.755        1. Assign the host to the hqe, if it isn't already assigned.756        2. Create a SpecialAgentTask for the hqe.757        3. Activate the hqe.758        @param queue_entry: The job to schedule.759        @param host: The host to schedule the job on.760        """761        if self.host_has_agent(host):762            host_agent_task = list(self._host_agents.get(host.id))[0].task763        else:764            self._host_scheduler.schedule_host_job(host, queue_entry)765    @_calls_log_tick_msg766    def _schedule_new_jobs(self):767        """768        Find any new HQEs and call schedule_pre_job_tasks for it.769        This involves setting the status of the HQE and creating a row in the770        db corresponding the the special task, through771        scheduler_models._queue_special_task. The new db row is then added as772        an agent to the dispatcher through _schedule_special_tasks and773        scheduled for execution on the drone through _handle_agents.774        """775        queue_entries = self._refresh_pending_queue_entries()776        key = 'scheduler.jobs_per_tick'777        new_hostless_jobs = 0778        new_jobs_with_hosts = 0779        new_jobs_need_hosts = 0780        host_jobs = []781        logging.debug('Processing %d queue_entries', len(queue_entries))782        for queue_entry in queue_entries:783            if queue_entry.is_hostless():784                self._schedule_hostless_job(queue_entry)785                new_hostless_jobs = new_hostless_jobs + 1786            else:787                host_jobs.append(queue_entry)788                new_jobs_need_hosts = new_jobs_need_hosts + 1789        metrics.Counter(790            'chromeos/autotest/scheduler/scheduled_jobs_hostless'791        ).increment_by(new_hostless_jobs)792        if not host_jobs:793            return794        if not self._inline_host_acquisition:795          # In this case, host_scheduler is responsible for scheduling796          # host_jobs. Scheduling the jobs ourselves can lead to DB corruption797          # since host_scheduler assumes it is the single process scheduling798          # host jobs.799          metrics.Gauge(800              'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(801                  len(host_jobs))802          return803        jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)804        for host_assignment in jobs_with_hosts:805            self._schedule_host_job(host_assignment.host, host_assignment.job)806            new_jobs_with_hosts = new_jobs_with_hosts + 1807        metrics.Counter(808            'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'809        ).increment_by(new_jobs_with_hosts)810    @_calls_log_tick_msg811    def _send_to_lucifer(self):812        """813        Hand off ownership of a job to lucifer component.814        """815        if luciferlib.is_enabled_for('starting'):816            self._send_starting_to_lucifer()817        # TODO(crbug.com/810141): Older states need to be supported when818        # STARTING is toggled; some jobs may be in an intermediate state819        # at that moment.820        self._send_gathering_to_lucifer()821        self._send_parsing_to_lucifer()822    # TODO(crbug.com/748234): This is temporary to enable toggling823    # lucifer rollouts with an option.824    def _send_starting_to_lucifer(self):825        Status = models.HostQueueEntry.Status826        queue_entries_qs = (models.HostQueueEntry.objects827                            .filter(status=Status.STARTING))828        for queue_entry in queue_entries_qs:829            if self.get_agents_for_entry(queue_entry):830                continue831            job = queue_entry.job832            if luciferlib.is_lucifer_owned(job):833                continue834            drone = luciferlib.spawn_starting_job_handler(835                    manager=_drone_manager,836                    job=job)837            models.JobHandoff.objects.create(job=job, drone=drone.hostname())838    # TODO(crbug.com/748234): This is temporary to enable toggling839    # lucifer rollouts with an option.840    def _send_gathering_to_lucifer(self):841        Status = models.HostQueueEntry.Status842        queue_entries_qs = (models.HostQueueEntry.objects843                            .filter(status=Status.GATHERING))844        for queue_entry in queue_entries_qs:845            # If this HQE already has an agent, let monitor_db continue846            # owning it.847            if self.get_agents_for_entry(queue_entry):848                continue849            job = queue_entry.job850            if luciferlib.is_lucifer_owned(job):851                continue852            task = postjob_task.PostJobTask(853                    [queue_entry], log_file_name='/dev/null')854            pidfile_id = task._autoserv_monitor.pidfile_id855            autoserv_exit = task._autoserv_monitor.exit_code()856            try:857                drone = luciferlib.spawn_gathering_job_handler(858                        manager=_drone_manager,859                        job=job,860                        autoserv_exit=autoserv_exit,861                        pidfile_id=pidfile_id)862                models.JobHandoff.objects.create(job=job,863                                                 drone=drone.hostname())864            except drone_manager.DroneManagerError as e:865                logging.warning(866                    'Fail to get drone for job %s, skipping lucifer. Error: %s',867                    job.id, e)868    # TODO(crbug.com/748234): This is temporary to enable toggling869    # lucifer rollouts with an option.870    def _send_parsing_to_lucifer(self):871        Status = models.HostQueueEntry.Status872        queue_entries_qs = (models.HostQueueEntry.objects873                            .filter(status=Status.PARSING))874        for queue_entry in queue_entries_qs:875            # If this HQE already has an agent, let monitor_db continue876            # owning it.877            if self.get_agents_for_entry(queue_entry):878                continue879            job = queue_entry.job880            if luciferlib.is_lucifer_owned(job):881                continue882            # TODO(crbug.com/811877): Ignore split HQEs.883            if luciferlib.is_split_job(queue_entry.id):884                continue885            task = postjob_task.PostJobTask(886                    [queue_entry], log_file_name='/dev/null')887            pidfile_id = task._autoserv_monitor.pidfile_id888            autoserv_exit = task._autoserv_monitor.exit_code()889            try:890                drone = luciferlib.spawn_parsing_job_handler(891                        manager=_drone_manager,892                        job=job,893                        autoserv_exit=autoserv_exit,894                        pidfile_id=pidfile_id)895                models.JobHandoff.objects.create(job=job,896                                                 drone=drone.hostname())897            except drone_manager.DroneManagerError as e:898                logging.warning(899                    'Fail to get drone for job %s, skipping lucifer. Error: %s',900                    job.id, e)901    @_calls_log_tick_msg902    def _schedule_running_host_queue_entries(self):903        """904        Adds agents to the dispatcher.905        Any AgentTask, like the QueueTask, is wrapped in an Agent. The906        QueueTask for example, will have a job with a control file, and907        the agent will have methods that poll, abort and check if the queue908        task is finished. The dispatcher runs the agent_task, as well as909        other agents in it's _agents member, through _handle_agents, by910        calling the Agents tick().911        This method creates an agent for each HQE in one of (starting, running,912        gathering, parsing) states, and adds it to the dispatcher so913        it is handled by _handle_agents.914        """915        for agent_task in self._get_queue_entry_agent_tasks():916            self.add_agent_task(agent_task)917    @_calls_log_tick_msg918    def _find_aborting(self):919        """920        Looks through the afe_host_queue_entries for an aborted entry.921        The aborted bit is set on an HQE in many ways, the most common922        being when a user requests an abort through the frontend, which923        results in an rpc from the afe to abort_host_queue_entries.924        """925        jobs_to_stop = set()926        for entry in scheduler_models.HostQueueEntry.fetch(927                where='aborted=1 and complete=0'):928            # If the job is running on a shard, let the shard handle aborting929            # it and sync back the right status.930            if entry.job.shard_id is not None and not server_utils.is_shard():931                logging.info('Waiting for shard %s to abort hqe %s',932                        entry.job.shard_id, entry)933                continue934            logging.info('Aborting %s', entry)935            # The task would have started off with both is_complete and936            # is_active = False. Aborted tasks are neither active nor complete.937            # For all currently active tasks this will happen through the agent,938            # but we need to manually update the special tasks that haven't939            # started yet, because they don't have agents.940            models.SpecialTask.objects.filter(is_active=False,941                queue_entry_id=entry.id).update(is_complete=True)942            for agent in self.get_agents_for_entry(entry):943                agent.abort()944            entry.abort(self)945            jobs_to_stop.add(entry.job)946        logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))947        for job in jobs_to_stop:948            job.stop_if_necessary()949    @_calls_log_tick_msg950    def _find_aborted_special_tasks(self):951        """952        Find SpecialTasks that have been marked for abortion.953        Poll the database looking for SpecialTasks that are active954        and have been marked for abortion, then abort them.955        """956        # The completed and active bits are very important when it comes...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
