Best Python code snippet using autotest_python
agent_task.py
Source:agent_task.py  
...168        """169        To be overridden.170        """171        assert not self.monitor172        self.register_necessary_pidfiles()173    def _log_file(self):174        if not self._log_file_name:175            return None176        return os.path.join(self._working_directory(), self._log_file_name)177    def cleanup(self):178        log_file = self._log_file()179        if self.monitor and log_file:180            self.monitor.try_copy_to_results_repository(log_file)181    def epilog(self):182        """183        To be overridden.184        """185        self.cleanup()186        logging.info("%s finished with success=%s", type(self).__name__,187                     self.success)188    def start(self):189        if not self.started:190            self.prolog()191            self.run()192        self.started = True193    def abort(self):194        if self.monitor:195            self.monitor.kill()196        self.done = True197        self.aborted = True198        self.cleanup()199    def _get_consistent_execution_path(self, execution_entries):200        first_execution_path = execution_entries[0].execution_path()201        for execution_entry in execution_entries[1:]:202            assert execution_entry.execution_path() == first_execution_path, (203                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),204                                        execution_entry,205                                        first_execution_path,206                                        execution_entries[0]))207        return first_execution_path208    def _copy_results(self, execution_entries, use_monitor=None):209        """210        @param execution_entries: list of objects with execution_path() method211        """212        if use_monitor is not None and not use_monitor.has_process():213            return214        assert len(execution_entries) > 0215        if use_monitor is None:216            assert self.monitor217            use_monitor = self.monitor218        assert use_monitor.has_process()219        execution_path = self._get_consistent_execution_path(execution_entries)220        results_path = execution_path + '/'221        use_monitor.try_copy_to_results_repository(results_path)222    def _parse_results(self, queue_entries):223        for queue_entry in queue_entries:224            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)225    def _archive_results(self, queue_entries):226        for queue_entry in queue_entries:227            queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)228    def _command_line(self):229        """230        Return the command line to run.  Must be overridden.231        """232        raise NotImplementedError233    @property234    def num_processes(self):235        """236        Return the number of processes forked by this BaseAgentTask's process.237        It may only be approximate.  To be overridden if necessary.238        """239        return 1240    def _paired_with_monitor(self):241        """242        If this BaseAgentTask's process must run on the same machine as some243        previous process, this method should be overridden to return a244        PidfileRunMonitor for that process.245        """246        return self._NullMonitor()247    @property248    def owner_username(self):249        """250        Return login of user responsible for this task.  May be None.  Must be251        overridden.252        """253        raise NotImplementedError254    def _working_directory(self):255        """256        Return the directory where this BaseAgentTask's process executes.257        Must be overridden.258        """259        raise NotImplementedError260    def _pidfile_name(self):261        """262        Return the name of the pidfile this BaseAgentTask's process uses.  To be263        overridden if necessary.264        """265        return drone_manager.AUTOSERV_PID_FILE266    def _check_paired_results_exist(self):267        if not self._paired_with_monitor().has_process():268            metadata = {269                    '_type': 'scheduler_error',270                    'error': 'No paired results in task',271                    'task': str(self),272                    'pidfile_id': str(self._paired_with_monitor().pidfile_id)}273            autotest_stats.Counter('no_paired_results_in_task',274                                   metadata=metadata).increment()275            self.finished(False)276            return False277        return True278    def _create_monitor(self):279        assert not self.monitor280        self.monitor = pidfile_monitor.PidfileRunMonitor()281    def run(self):282        if not self._check_paired_results_exist():283            return284        self._create_monitor()285        self.monitor.run(286                self._command_line(), self._working_directory(),287                num_processes=self.num_processes,288                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),289                pidfile_name=self._pidfile_name(),290                paired_with_pidfile=self._paired_with_monitor().pidfile_id,291                username=self.owner_username,292                drone_hostnames_allowed=self.get_drone_hostnames_allowed())293    def get_drone_hostnames_allowed(294            self, restricted_subnets=utils.RESTRICTED_SUBNETS,295            enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):296        filtered_drones = None297        has_unrestricted_host = False298        if (self.hostnames and restricted_subnets and enable_drone_in_subnet):299            for hostname in self.hostnames.values():300                subnet = utils.get_restricted_subnet(hostname,301                                                     restricted_subnets)302                # Return an empty set if the list of hosts exists both in303                # restricted and unrestricted subnet. No drone can work in such304                # case.305                if ((not subnet and filtered_drones is not None) or306                    (subnet and has_unrestricted_host)):307                    logging.error('The test has some DUT in restricted subnet, '308                                  'but some in unrestricted subnet. Therefore, '309                                  'no drone is available to run the test.')310                    return set()311                if not subnet:312                    has_unrestricted_host = True313                    continue314                server_ip_map=system_utils.DroneCache.get_drone_ip_map()315                filtered_drones_for_host = set(316                        utils.get_servers_in_same_subnet(317                                subnet[0], subnet[1],318                                server_ip_map=server_ip_map))319                logging.info('DUT %s is in restricted subnet, drone can only '320                             'be chosen from %s', hostname,321                             filtered_drones_for_host)322                if filtered_drones is None:323                    filtered_drones = filtered_drones_for_host324                else:325                    filtered_drones = set.intersection(326                            filtered_drones, filtered_drones_for_host)327                # If filtered_drones is an empty set, that means no drone is328                # allowed to run the task. This is different fron None, which329                # means all drones are allowed.330                if filtered_drones == set():331                    logging.error('DUT(s) is in restricted subnet, but no '332                                  'drone is available to run the test.')333                    return filtered_drones334        # If host is not in restricted subnet, use the unrestricted drones only.335        if (filtered_drones is None and restricted_subnets and336            enable_drone_in_subnet):337            filtered_drones = set(338                    system_utils.DroneCache.get_unrestricted_drones(339                            restricted_subnets=restricted_subnets))340        if not models.DroneSet.drone_sets_enabled():341            return filtered_drones342        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)343        if not hqes:344            # Only special tasks could be missing host queue entries345            assert isinstance(self, SpecialAgentTask)346            return self._user_or_global_default_drone_set(347                    self.task, self.task.requested_by)348        job_ids = hqes.values_list('job', flat=True).distinct()349        assert job_ids.count() == 1, ("BaseAgentTask's queue entries "350                                      "span multiple jobs")351        job = models.Job.objects.get(id=job_ids[0])352        drone_set = job.drone_set353        if not drone_set:354            return self._user_or_global_default_drone_set(job, job.user())355        if filtered_drones:356            return set.intersection(filtered_drones,357                                    drone_set.get_drone_hostnames())358        else:359            return drone_set.get_drone_hostnames()360    def _user_or_global_default_drone_set(self, obj_with_owner, user):361        """362        Returns the user's default drone set, if present.363        Otherwise, returns the global default drone set.364        """365        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()366        if not user:367            logging.warning('%s had no owner; using default drone set',368                         obj_with_owner)369            return default_hostnames370        if not user.drone_set:371            logging.warning('User %s has no default drone set, using global '372                         'default', user.login)373            return default_hostnames374        return user.drone_set.get_drone_hostnames()375    def register_necessary_pidfiles(self):376        pidfile_id = self._drone_manager.get_pidfile_id_from(377                self._working_directory(), self._pidfile_name())378        self._drone_manager.register_pidfile(pidfile_id)379        paired_pidfile_id = self._paired_with_monitor().pidfile_id380        if paired_pidfile_id:381            self._drone_manager.register_pidfile(paired_pidfile_id)382    def recover(self):383        if not self._check_paired_results_exist():384            return385        self._create_monitor()386        self.monitor.attach_to_existing_process(387                self._working_directory(), pidfile_name=self._pidfile_name(),388                num_processes=self.num_processes)389        if not self.monitor.has_process():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
