Best Python code snippet using autotest_python
server_job.py
Source:server_job.py  
...353    def _get_status_logger(self):354        """Return a reference to the status logger."""355        return self._logger356    @staticmethod357    def _load_control_file(path):358        f = open(path)359        try:360            control_file = f.read()361        finally:362            f.close()363        return re.sub('\r', '', control_file)364    def _register_subcommand_hooks(self):365        """366        Register some hooks into the subcommand modules that allow us367        to properly clean up self.hosts created in forked subprocesses.368        """369        def on_fork(cmd):370            self._existing_hosts_on_fork = set(self.hosts)371        def on_join(cmd):372            new_hosts = self.hosts - self._existing_hosts_on_fork373            for host in new_hosts:374                host.close()375        subcommand.subcommand.register_fork_hook(on_fork)376        subcommand.subcommand.register_join_hook(on_join)377    # TODO crbug.com/285395 add a kwargs parameter.378    def _make_namespace(self):379        """Create a namespace dictionary to be passed along to control file.380        Creates a namespace argument populated with standard values:381        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,382        and ssh_options.383        """384        namespace = {'machines' : self.machine_dict_list,385                     'job' : self,386                     'ssh_user' : self._ssh_user,387                     'ssh_port' : self._ssh_port,388                     'ssh_pass' : self._ssh_pass,389                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,390                     'ssh_options' : self._ssh_options}391        return namespace392    def cleanup(self, labels):393        """Cleanup machines.394        @param labels: Comma separated job labels, will be used to395                       determine special task actions.396        """397        if not self.machines:398            raise error.AutoservError('No machines specified to cleanup')399        if self.resultdir:400            os.chdir(self.resultdir)401        namespace = self._make_namespace()402        namespace.update({'job_labels': labels, 'args': ''})403        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)404    def verify(self, labels):405        """Verify machines are all ssh-able.406        @param labels: Comma separated job labels, will be used to407                       determine special task actions.408        """409        if not self.machines:410            raise error.AutoservError('No machines specified to verify')411        if self.resultdir:412            os.chdir(self.resultdir)413        namespace = self._make_namespace()414        namespace.update({'job_labels': labels, 'args': ''})415        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)416    def reset(self, labels):417        """Reset machines by first cleanup then verify each machine.418        @param labels: Comma separated job labels, will be used to419                       determine special task actions.420        """421        if not self.machines:422            raise error.AutoservError('No machines specified to reset.')423        if self.resultdir:424            os.chdir(self.resultdir)425        namespace = self._make_namespace()426        namespace.update({'job_labels': labels, 'args': ''})427        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)428    def repair(self, labels):429        """Repair machines.430        @param labels: Comma separated job labels, will be used to431                       determine special task actions.432        """433        if not self.machines:434            raise error.AutoservError('No machines specified to repair')435        if self.resultdir:436            os.chdir(self.resultdir)437        namespace = self._make_namespace()438        namespace.update({'job_labels': labels, 'args': ''})439        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)440    def provision(self, labels):441        """442        Provision all hosts to match |labels|.443        @param labels: A comma seperated string of labels to provision the444                       host to.445        """446        control = self._load_control_file(PROVISION_CONTROL_FILE)447        self.run(control=control, job_labels=labels)448    def precheck(self):449        """450        perform any additional checks in derived classes.451        """452        pass453    def enable_external_logging(self):454        """455        Start or restart external logging mechanism.456        """457        pass458    def disable_external_logging(self):459        """460        Pause or stop external logging mechanism.461        """462        pass463    def use_external_logging(self):464        """465        Return True if external logging should be used.466        """467        return False468    def _make_parallel_wrapper(self, function, machines, log):469        """Wrap function as appropriate for calling by parallel_simple."""470        # machines could be a list of dictionaries, e.g.,471        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]472        # The dictionary is generated in server_job.__init__, refer to473        # variable machine_dict_list, then passed in with namespace, see method474        # server_job._make_namespace.475        # To compare the machinese to self.machines, which is a list of machine476        # hostname, we need to convert machines back to a list of hostnames.477        if (machines and isinstance(machines, list)478            and isinstance(machines[0], dict)):479            machines = [m['hostname'] for m in machines]480        if len(machines) > 1 and log:481            def wrapper(machine):482                hostname = server_utils.get_hostname_from_machine(machine)483                self.push_execution_context(hostname)484                os.chdir(self.resultdir)485                machine_data = {'hostname' : hostname,486                                'status_version' : str(self._STATUS_VERSION)}487                utils.write_keyval(self.resultdir, machine_data)488                result = function(machine)489                return result490        else:491            wrapper = function492        return wrapper493    def parallel_simple(self, function, machines, log=True, timeout=None,494                        return_results=False):495        """496        Run 'function' using parallel_simple, with an extra wrapper to handle497        the necessary setup for continuous parsing, if possible. If continuous498        parsing is already properly initialized then this should just work.499        @param function: A callable to run in parallel given each machine.500        @param machines: A list of machine names to be passed one per subcommand501                invocation of function.502        @param log: If True, output will be written to output in a subdirectory503                named after each machine.504        @param timeout: Seconds after which the function call should timeout.505        @param return_results: If True instead of an AutoServError being raised506                on any error a list of the results|exceptions from the function507                called on each arg is returned.  [default: False]508        @raises error.AutotestError: If any of the functions failed.509        """510        wrapper = self._make_parallel_wrapper(function, machines, log)511        return subcommand.parallel_simple(512                wrapper, machines,513                subdir_name_constructor=server_utils.get_hostname_from_machine,514                log=log, timeout=timeout, return_results=return_results)515    def parallel_on_machines(self, function, machines, timeout=None):516        """517        @param function: Called in parallel with one machine as its argument.518        @param machines: A list of machines to call function(machine) on.519        @param timeout: Seconds after which the function call should timeout.520        @returns A list of machines on which function(machine) returned521                without raising an exception.522        """523        results = self.parallel_simple(function, machines, timeout=timeout,524                                       return_results=True)525        success_machines = []526        for result, machine in itertools.izip(results, machines):527            if not isinstance(result, Exception):528                success_machines.append(machine)529        return success_machines530    def record_skipped_test(self, skipped_test, message=None):531        """Insert a failure record into status.log for this test."""532        msg = message533        if msg is None:534            msg = 'No valid machines found for test %s.' % skipped_test535        logging.info(msg)536        self.record('START', None, skipped_test.test_name)537        self.record('INFO', None, skipped_test.test_name, msg)538        self.record('END TEST_NA', None, skipped_test.test_name, msg)539    def _has_failed_tests(self):540        """Parse status log for failed tests.541        This checks the current working directory and is intended only for use542        by the run() method.543        @return boolean544        """545        path = os.getcwd()546        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to547        # make code reuse plausible.548        job_keyval = tko_models.job.read_keyval(path)549        status_version = job_keyval.get("status_version", 0)550        # parse out the job551        parser = parser_lib.parser(status_version)552        job = parser.make_job(path)553        status_log = os.path.join(path, "status.log")554        if not os.path.exists(status_log):555            status_log = os.path.join(path, "status")556        if not os.path.exists(status_log):557            logging.warning("! Unable to parse job, no status file")558            return True559        # parse the status logs560        status_lines = open(status_log).readlines()561        parser.start(job)562        tests = parser.end(status_lines)563        # parser.end can return the same object multiple times, so filter out564        # dups565        job.tests = []566        already_added = set()567        for test in tests:568            if test not in already_added:569                already_added.add(test)570                job.tests.append(test)571        failed = False572        for test in job.tests:573            # The current job is still running and shouldn't count as failed.574            # The parser will fail to parse the exit status of the job since it575            # hasn't exited yet (this running right now is the job).576            failed = failed or (test.status != 'GOOD'577                                and not _is_current_server_job(test))578        return failed579    def _collect_crashes(self, namespace, collect_crashinfo):580        """Collect crashes.581        @param namespace: namespace dict.582        @param collect_crashinfo: whether to collect crashinfo in addition to583                dumps584        """585        if collect_crashinfo:586            # includes crashdumps587            crash_control_file = CRASHINFO_CONTROL_FILE588        else:589            crash_control_file = CRASHDUMPS_CONTROL_FILE590        self._execute_code(crash_control_file, namespace)591    _USE_TEMP_DIR = object()592    def run(self, collect_crashdumps=True, namespace={}, control=None,593            control_file_dir=None, verify_job_repo_url=False,594            only_collect_crashinfo=False, skip_crash_collection=False,595            job_labels='', use_packaging=True):596        # for a normal job, make sure the uncollected logs file exists597        # for a crashinfo-only run it should already exist, bail out otherwise598        created_uncollected_logs = False599        logging.info("I am PID %s", os.getpid())600        if self.resultdir and not os.path.exists(self._uncollected_log_file):601            if only_collect_crashinfo:602                # if this is a crashinfo-only run, and there were no existing603                # uncollected logs, just bail out early604                logging.info("No existing uncollected logs, "605                             "skipping crashinfo collection")606                return607            else:608                log_file = open(self._uncollected_log_file, "w")609                pickle.dump([], log_file)610                log_file.close()611                created_uncollected_logs = True612        # use a copy so changes don't affect the original dictionary613        namespace = namespace.copy()614        machines = self.machines615        if control is None:616            if self.control is None:617                control = ''618            elif self._use_client_trampoline:619                control = self._load_control_file(620                        CLIENT_TRAMPOLINE_CONTROL_FILE)621                # repr of a string is safe for eval.622                control = (('trampoline_testname = %r\n' % str(self.control))623                           + control)624            else:625                control = self._load_control_file(self.control)626        if control_file_dir is None:627            control_file_dir = self.resultdir628        self.aborted = False629        namespace.update(self._make_namespace())630        namespace.update({631                'args': self.args,632                'job_labels': job_labels,633                'gtest_runner': site_gtest_runner.gtest_runner(),634        })635        test_start_time = int(time.time())636        if self.resultdir:637            os.chdir(self.resultdir)638            # touch status.log so that the parser knows a job is running here639            open(self.get_status_log_path(), 'a').close()640            self.enable_external_logging()641        collect_crashinfo = True642        temp_control_file_dir = None643        try:644            try:645                if not self.fast:646                    with metrics.SecondsTimer(647                            'chromeos/autotest/job/get_network_stats',648                            fields = {'stage': 'start'}):649                        namespace['network_stats_label'] = 'at-start'650                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,651                                           namespace)652                if only_collect_crashinfo:653                    return654                # If the verify_job_repo_url option is set but we're unable655                # to actually verify that the job_repo_url contains the autotest656                # package, this job will fail.657                if verify_job_repo_url:658                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,659                                       namespace)660                else:661                    logging.warning('Not checking if job_repo_url contains '662                                    'autotest packages on %s', machines)663                # determine the dir to write the control files to664                cfd_specified = (control_file_dir665                                 and control_file_dir is not self._USE_TEMP_DIR)666                if cfd_specified:667                    temp_control_file_dir = None668                else:669                    temp_control_file_dir = tempfile.mkdtemp(670                        suffix='temp_control_file_dir')671                    control_file_dir = temp_control_file_dir672                server_control_file = os.path.join(control_file_dir,673                                                   self._control_filename)674                client_control_file = os.path.join(control_file_dir,675                                                   CLIENT_CONTROL_FILENAME)676                if self._client:677                    namespace['control'] = control678                    utils.open_write_close(client_control_file, control)679                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,680                                    server_control_file)681                else:682                    utils.open_write_close(server_control_file, control)683                logging.info("Processing control file")684                namespace['use_packaging'] = use_packaging685                self._execute_code(server_control_file, namespace)686                logging.info("Finished processing control file")687                # If no device error occured, no need to collect crashinfo.688                collect_crashinfo = self.failed_with_device_error689            except Exception as e:690                try:691                    logging.exception(692                            'Exception escaped control file, job aborting:')693                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,694                                    ' ', str(e))695                    self.record('INFO', None, None, str(e),696                                {'job_abort_reason': reason})697                except:698                    pass # don't let logging exceptions here interfere699                raise700        finally:701            if temp_control_file_dir:702                # Clean up temp directory used for copies of the control files703                try:704                    shutil.rmtree(temp_control_file_dir)705                except Exception as e:706                    logging.warning('Could not remove temp directory %s: %s',707                                 temp_control_file_dir, e)708            if machines and (collect_crashdumps or collect_crashinfo):709                if skip_crash_collection or self.fast:710                    logging.info('Skipping crash dump/info collection '711                                 'as requested.')712                else:713                    with metrics.SecondsTimer(714                            'chromeos/autotest/job/collect_crashinfo'):715                        namespace['test_start_time'] = test_start_time716                        # Remove crash files for passing tests.717                        # TODO(ayatane): Tests that create crash files should be718                        # reported.719                        namespace['has_failed_tests'] = self._has_failed_tests()720                        self._collect_crashes(namespace, collect_crashinfo)721            self.disable_external_logging()722            if self._uncollected_log_file and created_uncollected_logs:723                os.remove(self._uncollected_log_file)724            if not self.fast:725                with metrics.SecondsTimer(726                        'chromeos/autotest/job/get_network_stats',727                        fields = {'stage': 'end'}):728                    namespace['network_stats_label'] = 'at-end'729                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,730                                       namespace)731    def run_test(self, url, *args, **dargs):732        """733        Summon a test object and run it.734        tag735                tag to add to testname736        url737                url of the test to run738        """739        if self._disable_sysinfo:740            dargs['disable_sysinfo'] = True741        group, testname = self.pkgmgr.get_package_name(url, 'test')742        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)743        outputdir = self._make_test_outputdir(subdir)744        def group_func():745            try:746                test.runtest(self, url, tag, args, dargs)747            except error.TestBaseException as e:748                self.record(e.exit_status, subdir, testname, str(e))749                raise750            except Exception as e:751                info = str(e) + "\n" + traceback.format_exc()752                self.record('FAIL', subdir, testname, info)753                raise754            else:755                self.record('GOOD', subdir, testname, 'completed successfully')756        try:757            result = self._run_group(testname, subdir, group_func)758        except error.TestBaseException as e:759            return False760        else:761            return True762    def _run_group(self, name, subdir, function, *args, **dargs):763        """Underlying method for running something inside of a group."""764        result, exc_info = None, None765        try:766            self.record('START', subdir, name)767            result = function(*args, **dargs)768        except error.TestBaseException as e:769            self.record("END %s" % e.exit_status, subdir, name)770            raise771        except Exception as e:772            err_msg = str(e) + '\n'773            err_msg += traceback.format_exc()774            self.record('END ABORT', subdir, name, err_msg)775            raise error.JobError(name + ' failed\n' + traceback.format_exc())776        else:777            self.record('END GOOD', subdir, name)778        finally:779            for hook in self._post_run_hooks:780                hook()781        return result782    def run_group(self, function, *args, **dargs):783        """\784        @param function: subroutine to run785        @returns: (result, exc_info). When the call succeeds, result contains786                the return value of |function| and exc_info is None. If787                |function| raises an exception, exc_info contains the tuple788                returned by sys.exc_info(), and result is None.789        """790        name = function.__name__791        # Allow the tag for the group to be specified.792        tag = dargs.pop('tag', None)793        if tag:794            name = tag795        try:796            result = self._run_group(name, None, function, *args, **dargs)[0]797        except error.TestBaseException:798            return None, sys.exc_info()799        return result, None800    def run_op(self, op, op_func, get_kernel_func):801        """\802        A specialization of run_group meant specifically for handling803        management operation. Includes support for capturing the kernel version804        after the operation.805        Args:806           op: name of the operation.807           op_func: a function that carries out the operation (reboot, suspend)808           get_kernel_func: a function that returns a string809                            representing the kernel version.810        """811        try:812            self.record('START', None, op)813            op_func()814        except Exception as e:815            err_msg = str(e) + '\n' + traceback.format_exc()816            self.record('END FAIL', None, op, err_msg)817            raise818        else:819            kernel = get_kernel_func()820            self.record('END GOOD', None, op,821                        optional_fields={"kernel": kernel})822    def run_control(self, path):823        """Execute a control file found at path (relative to the autotest824        path). Intended for executing a control file within a control file,825        not for running the top-level job control file."""826        path = os.path.join(self.autodir, path)827        control_file = self._load_control_file(path)828        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)829    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):830        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),831                                   on_every_test)832    def add_sysinfo_logfile(self, file, on_every_test=False):833        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)834    def _add_sysinfo_loggable(self, loggable, on_every_test):835        if on_every_test:836            self.sysinfo.test_loggables.add(loggable)837        else:838            self.sysinfo.boot_loggables.add(loggable)839    def _read_warnings(self):840        """Poll all the warning loggers and extract any new warnings that have841        been logged. If the warnings belong to a category that is currently...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
