How to use _check_paired_results_exist method in autotest

Best Python code snippet using autotest_python

agent_task.py

Source:agent_task.py Github

copy

Full Screen

...262 Return the name of the pidfile this BaseAgentTask's process uses. To be263 overridden if necessary.264 """265 return drone_manager.AUTOSERV_PID_FILE266 def _check_paired_results_exist(self):267 if not self._paired_with_monitor().has_process():268 metadata = {269 '_type': 'scheduler_error',270 'error': 'No paired results in task',271 'task': str(self),272 'pidfile_id': str(self._paired_with_monitor().pidfile_id)}273 autotest_stats.Counter('no_paired_results_in_task',274 metadata=metadata).increment()275 self.finished(False)276 return False277 return True278 def _create_monitor(self):279 assert not self.monitor280 self.monitor = pidfile_monitor.PidfileRunMonitor()281 def run(self):282 if not self._check_paired_results_exist():283 return284 self._create_monitor()285 self.monitor.run(286 self._command_line(), self._working_directory(),287 num_processes=self.num_processes,288 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),289 pidfile_name=self._pidfile_name(),290 paired_with_pidfile=self._paired_with_monitor().pidfile_id,291 username=self.owner_username,292 drone_hostnames_allowed=self.get_drone_hostnames_allowed())293 def get_drone_hostnames_allowed(294 self, restricted_subnets=utils.RESTRICTED_SUBNETS,295 enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):296 filtered_drones = None297 has_unrestricted_host = False298 if (self.hostnames and restricted_subnets and enable_drone_in_subnet):299 for hostname in self.hostnames.values():300 subnet = utils.get_restricted_subnet(hostname,301 restricted_subnets)302 # Return an empty set if the list of hosts exists both in303 # restricted and unrestricted subnet. No drone can work in such304 # case.305 if ((not subnet and filtered_drones is not None) or306 (subnet and has_unrestricted_host)):307 logging.error('The test has some DUT in restricted subnet, '308 'but some in unrestricted subnet. Therefore, '309 'no drone is available to run the test.')310 return set()311 if not subnet:312 has_unrestricted_host = True313 continue314 server_ip_map=system_utils.DroneCache.get_drone_ip_map()315 filtered_drones_for_host = set(316 utils.get_servers_in_same_subnet(317 subnet[0], subnet[1],318 server_ip_map=server_ip_map))319 logging.info('DUT %s is in restricted subnet, drone can only '320 'be chosen from %s', hostname,321 filtered_drones_for_host)322 if filtered_drones is None:323 filtered_drones = filtered_drones_for_host324 else:325 filtered_drones = set.intersection(326 filtered_drones, filtered_drones_for_host)327 # If filtered_drones is an empty set, that means no drone is328 # allowed to run the task. This is different fron None, which329 # means all drones are allowed.330 if filtered_drones == set():331 logging.error('DUT(s) is in restricted subnet, but no '332 'drone is available to run the test.')333 return filtered_drones334 # If host is not in restricted subnet, use the unrestricted drones only.335 if (filtered_drones is None and restricted_subnets and336 enable_drone_in_subnet):337 filtered_drones = set(338 system_utils.DroneCache.get_unrestricted_drones(339 restricted_subnets=restricted_subnets))340 if not models.DroneSet.drone_sets_enabled():341 return filtered_drones342 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)343 if not hqes:344 # Only special tasks could be missing host queue entries345 assert isinstance(self, SpecialAgentTask)346 return self._user_or_global_default_drone_set(347 self.task, self.task.requested_by)348 job_ids = hqes.values_list('job', flat=True).distinct()349 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "350 "span multiple jobs")351 job = models.Job.objects.get(id=job_ids[0])352 drone_set = job.drone_set353 if not drone_set:354 return self._user_or_global_default_drone_set(job, job.user())355 if filtered_drones:356 return set.intersection(filtered_drones,357 drone_set.get_drone_hostnames())358 else:359 return drone_set.get_drone_hostnames()360 def _user_or_global_default_drone_set(self, obj_with_owner, user):361 """362 Returns the user's default drone set, if present.363 Otherwise, returns the global default drone set.364 """365 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()366 if not user:367 logging.warning('%s had no owner; using default drone set',368 obj_with_owner)369 return default_hostnames370 if not user.drone_set:371 logging.warning('User %s has no default drone set, using global '372 'default', user.login)373 return default_hostnames374 return user.drone_set.get_drone_hostnames()375 def register_necessary_pidfiles(self):376 pidfile_id = self._drone_manager.get_pidfile_id_from(377 self._working_directory(), self._pidfile_name())378 self._drone_manager.register_pidfile(pidfile_id)379 paired_pidfile_id = self._paired_with_monitor().pidfile_id380 if paired_pidfile_id:381 self._drone_manager.register_pidfile(paired_pidfile_id)382 def recover(self):383 if not self._check_paired_results_exist():384 return385 self._create_monitor()386 self.monitor.attach_to_existing_process(387 self._working_directory(), pidfile_name=self._pidfile_name(),388 num_processes=self.num_processes)389 if not self.monitor.has_process():390 # no process to recover; wait to be started normally391 self.monitor = None392 return393 self.started = True394 logging.info('Recovering process %s for %s at %s',395 self.monitor.get_process(), type(self).__name__,396 self._working_directory())397 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful