...209 return {}210 query %= self._get_sql_id_list(id_list)211 rows = self._db.execute(query)212 return self._process_many2many_dict(rows, flip)213 def _get_ready_hosts(self):214 # We don't lose anything by re-doing these checks215 # even though we release hosts on the same conditions.216 # In the future we might have multiple clients that217 # release_hosts and/or lock them independent of the218 # scheduler tick.219 hosts = scheduler_models.Host.fetch(220 where="NOT afe_hosts.leased "221 "AND NOT afe_hosts.locked "222 "AND (afe_hosts.status IS NULL "223 "OR afe_hosts.status = 'Ready')")224 return dict((, host) for host in hosts)225 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups')226 def _get_job_acl_groups(self, job_ids):227 query = """228 SELECT, afe_acl_groups_users.aclgroup_id229 FROM afe_jobs230 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner231 INNER JOIN afe_acl_groups_users ON232 afe_acl_groups_users.user_id = afe_users.id233 WHERE IN (%s)234 """235 return self._get_many2many_dict(query, job_ids)236 def _get_job_ineligible_hosts(self, job_ids):237 query = """238 SELECT job_id, host_id239 FROM afe_ineligible_host_queues240 WHERE job_id IN (%s)241 """242 return self._get_many2many_dict(query, job_ids)243 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies')244 def _get_job_dependencies(self, job_ids):245 query = """246 SELECT job_id, label_id247 FROM afe_jobs_dependency_labels248 WHERE job_id IN (%s)249 """250 return self._get_many2many_dict(query, job_ids)251 @classmethod252 def find_unused_healty_hosts(cls):253 """Get hosts that are currently unused and in the READY state.254 @return: A list of host objects, one for each unused healthy host.255 """256 # Avoid any host with a currently active queue entry against it.257 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '258 'ON ( = active_hqe.host_id AND '259 '')260 # Avoid any host with a new special task against it. There are 2 cases261 # when an inactive but incomplete special task will not use the host262 # this tick: 1. When the host is locked 2. When an active hqe already263 # has special tasks for the same host. In both these cases this host264 # will not be in the ready hosts list anyway. In all other cases,265 # an incomplete special task will grab the host before a new job does266 # by assigning an agent to it.267 special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '268 'ON ( = new_tasks.host_id AND '269 'new_tasks.is_complete=0)')270 return scheduler_models.Host.fetch(271 joins='%s %s' % (hqe_join, special_task_join),272 where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "273 "AND afe_hosts.leased "274 "AND NOT afe_hosts.locked "275 "AND (afe_hosts.status IS NULL "276 "OR afe_hosts.status = 'Ready')")277 @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased')278 def set_leased(self, leased_value, **kwargs):279 """Modify the leased bit on the hosts with ids in host_ids.280 @param leased_value: The True/False value of the leased column for281 the hosts with ids in host_ids.282 @param kwargs: The args to use in finding matching hosts.283 """284'Setting leased = %s for the hosts that match %s',285 leased_value, kwargs)286 models.Host.objects.filter(**kwargs).update(leased=leased_value)287 @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels')288 def _get_labels(self, job_dependencies):289 """290 Calculate a dict mapping label id to label object so that we don't291 frequently round trip to the database every time we need a label.292 @param job_dependencies: A dict mapping an integer job id to a list of293 integer label id's. ie. {job_id: [label_id]}294 @return: A dict mapping an integer label id to a scheduler model label295 object. ie. {label_id: label_object}296 """297 id_to_label = dict()298 # Pull all the labels on hosts we might look at299 host_labels = scheduler_models.Label.fetch(300 where="id IN (SELECT label_id FROM afe_hosts_labels)")301 id_to_label.update([(, label) for label in host_labels])302 # and pull all the labels on jobs we might look at.303 job_label_set = set()304 for job_deps in job_dependencies.values():305 job_label_set.update(job_deps)306 # On the rare/impossible chance that no jobs have any labels, we307 # can skip this.308 if job_label_set:309 job_string_label_list = ','.join([str(x) for x in job_label_set])310 job_labels = scheduler_models.Label.fetch(311 where="id IN (%s)" % job_string_label_list)312 id_to_label.update([(, label) for label in job_labels])313 return id_to_label314 def refresh(self, pending_queue_entries):315 """Update the query manager.316 Cache information about a list of queue entries and eligible hosts317 from the database so clients can avoid expensive round trips during318 host acquisition.319 @param pending_queue_entries: A list of queue entries about which we320 need information.321 """322 self._hosts_available = self._get_ready_hosts()323 relevant_jobs = [queue_entry.job_id324 for queue_entry in pending_queue_entries]325 self._job_acls = self._get_job_acl_groups(relevant_jobs)326 self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))327 self._job_dependencies = (self._get_job_dependencies(relevant_jobs))328 host_ids = self._hosts_available.keys()...

