...541 if not queue_entries:542 return []543 self._host_scheduler.refresh(queue_entries)544 return queue_entries545 def _schedule_atomic_group(self, queue_entry):546 """547 Schedule the given queue_entry on an atomic group of hosts.548 Returns immediately if there are insufficient available hosts.549 Creates new HostQueueEntries based off of queue_entry for the550 scheduled hosts and starts them all running.551 """552 # This is a virtual host queue entry representing an entire553 # atomic group, find a group and schedule their hosts.554 group_hosts = self._host_scheduler.find_eligible_atomic_group(555 queue_entry)556 if not group_hosts:557 return558'Expanding atomic group entry %s with hosts %s',559 queue_entry,560 ', '.join(host.hostname for host in group_hosts))561 for assigned_host in group_hosts[1:]:562 # Create a new HQE for every additional assigned_host.563 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)564 new_hqe.set_host(assigned_host)566 self._run_queue_entry(new_hqe)567 # The first assigned host uses the original HostQueueEntry568 queue_entry.set_host(group_hosts[0])569 self._run_queue_entry(queue_entry)570 def _schedule_hostless_job(self, queue_entry):571 self.add_agent_task(HostlessQueueTask(queue_entry))572 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)573 def _schedule_new_jobs(self):574 queue_entries = self._refresh_pending_queue_entries()575 if not queue_entries:576 return577 for queue_entry in queue_entries:578 is_unassigned_atomic_group = (579 queue_entry.atomic_group_id is not None and580 queue_entry.host_id is None)581 if queue_entry.is_hostless():582 self._schedule_hostless_job(queue_entry)583 elif is_unassigned_atomic_group:584 self._schedule_atomic_group(queue_entry)585 else:586 assigned_host = self._host_scheduler.schedule_entry(queue_entry)587 if assigned_host and not self.host_has_agent(assigned_host):588 assert == queue_entry.host_id589 self._run_queue_entry(queue_entry)590 def _schedule_running_host_queue_entries(self):591 for agent_task in self._get_queue_entry_agent_tasks():592 self.add_agent_task(agent_task)593 def _schedule_delay_tasks(self):594 for entry in scheduler_models.HostQueueEntry.fetch(595 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):596 task = entry.job.schedule_delayed_callback_task(entry)597 if task:598 self.add_agent_task(task)...

