How to use _get_drone_for_pidfile_id method in autotest

Best Python code snippet using autotest_python

drone_manager.py

Source:drone_manager.py Github

copy

Full Screen

...252 drone.hostname)253 drone.queue_call('cleanup_orphaned_containers')254 def _get_drone_for_process(self, process):255 return self._drones[process.hostname]256 def _get_drone_for_pidfile_id(self, pidfile_id):257 pidfile_contents = self.get_pidfile_contents(pidfile_id)258 if pidfile_contents.process is None:259 raise DroneManagerError('Fail to get a drone due to empty pidfile')260 return self._get_drone_for_process(pidfile_contents.process)261 def get_drone_for_pidfile_id(self, pidfile_id):262 """Public API for luciferlib.263 @param pidfile_id: PidfileId instance.264 """265 return self._get_drone_for_pidfile_id(pidfile_id)266 def _drop_old_pidfiles(self):267 # use items() since the dict is modified in unregister_pidfile()268 for pidfile_id, info in self._registered_pidfile_info.items():269 if info.age > self._get_max_pidfile_refreshes():270 logging.warning('dropping leaked pidfile %s', pidfile_id)271 self.unregister_pidfile(pidfile_id)272 else:273 info.age += 1274 def _reset(self):275 self._process_set = set()276 self._all_processes = {}277 self._pidfiles = {}278 self._pidfiles_second_read = {}279 self._drone_queue = []280 def _parse_pidfile(self, drone, raw_contents):281 """Parse raw pidfile contents.282 @param drone: The drone on which this pidfile was found.283 @param raw_contents: The raw contents of a pidfile, eg:284 "pid\nexit_staus\nnum_tests_failed\n".285 """286 contents = PidfileContents()287 if not raw_contents:288 return contents289 lines = raw_contents.splitlines()290 if len(lines) > 3:291 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %292 (len(lines), lines))293 try:294 pid = int(lines[0])295 contents.process = Process(drone.hostname, pid)296 # if len(lines) == 2, assume we caught Autoserv between writing297 # exit_status and num_failed_tests, so just ignore it and wait for298 # the next cycle299 if len(lines) == 3:300 contents.exit_status = int(lines[1])301 contents.num_tests_failed = int(lines[2])302 except ValueError, exc:303 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))304 return contents305 def _process_pidfiles(self, drone, pidfiles, store_in_dict):306 for pidfile_path, contents in pidfiles.iteritems():307 pidfile_id = PidfileId(pidfile_path)308 contents = self._parse_pidfile(drone, contents)309 store_in_dict[pidfile_id] = contents310 def _add_process(self, drone, process_info):311 process = Process(drone.hostname, int(process_info['pid']),312 int(process_info['ppid']))313 self._process_set.add(process)314 def _add_autoserv_process(self, drone, process_info):315 assert process_info['comm'] == 'autoserv'316 # only root autoserv processes have pgid == pid317 if process_info['pgid'] != process_info['pid']:318 return319 self._add_process(drone, process_info)320 def _enqueue_drone(self, drone):321 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))322 def _reorder_drone_queue(self):323 heapq.heapify(self._drone_queue)324 def _compute_active_processes(self, drone):325 drone.active_processes = 0326 for pidfile_id, contents in self._pidfiles.iteritems():327 is_running = contents.exit_status is None328 on_this_drone = (contents.process329 and contents.process.hostname == drone.hostname)330 if is_running and on_this_drone:331 info = self._registered_pidfile_info[pidfile_id]332 if info.num_processes is not None:333 drone.active_processes += info.num_processes334 metrics.Gauge('chromeos/autotest/drone/active_processes').set(335 drone.active_processes,336 fields={'drone_hostname': drone.hostname})337 def _check_drone_process_limit(self, drone):338 """339 Notify if the number of processes on |drone| is approaching limit.340 @param drone: A Drone object.341 """342 try:343 percent = float(drone.active_processes) / drone.max_processes344 except ZeroDivisionError:345 percent = 100346 metrics.Float('chromeos/autotest/drone/active_process_percentage'347 ).set(percent, fields={'drone_hostname': drone.hostname})348 def trigger_refresh(self):349 """Triggers a drone manager refresh.350 @raises DroneManagerError: If a drone has un-executed calls.351 Since they will get clobbered when we queue refresh calls.352 """353 self._reset()354 self._drop_old_pidfiles()355 pidfile_paths = [pidfile_id.path356 for pidfile_id in self._registered_pidfile_info]357 drones = list(self.get_drones())358 for drone in drones:359 calls = drone.get_calls()360 if calls:361 raise DroneManagerError('Drone %s has un-executed calls: %s '362 'which might get corrupted through '363 'this invocation' %364 (drone, [str(call) for call in calls]))365 drone.queue_call('refresh', pidfile_paths)366 logging.info("Invoking drone refresh.")367 with metrics.SecondsTimer(368 'chromeos/autotest/drone_manager/trigger_refresh_duration'):369 self._refresh_task_queue.execute(drones, wait=False)370 def sync_refresh(self):371 """Complete the drone refresh started by trigger_refresh.372 Waits for all drone threads then refreshes internal datastructures373 with drone process information.374 """375 # This gives us a dictionary like what follows:376 # {drone: [{'pidfiles': (raw contents of pidfile paths),377 # 'autoserv_processes': (autoserv process info from ps),378 # 'all_processes': (all process info from ps),379 # 'parse_processes': (parse process infor from ps),380 # 'pidfile_second_read': (pidfile contents, again),}]381 # drone2: ...}382 # The values of each drone are only a list because this adheres to the383 # drone utility interface (each call is executed and its results are384 # places in a list, but since we never couple the refresh calls with385 # any other call, this list will always contain a single dict).386 with metrics.SecondsTimer(387 'chromeos/autotest/drone_manager/sync_refresh_duration'):388 all_results = self._refresh_task_queue.get_results()389 logging.info("Drones refreshed.")390 # The loop below goes through and parses pidfile contents. Pidfiles391 # are used to track autoserv execution, and will always contain < 3392 # lines of the following: pid, exit code, number of tests. Each pidfile393 # is identified by a PidfileId object, which contains a unique pidfile394 # path (unique because it contains the job id) making it hashable.395 # All pidfiles are stored in the drone managers _pidfiles dict as:396 # {pidfile_id: pidfile_contents(Process(drone, pid),397 # exit_code, num_tests_failed)}398 # In handle agents, each agent knows its pidfile_id, and uses this399 # to retrieve the refreshed contents of its pidfile via the400 # PidfileRunMonitor (through its tick) before making decisions. If401 # the agent notices that its process has exited, it unregisters the402 # pidfile from the drone_managers._registered_pidfile_info dict403 # through its epilog.404 for drone, results_list in all_results.iteritems():405 results = results_list[0]406 drone_hostname = drone.hostname.replace('.', '_')407 for process_info in results['all_processes']:408 if process_info['comm'] == 'autoserv':409 self._add_autoserv_process(drone, process_info)410 drone_pid = drone.hostname, int(process_info['pid'])411 self._all_processes[drone_pid] = process_info412 for process_info in results['parse_processes']:413 self._add_process(drone, process_info)414 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)415 self._process_pidfiles(drone, results['pidfiles_second_read'],416 self._pidfiles_second_read)417 self._compute_active_processes(drone)418 if drone.enabled:419 self._enqueue_drone(drone)420 self._check_drone_process_limit(drone)421 def refresh(self):422 """Refresh all drones."""423 with metrics.SecondsTimer(424 'chromeos/autotest/drone_manager/refresh_duration'):425 self.trigger_refresh()426 self.sync_refresh()427 @metrics.SecondsTimerDecorator(428 'chromeos/autotest/drone_manager/execute_actions_duration')429 def execute_actions(self):430 """431 Called at the end of a scheduler cycle to execute all queued actions432 on drones.433 """434 # Invoke calls queued on all drones since the last call to execute435 # and wait for them to return.436 if _THREADED_DRONE_MANAGER:437 thread_lib.ThreadedTaskQueue(438 name='%s.execute_queue' % self._STATS_KEY).execute(439 self._drones.values())440 else:441 drone_task_queue.DroneTaskQueue().execute(self._drones.values())442 try:443 self._results_drone.execute_queued_calls()444 except error.AutoservError:445 m = 'chromeos/autotest/errors/results_repository_failed'446 metrics.Counter(m).increment(447 fields={'drone_hostname': self._results_drone.hostname})448 self._results_drone.clear_call_queue()449 def get_orphaned_autoserv_processes(self):450 """451 Returns a set of Process objects for orphaned processes only.452 """453 return set(process for process in self._process_set454 if process.ppid == 1)455 def kill_process(self, process):456 """457 Kill the given process.458 """459 logging.info('killing %s', process)460 drone = self._get_drone_for_process(process)461 drone.queue_kill_process(process)462 def _ensure_directory_exists(self, path):463 if not os.path.exists(path):464 os.makedirs(path)465 def total_running_processes(self):466 return sum(drone.active_processes for drone in self.get_drones())467 def max_runnable_processes(self, username, drone_hostnames_allowed):468 """469 Return the maximum number of processes that can be run (in a single470 execution) given the current load on drones.471 @param username: login of user to run a process. may be None.472 @param drone_hostnames_allowed: list of drones that can be used. May be473 None474 """475 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue476 if wrapper.drone.usable_by(username) and477 (drone_hostnames_allowed is None or478 wrapper.drone.hostname in479 drone_hostnames_allowed)]480 if not usable_drone_wrappers:481 # all drones disabled or inaccessible482 return 0483 runnable_processes = [484 wrapper.drone.max_processes - wrapper.drone.active_processes485 for wrapper in usable_drone_wrappers]486 return max([0] + runnable_processes)487 def _least_loaded_drone(self, drones):488 return min(drones, key=lambda d: d.used_capacity())489 def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):490 """Return a drone to use.491 Various options can be passed to optimize drone selection.492 num_processes is the number of processes the drone is intended493 to run.494 prefer_ssp indicates whether drones supporting server-side495 packaging should be preferred. The returned drone is not496 guaranteed to support it.497 This public API is exposed for luciferlib to wrap.498 Returns a drone instance (see drones.py).499 """500 return self._choose_drone_for_execution(501 num_processes=num_processes,502 username=None, # Always allow all drones503 drone_hostnames_allowed=None, # Always allow all drones504 require_ssp=prefer_ssp,505 )506 def _choose_drone_for_execution(self, num_processes, username,507 drone_hostnames_allowed,508 require_ssp=False):509 """Choose a drone to execute command.510 @param num_processes: Number of processes needed for execution.511 @param username: Name of the user to execute the command.512 @param drone_hostnames_allowed: A list of names of drone allowed.513 @param require_ssp: Require server-side packaging to execute the,514 command, default to False.515 @return: A drone object to be used for execution.516 """517 # cycle through drones is order of increasing used capacity until518 # we find one that can handle these processes519 checked_drones = []520 usable_drones = []521 # Drones do not support server-side packaging, used as backup if no522 # drone is found to run command requires server-side packaging.523 no_ssp_drones = []524 drone_to_use = None525 while self._drone_queue:526 drone = heapq.heappop(self._drone_queue).drone527 checked_drones.append(drone)528 logging.info('Checking drone %s', drone.hostname)529 if not drone.usable_by(username):530 continue531 drone_allowed = (drone_hostnames_allowed is None532 or drone.hostname in drone_hostnames_allowed)533 if not drone_allowed:534 logging.debug('Drone %s not allowed: ', drone.hostname)535 continue536 if require_ssp and not drone.support_ssp:537 logging.debug('Drone %s does not support server-side '538 'packaging.', drone.hostname)539 no_ssp_drones.append(drone)540 continue541 usable_drones.append(drone)542 if drone.active_processes + num_processes <= drone.max_processes:543 drone_to_use = drone544 break545 logging.info('Drone %s has %d active + %s requested > %s max',546 drone.hostname, drone.active_processes, num_processes,547 drone.max_processes)548 if not drone_to_use and usable_drones:549 # Drones are all over loaded, pick the one with least load.550 drone_summary = ','.join('%s %s/%s' % (drone.hostname,551 drone.active_processes,552 drone.max_processes)553 for drone in usable_drones)554 logging.error('No drone has capacity to handle %d processes (%s) '555 'for user %s', num_processes, drone_summary, username)556 drone_to_use = self._least_loaded_drone(usable_drones)557 elif not drone_to_use and require_ssp and no_ssp_drones:558 # No drone supports server-side packaging, choose the least loaded.559 drone_to_use = self._least_loaded_drone(no_ssp_drones)560 # refill _drone_queue561 for drone in checked_drones:562 self._enqueue_drone(drone)563 return drone_to_use564 def _substitute_working_directory_into_command(self, command,565 working_directory):566 for i, item in enumerate(command):567 if item is WORKING_DIRECTORY:568 command[i] = working_directory569 def execute_command(self, command, working_directory, pidfile_name,570 num_processes, log_file=None, paired_with_pidfile=None,571 username=None, drone_hostnames_allowed=None):572 """573 Execute the given command, taken as an argv list.574 @param command: command to execute as a list. if any item is575 WORKING_DIRECTORY, the absolute path to the working directory576 will be substituted for it.577 @param working_directory: directory in which the pidfile will be written578 @param pidfile_name: name of the pidfile this process will write579 @param num_processes: number of processes to account for from this580 execution581 @param log_file (optional): path (in the results repository) to hold582 command output.583 @param paired_with_pidfile (optional): a PidfileId for an584 already-executed process; the new process will execute on the585 same drone as the previous process.586 @param username (optional): login of the user responsible for this587 process.588 @param drone_hostnames_allowed (optional): hostnames of the drones that589 this command is allowed to590 execute on591 """592 abs_working_directory = self.absolute_path(working_directory)593 if not log_file:594 log_file = self.get_temporary_path('execute')595 log_file = self.absolute_path(log_file)596 self._substitute_working_directory_into_command(command,597 abs_working_directory)598 if paired_with_pidfile:599 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)600 else:601 require_ssp = '--require-ssp' in command602 drone = self._choose_drone_for_execution(603 num_processes, username, drone_hostnames_allowed,604 require_ssp=require_ssp)605 # Enable --warn-no-ssp option for autoserv to log a warning and run606 # the command without using server-side packaging.607 if require_ssp and not drone.support_ssp:608 command.append('--warn-no-ssp')609 if not drone:610 raise DroneManagerError('command failed; no drones available: %s'611 % command)612 logging.info("command = %s", command)613 logging.info('log file = %s:%s', drone.hostname, log_file)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful