Best Python code snippet using autotest_python
drone_manager.py
Source:drone_manager.py  
...326            num_processes = self._extract_num_processes(command)327            drone = self._choose_drone_for_execution(num_processes)328        logging.info("command = %s" % command)329        logging.info('log file = %s:%s' % (drone.hostname, log_file))330        self._write_attached_files(command, drone)331        drone.queue_call('execute_command', command, working_directory,332                         log_file, pidfile_name)333        pidfile_path = self.absolute_path(os.path.join(working_directory,334                                                       pidfile_name))335        pidfile_id = PidfileId(pidfile_path)336        self.register_pidfile(pidfile_id)337        return pidfile_id338    def get_pidfile_id_from(self, execution_tag, pidfile_name):339        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)340        return PidfileId(path)341    def register_pidfile(self, pidfile_id):342        """343        Indicate that the DroneManager should look for the given pidfile when344        refreshing.345        """346        self._pidfile_age[pidfile_id] = 0347    def get_pidfile_contents(self, pidfile_id, use_second_read=False):348        """349        Retrieve a PidfileContents object for the given pidfile_id.  If350        use_second_read is True, use results that were read after the processes351        were checked, instead of before.352        """353        self.register_pidfile(pidfile_id)354        if use_second_read:355            pidfile_map = self._pidfiles_second_read356        else:357            pidfile_map = self._pidfiles358        return pidfile_map.get(pidfile_id, PidfileContents())359    def is_process_running(self, process):360        """361        Check if the given process is in the running process list.362        """363        return process in self._process_set364    def get_temporary_path(self, base_name):365        """366        Get a new temporary path guaranteed to be unique across all drones367        for this scheduler execution.368        """369        self._temporary_path_counter += 1370        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,371                            '%s.%s' % (base_name, self._temporary_path_counter))372    def absolute_path(self, path):373        return os.path.join(self._results_dir, path)374    def _copy_results_helper(self, process, source_path, destination_path,375                             to_results_repository=False):376        full_source = self.absolute_path(source_path)377        full_destination = self.absolute_path(destination_path)378        source_drone = self._get_drone_for_process(process)379        if to_results_repository:380            source_drone.send_file_to(self._results_drone, full_source,381                                      full_destination, can_fail=True)382        else:383            source_drone.queue_call('copy_file_or_directory', full_source,384                                    full_destination)385    def copy_to_results_repository(self, process, source_path,386                                   destination_path=None):387        """388        Copy results from the given process at source_path to destination_path389        in the results repository.390        """391        if destination_path is None:392            destination_path = source_path393        self._copy_results_helper(process, source_path, destination_path,394                                  to_results_repository=True)395    def copy_results_on_drone(self, process, source_path, destination_path):396        """397        Copy a results directory from one place to another on the drone.398        """399        self._copy_results_helper(process, source_path, destination_path)400    def _write_attached_files(self, command, drone):401        execution_tag = self._extract_execution_tag(' '.join(command))402        attached_files = self._attached_files.pop(execution_tag, {})403        for file_path, contents in attached_files.iteritems():404            drone.queue_call('write_to_file', self.absolute_path(file_path),405                             contents)406    def attach_file_to_execution(self, execution_tag, file_contents,407                                 file_path=None):408        """409        When the process for execution_tag is executed, the given file contents410        will be placed in a file on the drone.  Returns the path at which the411        file will be placed.412        """413        if not file_path:414            file_path = self.get_temporary_path('attach')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