...36 [scheduler] + self._metahost_schedulers)37'Metahost schedulers: %s',38 ', '.join(type(scheduler).__name__ for scheduler39 in self._metahost_schedulers))40 def _get_ready_hosts(self):41 # avoid any host with a currently active queue entry against it42 hosts = scheduler_models.Host.fetch(43 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '44 'ON ( = active_hqe.host_id AND '45 '',46 where="active_hqe.host_id IS NULL "47 "AND NOT afe_hosts.locked "48 "AND (afe_hosts.status IS NULL "49 "OR afe_hosts.status = 'Ready')")50 return dict((, host) for host in hosts)51 def _get_sql_id_list(self, id_list):52 return ','.join(str(item_id) for item_id in id_list)53 def _get_many2many_dict(self, query, id_list, flip=False):54 if not id_list:55 return {}56 query %= self._get_sql_id_list(id_list)57 rows = self._db.execute(query)58 return self._process_many2many_dict(rows, flip)59 def _process_many2many_dict(self, rows, flip=False):60 result = {}61 for row in rows:62 left_id, right_id = int(row[0]), int(row[1])63 if flip:64 left_id, right_id = right_id, left_id65 result.setdefault(left_id, set()).add(right_id)66 return result67 def _get_job_acl_groups(self, job_ids):68 query = """69 SELECT, afe_acl_groups_users.aclgroup_id70 FROM afe_jobs71 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner72 INNER JOIN afe_acl_groups_users ON73 afe_acl_groups_users.user_id = afe_users.id74 WHERE IN (%s)75 """76 return self._get_many2many_dict(query, job_ids)77 def _get_job_ineligible_hosts(self, job_ids):78 query = """79 SELECT job_id, host_id80 FROM afe_ineligible_host_queues81 WHERE job_id IN (%s)82 """83 return self._get_many2many_dict(query, job_ids)84 def _get_job_dependencies(self, job_ids):85 query = """86 SELECT job_id, label_id87 FROM afe_jobs_dependency_labels88 WHERE job_id IN (%s)89 """90 return self._get_many2many_dict(query, job_ids)91 def _get_host_acls(self, host_ids):92 query = """93 SELECT host_id, aclgroup_id94 FROM afe_acl_groups_hosts95 WHERE host_id IN (%s)96 """97 return self._get_many2many_dict(query, host_ids)98 def _get_label_hosts(self, host_ids):99 if not host_ids:100 return {}, {}101 query = """102 SELECT label_id, host_id103 FROM afe_hosts_labels104 WHERE host_id IN (%s)105 """ % self._get_sql_id_list(host_ids)106 rows = self._db.execute(query)107 labels_to_hosts = self._process_many2many_dict(rows)108 hosts_to_labels = self._process_many2many_dict(rows, flip=True)109 return labels_to_hosts, hosts_to_labels110 def _get_labels(self):111 return dict((, label) for label112 in scheduler_models.Label.fetch())113 def recovery_on_startup(self):114 for metahost_scheduler in self._metahost_schedulers:115 metahost_scheduler.recovery_on_startup()116 def refresh(self, pending_queue_entries):117 self._hosts_available = self._get_ready_hosts()118 relevant_jobs = [queue_entry.job_id119 for queue_entry in pending_queue_entries]120 self._job_acls = self._get_job_acl_groups(relevant_jobs)121 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)122 self._job_dependencies = self._get_job_dependencies(relevant_jobs)123 host_ids = self._hosts_available.keys()124 self._host_acls = self._get_host_acls(host_ids)125 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)126 self._labels = self._get_labels()127 def tick(self):128 for metahost_scheduler in self._metahost_schedulers:129 metahost_scheduler.tick()130 def hosts_in_label(self, label_id):131 return set(self._label_hosts.get(label_id, ()))...

