Best Python code snippet using autotest_python
job.py
Source:job.py  
1"""The main job wrapper2This is the core infrastructure.3Copyright Andy Whitcroft, Martin J. Bligh 20064"""5# pylint: disable=missing-docstring6import copy7from datetime import datetime8import getpass9import glob10import logging11import os12import re13import shutil14import sys15import time16import traceback17import types18import weakref19import common20from autotest_lib.client.bin import client_logging_config21from autotest_lib.client.bin import harness22from autotest_lib.client.bin import local_host23from autotest_lib.client.bin import parallel24from autotest_lib.client.bin import partition as partition_lib25from autotest_lib.client.bin import profilers26from autotest_lib.client.bin import sysinfo27from autotest_lib.client.bin import test28from autotest_lib.client.bin import utils29from autotest_lib.client.common_lib import barrier30from autotest_lib.client.common_lib import base_job31from autotest_lib.client.common_lib import control_data32from autotest_lib.client.common_lib import error33from autotest_lib.client.common_lib import global_config34from autotest_lib.client.common_lib import logging_manager35from autotest_lib.client.common_lib import packages36from autotest_lib.client.cros import cros_logging37from autotest_lib.client.tools import html_report38GLOBAL_CONFIG = global_config.global_config39LAST_BOOT_TAG = object()40JOB_PREAMBLE = """41from autotest_lib.client.common_lib.error import *42from autotest_lib.client.bin.utils import *43"""44class StepError(error.AutotestError):45    pass46class NotAvailableError(error.AutotestError):47    pass48def _run_test_complete_on_exit(f):49    """Decorator for job methods that automatically calls50    self.harness.run_test_complete when the method exits, if appropriate."""51    def wrapped(self, *args, **dargs):52        try:53            return f(self, *args, **dargs)54        finally:55            if self._logger.global_filename == 'status':56                self.harness.run_test_complete()57                if self.drop_caches:58                    utils.drop_caches()59    wrapped.__name__ = f.__name__60    wrapped.__doc__ = f.__doc__61    wrapped.__dict__.update(f.__dict__)62    return wrapped63class status_indenter(base_job.status_indenter):64    """Provide a status indenter that is backed by job._record_prefix."""65    def __init__(self, job_):66        self._job = weakref.proxy(job_)  # avoid a circular reference67    @property68    def indent(self):69        return self._job._record_indent70    def increment(self):71        self._job._record_indent += 172    def decrement(self):73        self._job._record_indent -= 174class base_client_job(base_job.base_job):75    """The client-side concrete implementation of base_job.76    Optional properties provided by this implementation:77        control78        harness79    """80    _WARNING_DISABLE_DELAY = 581    # _record_indent is a persistent property, but only on the client82    _job_state = base_job.base_job._job_state83    _record_indent = _job_state.property_factory(84        '_state', '_record_indent', 0, namespace='client')85    _max_disk_usage_rate = _job_state.property_factory(86        '_state', '_max_disk_usage_rate', 0.0, namespace='client')87    def __init__(self, control, options, drop_caches=True):88        """89        Prepare a client side job object.90        @param control: The control file (pathname of).91        @param options: an object which includes:92                jobtag: The job tag string (eg "default").93                cont: If this is the continuation of this job.94                harness_type: An alternative server harness.  [None]95                use_external_logging: If true, the enable_external_logging96                          method will be called during construction.  [False]97        @param drop_caches: If true, utils.drop_caches() is called before and98                between all tests.  [True]99        """100        super(base_client_job, self).__init__(options=options)101        self._pre_record_init(control, options)102        try:103            self._post_record_init(control, options, drop_caches)104        except Exception, err:105            self.record(106                    'ABORT', None, None,'client.bin.job.__init__ failed: %s' %107                    str(err))108            raise109    @classmethod110    def _get_environ_autodir(cls):111        return os.environ['AUTODIR']112    @classmethod113    def _find_base_directories(cls):114        """115        Determine locations of autodir and clientdir (which are the same)116        using os.environ. Serverdir does not exist in this context.117        """118        autodir = clientdir = cls._get_environ_autodir()119        return autodir, clientdir, None120    @classmethod121    def _parse_args(cls, args):122        return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)123    def _find_resultdir(self, options):124        """125        Determine the directory for storing results. On a client this is126        always <autodir>/results/<tag>, where tag is passed in on the command127        line as an option.128        """129        output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',130                                                           'output_dir',131                                                            default="")132        if options.output_dir:133            basedir = options.output_dir134        elif output_dir_config:135            basedir = output_dir_config136        else:137            basedir = self.autodir138        return os.path.join(basedir, 'results', options.tag)139    def _get_status_logger(self):140        """Return a reference to the status logger."""141        return self._logger142    def _pre_record_init(self, control, options):143        """144        Initialization function that should peform ONLY the required145        setup so that the self.record() method works.146        As of now self.record() needs self.resultdir, self._group_level,147        self.harness and of course self._logger.148        """149        if not options.cont:150            self._cleanup_debugdir_files()151            self._cleanup_results_dir()152        logging_manager.configure_logging(153            client_logging_config.ClientLoggingConfig(),154            results_dir=self.resultdir,155            verbose=options.verbose)156        logging.info('Writing results to %s', self.resultdir)157        # init_group_level needs the state158        self.control = os.path.realpath(control)159        self._is_continuation = options.cont160        self._current_step_ancestry = []161        self._next_step_index = 0162        self._load_state()163        _harness = self.handle_persistent_option(options, 'harness')164        _harness_args = self.handle_persistent_option(options, 'harness_args')165        self.harness = harness.select(_harness, self, _harness_args)166        if self.control:167            parsed_control = control_data.parse_control(168                    self.control, raise_warnings=False)169            self.fast = parsed_control.fast170        # set up the status logger171        def client_job_record_hook(entry):172            msg_tag = ''173            if '.' in self._logger.global_filename:174                msg_tag = self._logger.global_filename.split('.', 1)[1]175            # send the entry to the job harness176            message = '\n'.join([entry.message] + entry.extra_message_lines)177            rendered_entry = self._logger.render_entry(entry)178            self.harness.test_status_detail(entry.status_code, entry.subdir,179                                            entry.operation, message, msg_tag,180                                            entry.fields)181            self.harness.test_status(rendered_entry, msg_tag)182            # send the entry to stdout, if it's enabled183            logging.info(rendered_entry)184        self._logger = base_job.status_logger(185            self, status_indenter(self), record_hook=client_job_record_hook)186    def _post_record_init(self, control, options, drop_caches):187        """188        Perform job initialization not required by self.record().189        """190        self._init_drop_caches(drop_caches)191        self._init_packages()192        self.sysinfo = sysinfo.sysinfo(self.resultdir)193        self._load_sysinfo_state()194        if not options.cont:195            download = os.path.join(self.testdir, 'download')196            if not os.path.exists(download):197                os.mkdir(download)198            shutil.copyfile(self.control,199                            os.path.join(self.resultdir, 'control'))200        self.control = control201        self.logging = logging_manager.get_logging_manager(202                manage_stdout_and_stderr=True, redirect_fds=True)203        self.logging.start_logging()204        self.profilers = profilers.profilers(self)205        self.machines = [options.hostname]206        self.machine_dict_list = [{'hostname' : options.hostname}]207        # Client side tests should always run the same whether or not they are208        # running in the lab.209        self.in_lab = False210        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])211        self.args = []212        if options.args:213            self.args = self._parse_args(options.args)214        if options.user:215            self.user = options.user216        else:217            self.user = getpass.getuser()218        self.sysinfo.log_per_reboot_data()219        if not options.cont:220            self.record('START', None, None)221        self.harness.run_start()222        if options.log:223            self.enable_external_logging()224        self.num_tests_run = None225        self.num_tests_failed = None226        self.warning_loggers = None227        self.warning_manager = None228    def _init_drop_caches(self, drop_caches):229        """230        Perform the drop caches initialization.231        """232        self.drop_caches_between_iterations = (233                                    GLOBAL_CONFIG.get_config_value('CLIENT',234                                    'drop_caches_between_iterations',235                                    type=bool, default=True))236        self.drop_caches = drop_caches237        if self.drop_caches:238            utils.drop_caches()239    def _init_packages(self):240        """241        Perform the packages support initialization.242        """243        self.pkgmgr = packages.PackageManager(244            self.autodir, run_function_dargs={'timeout':3600})245    def _cleanup_results_dir(self):246        """Delete everything in resultsdir"""247        assert os.path.exists(self.resultdir)248        list_files = glob.glob('%s/*' % self.resultdir)249        for f in list_files:250            if os.path.isdir(f):251                shutil.rmtree(f)252            elif os.path.isfile(f):253                os.remove(f)254    def _cleanup_debugdir_files(self):255        """256        Delete any leftover debugdir files257        """258        list_files = glob.glob("/tmp/autotest_results_dir.*")259        for f in list_files:260            os.remove(f)261    def disable_warnings(self, warning_type):262        self.record("INFO", None, None,263                    "disabling %s warnings" % warning_type,264                    {"warnings.disable": warning_type})265        time.sleep(self._WARNING_DISABLE_DELAY)266    def enable_warnings(self, warning_type):267        time.sleep(self._WARNING_DISABLE_DELAY)268        self.record("INFO", None, None,269                    "enabling %s warnings" % warning_type,270                    {"warnings.enable": warning_type})271    def monitor_disk_usage(self, max_rate):272        """\273        Signal that the job should monitor disk space usage on /274        and generate a warning if a test uses up disk space at a275        rate exceeding 'max_rate'.276        Parameters:277             max_rate - the maximium allowed rate of disk consumption278                        during a test, in MB/hour, or 0 to indicate279                        no limit.280        """281        self._max_disk_usage_rate = max_rate282    def control_get(self):283        return self.control284    def control_set(self, control):285        self.control = os.path.abspath(control)286    def harness_select(self, which, harness_args):287        self.harness = harness.select(which, self, harness_args)288    def setup_dirs(self, results_dir, tmp_dir):289        if not tmp_dir:290            tmp_dir = os.path.join(self.tmpdir, 'build')291        if not os.path.exists(tmp_dir):292            os.mkdir(tmp_dir)293        if not os.path.isdir(tmp_dir):294            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir295            raise ValueError(e_msg)296        # We label the first build "build" and then subsequent ones297        # as "build.2", "build.3", etc. Whilst this is a little bit298        # inconsistent, 99.9% of jobs will only have one build299        # (that's not done as kernbench, sparse, or buildtest),300        # so it works out much cleaner. One of life's compromises.301        if not results_dir:302            results_dir = os.path.join(self.resultdir, 'build')303            i = 2304            while os.path.exists(results_dir):305                results_dir = os.path.join(self.resultdir, 'build.%d' % i)306                i += 1307        if not os.path.exists(results_dir):308            os.mkdir(results_dir)309        return (results_dir, tmp_dir)310    def barrier(self, *args, **kwds):311        """Create a barrier object"""312        return barrier.barrier(*args, **kwds)313    def install_pkg(self, name, pkg_type, install_dir):314        '''315        This method is a simple wrapper around the actual package316        installation method in the Packager class. This is used317        internally by the profilers, deps and tests code.318        name : name of the package (ex: sleeptest, dbench etc.)319        pkg_type : Type of the package (ex: test, dep etc.)320        install_dir : The directory in which the source is actually321                      untarred into. (ex: client/profilers/<name> for profilers)322        '''323        if self.pkgmgr.repositories:324            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)325    def add_repository(self, repo_urls):326        '''327        Adds the repository locations to the job so that packages328        can be fetched from them when needed. The repository list329        needs to be a string list330        Ex: job.add_repository(['http://blah1','http://blah2'])331        '''332        for repo_url in repo_urls:333            self.pkgmgr.add_repository(repo_url)334        # Fetch the packages' checksum file that contains the checksums335        # of all the packages if it is not already fetched. The checksum336        # is always fetched whenever a job is first started. This337        # is not done in the job's constructor as we don't have the list of338        # the repositories there (and obviously don't care about this file339        # if we are not using the repos)340        try:341            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,342                                              packages.CHECKSUM_FILE)343            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,344                                  checksum_file_path, use_checksum=False)345        except error.PackageFetchError:346            # packaging system might not be working in this case347            # Silently fall back to the normal case348            pass349    def require_gcc(self):350        """351        Test whether gcc is installed on the machine.352        """353        # check if gcc is installed on the system.354        try:355            utils.system('which gcc')356        except error.CmdError:357            raise NotAvailableError('gcc is required by this job and is '358                                    'not available on the system')359    def setup_dep(self, deps):360        """Set up the dependencies for this test.361        deps is a list of libraries required for this test.362        """363        # Fetch the deps from the repositories and set them up.364        for dep in deps:365            dep_dir = os.path.join(self.autodir, 'deps', dep)366            # Search for the dependency in the repositories if specified,367            # else check locally.368            try:369                self.install_pkg(dep, 'dep', dep_dir)370            except error.PackageInstallError:371                # see if the dep is there locally372                pass373            # dep_dir might not exist if it is not fetched from the repos374            if not os.path.exists(dep_dir):375                raise error.TestError("Dependency %s does not exist" % dep)376            os.chdir(dep_dir)377            if execfile('%s.py' % dep, {}) is None:378                logging.info('Dependency %s successfuly built', dep)379    def _runtest(self, url, tag, timeout, args, dargs):380        try:381            l = lambda : test.runtest(self, url, tag, args, dargs)382            pid = parallel.fork_start(self.resultdir, l)383            if timeout:384                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)385                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)386            else:387                parallel.fork_waitfor(self.resultdir, pid)388        except error.TestBaseException:389            # These are already classified with an error type (exit_status)390            raise391        except error.JobError:392            raise  # Caught further up and turned into an ABORT.393        except Exception, e:394            # Converts all other exceptions thrown by the test regardless395            # of phase into a TestError(TestBaseException) subclass that396            # reports them with their full stack trace.397            raise error.UnhandledTestError(e)398    def _run_test_base(self, url, *args, **dargs):399        """400        Prepares arguments and run functions to run_test and run_test_detail.401        @param url A url that identifies the test to run.402        @param tag An optional keyword argument that will be added to the403            test and subdir name.404        @param subdir_tag An optional keyword argument that will be added405            to the subdir name.406        @returns:407                subdir: Test subdirectory408                testname: Test name409                group_func: Actual test run function410                timeout: Test timeout411        """412        _group, testname = self.pkgmgr.get_package_name(url, 'test')413        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)414        self._make_test_outputdir(subdir)415        timeout = dargs.pop('timeout', None)416        if timeout:417            logging.debug('Test has timeout: %d sec.', timeout)418        def log_warning(reason):419            self.record("WARN", subdir, testname, reason)420        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)421        def group_func():422            try:423                self._runtest(url, tag, timeout, args, dargs)424            except error.TestBaseException, detail:425                # The error is already classified, record it properly.426                self.record(detail.exit_status, subdir, testname, str(detail))427                raise428            else:429                self.record('GOOD', subdir, testname, 'completed successfully')430        return (subdir, testname, group_func, timeout)431    @_run_test_complete_on_exit432    def run_test(self, url, *args, **dargs):433        """434        Summon a test object and run it.435        @param url A url that identifies the test to run.436        @param tag An optional keyword argument that will be added to the437            test and subdir name.438        @param subdir_tag An optional keyword argument that will be added439            to the subdir name.440        @returns True if the test passes, False otherwise.441        """442        (subdir, testname, group_func, timeout) = self._run_test_base(url,443                                                                      *args,444                                                                      **dargs)445        try:446            self._rungroup(subdir, testname, group_func, timeout)447            return True448        except error.TestBaseException:449            return False450        # Any other exception here will be given to the caller451        #452        # NOTE: The only exception possible from the control file here453        # is error.JobError as _runtest() turns all others into an454        # UnhandledTestError that is caught above.455    @_run_test_complete_on_exit456    def run_test_detail(self, url, *args, **dargs):457        """458        Summon a test object and run it, returning test status.459        @param url A url that identifies the test to run.460        @param tag An optional keyword argument that will be added to the461            test and subdir name.462        @param subdir_tag An optional keyword argument that will be added463            to the subdir name.464        @returns Test status465        @see: client/common_lib/error.py, exit_status466        """467        (subdir, testname, group_func, timeout) = self._run_test_base(url,468                                                                      *args,469                                                                      **dargs)470        try:471            self._rungroup(subdir, testname, group_func, timeout)472            return 'GOOD'473        except error.TestBaseException, detail:474            return detail.exit_status475    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):476        """\477        subdir:478                name of the group479        testname:480                name of the test to run, or support step481        function:482                subroutine to run483        *args:484                arguments for the function485        Returns the result of the passed in function486        """487        try:488            optional_fields = None489            if timeout:490                optional_fields = {}491                optional_fields['timeout'] = timeout492            self.record('START', subdir, testname,493                        optional_fields=optional_fields)494            self._state.set('client', 'unexpected_reboot', (subdir, testname))495            try:496                result = function(*args, **dargs)497                self.record('END GOOD', subdir, testname)498                return result499            except error.TestBaseException, e:500                self.record('END %s' % e.exit_status, subdir, testname)501                raise502            except error.JobError, e:503                self.record('END ABORT', subdir, testname)504                raise505            except Exception, e:506                # This should only ever happen due to a bug in the given507                # function's code.  The common case of being called by508                # run_test() will never reach this.  If a control file called509                # run_group() itself, bugs in its function will be caught510                # here.511                err_msg = str(e) + '\n' + traceback.format_exc()512                self.record('END ERROR', subdir, testname, err_msg)513                raise514        finally:515            self._state.discard('client', 'unexpected_reboot')516    def run_group(self, function, tag=None, **dargs):517        """518        Run a function nested within a group level.519        function:520                Callable to run.521        tag:522                An optional tag name for the group.  If None (default)523                function.__name__ will be used.524        **dargs:525                Named arguments for the function.526        """527        if tag:528            name = tag529        else:530            name = function.__name__531        try:532            return self._rungroup(subdir=None, testname=name,533                                  function=function, timeout=None, **dargs)534        except (SystemExit, error.TestBaseException):535            raise536        # If there was a different exception, turn it into a TestError.537        # It will be caught by step_engine or _run_step_fn.538        except Exception, e:539            raise error.UnhandledTestError(e)540    def cpu_count(self):541        return utils.count_cpus()  # use total system count542    def start_reboot(self):543        self.record('START', None, 'reboot')544        self.record('GOOD', None, 'reboot.start')545    def _record_reboot_failure(self, subdir, operation, status,546                               running_id=None):547        self.record("ABORT", subdir, operation, status)548        if not running_id:549            running_id = utils.running_os_ident()550        kernel = {"kernel": running_id.split("::")[0]}551        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)552    def _check_post_reboot(self, subdir, running_id=None):553        """554        Function to perform post boot checks such as if the system configuration555        has changed across reboots (specifically, CPUs and partitions).556        @param subdir: The subdir to use in the job.record call.557        @param running_id: An optional running_id to include in the reboot558            failure log message559        @raise JobError: Raised if the current configuration does not match the560            pre-reboot configuration.561        """562        # check to see if any partitions have changed563        partition_list = partition_lib.get_partition_list(self,564                                                          exclude_swap=False)565        mount_info = partition_lib.get_mount_info(partition_list)566        old_mount_info = self._state.get('client', 'mount_info')567        if mount_info != old_mount_info:568            new_entries = mount_info - old_mount_info569            old_entries = old_mount_info - mount_info570            description = ("mounted partitions are different after reboot "571                           "(old entries: %s, new entries: %s)" %572                           (old_entries, new_entries))573            self._record_reboot_failure(subdir, "reboot.verify_config",574                                        description, running_id=running_id)575            raise error.JobError("Reboot failed: %s" % description)576        # check to see if any CPUs have changed577        cpu_count = utils.count_cpus()578        old_count = self._state.get('client', 'cpu_count')579        if cpu_count != old_count:580            description = ('Number of CPUs changed after reboot '581                           '(old count: %d, new count: %d)' %582                           (old_count, cpu_count))583            self._record_reboot_failure(subdir, 'reboot.verify_config',584                                        description, running_id=running_id)585            raise error.JobError('Reboot failed: %s' % description)586    def partition(self, device, loop_size=0, mountpoint=None):587        """588        Work with a machine partition589            @param device: e.g. /dev/sda2, /dev/sdb1 etc...590            @param mountpoint: Specify a directory to mount to. If not specified591                               autotest tmp directory will be used.592            @param loop_size: Size of loopback device (in MB). Defaults to 0.593            @return: A L{client.bin.partition.partition} object594        """595        if not mountpoint:596            mountpoint = self.tmpdir597        return partition_lib.partition(self, device, loop_size, mountpoint)598    @utils.deprecated599    def filesystem(self, device, mountpoint=None, loop_size=0):600        """ Same as partition601        @deprecated: Use partition method instead602        """603        return self.partition(device, loop_size, mountpoint)604    def enable_external_logging(self):605        pass606    def disable_external_logging(self):607        pass608    def reboot_setup(self):609        # save the partition list and mount points, as well as the cpu count610        partition_list = partition_lib.get_partition_list(self,611                                                          exclude_swap=False)612        mount_info = partition_lib.get_mount_info(partition_list)613        self._state.set('client', 'mount_info', mount_info)614        self._state.set('client', 'cpu_count', utils.count_cpus())615    def reboot(self):616        self.reboot_setup()617        self.harness.run_reboot()618        # HACK: using this as a module sometimes hangs shutdown, so if it's619        # installed unload it first620        utils.system("modprobe -r netconsole", ignore_status=True)621        # sync first, so that a sync during shutdown doesn't time out622        utils.system("sync; sync", ignore_status=True)623        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624        self.quit()625    def noop(self, text):626        logging.info("job: noop: " + text)627    @_run_test_complete_on_exit628    def parallel(self, *tasklist):629        """Run tasks in parallel"""630        pids = []631        old_log_filename = self._logger.global_filename632        for i, task in enumerate(tasklist):633            assert isinstance(task, (tuple, list))634            self._logger.global_filename = old_log_filename + (".%d" % i)635            def task_func():636                # stub out _record_indent with a process-local one637                base_record_indent = self._record_indent638                proc_local = self._job_state.property_factory(639                    '_state', '_record_indent.%d' % os.getpid(),640                    base_record_indent, namespace='client')641                self.__class__._record_indent = proc_local642                task[0](*task[1:])643            pids.append(parallel.fork_start(self.resultdir, task_func))644        old_log_path = os.path.join(self.resultdir, old_log_filename)645        old_log = open(old_log_path, "a")646        exceptions = []647        for i, pid in enumerate(pids):648            # wait for the task to finish649            try:650                parallel.fork_waitfor(self.resultdir, pid)651            except Exception, e:652                exceptions.append(e)653            # copy the logs from the subtask into the main log654            new_log_path = old_log_path + (".%d" % i)655            if os.path.exists(new_log_path):656                new_log = open(new_log_path)657                old_log.write(new_log.read())658                new_log.close()659                old_log.flush()660                os.remove(new_log_path)661        old_log.close()662        self._logger.global_filename = old_log_filename663        # handle any exceptions raised by the parallel tasks664        if exceptions:665            msg = "%d task(s) failed in job.parallel" % len(exceptions)666            raise error.JobError(msg)667    def quit(self):668        # XXX: should have a better name.669        self.harness.run_pause()670        raise error.JobContinue("more to come")671    def complete(self, status):672        """Write pending reports, clean up, and exit"""673        # write out a job HTML report674        try:675            html_report.create_report(self.resultdir)676        except Exception, e:677            logging.error("Error writing job HTML report: %s", e)678        # We are about to exit 'complete' so clean up the control file.679        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680        shutil.move(self._state_file, dest)681        self.harness.run_complete()682        self.disable_external_logging()683        sys.exit(status)684    def _load_state(self):685        # grab any initial state and set up $CONTROL.state as the backing file686        init_state_file = self.control + '.init.state'687        self._state_file = self.control + '.state'688        if os.path.exists(init_state_file):689            shutil.move(init_state_file, self._state_file)690        self._state.set_backing_file(self._state_file)691        # initialize the state engine, if necessary692        has_steps = self._state.has('client', 'steps')693        if not self._is_continuation and has_steps:694            raise RuntimeError('Loaded state can only contain client.steps if '695                               'this is a continuation')696        if not has_steps:697            logging.debug('Initializing the state engine')698            self._state.set('client', 'steps', [])699    def handle_persistent_option(self, options, option_name):700        """701        Select option from command line or persistent state.702        Store selected option to allow standalone client to continue703        after reboot with previously selected options.704        Priority:705        1. explicitly specified via command line706        2. stored in state file (if continuing job '-c')707        3. default == None708        """709        option = None710        cmd_line_option = getattr(options, option_name)711        if cmd_line_option:712            option = cmd_line_option713            self._state.set('client', option_name, option)714        else:715            stored_option = self._state.get('client', option_name, None)716            if stored_option:717                option = stored_option718        logging.debug('Persistent option %s now set to %s', option_name, option)719        return option720    def __create_step_tuple(self, fn, args, dargs):721        # Legacy code passes in an array where the first arg is722        # the function or its name.723        if isinstance(fn, list):724            assert(len(args) == 0)725            assert(len(dargs) == 0)726            args = fn[1:]727            fn = fn[0]728        # Pickling actual functions is hairy, thus we have to call729        # them by name.  Unfortunately, this means only functions730        # defined globally can be used as a next step.731        if callable(fn):732            fn = fn.__name__733        if not isinstance(fn, types.StringTypes):734            raise StepError("Next steps must be functions or "735                            "strings containing the function name")736        ancestry = copy.copy(self._current_step_ancestry)737        return (ancestry, fn, args, dargs)738    def next_step_append(self, fn, *args, **dargs):739        """Define the next step and place it at the end"""740        steps = self._state.get('client', 'steps')741        steps.append(self.__create_step_tuple(fn, args, dargs))742        self._state.set('client', 'steps', steps)743    def next_step(self, fn, *args, **dargs):744        """Create a new step and place it after any steps added745        while running the current step but before any steps added in746        previous steps"""747        steps = self._state.get('client', 'steps')748        steps.insert(self._next_step_index,749                     self.__create_step_tuple(fn, args, dargs))750        self._next_step_index += 1751        self._state.set('client', 'steps', steps)752    def next_step_prepend(self, fn, *args, **dargs):753        """Insert a new step, executing first"""754        steps = self._state.get('client', 'steps')755        steps.insert(0, self.__create_step_tuple(fn, args, dargs))756        self._next_step_index += 1757        self._state.set('client', 'steps', steps)758    def _run_step_fn(self, local_vars, fn, args, dargs):759        """Run a (step) function within the given context"""760        local_vars['__args'] = args761        local_vars['__dargs'] = dargs762        try:763            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)764            return local_vars['__ret']765        except SystemExit:766            raise  # Send error.JobContinue and JobComplete on up to runjob.767        except error.TestNAError, detail:768            self.record(detail.exit_status, None, fn, str(detail))769        except Exception, detail:770            raise error.UnhandledJobError(detail)771    def _create_frame(self, global_vars, ancestry, fn_name):772        """Set up the environment like it would have been when this773        function was first defined.774        Child step engine 'implementations' must have 'return locals()'775        at end end of their steps.  Because of this, we can call the776        parent function and get back all child functions (i.e. those777        defined within it).778        Unfortunately, the call stack of the function calling779        job.next_step might have been deeper than the function it780        added.  In order to make sure that the environment is what it781        should be, we need to then pop off the frames we built until782        we find the frame where the function was first defined."""783        # The copies ensure that the parent frames are not modified784        # while building child frames.  This matters if we then785        # pop some frames in the next part of this function.786        current_frame = copy.copy(global_vars)787        frames = [current_frame]788        for steps_fn_name in ancestry:789            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})790            current_frame = copy.copy(ret)791            frames.append(current_frame)792        # Walk up the stack frames until we find the place fn_name was defined.793        while len(frames) > 2:794            if fn_name not in frames[-2]:795                break796            if frames[-2][fn_name] != frames[-1][fn_name]:797                break798            frames.pop()799            ancestry.pop()800        return (frames[-1], ancestry)801    def _add_step_init(self, local_vars, current_function):802        """If the function returned a dictionary that includes a803        function named 'step_init', prepend it to our list of steps.804        This will only get run the first time a function with a nested805        use of the step engine is run."""806        if (isinstance(local_vars, dict) and807            'step_init' in local_vars and808            callable(local_vars['step_init'])):809            # The init step is a child of the function810            # we were just running.811            self._current_step_ancestry.append(current_function)812            self.next_step_prepend('step_init')813    def step_engine(self):814        """The multi-run engine used when the control file defines step_init.815        Does the next step.816        """817        # Set up the environment and then interpret the control file.818        # Some control files will have code outside of functions,819        # which means we need to have our state engine initialized820        # before reading in the file.821        global_control_vars = {'job': self,822                               'args': self.args}823        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)824        try:825            execfile(self.control, global_control_vars, global_control_vars)826        except error.TestNAError, detail:827            self.record(detail.exit_status, None, self.control, str(detail))828        except SystemExit:829            raise  # Send error.JobContinue and JobComplete on up to runjob.830        except Exception, detail:831            # Syntax errors or other general Python exceptions coming out of832            # the top level of the control file itself go through here.833            raise error.UnhandledJobError(detail)834        # If we loaded in a mid-job state file, then we presumably835        # know what steps we have yet to run.836        if not self._is_continuation:837            if 'step_init' in global_control_vars:838                self.next_step(global_control_vars['step_init'])839        else:840            # if last job failed due to unexpected reboot, record it as fail841            # so harness gets called842            last_job = self._state.get('client', 'unexpected_reboot', None)843            if last_job:844                subdir, testname = last_job845                self.record('FAIL', subdir, testname, 'unexpected reboot')846                self.record('END FAIL', subdir, testname)847        # Iterate through the steps.  If we reboot, we'll simply848        # continue iterating on the next step.849        while len(self._state.get('client', 'steps')) > 0:850            steps = self._state.get('client', 'steps')851            (ancestry, fn_name, args, dargs) = steps.pop(0)852            self._state.set('client', 'steps', steps)853            self._next_step_index = 0854            ret = self._create_frame(global_control_vars, ancestry, fn_name)855            local_vars, self._current_step_ancestry = ret856            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)857            self._add_step_init(local_vars, fn_name)858    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):859        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),860                                   on_every_test)861    def add_sysinfo_logfile(self, file, on_every_test=False):862        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)863    def _add_sysinfo_loggable(self, loggable, on_every_test):864        if on_every_test:865            self.sysinfo.test_loggables.add(loggable)866        else:867            self.sysinfo.boot_loggables.add(loggable)868        self._save_sysinfo_state()869    def _load_sysinfo_state(self):870        state = self._state.get('client', 'sysinfo', None)871        if state:872            self.sysinfo.deserialize(state)873    def _save_sysinfo_state(self):874        state = self.sysinfo.serialize()875        self._state.set('client', 'sysinfo', state)876class disk_usage_monitor:877    def __init__(self, logging_func, device, max_mb_per_hour):878        self.func = logging_func879        self.device = device880        self.max_mb_per_hour = max_mb_per_hour881    def start(self):882        self.initial_space = utils.freespace(self.device)883        self.start_time = time.time()884    def stop(self):885        # if no maximum usage rate was set, we don't need to886        # generate any warnings887        if not self.max_mb_per_hour:888            return889        final_space = utils.freespace(self.device)890        used_space = self.initial_space - final_space891        stop_time = time.time()892        total_time = stop_time - self.start_time893        # round up the time to one minute, to keep extremely short894        # tests from generating false positives due to short, badly895        # timed bursts of activity896        total_time = max(total_time, 60.0)897        # determine the usage rate898        bytes_per_sec = used_space / total_time899        mb_per_sec = bytes_per_sec / 1024**2900        mb_per_hour = mb_per_sec * 60 * 60901        if mb_per_hour > self.max_mb_per_hour:902            msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")903            msg %= (self.device, mb_per_hour)904            self.func(msg)905    @classmethod906    def watch(cls, *monitor_args, **monitor_dargs):907        """ Generic decorator to wrap a function call with the908        standard create-monitor -> start -> call -> stop idiom."""909        def decorator(func):910            def watched_func(*args, **dargs):911                monitor = cls(*monitor_args, **monitor_dargs)912                monitor.start()913                try:914                    func(*args, **dargs)915                finally:916                    monitor.stop()917            return watched_func918        return decorator919def runjob(control, drop_caches, options):920    """921    Run a job using the given control file.922    This is the main interface to this module.923    @see base_job.__init__ for parameter info.924    """925    control = os.path.abspath(control)926    state = control + '.state'927    # Ensure state file is cleaned up before the job starts to run if autotest928    # is not running with the --continue flag929    if not options.cont and os.path.isfile(state):930        logging.debug('Cleaning up previously found state file')931        os.remove(state)932    # instantiate the job object ready for the control file.933    myjob = None934    try:935        # Check that the control file is valid936        if not os.path.exists(control):937            raise error.JobError(control + ": control file not found")938        # When continuing, the job is complete when there is no939        # state file, ensure we don't try and continue.940        if options.cont and not os.path.exists(state):941            raise error.JobComplete("all done")942        myjob = job(control=control, drop_caches=drop_caches, options=options)943        # Load in the users control file, may do any one of:944        #  1) execute in toto945        #  2) define steps, and select the first via next_step()946        myjob.step_engine()947    except error.JobContinue:948        sys.exit(5)949    except error.JobComplete:950        sys.exit(1)951    except error.JobError, instance:952        logging.error("JOB ERROR: " + str(instance))953        if myjob:954            command = None955            if len(instance.args) > 1:956                command = instance.args[1]957                myjob.record('ABORT', None, command, str(instance))958            myjob.record('END ABORT', None, None, str(instance))959            assert myjob._record_indent == 0960            myjob.complete(1)961        else:962            sys.exit(1)963    except Exception, e:964        # NOTE: job._run_step_fn and job.step_engine will turn things into965        # a JobError for us.  If we get here, its likely an autotest bug.966        msg = str(e) + '\n' + traceback.format_exc()967        logging.critical("JOB ERROR (autotest bug?): " + msg)968        if myjob:969            myjob.record('END ABORT', None, None, msg)970            assert myjob._record_indent == 0971            myjob.complete(1)972        else:973            sys.exit(1)974    # If we get here, then we assume the job is complete and good.975    myjob.record('END GOOD', None, None)976    assert myjob._record_indent == 0977    myjob.complete(0)978class job(base_client_job):979    def __init__(self, *args, **kwargs):980        base_client_job.__init__(self, *args, **kwargs)981    def run_test(self, url, *args, **dargs):982        log_pauser = cros_logging.LogRotationPauser()983        passed = False984        try:985            log_pauser.begin()986            passed = base_client_job.run_test(self, url, *args, **dargs)987            if not passed:988                # Save the VM state immediately after the test failure.989                # This is a NOOP if the the test isn't running in a VM or990                # if the VM is not properly configured to save state.991                _group, testname = self.pkgmgr.get_package_name(url, 'test')992                now = datetime.now().strftime('%I:%M:%S.%f')993                checkpoint_name = '%s-%s' % (testname, now)994                utils.save_vm_state(checkpoint_name)995        finally:996            log_pauser.end()997        return passed998    def reboot(self):999        self.reboot_setup()1000        self.harness.run_reboot()1001        # sync first, so that a sync during shutdown doesn't time out1002        utils.system('sync; sync', ignore_status=True)1003        utils.system('reboot </dev/null >/dev/null 2>&1 &')1004        self.quit()1005    def require_gcc(self):...autotest.py
Source:autotest.py  
...383        if section > 0:384            args.append('-c')385        if self.tag:386            args.append('-t %s' % self.tag)387        if self.host.job.use_external_logging():388            args.append('-l')389        if self.host.hostname:390            args.append('--hostname=%s' % self.host.hostname)391        args.append('--user=%s' % self.host.job.user)392        args.append(self.remote_control_file)393        return args394    def get_background_cmd(self, section):395        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotest_client')]396        cmd += self.get_base_cmd_args(section)397        cmd += ['>/dev/null', '2>/dev/null', '&']398        return ' '.join(cmd)399    def get_daemon_cmd(self, section, monitor_dir):400        cmd = ['nohup', os.path.join(self.autodir, 'bin/autotestd'),401               monitor_dir, '-H autoserv']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
