...194 if flip:195 left_id, right_id = right_id, left_id196 result.setdefault(left_id, set()).add(right_id)197 return result198 def _get_sql_id_list(self, id_list):199 return ','.join(str(item_id) for item_id in id_list)200 def _get_many2many_dict(self, query, id_list, flip=False):201 if not id_list:202 return {}203 query %= self._get_sql_id_list(id_list)204 rows = self._db.execute(query)205 return self._process_many2many_dict(rows, flip)206 @_host_timer.decorate207 def _get_ready_hosts(self):208 # We don't lose anything by re-doing these checks209 # even though we release hosts on the same conditions.210 # In the future we might have multiple clients that211 # release_hosts and/or lock them independent of the212 # scheduler tick.213 hosts = scheduler_models.Host.fetch(214 where="NOT afe_hosts.leased "215 "AND NOT afe_hosts.locked "216 "AND (afe_hosts.status IS NULL "217 "OR afe_hosts.status = 'Ready')")218 return dict((, host) for host in hosts)219 @_host_timer.decorate220 def _get_job_acl_groups(self, job_ids):221 query = """222 SELECT, afe_acl_groups_users.aclgroup_id223 FROM afe_jobs224 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner225 INNER JOIN afe_acl_groups_users ON226 afe_acl_groups_users.user_id = afe_users.id227 WHERE IN (%s)228 """229 return self._get_many2many_dict(query, job_ids)230 @_host_timer.decorate231 def _get_job_ineligible_hosts(self, job_ids):232 query = """233 SELECT job_id, host_id234 FROM afe_ineligible_host_queues235 WHERE job_id IN (%s)236 """237 return self._get_many2many_dict(query, job_ids)238 @_host_timer.decorate239 def _get_job_dependencies(self, job_ids):240 query = """241 SELECT job_id, label_id242 FROM afe_jobs_dependency_labels243 WHERE job_id IN (%s)244 """245 return self._get_many2many_dict(query, job_ids)246 @_host_timer.decorate247 def _get_host_acls(self, host_ids):248 query = """249 SELECT host_id, aclgroup_id250 FROM afe_acl_groups_hosts251 WHERE host_id IN (%s)252 """253 return self._get_many2many_dict(query, host_ids)254 @_host_timer.decorate255 def _get_label_hosts(self, host_ids):256 if not host_ids:257 return {}, {}258 query = """259 SELECT label_id, host_id260 FROM afe_hosts_labels261 WHERE host_id IN (%s)262 """ % self._get_sql_id_list(host_ids)263 rows = self._db.execute(query)264 labels_to_hosts = self._process_many2many_dict(rows)265 hosts_to_labels = self._process_many2many_dict(rows, flip=True)266 return labels_to_hosts, hosts_to_labels267 @classmethod268 def find_unused_healty_hosts(cls):269 """Get hosts that are currently unused and in the READY state.270 @return: A list of host objects, one for each unused healthy host.271 """272 # Avoid any host with a currently active queue entry against it.273 hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '274 'ON ( = active_hqe.host_id AND '275 '')276 # Avoid any host with a new special task against it. There are 2 cases...

...47 "AND NOT afe_hosts.locked "48 "AND (afe_hosts.status IS NULL "49 "OR afe_hosts.status = 'Ready')")50 return dict((, host) for host in hosts)51 def _get_sql_id_list(self, id_list):52 return ','.join(str(item_id) for item_id in id_list)53 def _get_many2many_dict(self, query, id_list, flip=False):54 if not id_list:55 return {}56 query %= self._get_sql_id_list(id_list)57 rows = self._db.execute(query)58 return self._process_many2many_dict(rows, flip)59 def _process_many2many_dict(self, rows, flip=False):60 result = {}61 for row in rows:62 left_id, right_id = int(row[0]), int(row[1])63 if flip:64 left_id, right_id = right_id, left_id65 result.setdefault(left_id, set()).add(right_id)66 return result67 def _get_job_acl_groups(self, job_ids):68 query = """69 SELECT, afe_acl_groups_users.aclgroup_id70 FROM afe_jobs71 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner72 INNER JOIN afe_acl_groups_users ON73 afe_acl_groups_users.user_id = afe_users.id74 WHERE IN (%s)75 """76 return self._get_many2many_dict(query, job_ids)77 def _get_job_ineligible_hosts(self, job_ids):78 query = """79 SELECT job_id, host_id80 FROM afe_ineligible_host_queues81 WHERE job_id IN (%s)82 """83 return self._get_many2many_dict(query, job_ids)84 def _get_job_dependencies(self, job_ids):85 query = """86 SELECT job_id, label_id87 FROM afe_jobs_dependency_labels88 WHERE job_id IN (%s)89 """90 return self._get_many2many_dict(query, job_ids)91 def _get_host_acls(self, host_ids):92 query = """93 SELECT host_id, aclgroup_id94 FROM afe_acl_groups_hosts95 WHERE host_id IN (%s)96 """97 return self._get_many2many_dict(query, host_ids)98 def _get_label_hosts(self, host_ids):99 if not host_ids:100 return {}, {}101 query = """102 SELECT label_id, host_id103 FROM afe_hosts_labels104 WHERE host_id IN (%s)105 """ % self._get_sql_id_list(host_ids)106 rows = self._db.execute(query)107 labels_to_hosts = self._process_many2many_dict(rows)108 hosts_to_labels = self._process_many2many_dict(rows, flip=True)109 return labels_to_hosts, hosts_to_labels110 def _get_labels(self):111 return dict((, label) for label112 in scheduler_models.Label.fetch())113 def recovery_on_startup(self):114 for metahost_scheduler in self._metahost_schedulers:115 metahost_scheduler.recovery_on_startup()116 def refresh(self, pending_queue_entries):117 self._hosts_available = self._get_ready_hosts()118 relevant_jobs = [queue_entry.job_id119 for queue_entry in pending_queue_entries]...

