Best Python code snippet using autotest_python
drone_manager.py
Source:drone_manager.py  
...131        self._call_all_drones('initialize', self._results_dir)132    def shutdown(self):133        for drone in self.get_drones():134            drone.shutdown()135    def _get_max_pidfile_refreshes(self):136        """137        Normally refresh() is called on every monitor_db.Dispatcher.tick().138        :return: The number of refresh() calls before we forget a pidfile.139        """140        pidfile_timeout = settings.get_value(141            scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',142            type=int, default=2000)143        return pidfile_timeout144    def _add_drone(self, hostname):145        logging.info('Adding drone %s' % hostname)146        drone = drones.get_drone(hostname)147        if drone:148            self._drones[drone.hostname] = drone149            drone.call('initialize', self.absolute_path(''))150    def _remove_drone(self, hostname):151        self._drones.pop(hostname, None)152    def refresh_drone_configs(self):153        """154        Reread global config options for all drones.155        """156        section = scheduler_config.CONFIG_SECTION157        settings.parse_config_file()158        for hostname, drone in self._drones.iteritems():159            disabled = settings.get_value(section, '%s_disabled' % hostname,160                                          default='')161            drone.enabled = not bool(disabled)162            drone.max_processes = settings.get_value(163                section, '%s_max_processes' % hostname, type=int,164                default=scheduler_config.config.max_processes_per_drone)165            allowed_users = settings.get_value(section, '%s_users' % hostname,166                                               default=None)167            if allowed_users is not None:168                allowed_users = set(allowed_users.split())169            drone.allowed_users = allowed_users170        self._reorder_drone_queue()  # max_processes may have changed171    def get_drones(self):172        return self._drones.itervalues()173    def _get_drone_for_process(self, process):174        return self._drones[process.hostname]175    def _get_drone_for_pidfile_id(self, pidfile_id):176        pidfile_contents = self.get_pidfile_contents(pidfile_id)177        assert pidfile_contents.process is not None178        return self._get_drone_for_process(pidfile_contents.process)179    def _drop_old_pidfiles(self):180        # use items() since the dict is modified in unregister_pidfile()181        for pidfile_id, info in self._registered_pidfile_info.items():182            if info.age > self._get_max_pidfile_refreshes():183                logging.warning('dropping leaked pidfile %s', pidfile_id)184                self.unregister_pidfile(pidfile_id)185            else:186                info.age += 1187    def _reset(self):188        self._process_set = set()189        self._pidfiles = {}190        self._pidfiles_second_read = {}191        self._drone_queue = []192    def _call_all_drones(self, method, *args, **kwargs):193        all_results = {}194        for drone in self.get_drones():195            all_results[drone] = drone.call(method, *args, **kwargs)196        return all_results...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
