How to use get_agents_for_entry method in autotest

Best Python code snippet using autotest_python

monitor_db.py

Source:monitor_db.py Github

copy

Full Screen

...430 agent.dispatcher = self431 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)432 self._register_agent_for_ids(self._queue_entry_agents,433 agent.queue_entry_ids, agent)434 def get_agents_for_entry(self, queue_entry):435 """436 Find agents corresponding to the specified queue_entry.437 """438 return list(self._queue_entry_agents.get(queue_entry.id, set()))439 def host_has_agent(self, host):440 """441 Determine if there is currently an Agent present using this host.442 """443 return bool(self._host_agents.get(host.id, None))444 def remove_agent(self, agent):445 self._agents.remove(agent)446 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,447 agent)448 self._unregister_agent_for_ids(self._queue_entry_agents,449 agent.queue_entry_ids, agent)450 def _host_has_scheduled_special_task(self, host):451 return bool(models.SpecialTask.objects.filter(host__id=host.id,452 is_active=False,453 is_complete=False))454 def _recover_processes(self):455 agent_tasks = self._create_recovery_agent_tasks()456 self._register_pidfiles(agent_tasks)457 _drone_manager.refresh()458 self._recover_tasks(agent_tasks)459 self._recover_pending_entries()460 self._check_for_unrecovered_verifying_entries()461 self._reverify_remaining_hosts()462 # reinitialize drones after killing orphaned processes, since they can463 # leave around files when they die464 _drone_manager.execute_actions()465 _drone_manager.reinitialize_drones()466 def _create_recovery_agent_tasks(self):467 return (self._get_queue_entry_agent_tasks()468 + self._get_special_task_agent_tasks(is_active=True))469 def _get_queue_entry_agent_tasks(self):470 """471 Get agent tasks for all hqe in the specified states.472 Loosely this translates to taking a hqe in one of the specified states,473 say parsing, and getting an AgentTask for it, like the FinalReparseTask,474 through _get_agent_task_for_queue_entry. Each queue entry can only have475 one agent task at a time, but there might be multiple queue entries in476 the group.477 @return: A list of AgentTasks.478 """479 # host queue entry statuses handled directly by AgentTasks480 # (Verifying is handled through SpecialTasks, so is not481 # listed here)482 statuses = (models.HostQueueEntry.Status.STARTING,483 models.HostQueueEntry.Status.RUNNING,484 models.HostQueueEntry.Status.GATHERING,485 models.HostQueueEntry.Status.PARSING)486 status_list = ','.join("'%s'" % status for status in statuses)487 queue_entries = scheduler_models.HostQueueEntry.fetch(488 where='status IN (%s)' % status_list)489 agent_tasks = []490 used_queue_entries = set()491 hqe_count_by_status = {}492 for entry in queue_entries:493 try:494 hqe_count_by_status[entry.status] = (495 hqe_count_by_status.get(entry.status, 0) + 1)496 if self.get_agents_for_entry(entry):497 # already being handled498 continue499 if entry in used_queue_entries:500 # already picked up by a synchronous job501 continue502 try:503 agent_task = self._get_agent_task_for_queue_entry(entry)504 except scheduler_lib.SchedulerError:505 # Probably being handled by lucifer crbug.com/809773506 continue507 agent_tasks.append(agent_task)508 used_queue_entries.update(agent_task.queue_entries)509 except scheduler_lib.MalformedRecordError as e:510 logging.exception('Skipping agent task for a malformed hqe.')511 # TODO(akeshet): figure out a way to safely permanently discard512 # this errant HQE. It appears that calling entry.abort() is not513 # sufficient, as that already makes some assumptions about514 # record sanity that may be violated. See crbug.com/739530 for515 # context.516 m = 'chromeos/autotest/scheduler/skipped_malformed_hqe'517 metrics.Counter(m).increment()518 for status, count in hqe_count_by_status.iteritems():519 metrics.Gauge(520 'chromeos/autotest/scheduler/active_host_queue_entries'521 ).set(count, fields={'status': status})522 return agent_tasks523 def _get_special_task_agent_tasks(self, is_active=False):524 special_tasks = models.SpecialTask.objects.filter(525 is_active=is_active, is_complete=False)526 agent_tasks = []527 for task in special_tasks:528 try:529 agent_tasks.append(self._get_agent_task_for_special_task(task))530 except scheduler_lib.MalformedRecordError as e:531 logging.exception('Skipping agent task for malformed special '532 'task.')533 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'534 metrics.Counter(m).increment()535 return agent_tasks536 def _get_agent_task_for_queue_entry(self, queue_entry):537 """538 Construct an AgentTask instance for the given active HostQueueEntry.539 @param queue_entry: a HostQueueEntry540 @return: an AgentTask to run the queue entry541 """542 task_entries = queue_entry.job.get_group_entries(queue_entry)543 self._check_for_duplicate_host_entries(task_entries)544 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,545 models.HostQueueEntry.Status.RUNNING):546 if queue_entry.is_hostless():547 return HostlessQueueTask(queue_entry=queue_entry)548 return QueueTask(queue_entries=task_entries)549 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:550 return postjob_task.GatherLogsTask(queue_entries=task_entries)551 if queue_entry.status == models.HostQueueEntry.Status.PARSING:552 return postjob_task.FinalReparseTask(queue_entries=task_entries)553 raise scheduler_lib.MalformedRecordError(554 '_get_agent_task_for_queue_entry got entry with '555 'invalid status %s: %s' % (queue_entry.status, queue_entry))556 def _check_for_duplicate_host_entries(self, task_entries):557 non_host_statuses = {models.HostQueueEntry.Status.PARSING}558 for task_entry in task_entries:559 using_host = (task_entry.host is not None560 and task_entry.status not in non_host_statuses)561 if using_host:562 self._assert_host_has_no_agent(task_entry)563 def _assert_host_has_no_agent(self, entry):564 """565 @param entry: a HostQueueEntry or a SpecialTask566 """567 if self.host_has_agent(entry.host):568 agent = tuple(self._host_agents.get(entry.host.id))[0]569 raise scheduler_lib.MalformedRecordError(570 'While scheduling %s, host %s already has a host agent %s'571 % (entry, entry.host, agent.task))572 def _get_agent_task_for_special_task(self, special_task):573 """574 Construct an AgentTask class to run the given SpecialTask and add it575 to this dispatcher.576 A special task is created through schedule_special_tasks, but only if577 the host doesn't already have an agent. This happens through578 add_agent_task. All special agent tasks are given a host on creation,579 and a Null hqe. To create a SpecialAgentTask object, you need a580 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask581 object contains a hqe it's passed on to the special agent task, which582 creates a HostQueueEntry and saves it as it's queue_entry.583 @param special_task: a models.SpecialTask instance584 @returns an AgentTask to run this SpecialTask585 """586 self._assert_host_has_no_agent(special_task)587 special_agent_task_classes = (prejob_task.CleanupTask,588 prejob_task.VerifyTask,589 prejob_task.RepairTask,590 prejob_task.ResetTask,591 prejob_task.ProvisionTask)592 for agent_task_class in special_agent_task_classes:593 if agent_task_class.TASK_TYPE == special_task.task:594 return agent_task_class(task=special_task)595 raise scheduler_lib.MalformedRecordError(596 'No AgentTask class for task', str(special_task))597 def _register_pidfiles(self, agent_tasks):598 for agent_task in agent_tasks:599 agent_task.register_necessary_pidfiles()600 def _recover_tasks(self, agent_tasks):601 orphans = _drone_manager.get_orphaned_autoserv_processes()602 for agent_task in agent_tasks:603 agent_task.recover()604 if agent_task.monitor and agent_task.monitor.has_process():605 orphans.discard(agent_task.monitor.get_process())606 self.add_agent_task(agent_task)607 self._check_for_remaining_orphan_processes(orphans)608 def _get_unassigned_entries(self, status):609 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"610 % status):611 if entry.status == status and not self.get_agents_for_entry(entry):612 # The status can change during iteration, e.g., if job.run()613 # sets a group of queue entries to Starting614 yield entry615 def _check_for_remaining_orphan_processes(self, orphans):616 m = 'chromeos/autotest/errors/unrecovered_orphan_processes'617 metrics.Gauge(m).set(len(orphans))618 if not orphans:619 return620 subject = 'Unrecovered orphan autoserv processes remain'621 message = '\n'.join(str(process) for process in orphans)622 die_on_orphans = global_config.global_config.get_config_value(623 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)624 if die_on_orphans:625 raise RuntimeError(subject + '\n' + message)626 def _recover_pending_entries(self):627 for entry in self._get_unassigned_entries(628 models.HostQueueEntry.Status.PENDING):629 logging.info('Recovering Pending entry %s', entry)630 try:631 entry.on_pending()632 except scheduler_lib.MalformedRecordError as e:633 logging.exception(634 'Skipping agent task for malformed special task.')635 m = 'chromeos/autotest/scheduler/skipped_malformed_special_task'636 metrics.Counter(m).increment()637 def _check_for_unrecovered_verifying_entries(self):638 # Verify is replaced by Reset.639 queue_entries = scheduler_models.HostQueueEntry.fetch(640 where='status = "%s"' % models.HostQueueEntry.Status.RESETTING)641 for queue_entry in queue_entries:642 special_tasks = models.SpecialTask.objects.filter(643 task__in=(models.SpecialTask.Task.CLEANUP,644 models.SpecialTask.Task.VERIFY,645 models.SpecialTask.Task.RESET),646 queue_entry__id=queue_entry.id,647 is_complete=False)648 if special_tasks.count() == 0:649 logging.error('Unrecovered Resetting host queue entry: %s. '650 'Setting status to Queued.', str(queue_entry))651 # Essentially this host queue entry was set to be Verifying652 # however no special task exists for entry. This occurs if the653 # scheduler dies between changing the status and creating the654 # special task. By setting it to queued, the job can restart655 # from the beginning and proceed correctly. This is much more656 # preferable than having monitor_db not launching.657 queue_entry.set_status('Queued')658 @_calls_log_tick_msg659 def _schedule_special_tasks(self):660 """661 Execute queued SpecialTasks that are ready to run on idle hosts.662 Special tasks include PreJobTasks like verify, reset and cleanup.663 They are created through _schedule_new_jobs and associated with a hqe664 This method translates SpecialTasks to the appropriate AgentTask and665 adds them to the dispatchers agents list, so _handle_agents can execute666 them.667 """668 # When the host scheduler is responsible for acquisition we only want669 # to run tasks with leased hosts. All hqe tasks will already have670 # leased hosts, and we don't want to run frontend tasks till the host671 # scheduler has vetted the assignment. Note that this doesn't include672 # frontend tasks with hosts leased by other active hqes.673 for task in self._job_query_manager.get_prioritized_special_tasks(674 only_tasks_with_leased_hosts=not self._inline_host_acquisition):675 if self.host_has_agent(task.host):676 continue677 try:678 self.add_agent_task(self._get_agent_task_for_special_task(task))679 except scheduler_lib.MalformedRecordError:680 logging.exception('Skipping schedule for malformed '681 'special task.')682 m = 'chromeos/autotest/scheduler/skipped_schedule_special_task'683 metrics.Counter(m).increment()684 def _reverify_remaining_hosts(self):685 # recover active hosts that have not yet been recovered, although this686 # should never happen687 message = ('Recovering active host %s - this probably indicates a '688 'scheduler bug')689 self._reverify_hosts_where(690 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",691 print_message=message)692 DEFAULT_REQUESTED_BY_USER_ID = 1693 def _reverify_hosts_where(self, where,694 print_message='Reverifying host %s'):695 full_where='locked = 0 AND invalid = 0 AND ' + where696 for host in scheduler_models.Host.fetch(where=full_where):697 if self.host_has_agent(host):698 # host has already been recovered in some way699 continue700 if self._host_has_scheduled_special_task(host):701 # host will have a special task scheduled on the next cycle702 continue703 if print_message:704 logging.error(print_message, host.hostname)705 try:706 user = models.User.objects.get(login='autotest_system')707 except models.User.DoesNotExist:708 user = models.User.objects.get(709 id=self.DEFAULT_REQUESTED_BY_USER_ID)710 models.SpecialTask.objects.create(711 task=models.SpecialTask.Task.RESET,712 host=models.Host.objects.get(id=host.id),713 requested_by=user)714 def _recover_hosts(self):715 # recover "Repair Failed" hosts716 message = 'Reverifying dead host %s'717 self._reverify_hosts_where("status = 'Repair Failed'",718 print_message=message)719 def _refresh_pending_queue_entries(self):720 """721 Lookup the pending HostQueueEntries and call our HostScheduler722 refresh() method given that list. Return the list.723 @returns A list of pending HostQueueEntries sorted in priority order.724 """725 queue_entries = self._job_query_manager.get_pending_queue_entries(726 only_hostless=not self._inline_host_acquisition)727 if not queue_entries:728 return []729 return queue_entries730 def _schedule_hostless_job(self, queue_entry):731 """Schedule a hostless (suite) job.732 @param queue_entry: The queue_entry representing the hostless job.733 """734 if not luciferlib.is_enabled_for('STARTING'):735 self.add_agent_task(HostlessQueueTask(queue_entry))736 # Need to set execution_subdir before setting the status:737 # After a restart of the scheduler, agents will be restored for HQEs in738 # Starting, Running, Gathering, Parsing or Archiving. To do this, the739 # execution_subdir is needed. Therefore it must be set before entering740 # one of these states.741 # Otherwise, if the scheduler was interrupted between setting the status742 # and the execution_subdir, upon it's restart restoring agents would743 # fail.744 # Is there a way to get a status in one of these states without going745 # through this code? Following cases are possible:746 # - If it's aborted before being started:747 # active bit will be 0, so there's nothing to parse, it will just be748 # set to completed by _find_aborting. Critical statuses are skipped.749 # - If it's aborted or it fails after being started:750 # It was started, so this code was executed.751 queue_entry.update_field('execution_subdir', 'hostless')752 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)753 def _schedule_host_job(self, host, queue_entry):754 """Schedules a job on the given host.755 1. Assign the host to the hqe, if it isn't already assigned.756 2. Create a SpecialAgentTask for the hqe.757 3. Activate the hqe.758 @param queue_entry: The job to schedule.759 @param host: The host to schedule the job on.760 """761 if self.host_has_agent(host):762 host_agent_task = list(self._host_agents.get(host.id))[0].task763 else:764 self._host_scheduler.schedule_host_job(host, queue_entry)765 @_calls_log_tick_msg766 def _schedule_new_jobs(self):767 """768 Find any new HQEs and call schedule_pre_job_tasks for it.769 This involves setting the status of the HQE and creating a row in the770 db corresponding the the special task, through771 scheduler_models._queue_special_task. The new db row is then added as772 an agent to the dispatcher through _schedule_special_tasks and773 scheduled for execution on the drone through _handle_agents.774 """775 queue_entries = self._refresh_pending_queue_entries()776 key = 'scheduler.jobs_per_tick'777 new_hostless_jobs = 0778 new_jobs_with_hosts = 0779 new_jobs_need_hosts = 0780 host_jobs = []781 logging.debug('Processing %d queue_entries', len(queue_entries))782 for queue_entry in queue_entries:783 if queue_entry.is_hostless():784 self._schedule_hostless_job(queue_entry)785 new_hostless_jobs = new_hostless_jobs + 1786 else:787 host_jobs.append(queue_entry)788 new_jobs_need_hosts = new_jobs_need_hosts + 1789 metrics.Counter(790 'chromeos/autotest/scheduler/scheduled_jobs_hostless'791 ).increment_by(new_hostless_jobs)792 if not host_jobs:793 return794 if not self._inline_host_acquisition:795 # In this case, host_scheduler is responsible for scheduling796 # host_jobs. Scheduling the jobs ourselves can lead to DB corruption797 # since host_scheduler assumes it is the single process scheduling798 # host jobs.799 metrics.Gauge(800 'chromeos/autotest/errors/scheduler/unexpected_host_jobs').set(801 len(host_jobs))802 return803 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)804 for host_assignment in jobs_with_hosts:805 self._schedule_host_job(host_assignment.host, host_assignment.job)806 new_jobs_with_hosts = new_jobs_with_hosts + 1807 metrics.Counter(808 'chromeos/autotest/scheduler/scheduled_jobs_with_hosts'809 ).increment_by(new_jobs_with_hosts)810 @_calls_log_tick_msg811 def _send_to_lucifer(self):812 """813 Hand off ownership of a job to lucifer component.814 """815 if luciferlib.is_enabled_for('starting'):816 self._send_starting_to_lucifer()817 # TODO(crbug.com/810141): Older states need to be supported when818 # STARTING is toggled; some jobs may be in an intermediate state819 # at that moment.820 self._send_gathering_to_lucifer()821 self._send_parsing_to_lucifer()822 # TODO(crbug.com/748234): This is temporary to enable toggling823 # lucifer rollouts with an option.824 def _send_starting_to_lucifer(self):825 Status = models.HostQueueEntry.Status826 queue_entries_qs = (models.HostQueueEntry.objects827 .filter(status=Status.STARTING))828 for queue_entry in queue_entries_qs:829 if self.get_agents_for_entry(queue_entry):830 continue831 job = queue_entry.job832 if luciferlib.is_lucifer_owned(job):833 continue834 drone = luciferlib.spawn_starting_job_handler(835 manager=_drone_manager,836 job=job)837 models.JobHandoff.objects.create(job=job, drone=drone.hostname())838 # TODO(crbug.com/748234): This is temporary to enable toggling839 # lucifer rollouts with an option.840 def _send_gathering_to_lucifer(self):841 Status = models.HostQueueEntry.Status842 queue_entries_qs = (models.HostQueueEntry.objects843 .filter(status=Status.GATHERING))844 for queue_entry in queue_entries_qs:845 # If this HQE already has an agent, let monitor_db continue846 # owning it.847 if self.get_agents_for_entry(queue_entry):848 continue849 job = queue_entry.job850 if luciferlib.is_lucifer_owned(job):851 continue852 task = postjob_task.PostJobTask(853 [queue_entry], log_file_name='/dev/null')854 pidfile_id = task._autoserv_monitor.pidfile_id855 autoserv_exit = task._autoserv_monitor.exit_code()856 try:857 drone = luciferlib.spawn_gathering_job_handler(858 manager=_drone_manager,859 job=job,860 autoserv_exit=autoserv_exit,861 pidfile_id=pidfile_id)862 models.JobHandoff.objects.create(job=job,863 drone=drone.hostname())864 except drone_manager.DroneManagerError as e:865 logging.warning(866 'Fail to get drone for job %s, skipping lucifer. Error: %s',867 job.id, e)868 # TODO(crbug.com/748234): This is temporary to enable toggling869 # lucifer rollouts with an option.870 def _send_parsing_to_lucifer(self):871 Status = models.HostQueueEntry.Status872 queue_entries_qs = (models.HostQueueEntry.objects873 .filter(status=Status.PARSING))874 for queue_entry in queue_entries_qs:875 # If this HQE already has an agent, let monitor_db continue876 # owning it.877 if self.get_agents_for_entry(queue_entry):878 continue879 job = queue_entry.job880 if luciferlib.is_lucifer_owned(job):881 continue882 # TODO(crbug.com/811877): Ignore split HQEs.883 if luciferlib.is_split_job(queue_entry.id):884 continue885 task = postjob_task.PostJobTask(886 [queue_entry], log_file_name='/dev/null')887 pidfile_id = task._autoserv_monitor.pidfile_id888 autoserv_exit = task._autoserv_monitor.exit_code()889 try:890 drone = luciferlib.spawn_parsing_job_handler(891 manager=_drone_manager,892 job=job,893 autoserv_exit=autoserv_exit,894 pidfile_id=pidfile_id)895 models.JobHandoff.objects.create(job=job,896 drone=drone.hostname())897 except drone_manager.DroneManagerError as e:898 logging.warning(899 'Fail to get drone for job %s, skipping lucifer. Error: %s',900 job.id, e)901 @_calls_log_tick_msg902 def _schedule_running_host_queue_entries(self):903 """904 Adds agents to the dispatcher.905 Any AgentTask, like the QueueTask, is wrapped in an Agent. The906 QueueTask for example, will have a job with a control file, and907 the agent will have methods that poll, abort and check if the queue908 task is finished. The dispatcher runs the agent_task, as well as909 other agents in it's _agents member, through _handle_agents, by910 calling the Agents tick().911 This method creates an agent for each HQE in one of (starting, running,912 gathering, parsing) states, and adds it to the dispatcher so913 it is handled by _handle_agents.914 """915 for agent_task in self._get_queue_entry_agent_tasks():916 self.add_agent_task(agent_task)917 @_calls_log_tick_msg918 def _find_aborting(self):919 """920 Looks through the afe_host_queue_entries for an aborted entry.921 The aborted bit is set on an HQE in many ways, the most common922 being when a user requests an abort through the frontend, which923 results in an rpc from the afe to abort_host_queue_entries.924 """925 jobs_to_stop = set()926 for entry in scheduler_models.HostQueueEntry.fetch(927 where='aborted=1 and complete=0'):928 # If the job is running on a shard, let the shard handle aborting929 # it and sync back the right status.930 if entry.job.shard_id is not None and not server_utils.is_shard():931 logging.info('Waiting for shard %s to abort hqe %s',932 entry.job.shard_id, entry)933 continue934 logging.info('Aborting %s', entry)935 # The task would have started off with both is_complete and936 # is_active = False. Aborted tasks are neither active nor complete.937 # For all currently active tasks this will happen through the agent,938 # but we need to manually update the special tasks that haven't939 # started yet, because they don't have agents.940 models.SpecialTask.objects.filter(is_active=False,941 queue_entry_id=entry.id).update(is_complete=True)942 for agent in self.get_agents_for_entry(entry):943 agent.abort()944 entry.abort(self)945 jobs_to_stop.add(entry.job)946 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))947 for job in jobs_to_stop:948 job.stop_if_necessary()949 @_calls_log_tick_msg950 def _find_aborted_special_tasks(self):951 """952 Find SpecialTasks that have been marked for abortion.953 Poll the database looking for SpecialTasks that are active954 and have been marked for abortion, then abort them.955 """956 # The completed and active bits are very important when it comes...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful