...252 drone.hostname)253 drone.queue_call('cleanup_orphaned_containers')254 def _get_drone_for_process(self, process):255 return self._drones[process.hostname]256 def _get_drone_for_pidfile_id(self, pidfile_id):257 pidfile_contents = self.get_pidfile_contents(pidfile_id)258 if pidfile_contents.process is None:259 raise DroneManagerError('Fail to get a drone due to empty pidfile')260 return self._get_drone_for_process(pidfile_contents.process)261 def get_drone_for_pidfile_id(self, pidfile_id):262 """Public API for luciferlib.263 @param pidfile_id: PidfileId instance.264 """265 return self._get_drone_for_pidfile_id(pidfile_id)266 def _drop_old_pidfiles(self):267 # use items() since the dict is modified in unregister_pidfile()268 for pidfile_id, info in self._registered_pidfile_info.items():269 if info.age > self._get_max_pidfile_refreshes():270 logging.warning('dropping leaked pidfile %s', pidfile_id)271 self.unregister_pidfile(pidfile_id)272 else:273 info.age += 1274 def _reset(self):275 self._process_set = set()276 self._all_processes = {}277 self._pidfiles = {}278 self._pidfiles_second_read = {}279 self._drone_queue = []280 def _parse_pidfile(self, drone, raw_contents):281 """Parse raw pidfile contents.282 @param drone: The drone on which this pidfile was found.283 @param raw_contents: The raw contents of a pidfile, eg:284 "pid\nexit_staus\nnum_tests_failed\n".285 """286 contents = PidfileContents()287 if not raw_contents:288 return contents289 lines = raw_contents.splitlines()290 if len(lines) > 3:291 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %292 (len(lines), lines))293 try:294 pid = int(lines[0])295 contents.process = Process(drone.hostname, pid)296 # if len(lines) == 2, assume we caught Autoserv between writing297 # exit_status and num_failed_tests, so just ignore it and wait for298 # the next cycle299 if len(lines) == 3:300 contents.exit_status = int(lines[1])301 contents.num_tests_failed = int(lines[2])302 except ValueError, exc:303 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))304 return contents305 def _process_pidfiles(self, drone, pidfiles, store_in_dict):306 for pidfile_path, contents in pidfiles.iteritems():307 pidfile_id = PidfileId(pidfile_path)308 contents = self._parse_pidfile(drone, contents)309 store_in_dict[pidfile_id] = contents310 def _add_process(self, drone, process_info):311 process = Process(drone.hostname, int(process_info['pid']),312 int(process_info['ppid']))313 self._process_set.add(process)314 def _add_autoserv_process(self, drone, process_info):315 assert process_info['comm'] == 'autoserv'316 # only root autoserv processes have pgid == pid317 if process_info['pgid'] != process_info['pid']:318 return319 self._add_process(drone, process_info)320 def _enqueue_drone(self, drone):321 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))322 def _reorder_drone_queue(self):323 heapq.heapify(self._drone_queue)324 def _compute_active_processes(self, drone):325 drone.active_processes = 0326 for pidfile_id, contents in self._pidfiles.iteritems():327 is_running = contents.exit_status is None328 on_this_drone = (contents.process329 and contents.process.hostname == drone.hostname)330 if is_running and on_this_drone:331 info = self._registered_pidfile_info[pidfile_id]332 if info.num_processes is not None:333 drone.active_processes += info.num_processes334 metrics.Gauge('chromeos/autotest/drone/active_processes').set(335 drone.active_processes,336 fields={'drone_hostname': drone.hostname})337 def _check_drone_process_limit(self, drone):338 """339 Notify if the number of processes on |drone| is approaching limit.340 @param drone: A Drone object.341 """342 try:343 percent = float(drone.active_processes) / drone.max_processes344 except ZeroDivisionError:345 percent = 100346 metrics.Float('chromeos/autotest/drone/active_process_percentage'347 ).set(percent, fields={'drone_hostname': drone.hostname})348 def trigger_refresh(self):349 """Triggers a drone manager refresh.350 @raises DroneManagerError: If a drone has un-executed calls.351 Since they will get clobbered when we queue refresh calls.352 """353 self._reset()354 self._drop_old_pidfiles()355 pidfile_paths = [pidfile_id.path356 for pidfile_id in self._registered_pidfile_info]357 drones = list(self.get_drones())358 for drone in drones:359 calls = drone.get_calls()360 if calls:361 raise DroneManagerError('Drone %s has un-executed calls: %s '362 'which might get corrupted through '363 'this invocation' %364 (drone, [str(call) for call in calls]))365 drone.queue_call('refresh', pidfile_paths)366"Invoking drone refresh.")367 with metrics.SecondsTimer(368 'chromeos/autotest/drone_manager/trigger_refresh_duration'):369 self._refresh_task_queue.execute(drones, wait=False)370 def sync_refresh(self):371 """Complete the drone refresh started by trigger_refresh.372 Waits for all drone threads then refreshes internal datastructures373 with drone process information.374 """375 # This gives us a dictionary like what follows:376 # {drone: [{'pidfiles': (raw contents of pidfile paths),377 # 'autoserv_processes': (autoserv process info from ps),378 # 'all_processes': (all process info from ps),379 # 'parse_processes': (parse process infor from ps),380 # 'pidfile_second_read': (pidfile contents, again),}]381 # drone2: ...}382 # The values of each drone are only a list because this adheres to the383 # drone utility interface (each call is executed and its results are384 # places in a list, but since we never couple the refresh calls with385 # any other call, this list will always contain a single dict).386 with metrics.SecondsTimer(387 'chromeos/autotest/drone_manager/sync_refresh_duration'):388 all_results = self._refresh_task_queue.get_results()389"Drones refreshed.")390 # The loop below goes through and parses pidfile contents. Pidfiles391 # are used to track autoserv execution, and will always contain < 3392 # lines of the following: pid, exit code, number of tests. Each pidfile393 # is identified by a PidfileId object, which contains a unique pidfile394 # path (unique because it contains the job id) making it hashable.395 # All pidfiles are stored in the drone managers _pidfiles dict as:396 # {pidfile_id: pidfile_contents(Process(drone, pid),397 # exit_code, num_tests_failed)}398 # In handle agents, each agent knows its pidfile_id, and uses this399 # to retrieve the refreshed contents of its pidfile via the400 # PidfileRunMonitor (through its tick) before making decisions. If401 # the agent notices that its process has exited, it unregisters the402 # pidfile from the drone_managers._registered_pidfile_info dict403 # through its epilog.404 for drone, results_list in all_results.iteritems():405 results = results_list[0]406 drone_hostname = drone.hostname.replace('.', '_')407 for process_info in results['all_processes']:408 if process_info['comm'] == 'autoserv':409 self._add_autoserv_process(drone, process_info)410 drone_pid = drone.hostname, int(process_info['pid'])411 self._all_processes[drone_pid] = process_info412 for process_info in results['parse_processes']:413 self._add_process(drone, process_info)414 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)415 self._process_pidfiles(drone, results['pidfiles_second_read'],416 self._pidfiles_second_read)417 self._compute_active_processes(drone)418 if drone.enabled:419 self._enqueue_drone(drone)420 self._check_drone_process_limit(drone)421 def refresh(self):422 """Refresh all drones."""423 with metrics.SecondsTimer(424 'chromeos/autotest/drone_manager/refresh_duration'):425 self.trigger_refresh()426 self.sync_refresh()427 @metrics.SecondsTimerDecorator(428 'chromeos/autotest/drone_manager/execute_actions_duration')429 def execute_actions(self):430 """431 Called at the end of a scheduler cycle to execute all queued actions432 on drones.433 """434 # Invoke calls queued on all drones since the last call to execute435 # and wait for them to return.436 if _THREADED_DRONE_MANAGER:437 thread_lib.ThreadedTaskQueue(438 name='%s.execute_queue' % self._STATS_KEY).execute(439 self._drones.values())440 else:441 drone_task_queue.DroneTaskQueue().execute(self._drones.values())442 try:443 self._results_drone.execute_queued_calls()444 except error.AutoservError:445 m = 'chromeos/autotest/errors/results_repository_failed'446 metrics.Counter(m).increment(447 fields={'drone_hostname': self._results_drone.hostname})448 self._results_drone.clear_call_queue()449 def get_orphaned_autoserv_processes(self):450 """451 Returns a set of Process objects for orphaned processes only.452 """453 return set(process for process in self._process_set454 if process.ppid == 1)455 def kill_process(self, process):456 """457 Kill the given process.458 """459'killing %s', process)460 drone = self._get_drone_for_process(process)461 drone.queue_kill_process(process)462 def _ensure_directory_exists(self, path):463 if not os.path.exists(path):464 os.makedirs(path)465 def total_running_processes(self):466 return sum(drone.active_processes for drone in self.get_drones())467 def max_runnable_processes(self, username, drone_hostnames_allowed):468 """469 Return the maximum number of processes that can be run (in a single470 execution) given the current load on drones.471 @param username: login of user to run a process. may be None.472 @param drone_hostnames_allowed: list of drones that can be used. May be473 None474 """475 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue476 if wrapper.drone.usable_by(username) and477 (drone_hostnames_allowed is None or478 wrapper.drone.hostname in479 drone_hostnames_allowed)]480 if not usable_drone_wrappers:481 # all drones disabled or inaccessible482 return 0483 runnable_processes = [484 wrapper.drone.max_processes - wrapper.drone.active_processes485 for wrapper in usable_drone_wrappers]486 return max([0] + runnable_processes)487 def _least_loaded_drone(self, drones):488 return min(drones, key=lambda d: d.used_capacity())489 def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):490 """Return a drone to use.491 Various options can be passed to optimize drone selection.492 num_processes is the number of processes the drone is intended493 to run.494 prefer_ssp indicates whether drones supporting server-side495 packaging should be preferred. The returned drone is not496 guaranteed to support it.497 This public API is exposed for luciferlib to wrap.498 Returns a drone instance (see """500 return self._choose_drone_for_execution(501 num_processes=num_processes,502 username=None, # Always allow all drones503 drone_hostnames_allowed=None, # Always allow all drones504 require_ssp=prefer_ssp,505 )506 def _choose_drone_for_execution(self, num_processes, username,507 drone_hostnames_allowed,508 require_ssp=False):509 """Choose a drone to execute command.510 @param num_processes: Number of processes needed for execution.511 @param username: Name of the user to execute the command.512 @param drone_hostnames_allowed: A list of names of drone allowed.513 @param require_ssp: Require server-side packaging to execute the,514 command, default to False.515 @return: A drone object to be used for execution.516 """517 # cycle through drones is order of increasing used capacity until518 # we find one that can handle these processes519 checked_drones = []520 usable_drones = []521 # Drones do not support server-side packaging, used as backup if no522 # drone is found to run command requires server-side packaging.523 no_ssp_drones = []524 drone_to_use = None525 while self._drone_queue:526 drone = heapq.heappop(self._drone_queue).drone527 checked_drones.append(drone)528'Checking drone %s', drone.hostname)529 if not drone.usable_by(username):530 continue531 drone_allowed = (drone_hostnames_allowed is None532 or drone.hostname in drone_hostnames_allowed)533 if not drone_allowed:534 logging.debug('Drone %s not allowed: ', drone.hostname)535 continue536 if require_ssp and not drone.support_ssp:537 logging.debug('Drone %s does not support server-side '538 'packaging.', drone.hostname)539 no_ssp_drones.append(drone)540 continue541 usable_drones.append(drone)542 if drone.active_processes + num_processes <= drone.max_processes:543 drone_to_use = drone544 break545'Drone %s has %d active + %s requested > %s max',546 drone.hostname, drone.active_processes, num_processes,547 drone.max_processes)548 if not drone_to_use and usable_drones:549 # Drones are all over loaded, pick the one with least load.550 drone_summary = ','.join('%s %s/%s' % (drone.hostname,551 drone.active_processes,552 drone.max_processes)553 for drone in usable_drones)554 logging.error('No drone has capacity to handle %d processes (%s) '555 'for user %s', num_processes, drone_summary, username)556 drone_to_use = self._least_loaded_drone(usable_drones)557 elif not drone_to_use and require_ssp and no_ssp_drones:558 # No drone supports server-side packaging, choose the least loaded.559 drone_to_use = self._least_loaded_drone(no_ssp_drones)560 # refill _drone_queue561 for drone in checked_drones:562 self._enqueue_drone(drone)563 return drone_to_use564 def _substitute_working_directory_into_command(self, command,565 working_directory):566 for i, item in enumerate(command):567 if item is WORKING_DIRECTORY:568 command[i] = working_directory569 def execute_command(self, command, working_directory, pidfile_name,570 num_processes, log_file=None, paired_with_pidfile=None,571 username=None, drone_hostnames_allowed=None):572 """573 Execute the given command, taken as an argv list.574 @param command: command to execute as a list. if any item is575 WORKING_DIRECTORY, the absolute path to the working directory576 will be substituted for it.577 @param working_directory: directory in which the pidfile will be written578 @param pidfile_name: name of the pidfile this process will write579 @param num_processes: number of processes to account for from this580 execution581 @param log_file (optional): path (in the results repository) to hold582 command output.583 @param paired_with_pidfile (optional): a PidfileId for an584 already-executed process; the new process will execute on the585 same drone as the previous process.586 @param username (optional): login of the user responsible for this587 process.588 @param drone_hostnames_allowed (optional): hostnames of the drones that589 this command is allowed to590 execute on591 """592 abs_working_directory = self.absolute_path(working_directory)593 if not log_file:594 log_file = self.get_temporary_path('execute')595 log_file = self.absolute_path(log_file)596 self._substitute_working_directory_into_command(command,597 abs_working_directory)598 if paired_with_pidfile:599 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)600 else:601 require_ssp = '--require-ssp' in command602 drone = self._choose_drone_for_execution(603 num_processes, username, drone_hostnames_allowed,604 require_ssp=require_ssp)605 # Enable --warn-no-ssp option for autoserv to log a warning and run606 # the command without using server-side packaging.607 if require_ssp and not drone.support_ssp:608 command.append('--warn-no-ssp')609 if not drone:610 raise DroneManagerError('command failed; no drones available: %s'611 % command)612"command = %s", command)613'log file = %s:%s', drone.hostname, log_file)...

