How to use _get_queue_entry_agent_tasks method in autotest

Best Python code snippet using autotest_python

monitor_db.py

Source:monitor_db.py Github

copy

Full Screen

...387 # leave around files when they die388 _drone_manager.execute_actions()389 _drone_manager.reinitialize_drones()390 def _create_recovery_agent_tasks(self):391 return (self._get_queue_entry_agent_tasks()392 + self._get_special_task_agent_tasks(is_active=True))393 def _get_queue_entry_agent_tasks(self):394 """395 Get agent tasks for all hqe in the specified states.396 Loosely this translates to taking a hqe in one of the specified states,397 say parsing, and getting an AgentTask for it, like the FinalReparseTask,398 through _get_agent_task_for_queue_entry. Each queue entry can only have399 one agent task at a time, but there might be multiple queue entries in400 the group.401 @return: A list of AgentTasks.402 """403 # host queue entry statuses handled directly by AgentTasks (Verifying is404 # handled through SpecialTasks, so is not listed here)405 statuses = (models.HostQueueEntry.Status.STARTING,406 models.HostQueueEntry.Status.RUNNING,407 models.HostQueueEntry.Status.GATHERING,408 models.HostQueueEntry.Status.PARSING,409 models.HostQueueEntry.Status.ARCHIVING)410 status_list = ','.join("'%s'" % status for status in statuses)411 queue_entries = scheduler_models.HostQueueEntry.fetch(412 where='status IN (%s)' % status_list)413 autotest_stats.Gauge('scheduler.jobs_per_tick').send(414 'running', len(queue_entries))415 agent_tasks = []416 used_queue_entries = set()417 for entry in queue_entries:418 if self.get_agents_for_entry(entry):419 # already being handled420 continue421 if entry in used_queue_entries:422 # already picked up by a synchronous job423 continue424 agent_task = self._get_agent_task_for_queue_entry(entry)425 agent_tasks.append(agent_task)426 used_queue_entries.update(agent_task.queue_entries)427 return agent_tasks428 def _get_special_task_agent_tasks(self, is_active=False):429 special_tasks = models.SpecialTask.objects.filter(430 is_active=is_active, is_complete=False)431 return [self._get_agent_task_for_special_task(task)432 for task in special_tasks]433 def _get_agent_task_for_queue_entry(self, queue_entry):434 """435 Construct an AgentTask instance for the given active HostQueueEntry.436 @param queue_entry: a HostQueueEntry437 @return: an AgentTask to run the queue entry438 """439 task_entries = queue_entry.job.get_group_entries(queue_entry)440 self._check_for_duplicate_host_entries(task_entries)441 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,442 models.HostQueueEntry.Status.RUNNING):443 if queue_entry.is_hostless():444 return HostlessQueueTask(queue_entry=queue_entry)445 return QueueTask(queue_entries=task_entries)446 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:447 return postjob_task.GatherLogsTask(queue_entries=task_entries)448 if queue_entry.status == models.HostQueueEntry.Status.PARSING:449 return postjob_task.FinalReparseTask(queue_entries=task_entries)450 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:451 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)452 raise scheduler_lib.SchedulerError(453 '_get_agent_task_for_queue_entry got entry with '454 'invalid status %s: %s' % (queue_entry.status, queue_entry))455 def _check_for_duplicate_host_entries(self, task_entries):456 non_host_statuses = (models.HostQueueEntry.Status.PARSING,457 models.HostQueueEntry.Status.ARCHIVING)458 for task_entry in task_entries:459 using_host = (task_entry.host is not None460 and task_entry.status not in non_host_statuses)461 if using_host:462 self._assert_host_has_no_agent(task_entry)463 def _assert_host_has_no_agent(self, entry):464 """465 @param entry: a HostQueueEntry or a SpecialTask466 """467 if self.host_has_agent(entry.host):468 agent = tuple(self._host_agents.get(entry.host.id))[0]469 raise scheduler_lib.SchedulerError(470 'While scheduling %s, host %s already has a host agent %s'471 % (entry, entry.host, agent.task))472 def _get_agent_task_for_special_task(self, special_task):473 """474 Construct an AgentTask class to run the given SpecialTask and add it475 to this dispatcher.476 A special task is created through schedule_special_tasks, but only if477 the host doesn't already have an agent. This happens through478 add_agent_task. All special agent tasks are given a host on creation,479 and a Null hqe. To create a SpecialAgentTask object, you need a480 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask481 object contains a hqe it's passed on to the special agent task, which482 creates a HostQueueEntry and saves it as it's queue_entry.483 @param special_task: a models.SpecialTask instance484 @returns an AgentTask to run this SpecialTask485 """486 self._assert_host_has_no_agent(special_task)487 special_agent_task_classes = (prejob_task.CleanupTask,488 prejob_task.VerifyTask,489 prejob_task.RepairTask,490 prejob_task.ResetTask,491 prejob_task.ProvisionTask)492 for agent_task_class in special_agent_task_classes:493 if agent_task_class.TASK_TYPE == special_task.task:494 return agent_task_class(task=special_task)495 raise scheduler_lib.SchedulerError(496 'No AgentTask class for task', str(special_task))497 def _register_pidfiles(self, agent_tasks):498 for agent_task in agent_tasks:499 agent_task.register_necessary_pidfiles()500 def _recover_tasks(self, agent_tasks):501 orphans = _drone_manager.get_orphaned_autoserv_processes()502 for agent_task in agent_tasks:503 agent_task.recover()504 if agent_task.monitor and agent_task.monitor.has_process():505 orphans.discard(agent_task.monitor.get_process())506 self.add_agent_task(agent_task)507 self._check_for_remaining_orphan_processes(orphans)508 def _get_unassigned_entries(self, status):509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"510 % status):511 if entry.status == status and not self.get_agents_for_entry(entry):512 # The status can change during iteration, e.g., if job.run()513 # sets a group of queue entries to Starting514 yield entry515 def _check_for_remaining_orphan_processes(self, orphans):516 if not orphans:517 return518 subject = 'Unrecovered orphan autoserv processes remain'519 message = '\n'.join(str(process) for process in orphans)520 email_manager.manager.enqueue_notify_email(subject, message)521 die_on_orphans = global_config.global_config.get_config_value(522 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)523 if die_on_orphans:524 raise RuntimeError(subject + '\n' + message)525 def _recover_pending_entries(self):526 for entry in self._get_unassigned_entries(527 models.HostQueueEntry.Status.PENDING):528 logging.info('Recovering Pending entry %s', entry)529 entry.on_pending()530 def _check_for_unrecovered_verifying_entries(self):531 queue_entries = scheduler_models.HostQueueEntry.fetch(532 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)533 unrecovered_hqes = []534 for queue_entry in queue_entries:535 special_tasks = models.SpecialTask.objects.filter(536 task__in=(models.SpecialTask.Task.CLEANUP,537 models.SpecialTask.Task.VERIFY),538 queue_entry__id=queue_entry.id,539 is_complete=False)540 if special_tasks.count() == 0:541 unrecovered_hqes.append(queue_entry)542 if unrecovered_hqes:543 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)544 raise scheduler_lib.SchedulerError(545 '%d unrecovered verifying host queue entries:\n%s' %546 (len(unrecovered_hqes), message))547 def _schedule_special_tasks(self):548 """549 Execute queued SpecialTasks that are ready to run on idle hosts.550 Special tasks include PreJobTasks like verify, reset and cleanup.551 They are created through _schedule_new_jobs and associated with a hqe552 This method translates SpecialTasks to the appropriate AgentTask and553 adds them to the dispatchers agents list, so _handle_agents can execute554 them.555 """556 # When the host scheduler is responsible for acquisition we only want557 # to run tasks with leased hosts. All hqe tasks will already have558 # leased hosts, and we don't want to run frontend tasks till the host559 # scheduler has vetted the assignment. Note that this doesn't include560 # frontend tasks with hosts leased by other active hqes.561 for task in self._job_query_manager.get_prioritized_special_tasks(562 only_tasks_with_leased_hosts=not _inline_host_acquisition):563 if self.host_has_agent(task.host):564 continue565 self.add_agent_task(self._get_agent_task_for_special_task(task))566 def _reverify_remaining_hosts(self):567 # recover active hosts that have not yet been recovered, although this568 # should never happen569 message = ('Recovering active host %s - this probably indicates a '570 'scheduler bug')571 self._reverify_hosts_where(572 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",573 print_message=message)574 def _reverify_hosts_where(self, where,575 print_message='Reverifying host %s'):576 full_where='locked = 0 AND invalid = 0 AND ' + where577 for host in scheduler_models.Host.fetch(where=full_where):578 if self.host_has_agent(host):579 # host has already been recovered in some way580 continue581 if self._host_has_scheduled_special_task(host):582 # host will have a special task scheduled on the next tick583 continue584 if print_message:585 logging.info(print_message, host.hostname)586 models.SpecialTask.objects.create(587 task=models.SpecialTask.Task.CLEANUP,588 host=models.Host.objects.get(id=host.id))589 def _recover_hosts(self):590 # recover "Repair Failed" hosts591 message = 'Reverifying dead host %s'592 self._reverify_hosts_where("status = 'Repair Failed'",593 print_message=message)594 def _refresh_pending_queue_entries(self):595 """596 Lookup the pending HostQueueEntries and call our HostScheduler597 refresh() method given that list. Return the list.598 @returns A list of pending HostQueueEntries sorted in priority order.599 """600 queue_entries = self._job_query_manager.get_pending_queue_entries(601 only_hostless=not _inline_host_acquisition)602 if not queue_entries:603 return []604 return queue_entries605 def _schedule_hostless_job(self, queue_entry):606 """Schedule a hostless (suite) job.607 @param queue_entry: The queue_entry representing the hostless job.608 """609 self.add_agent_task(HostlessQueueTask(queue_entry))610 # Need to set execution_subdir before setting the status:611 # After a restart of the scheduler, agents will be restored for HQEs in612 # Starting, Running, Gathering, Parsing or Archiving. To do this, the613 # execution_subdir is needed. Therefore it must be set before entering614 # one of these states.615 # Otherwise, if the scheduler was interrupted between setting the status616 # and the execution_subdir, upon it's restart restoring agents would617 # fail.618 # Is there a way to get a status in one of these states without going619 # through this code? Following cases are possible:620 # - If it's aborted before being started:621 # active bit will be 0, so there's nothing to parse, it will just be622 # set to completed by _find_aborting. Critical statuses are skipped.623 # - If it's aborted or it fails after being started:624 # It was started, so this code was executed.625 queue_entry.update_field('execution_subdir', 'hostless')626 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)627 def _schedule_host_job(self, host, queue_entry):628 """Schedules a job on the given host.629 1. Assign the host to the hqe, if it isn't already assigned.630 2. Create a SpecialAgentTask for the hqe.631 3. Activate the hqe.632 @param queue_entry: The job to schedule.633 @param host: The host to schedule the job on.634 """635 if self.host_has_agent(host):636 host_agent_task = list(self._host_agents.get(host.id))[0].task637 subject = 'Host with agents assigned to an HQE'638 message = ('HQE: %s assigned host %s, but the host has '639 'agent: %s for queue_entry %s. The HQE '640 'will have to try and acquire a host next tick ' %641 (queue_entry, host.hostname, host_agent_task,642 host_agent_task.queue_entry))643 email_manager.manager.enqueue_notify_email(subject, message)644 else:645 self._host_scheduler.schedule_host_job(host, queue_entry)646 def _schedule_new_jobs(self):647 """648 Find any new HQEs and call schedule_pre_job_tasks for it.649 This involves setting the status of the HQE and creating a row in the650 db corresponding the the special task, through651 scheduler_models._queue_special_task. The new db row is then added as652 an agent to the dispatcher through _schedule_special_tasks and653 scheduled for execution on the drone through _handle_agents.654 """655 queue_entries = self._refresh_pending_queue_entries()656 key = 'scheduler.jobs_per_tick'657 new_hostless_jobs = 0658 new_jobs_with_hosts = 0659 new_jobs_need_hosts = 0660 host_jobs = []661 logging.debug('Processing %d queue_entries', len(queue_entries))662 for queue_entry in queue_entries:663 if queue_entry.is_hostless():664 self._schedule_hostless_job(queue_entry)665 new_hostless_jobs = new_hostless_jobs + 1666 else:667 host_jobs.append(queue_entry)668 new_jobs_need_hosts = new_jobs_need_hosts + 1669 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)670 if not host_jobs:671 return672 if not _inline_host_acquisition:673 message = ('Found %s jobs that need hosts though '674 '_inline_host_acquisition=%s. Will acquire hosts.' %675 ([str(job) for job in host_jobs],676 _inline_host_acquisition))677 email_manager.manager.enqueue_notify_email(678 'Processing unexpected host acquisition requests', message)679 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)680 for host_assignment in jobs_with_hosts:681 self._schedule_host_job(host_assignment.host, host_assignment.job)682 new_jobs_with_hosts = new_jobs_with_hosts + 1683 autotest_stats.Gauge(key).send('new_jobs_with_hosts',684 new_jobs_with_hosts)685 autotest_stats.Gauge(key).send('new_jobs_without_hosts',686 new_jobs_need_hosts -687 new_jobs_with_hosts)688 def _schedule_running_host_queue_entries(self):689 """690 Adds agents to the dispatcher.691 Any AgentTask, like the QueueTask, is wrapped in an Agent. The692 QueueTask for example, will have a job with a control file, and693 the agent will have methods that poll, abort and check if the queue694 task is finished. The dispatcher runs the agent_task, as well as695 other agents in it's _agents member, through _handle_agents, by696 calling the Agents tick().697 This method creates an agent for each HQE in one of (starting, running,698 gathering, parsing, archiving) states, and adds it to the dispatcher so699 it is handled by _handle_agents.700 """701 for agent_task in self._get_queue_entry_agent_tasks():702 self.add_agent_task(agent_task)703 def _schedule_delay_tasks(self):704 for entry in scheduler_models.HostQueueEntry.fetch(705 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):706 task = entry.job.schedule_delayed_callback_task(entry)707 if task:708 self.add_agent_task(task)709 def _find_aborting(self):710 """711 Looks through the afe_host_queue_entries for an aborted entry.712 The aborted bit is set on an HQE in many ways, the most common713 being when a user requests an abort through the frontend, which714 results in an rpc from the afe to abort_host_queue_entries.715 """...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful