Best Python code snippet using autotest_python
job.py
Source:job.py  
...602        """603        return self.partition(device, loop_size, mountpoint)604    def enable_external_logging(self):605        pass606    def disable_external_logging(self):607        pass608    def reboot_setup(self):609        # save the partition list and mount points, as well as the cpu count610        partition_list = partition_lib.get_partition_list(self,611                                                          exclude_swap=False)612        mount_info = partition_lib.get_mount_info(partition_list)613        self._state.set('client', 'mount_info', mount_info)614        self._state.set('client', 'cpu_count', utils.count_cpus())615    def reboot(self):616        self.reboot_setup()617        self.harness.run_reboot()618        # HACK: using this as a module sometimes hangs shutdown, so if it's619        # installed unload it first620        utils.system("modprobe -r netconsole", ignore_status=True)621        # sync first, so that a sync during shutdown doesn't time out622        utils.system("sync; sync", ignore_status=True)623        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624        self.quit()625    def noop(self, text):626        logging.info("job: noop: " + text)627    @_run_test_complete_on_exit628    def parallel(self, *tasklist):629        """Run tasks in parallel"""630        pids = []631        old_log_filename = self._logger.global_filename632        for i, task in enumerate(tasklist):633            assert isinstance(task, (tuple, list))634            self._logger.global_filename = old_log_filename + (".%d" % i)635            def task_func():636                # stub out _record_indent with a process-local one637                base_record_indent = self._record_indent638                proc_local = self._job_state.property_factory(639                    '_state', '_record_indent.%d' % os.getpid(),640                    base_record_indent, namespace='client')641                self.__class__._record_indent = proc_local642                task[0](*task[1:])643            pids.append(parallel.fork_start(self.resultdir, task_func))644        old_log_path = os.path.join(self.resultdir, old_log_filename)645        old_log = open(old_log_path, "a")646        exceptions = []647        for i, pid in enumerate(pids):648            # wait for the task to finish649            try:650                parallel.fork_waitfor(self.resultdir, pid)651            except Exception, e:652                exceptions.append(e)653            # copy the logs from the subtask into the main log654            new_log_path = old_log_path + (".%d" % i)655            if os.path.exists(new_log_path):656                new_log = open(new_log_path)657                old_log.write(new_log.read())658                new_log.close()659                old_log.flush()660                os.remove(new_log_path)661        old_log.close()662        self._logger.global_filename = old_log_filename663        # handle any exceptions raised by the parallel tasks664        if exceptions:665            msg = "%d task(s) failed in job.parallel" % len(exceptions)666            raise error.JobError(msg)667    def quit(self):668        # XXX: should have a better name.669        self.harness.run_pause()670        raise error.JobContinue("more to come")671    def complete(self, status):672        """Write pending reports, clean up, and exit"""673        # write out a job HTML report674        try:675            html_report.create_report(self.resultdir)676        except Exception, e:677            logging.error("Error writing job HTML report: %s", e)678        # We are about to exit 'complete' so clean up the control file.679        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680        shutil.move(self._state_file, dest)681        self.harness.run_complete()682        self.disable_external_logging()683        sys.exit(status)684    def _load_state(self):685        # grab any initial state and set up $CONTROL.state as the backing file686        init_state_file = self.control + '.init.state'687        self._state_file = self.control + '.state'688        if os.path.exists(init_state_file):689            shutil.move(init_state_file, self._state_file)690        self._state.set_backing_file(self._state_file)691        # initialize the state engine, if necessary692        has_steps = self._state.has('client', 'steps')693        if not self._is_continuation and has_steps:694            raise RuntimeError('Loaded state can only contain client.steps if '695                               'this is a continuation')696        if not has_steps:...server_job.py
Source:server_job.py  
...317        """318        Start or restart external logging mechanism.319        """320        pass321    def disable_external_logging(self):322        """323        Pause or stop external logging mechanism.324        """325        pass326    def use_external_logging(self):327        """328        Return True if external logging should be used.329        """330        return False331    def _make_parallel_wrapper(self, function, machines, log):332        """Wrap function as appropriate for calling by parallel_simple."""333        is_forking = not (len(machines) == 1 and self.machines == machines)334        if self._parse_job and is_forking and log:335            def wrapper(machine):336                self._parse_job += "/" + machine337                self._using_parser = True338                self.machines = [machine]339                self.push_execution_context(machine)340                os.chdir(self.resultdir)341                utils.write_keyval(self.resultdir, {"hostname": machine})342                self.init_parser()343                result = function(machine)344                self.cleanup_parser()345                return result346        elif len(machines) > 1 and log:347            def wrapper(machine):348                self.push_execution_context(machine)349                os.chdir(self.resultdir)350                machine_data = {'hostname': machine,351                                'status_version': str(self._STATUS_VERSION)}352                utils.write_keyval(self.resultdir, machine_data)353                result = function(machine)354                return result355        else:356            wrapper = function357        return wrapper358    def parallel_simple(self, function, machines, log=True, timeout=None,359                        return_results=False):360        """361        Run 'function' using parallel_simple, with an extra wrapper to handle362        the necessary setup for continuous parsing, if possible. If continuous363        parsing is already properly initialized then this should just work.364        :param function: A callable to run in parallel given each machine.365        :param machines: A list of machine names to be passed one per subcommand366                invocation of function.367        :param log: If True, output will be written to output in a subdirectory368                named after each machine.369        :param timeout: Seconds after which the function call should timeout.370        :param return_results: If True instead of an AutoServError being raised371                on any error a list of the results|exceptions from the function372                called on each arg is returned.  [default: False]373        :raise error.AutotestError: If any of the functions failed.374        """375        wrapper = self._make_parallel_wrapper(function, machines, log)376        return subcommand.parallel_simple(wrapper, machines,377                                          log=log, timeout=timeout,378                                          return_results=return_results)379    def parallel_on_machines(self, function, machines, timeout=None):380        """381        :param function: Called in parallel with one machine as its argument.382        :param machines: A list of machines to call function(machine) on.383        :param timeout: Seconds after which the function call should timeout.384        :return: A list of machines on which function(machine) returned385                without raising an exception.386        """387        results = self.parallel_simple(function, machines, timeout=timeout,388                                       return_results=True)389        success_machines = []390        for result, machine in itertools.izip(results, machines):391            if not isinstance(result, Exception):392                success_machines.append(machine)393        return success_machines394    _USE_TEMP_DIR = object()395    def run(self, cleanup=False, install_before=False, install_after=False,396            collect_crashdumps=True, namespace={}, control=None,397            control_file_dir=None, only_collect_crashinfo=False):398        # for a normal job, make sure the uncollected logs file exists399        # for a crashinfo-only run it should already exist, bail out otherwise400        created_uncollected_logs = False401        if self.resultdir and not os.path.exists(self._uncollected_log_file):402            if only_collect_crashinfo:403                # if this is a crashinfo-only run, and there were no existing404                # uncollected logs, just bail out early405                logging.info("No existing uncollected logs, "406                             "skipping crashinfo collection")407                return408            else:409                log_file = open(self._uncollected_log_file, "w")410                pickle.dump([], log_file)411                log_file.close()412                created_uncollected_logs = True413        # use a copy so changes don't affect the original dictionary414        namespace = namespace.copy()415        machines = self.machines416        if control is None:417            if self.control is None:418                control = ''419            else:420                control = self._load_control_file(self.control)421        if control_file_dir is None:422            control_file_dir = self.resultdir423        self.aborted = False424        namespace['machines'] = machines425        namespace['args'] = self.args426        namespace['job'] = self427        namespace['ssh_user'] = self._ssh_user428        namespace['ssh_port'] = self._ssh_port429        namespace['ssh_pass'] = self._ssh_pass430        test_start_time = int(time.time())431        if self.resultdir:432            os.chdir(self.resultdir)433            # touch status.log so that the parser knows a job is running here434            open(self.get_status_log_path(), 'a').close()435            self.enable_external_logging()436        collect_crashinfo = True437        temp_control_file_dir = None438        try:439            try:440                if install_before and machines:441                    self._execute_code(INSTALL_CONTROL_FILE, namespace)442                if only_collect_crashinfo:443                    return444                # determine the dir to write the control files to445                cfd_specified = (control_file_dir and control_file_dir is not446                                 self._USE_TEMP_DIR)447                if cfd_specified:448                    temp_control_file_dir = None449                else:450                    temp_control_file_dir = tempfile.mkdtemp(451                        suffix='temp_control_file_dir')452                    control_file_dir = temp_control_file_dir453                server_control_file = os.path.join(control_file_dir,454                                                   self._control_filename)455                client_control_file = os.path.join(control_file_dir,456                                                   CLIENT_CONTROL_FILENAME)457                if self._client:458                    namespace['control'] = control459                    utils.open_write_close(client_control_file, control)460                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,461                                    server_control_file)462                else:463                    utils.open_write_close(server_control_file, control)464                logging.info("Processing control file")465                self._execute_code(server_control_file, namespace)466                logging.info("Finished processing control file")467                # no error occurred, so we don't need to collect crashinfo468                collect_crashinfo = False469            except Exception, e:470                try:471                    logging.exception(472                        'Exception escaped control file, job aborting:')473                    self.record('INFO', None, None, str(e),474                                {'job_abort_reason': str(e)})475                except Exception:476                    pass  # don't let logging exceptions here interfere477                raise478        finally:479            if temp_control_file_dir:480                # Clean up temp directory used for copies of the control files481                try:482                    shutil.rmtree(temp_control_file_dir)483                except Exception, e:484                    logging.warn('Could not remove temp directory %s: %s',485                                 temp_control_file_dir, e)486            if machines and (collect_crashdumps or collect_crashinfo):487                namespace['test_start_time'] = test_start_time488                if collect_crashinfo:489                    # includes crashdumps490                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)491                else:492                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)493            if self._uncollected_log_file and created_uncollected_logs:494                os.remove(self._uncollected_log_file)495            self.disable_external_logging()496            if cleanup and machines:497                self._execute_code(CLEANUP_CONTROL_FILE, namespace)498            if install_after and machines:499                self._execute_code(INSTALL_CONTROL_FILE, namespace)500    def run_test(self, url, *args, **dargs):501        """502        Summon a test object and run it.503        tag504                tag to add to testname505        url506                url of the test to run507        """508        group, testname = self.pkgmgr.get_package_name(url, 'test')509        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
