Best Python code snippet using autotest_python
job.py
Source:job.py  
...189        """190        self._init_drop_caches(drop_caches)191        self._init_packages()192        self.sysinfo = sysinfo.sysinfo(self.resultdir)193        self._load_sysinfo_state()194        if not options.cont:195            download = os.path.join(self.testdir, 'download')196            if not os.path.exists(download):197                os.mkdir(download)198            shutil.copyfile(self.control,199                            os.path.join(self.resultdir, 'control'))200        self.control = control201        self.logging = logging_manager.get_logging_manager(202                manage_stdout_and_stderr=True, redirect_fds=True)203        self.logging.start_logging()204        self.profilers = profilers.profilers(self)205        self.machines = [options.hostname]206        self.machine_dict_list = [{'hostname' : options.hostname}]207        # Client side tests should always run the same whether or not they are208        # running in the lab.209        self.in_lab = False210        self.hosts = set([local_host.LocalHost(hostname=options.hostname)])211        self.args = []212        if options.args:213            self.args = self._parse_args(options.args)214        if options.user:215            self.user = options.user216        else:217            self.user = getpass.getuser()218        self.sysinfo.log_per_reboot_data()219        if not options.cont:220            self.record('START', None, None)221        self.harness.run_start()222        if options.log:223            self.enable_external_logging()224        self.num_tests_run = None225        self.num_tests_failed = None226        self.warning_loggers = None227        self.warning_manager = None228    def _init_drop_caches(self, drop_caches):229        """230        Perform the drop caches initialization.231        """232        self.drop_caches_between_iterations = (233                                    GLOBAL_CONFIG.get_config_value('CLIENT',234                                    'drop_caches_between_iterations',235                                    type=bool, default=True))236        self.drop_caches = drop_caches237        if self.drop_caches:238            utils.drop_caches()239    def _init_packages(self):240        """241        Perform the packages support initialization.242        """243        self.pkgmgr = packages.PackageManager(244            self.autodir, run_function_dargs={'timeout':3600})245    def _cleanup_results_dir(self):246        """Delete everything in resultsdir"""247        assert os.path.exists(self.resultdir)248        list_files = glob.glob('%s/*' % self.resultdir)249        for f in list_files:250            if os.path.isdir(f):251                shutil.rmtree(f)252            elif os.path.isfile(f):253                os.remove(f)254    def _cleanup_debugdir_files(self):255        """256        Delete any leftover debugdir files257        """258        list_files = glob.glob("/tmp/autotest_results_dir.*")259        for f in list_files:260            os.remove(f)261    def disable_warnings(self, warning_type):262        self.record("INFO", None, None,263                    "disabling %s warnings" % warning_type,264                    {"warnings.disable": warning_type})265        time.sleep(self._WARNING_DISABLE_DELAY)266    def enable_warnings(self, warning_type):267        time.sleep(self._WARNING_DISABLE_DELAY)268        self.record("INFO", None, None,269                    "enabling %s warnings" % warning_type,270                    {"warnings.enable": warning_type})271    def monitor_disk_usage(self, max_rate):272        """\273        Signal that the job should monitor disk space usage on /274        and generate a warning if a test uses up disk space at a275        rate exceeding 'max_rate'.276        Parameters:277             max_rate - the maximium allowed rate of disk consumption278                        during a test, in MB/hour, or 0 to indicate279                        no limit.280        """281        self._max_disk_usage_rate = max_rate282    def control_get(self):283        return self.control284    def control_set(self, control):285        self.control = os.path.abspath(control)286    def harness_select(self, which, harness_args):287        self.harness = harness.select(which, self, harness_args)288    def setup_dirs(self, results_dir, tmp_dir):289        if not tmp_dir:290            tmp_dir = os.path.join(self.tmpdir, 'build')291        if not os.path.exists(tmp_dir):292            os.mkdir(tmp_dir)293        if not os.path.isdir(tmp_dir):294            e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir295            raise ValueError(e_msg)296        # We label the first build "build" and then subsequent ones297        # as "build.2", "build.3", etc. Whilst this is a little bit298        # inconsistent, 99.9% of jobs will only have one build299        # (that's not done as kernbench, sparse, or buildtest),300        # so it works out much cleaner. One of life's compromises.301        if not results_dir:302            results_dir = os.path.join(self.resultdir, 'build')303            i = 2304            while os.path.exists(results_dir):305                results_dir = os.path.join(self.resultdir, 'build.%d' % i)306                i += 1307        if not os.path.exists(results_dir):308            os.mkdir(results_dir)309        return (results_dir, tmp_dir)310    def barrier(self, *args, **kwds):311        """Create a barrier object"""312        return barrier.barrier(*args, **kwds)313    def install_pkg(self, name, pkg_type, install_dir):314        '''315        This method is a simple wrapper around the actual package316        installation method in the Packager class. This is used317        internally by the profilers, deps and tests code.318        name : name of the package (ex: sleeptest, dbench etc.)319        pkg_type : Type of the package (ex: test, dep etc.)320        install_dir : The directory in which the source is actually321                      untarred into. (ex: client/profilers/<name> for profilers)322        '''323        if self.pkgmgr.repositories:324            self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)325    def add_repository(self, repo_urls):326        '''327        Adds the repository locations to the job so that packages328        can be fetched from them when needed. The repository list329        needs to be a string list330        Ex: job.add_repository(['http://blah1','http://blah2'])331        '''332        for repo_url in repo_urls:333            self.pkgmgr.add_repository(repo_url)334        # Fetch the packages' checksum file that contains the checksums335        # of all the packages if it is not already fetched. The checksum336        # is always fetched whenever a job is first started. This337        # is not done in the job's constructor as we don't have the list of338        # the repositories there (and obviously don't care about this file339        # if we are not using the repos)340        try:341            checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,342                                              packages.CHECKSUM_FILE)343            self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,344                                  checksum_file_path, use_checksum=False)345        except error.PackageFetchError:346            # packaging system might not be working in this case347            # Silently fall back to the normal case348            pass349    def require_gcc(self):350        """351        Test whether gcc is installed on the machine.352        """353        # check if gcc is installed on the system.354        try:355            utils.system('which gcc')356        except error.CmdError:357            raise NotAvailableError('gcc is required by this job and is '358                                    'not available on the system')359    def setup_dep(self, deps):360        """Set up the dependencies for this test.361        deps is a list of libraries required for this test.362        """363        # Fetch the deps from the repositories and set them up.364        for dep in deps:365            dep_dir = os.path.join(self.autodir, 'deps', dep)366            # Search for the dependency in the repositories if specified,367            # else check locally.368            try:369                self.install_pkg(dep, 'dep', dep_dir)370            except error.PackageInstallError:371                # see if the dep is there locally372                pass373            # dep_dir might not exist if it is not fetched from the repos374            if not os.path.exists(dep_dir):375                raise error.TestError("Dependency %s does not exist" % dep)376            os.chdir(dep_dir)377            if execfile('%s.py' % dep, {}) is None:378                logging.info('Dependency %s successfuly built', dep)379    def _runtest(self, url, tag, timeout, args, dargs):380        try:381            l = lambda : test.runtest(self, url, tag, args, dargs)382            pid = parallel.fork_start(self.resultdir, l)383            if timeout:384                logging.debug('Waiting for pid %d for %d seconds', pid, timeout)385                parallel.fork_waitfor_timed(self.resultdir, pid, timeout)386            else:387                parallel.fork_waitfor(self.resultdir, pid)388        except error.TestBaseException:389            # These are already classified with an error type (exit_status)390            raise391        except error.JobError:392            raise  # Caught further up and turned into an ABORT.393        except Exception, e:394            # Converts all other exceptions thrown by the test regardless395            # of phase into a TestError(TestBaseException) subclass that396            # reports them with their full stack trace.397            raise error.UnhandledTestError(e)398    def _run_test_base(self, url, *args, **dargs):399        """400        Prepares arguments and run functions to run_test and run_test_detail.401        @param url A url that identifies the test to run.402        @param tag An optional keyword argument that will be added to the403            test and subdir name.404        @param subdir_tag An optional keyword argument that will be added405            to the subdir name.406        @returns:407                subdir: Test subdirectory408                testname: Test name409                group_func: Actual test run function410                timeout: Test timeout411        """412        _group, testname = self.pkgmgr.get_package_name(url, 'test')413        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)414        self._make_test_outputdir(subdir)415        timeout = dargs.pop('timeout', None)416        if timeout:417            logging.debug('Test has timeout: %d sec.', timeout)418        def log_warning(reason):419            self.record("WARN", subdir, testname, reason)420        @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)421        def group_func():422            try:423                self._runtest(url, tag, timeout, args, dargs)424            except error.TestBaseException, detail:425                # The error is already classified, record it properly.426                self.record(detail.exit_status, subdir, testname, str(detail))427                raise428            else:429                self.record('GOOD', subdir, testname, 'completed successfully')430        return (subdir, testname, group_func, timeout)431    @_run_test_complete_on_exit432    def run_test(self, url, *args, **dargs):433        """434        Summon a test object and run it.435        @param url A url that identifies the test to run.436        @param tag An optional keyword argument that will be added to the437            test and subdir name.438        @param subdir_tag An optional keyword argument that will be added439            to the subdir name.440        @returns True if the test passes, False otherwise.441        """442        (subdir, testname, group_func, timeout) = self._run_test_base(url,443                                                                      *args,444                                                                      **dargs)445        try:446            self._rungroup(subdir, testname, group_func, timeout)447            return True448        except error.TestBaseException:449            return False450        # Any other exception here will be given to the caller451        #452        # NOTE: The only exception possible from the control file here453        # is error.JobError as _runtest() turns all others into an454        # UnhandledTestError that is caught above.455    @_run_test_complete_on_exit456    def run_test_detail(self, url, *args, **dargs):457        """458        Summon a test object and run it, returning test status.459        @param url A url that identifies the test to run.460        @param tag An optional keyword argument that will be added to the461            test and subdir name.462        @param subdir_tag An optional keyword argument that will be added463            to the subdir name.464        @returns Test status465        @see: client/common_lib/error.py, exit_status466        """467        (subdir, testname, group_func, timeout) = self._run_test_base(url,468                                                                      *args,469                                                                      **dargs)470        try:471            self._rungroup(subdir, testname, group_func, timeout)472            return 'GOOD'473        except error.TestBaseException, detail:474            return detail.exit_status475    def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):476        """\477        subdir:478                name of the group479        testname:480                name of the test to run, or support step481        function:482                subroutine to run483        *args:484                arguments for the function485        Returns the result of the passed in function486        """487        try:488            optional_fields = None489            if timeout:490                optional_fields = {}491                optional_fields['timeout'] = timeout492            self.record('START', subdir, testname,493                        optional_fields=optional_fields)494            self._state.set('client', 'unexpected_reboot', (subdir, testname))495            try:496                result = function(*args, **dargs)497                self.record('END GOOD', subdir, testname)498                return result499            except error.TestBaseException, e:500                self.record('END %s' % e.exit_status, subdir, testname)501                raise502            except error.JobError, e:503                self.record('END ABORT', subdir, testname)504                raise505            except Exception, e:506                # This should only ever happen due to a bug in the given507                # function's code.  The common case of being called by508                # run_test() will never reach this.  If a control file called509                # run_group() itself, bugs in its function will be caught510                # here.511                err_msg = str(e) + '\n' + traceback.format_exc()512                self.record('END ERROR', subdir, testname, err_msg)513                raise514        finally:515            self._state.discard('client', 'unexpected_reboot')516    def run_group(self, function, tag=None, **dargs):517        """518        Run a function nested within a group level.519        function:520                Callable to run.521        tag:522                An optional tag name for the group.  If None (default)523                function.__name__ will be used.524        **dargs:525                Named arguments for the function.526        """527        if tag:528            name = tag529        else:530            name = function.__name__531        try:532            return self._rungroup(subdir=None, testname=name,533                                  function=function, timeout=None, **dargs)534        except (SystemExit, error.TestBaseException):535            raise536        # If there was a different exception, turn it into a TestError.537        # It will be caught by step_engine or _run_step_fn.538        except Exception, e:539            raise error.UnhandledTestError(e)540    def cpu_count(self):541        return utils.count_cpus()  # use total system count542    def start_reboot(self):543        self.record('START', None, 'reboot')544        self.record('GOOD', None, 'reboot.start')545    def _record_reboot_failure(self, subdir, operation, status,546                               running_id=None):547        self.record("ABORT", subdir, operation, status)548        if not running_id:549            running_id = utils.running_os_ident()550        kernel = {"kernel": running_id.split("::")[0]}551        self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)552    def _check_post_reboot(self, subdir, running_id=None):553        """554        Function to perform post boot checks such as if the system configuration555        has changed across reboots (specifically, CPUs and partitions).556        @param subdir: The subdir to use in the job.record call.557        @param running_id: An optional running_id to include in the reboot558            failure log message559        @raise JobError: Raised if the current configuration does not match the560            pre-reboot configuration.561        """562        # check to see if any partitions have changed563        partition_list = partition_lib.get_partition_list(self,564                                                          exclude_swap=False)565        mount_info = partition_lib.get_mount_info(partition_list)566        old_mount_info = self._state.get('client', 'mount_info')567        if mount_info != old_mount_info:568            new_entries = mount_info - old_mount_info569            old_entries = old_mount_info - mount_info570            description = ("mounted partitions are different after reboot "571                           "(old entries: %s, new entries: %s)" %572                           (old_entries, new_entries))573            self._record_reboot_failure(subdir, "reboot.verify_config",574                                        description, running_id=running_id)575            raise error.JobError("Reboot failed: %s" % description)576        # check to see if any CPUs have changed577        cpu_count = utils.count_cpus()578        old_count = self._state.get('client', 'cpu_count')579        if cpu_count != old_count:580            description = ('Number of CPUs changed after reboot '581                           '(old count: %d, new count: %d)' %582                           (old_count, cpu_count))583            self._record_reboot_failure(subdir, 'reboot.verify_config',584                                        description, running_id=running_id)585            raise error.JobError('Reboot failed: %s' % description)586    def partition(self, device, loop_size=0, mountpoint=None):587        """588        Work with a machine partition589            @param device: e.g. /dev/sda2, /dev/sdb1 etc...590            @param mountpoint: Specify a directory to mount to. If not specified591                               autotest tmp directory will be used.592            @param loop_size: Size of loopback device (in MB). Defaults to 0.593            @return: A L{client.bin.partition.partition} object594        """595        if not mountpoint:596            mountpoint = self.tmpdir597        return partition_lib.partition(self, device, loop_size, mountpoint)598    @utils.deprecated599    def filesystem(self, device, mountpoint=None, loop_size=0):600        """ Same as partition601        @deprecated: Use partition method instead602        """603        return self.partition(device, loop_size, mountpoint)604    def enable_external_logging(self):605        pass606    def disable_external_logging(self):607        pass608    def reboot_setup(self):609        # save the partition list and mount points, as well as the cpu count610        partition_list = partition_lib.get_partition_list(self,611                                                          exclude_swap=False)612        mount_info = partition_lib.get_mount_info(partition_list)613        self._state.set('client', 'mount_info', mount_info)614        self._state.set('client', 'cpu_count', utils.count_cpus())615    def reboot(self):616        self.reboot_setup()617        self.harness.run_reboot()618        # HACK: using this as a module sometimes hangs shutdown, so if it's619        # installed unload it first620        utils.system("modprobe -r netconsole", ignore_status=True)621        # sync first, so that a sync during shutdown doesn't time out622        utils.system("sync; sync", ignore_status=True)623        utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624        self.quit()625    def noop(self, text):626        logging.info("job: noop: " + text)627    @_run_test_complete_on_exit628    def parallel(self, *tasklist):629        """Run tasks in parallel"""630        pids = []631        old_log_filename = self._logger.global_filename632        for i, task in enumerate(tasklist):633            assert isinstance(task, (tuple, list))634            self._logger.global_filename = old_log_filename + (".%d" % i)635            def task_func():636                # stub out _record_indent with a process-local one637                base_record_indent = self._record_indent638                proc_local = self._job_state.property_factory(639                    '_state', '_record_indent.%d' % os.getpid(),640                    base_record_indent, namespace='client')641                self.__class__._record_indent = proc_local642                task[0](*task[1:])643            pids.append(parallel.fork_start(self.resultdir, task_func))644        old_log_path = os.path.join(self.resultdir, old_log_filename)645        old_log = open(old_log_path, "a")646        exceptions = []647        for i, pid in enumerate(pids):648            # wait for the task to finish649            try:650                parallel.fork_waitfor(self.resultdir, pid)651            except Exception, e:652                exceptions.append(e)653            # copy the logs from the subtask into the main log654            new_log_path = old_log_path + (".%d" % i)655            if os.path.exists(new_log_path):656                new_log = open(new_log_path)657                old_log.write(new_log.read())658                new_log.close()659                old_log.flush()660                os.remove(new_log_path)661        old_log.close()662        self._logger.global_filename = old_log_filename663        # handle any exceptions raised by the parallel tasks664        if exceptions:665            msg = "%d task(s) failed in job.parallel" % len(exceptions)666            raise error.JobError(msg)667    def quit(self):668        # XXX: should have a better name.669        self.harness.run_pause()670        raise error.JobContinue("more to come")671    def complete(self, status):672        """Write pending reports, clean up, and exit"""673        # write out a job HTML report674        try:675            html_report.create_report(self.resultdir)676        except Exception, e:677            logging.error("Error writing job HTML report: %s", e)678        # We are about to exit 'complete' so clean up the control file.679        dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680        shutil.move(self._state_file, dest)681        self.harness.run_complete()682        self.disable_external_logging()683        sys.exit(status)684    def _load_state(self):685        # grab any initial state and set up $CONTROL.state as the backing file686        init_state_file = self.control + '.init.state'687        self._state_file = self.control + '.state'688        if os.path.exists(init_state_file):689            shutil.move(init_state_file, self._state_file)690        self._state.set_backing_file(self._state_file)691        # initialize the state engine, if necessary692        has_steps = self._state.has('client', 'steps')693        if not self._is_continuation and has_steps:694            raise RuntimeError('Loaded state can only contain client.steps if '695                               'this is a continuation')696        if not has_steps:697            logging.debug('Initializing the state engine')698            self._state.set('client', 'steps', [])699    def handle_persistent_option(self, options, option_name):700        """701        Select option from command line or persistent state.702        Store selected option to allow standalone client to continue703        after reboot with previously selected options.704        Priority:705        1. explicitly specified via command line706        2. stored in state file (if continuing job '-c')707        3. default == None708        """709        option = None710        cmd_line_option = getattr(options, option_name)711        if cmd_line_option:712            option = cmd_line_option713            self._state.set('client', option_name, option)714        else:715            stored_option = self._state.get('client', option_name, None)716            if stored_option:717                option = stored_option718        logging.debug('Persistent option %s now set to %s', option_name, option)719        return option720    def __create_step_tuple(self, fn, args, dargs):721        # Legacy code passes in an array where the first arg is722        # the function or its name.723        if isinstance(fn, list):724            assert(len(args) == 0)725            assert(len(dargs) == 0)726            args = fn[1:]727            fn = fn[0]728        # Pickling actual functions is hairy, thus we have to call729        # them by name.  Unfortunately, this means only functions730        # defined globally can be used as a next step.731        if callable(fn):732            fn = fn.__name__733        if not isinstance(fn, types.StringTypes):734            raise StepError("Next steps must be functions or "735                            "strings containing the function name")736        ancestry = copy.copy(self._current_step_ancestry)737        return (ancestry, fn, args, dargs)738    def next_step_append(self, fn, *args, **dargs):739        """Define the next step and place it at the end"""740        steps = self._state.get('client', 'steps')741        steps.append(self.__create_step_tuple(fn, args, dargs))742        self._state.set('client', 'steps', steps)743    def next_step(self, fn, *args, **dargs):744        """Create a new step and place it after any steps added745        while running the current step but before any steps added in746        previous steps"""747        steps = self._state.get('client', 'steps')748        steps.insert(self._next_step_index,749                     self.__create_step_tuple(fn, args, dargs))750        self._next_step_index += 1751        self._state.set('client', 'steps', steps)752    def next_step_prepend(self, fn, *args, **dargs):753        """Insert a new step, executing first"""754        steps = self._state.get('client', 'steps')755        steps.insert(0, self.__create_step_tuple(fn, args, dargs))756        self._next_step_index += 1757        self._state.set('client', 'steps', steps)758    def _run_step_fn(self, local_vars, fn, args, dargs):759        """Run a (step) function within the given context"""760        local_vars['__args'] = args761        local_vars['__dargs'] = dargs762        try:763            exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)764            return local_vars['__ret']765        except SystemExit:766            raise  # Send error.JobContinue and JobComplete on up to runjob.767        except error.TestNAError, detail:768            self.record(detail.exit_status, None, fn, str(detail))769        except Exception, detail:770            raise error.UnhandledJobError(detail)771    def _create_frame(self, global_vars, ancestry, fn_name):772        """Set up the environment like it would have been when this773        function was first defined.774        Child step engine 'implementations' must have 'return locals()'775        at end end of their steps.  Because of this, we can call the776        parent function and get back all child functions (i.e. those777        defined within it).778        Unfortunately, the call stack of the function calling779        job.next_step might have been deeper than the function it780        added.  In order to make sure that the environment is what it781        should be, we need to then pop off the frames we built until782        we find the frame where the function was first defined."""783        # The copies ensure that the parent frames are not modified784        # while building child frames.  This matters if we then785        # pop some frames in the next part of this function.786        current_frame = copy.copy(global_vars)787        frames = [current_frame]788        for steps_fn_name in ancestry:789            ret = self._run_step_fn(current_frame, steps_fn_name, [], {})790            current_frame = copy.copy(ret)791            frames.append(current_frame)792        # Walk up the stack frames until we find the place fn_name was defined.793        while len(frames) > 2:794            if fn_name not in frames[-2]:795                break796            if frames[-2][fn_name] != frames[-1][fn_name]:797                break798            frames.pop()799            ancestry.pop()800        return (frames[-1], ancestry)801    def _add_step_init(self, local_vars, current_function):802        """If the function returned a dictionary that includes a803        function named 'step_init', prepend it to our list of steps.804        This will only get run the first time a function with a nested805        use of the step engine is run."""806        if (isinstance(local_vars, dict) and807            'step_init' in local_vars and808            callable(local_vars['step_init'])):809            # The init step is a child of the function810            # we were just running.811            self._current_step_ancestry.append(current_function)812            self.next_step_prepend('step_init')813    def step_engine(self):814        """The multi-run engine used when the control file defines step_init.815        Does the next step.816        """817        # Set up the environment and then interpret the control file.818        # Some control files will have code outside of functions,819        # which means we need to have our state engine initialized820        # before reading in the file.821        global_control_vars = {'job': self,822                               'args': self.args}823        exec(JOB_PREAMBLE, global_control_vars, global_control_vars)824        try:825            execfile(self.control, global_control_vars, global_control_vars)826        except error.TestNAError, detail:827            self.record(detail.exit_status, None, self.control, str(detail))828        except SystemExit:829            raise  # Send error.JobContinue and JobComplete on up to runjob.830        except Exception, detail:831            # Syntax errors or other general Python exceptions coming out of832            # the top level of the control file itself go through here.833            raise error.UnhandledJobError(detail)834        # If we loaded in a mid-job state file, then we presumably835        # know what steps we have yet to run.836        if not self._is_continuation:837            if 'step_init' in global_control_vars:838                self.next_step(global_control_vars['step_init'])839        else:840            # if last job failed due to unexpected reboot, record it as fail841            # so harness gets called842            last_job = self._state.get('client', 'unexpected_reboot', None)843            if last_job:844                subdir, testname = last_job845                self.record('FAIL', subdir, testname, 'unexpected reboot')846                self.record('END FAIL', subdir, testname)847        # Iterate through the steps.  If we reboot, we'll simply848        # continue iterating on the next step.849        while len(self._state.get('client', 'steps')) > 0:850            steps = self._state.get('client', 'steps')851            (ancestry, fn_name, args, dargs) = steps.pop(0)852            self._state.set('client', 'steps', steps)853            self._next_step_index = 0854            ret = self._create_frame(global_control_vars, ancestry, fn_name)855            local_vars, self._current_step_ancestry = ret856            local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)857            self._add_step_init(local_vars, fn_name)858    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):859        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),860                                   on_every_test)861    def add_sysinfo_logfile(self, file, on_every_test=False):862        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)863    def _add_sysinfo_loggable(self, loggable, on_every_test):864        if on_every_test:865            self.sysinfo.test_loggables.add(loggable)866        else:867            self.sysinfo.boot_loggables.add(loggable)868        self._save_sysinfo_state()869    def _load_sysinfo_state(self):870        state = self._state.get('client', 'sysinfo', None)871        if state:872            self.sysinfo.deserialize(state)873    def _save_sysinfo_state(self):874        state = self.sysinfo.serialize()875        self._state.set('client', 'sysinfo', state)876class disk_usage_monitor:877    def __init__(self, logging_func, device, max_mb_per_hour):878        self.func = logging_func879        self.device = device880        self.max_mb_per_hour = max_mb_per_hour881    def start(self):882        self.initial_space = utils.freespace(self.device)883        self.start_time = time.time()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
