Best Python code snippet using autotest_python
server_job.py
Source:server_job.py  
...370        test_start_time = int(time.time())371        if self.resultdir:372            os.chdir(self.resultdir)373            # touch status.log so that the parser knows a job is running here374            open(self.get_status_log_path(), 'a').close()375            self.enable_external_logging()376        collect_crashinfo = True377        temp_control_file_dir = None378        try:379            if install_before and machines:380                self._execute_code(INSTALL_CONTROL_FILE, namespace)381            if not only_collect_crashinfo:382                # determine the dir to write the control files to383                cfd_specified = (control_file_dir384                                 and control_file_dir is not self.USE_TEMP_DIR)385                if cfd_specified:386                    temp_control_file_dir = None387                else:388                    temp_control_file_dir = tempfile.mkdtemp(389                        suffix='temp_control_file_dir')390                    control_file_dir = temp_control_file_dir391                server_control_file = os.path.join(control_file_dir,392                                                   SERVER_CONTROL_FILENAME)393                client_control_file = os.path.join(control_file_dir,394                                                   CLIENT_CONTROL_FILENAME)395                if self.client:396                    namespace['control'] = control397                    utils.open_write_close(client_control_file, control)398                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,399                                    server_control_file)400                else:401                    utils.open_write_close(server_control_file, control)402                logging.info("Processing control file")403                self._execute_code(server_control_file, namespace)404                logging.info("Finished processing control file")405                # no error occured, so we don't need to collect crashinfo406                collect_crashinfo = False407        finally:408            if temp_control_file_dir:409                # Clean up temp directory used for copies of the control files410                try:411                    shutil.rmtree(temp_control_file_dir)412                except Exception, e:413                    logging.warn('Could not remove temp directory %s: %s',414                                 temp_control_file_dir, e)415            if machines and (collect_crashdumps or collect_crashinfo):416                namespace['test_start_time'] = test_start_time417                if collect_crashinfo:418                    # includes crashdumps419                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)420                else:421                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)422            if self.uncollected_log_file:423                os.remove(self.uncollected_log_file)424            self.disable_external_logging()425            if cleanup and machines:426                self._execute_code(CLEANUP_CONTROL_FILE, namespace)427            if install_after and machines:428                self._execute_code(INSTALL_CONTROL_FILE, namespace)429    def set_test_tag(self, tag=''):430        """Set tag to be added to test name of all following run_test steps."""431        self.test_tag = tag432    def run_test(self, url, *args, **dargs):433        """434        Summon a test object and run it.435        tag436                tag to add to testname437        url438                url of the test to run439        """440        (group, testname) = self.pkgmgr.get_package_name(url, 'test')441        tag = dargs.pop('tag', None)442        if tag is None:443            tag = self.test_tag444        if tag:445            testname += '.' + str(tag)446        subdir = testname447        outputdir = os.path.join(self.resultdir, subdir)448        if os.path.exists(outputdir):449            msg = ("%s already exists, test <%s> may have"450                   " already run with tag <%s>" % (outputdir, testname, tag))451            raise error.TestError(msg)452        os.mkdir(outputdir)453        def group_func():454            try:455                test.runtest(self, url, tag, args, dargs)456            except error.TestBaseException, e:457                self.record(e.exit_status, subdir, testname, str(e))458                raise459            except Exception, e:460                info = str(e) + "\n" + traceback.format_exc()461                self.record('FAIL', subdir, testname, info)462                raise463            else:464                self.record('GOOD', subdir, testname, 'completed successfully')465        result, exc_info = self._run_group(testname, subdir, group_func)466        if exc_info and isinstance(exc_info[1], error.TestBaseException):467            return False468        elif exc_info:469            raise exc_info[0], exc_info[1], exc_info[2]470        else:471            return True472    def _run_group(self, name, subdir, function, *args, **dargs):473        """\474        Underlying method for running something inside of a group.475        """476        result, exc_info = None, None477        old_record_prefix = self.record_prefix478        try:479            self.record('START', subdir, name)480            self.record_prefix += '\t'481            try:482                result = function(*args, **dargs)483            finally:484                self.record_prefix = old_record_prefix485        except error.TestBaseException, e:486            self.record("END %s" % e.exit_status, subdir, name)487            exc_info = sys.exc_info()488        except Exception, e:489            err_msg = str(e) + '\n'490            err_msg += traceback.format_exc()491            self.record('END ABORT', subdir, name, err_msg)492            raise error.JobError(name + ' failed\n' + traceback.format_exc())493        else:494            self.record('END GOOD', subdir, name)495        return result, exc_info496    def run_group(self, function, *args, **dargs):497        """\498        function:499                subroutine to run500        *args:501                arguments for the function502        """503        name = function.__name__504        # Allow the tag for the group to be specified.505        tag = dargs.pop('tag', None)506        if tag:507            name = tag508        return self._run_group(name, None, function, *args, **dargs)[0]509    def run_reboot(self, reboot_func, get_kernel_func):510        """\511        A specialization of run_group meant specifically for handling512        a reboot. Includes support for capturing the kernel version513        after the reboot.514        reboot_func: a function that carries out the reboot515        get_kernel_func: a function that returns a string516        representing the kernel version.517        """518        old_record_prefix = self.record_prefix519        try:520            self.record('START', None, 'reboot')521            self.record_prefix += '\t'522            reboot_func()523        except Exception, e:524            self.record_prefix = old_record_prefix525            err_msg = str(e) + '\n' + traceback.format_exc()526            self.record('END FAIL', None, 'reboot', err_msg)527            raise528        else:529            kernel = get_kernel_func()530            self.record_prefix = old_record_prefix531            self.record('END GOOD', None, 'reboot',532                        optional_fields={"kernel": kernel})533    def run_control(self, path):534        """Execute a control file found at path (relative to the autotest535        path). Intended for executing a control file within a control file,536        not for running the top-level job control file."""537        path = os.path.join(self.autodir, path)538        control_file = self._load_control_file(path)539        self.run(control=control_file, control_file_dir=self.USE_TEMP_DIR)540    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):541        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),542                                   on_every_test)543    def add_sysinfo_logfile(self, file, on_every_test=False):544        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)545    def _add_sysinfo_loggable(self, loggable, on_every_test):546        if on_every_test:547            self.sysinfo.test_loggables.add(loggable)548        else:549            self.sysinfo.boot_loggables.add(loggable)550    def record(self, status_code, subdir, operation, status='',551               optional_fields=None):552        """553        Record job-level status554        The intent is to make this file both machine parseable and555        human readable. That involves a little more complexity, but556        really isn't all that bad ;-)557        Format is <status code>\t<subdir>\t<operation>\t<status>558        status code: see common_lib.log.is_valid_status()559                     for valid status definition560        subdir: MUST be a relevant subdirectory in the results,561        or None, which will be represented as '----'562        operation: description of what you ran (e.g. "dbench", or563                                        "mkfs -t foobar /dev/sda9")564        status: error message or "completed sucessfully"565        ------------------------------------------------------------566        Initial tabs indicate indent levels for grouping, and is567        governed by self.record_prefix568        multiline messages have secondary lines prefaced by a double569        space ('  ')570        Executing this method will trigger the logging of all new571        warnings to date from the various console loggers.572        """573        # poll all our warning loggers for new warnings574        warnings = self._read_warnings()575        old_record_prefix = self.record_prefix576        try:577            if status_code.startswith("END "):578                self.record_prefix += "\t"579            for timestamp, msg in warnings:580                self._record("WARN", None, None, msg, timestamp)581        finally:582            self.record_prefix = old_record_prefix583        # write out the actual status log line584        self._record(status_code, subdir, operation, status,585                      optional_fields=optional_fields)586    def _read_warnings(self):587        """Poll all the warning loggers and extract any new warnings that have588        been logged. If the warnings belong to a category that is currently589        disabled, this method will discard them and they will no longer be590        retrievable.591        Returns a list of (timestamp, message) tuples, where timestamp is an592        integer epoch timestamp."""593        warnings = []594        while True:595            # pull in a line of output from every logger that has596            # output ready to be read597            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)598            closed_loggers = set()599            for logger in loggers:600                line = logger.readline()601                # record any broken pipes (aka line == empty)602                if len(line) == 0:603                    closed_loggers.add(logger)604                    continue605                # parse out the warning606                timestamp, msgtype, msg = line.split('\t', 2)607                timestamp = int(timestamp)608                # if the warning is valid, add it to the results609                if self.warning_manager.is_valid(timestamp, msgtype):610                    warnings.append((timestamp, msg.strip()))611            # stop listening to loggers that are closed612            self.warning_loggers -= closed_loggers613            # stop if none of the loggers have any output left614            if not loggers:615                break616        # sort into timestamp order617        warnings.sort()618        return warnings619    def disable_warnings(self, warning_type):620        self.warning_manager.disable_warnings(warning_type)621        self.record("INFO", None, None,622                    "disabling %s warnings" % warning_type,623                    {"warnings.disable": warning_type})624    def enable_warnings(self, warning_type):625        self.warning_manager.enable_warnings(warning_type)626        self.record("INFO", None, None,627                    "enabling %s warnings" % warning_type,628                    {"warnings.enable": warning_type})629    def get_status_log_path(self, subdir=None):630        """Return the path to the job status log.631        @param subdir - Optional paramter indicating that you want the path632            to a subdirectory status log.633        @returns The path where the status log should be.634        """635        if self.resultdir:636            if subdir:637                return os.path.join(self.resultdir, subdir, "status.log")638            else:639                return os.path.join(self.resultdir, "status.log")640        else:641            return None642    def _update_uncollected_logs_list(self, update_func):643        """Updates the uncollected logs list in a multi-process safe manner.644        @param update_func - a function that updates the list of uncollected645            logs. Should take one parameter, the list to be updated.646        """647        if self.uncollected_log_file:648            log_file = open(self.uncollected_log_file, "r+")649            fcntl.flock(log_file, fcntl.LOCK_EX)650        try:651            uncollected_logs = pickle.load(log_file)652            update_func(uncollected_logs)653            log_file.seek(0)654            log_file.truncate()655            pickle.dump(uncollected_logs, log_file)656            log_file.flush()657        finally:658            fcntl.flock(log_file, fcntl.LOCK_UN)659            log_file.close()660    def add_client_log(self, hostname, remote_path, local_path):661        """Adds a new set of client logs to the list of uncollected logs,662        to allow for future log recovery.663        @param host - the hostname of the machine holding the logs664        @param remote_path - the directory on the remote machine holding logs665        @param local_path - the local directory to copy the logs into666        """667        def update_func(logs_list):668            logs_list.append((hostname, remote_path, local_path))669        self._update_uncollected_logs_list(update_func)670    def remove_client_log(self, hostname, remote_path, local_path):671        """Removes a set of client logs from the list of uncollected logs,672        to allow for future log recovery.673        @param host - the hostname of the machine holding the logs674        @param remote_path - the directory on the remote machine holding logs675        @param local_path - the local directory to copy the logs into676        """677        def update_func(logs_list):678            logs_list.remove((hostname, remote_path, local_path))679        self._update_uncollected_logs_list(update_func)680    def _render_record(self, status_code, subdir, operation, status='',681                       epoch_time=None, record_prefix=None,682                       optional_fields=None):683        """684        Internal Function to generate a record to be written into a685        status log. For use by server_job.* classes only.686        """687        if subdir:688            if re.match(r'[\n\t]', subdir):689                raise ValueError('Invalid character in subdir string')690            substr = subdir691        else:692            substr = '----'693        if not log.is_valid_status(status_code):694            raise ValueError('Invalid status code supplied: %s' % status_code)695        if not operation:696            operation = '----'697        if re.match(r'[\n\t]', operation):698            raise ValueError('Invalid character in operation string')699        operation = operation.rstrip()700        status = status.rstrip()701        status = re.sub(r"\t", "  ", status)702        # Ensure any continuation lines are marked so we can703        # detect them in the status file to ensure it is parsable.704        status = re.sub(r"\n", "\n" + self.record_prefix + "  ", status)705        if not optional_fields:706            optional_fields = {}707        # Generate timestamps for inclusion in the logs708        if epoch_time is None:709            epoch_time = int(time.time())710        local_time = time.localtime(epoch_time)711        optional_fields["timestamp"] = str(epoch_time)712        optional_fields["localtime"] = time.strftime("%b %d %H:%M:%S",713                                                     local_time)714        fields = [status_code, substr, operation]715        fields += ["%s=%s" % x for x in optional_fields.iteritems()]716        fields.append(status)717        if record_prefix is None:718            record_prefix = self.record_prefix719        msg = '\t'.join(str(x) for x in fields)720        return record_prefix + msg + '\n'721    def _record_prerendered(self, msg):722        """723        Record a pre-rendered msg into the status logs. The only724        change this makes to the message is to add on the local725        indentation. Should not be called outside of server_job.*726        classes. Unlike _record, this does not write the message727        to standard output.728        """729        lines = []730        status_file = self.get_status_log_path()731        status_log = open(status_file, 'a')732        for line in msg.splitlines():733            line = self.record_prefix + line + '\n'734            lines.append(line)735            status_log.write(line)736        status_log.close()737        self.__parse_status(lines)738    def _fill_server_control_namespace(self, namespace, protect=True):739        """740        Prepare a namespace to be used when executing server control files.741        This sets up the control file API by importing modules and making them742        available under the appropriate names within namespace.743        For use by _execute_code().744        Args:745          namespace: The namespace dictionary to fill in.746          protect: Boolean.  If True (the default) any operation that would747              clobber an existing entry in namespace will cause an error.748        Raises:749          error.AutoservError: When a name would be clobbered by import.750        """751        def _import_names(module_name, names=()):752            """753            Import a module and assign named attributes into namespace.754            Args:755                module_name: The string module name.756                names: A limiting list of names to import from module_name.  If757                    empty (the default), all names are imported from the module758                    similar to a "from foo.bar import *" statement.759            Raises:760                error.AutoservError: When a name being imported would clobber761                    a name already in namespace.762            """763            module = __import__(module_name, {}, {}, names)764            # No names supplied?  Import * from the lowest level module.765            # (Ugh, why do I have to implement this part myself?)766            if not names:767                for submodule_name in module_name.split('.')[1:]:768                    module = getattr(module, submodule_name)769                if hasattr(module, '__all__'):770                    names = getattr(module, '__all__')771                else:772                    names = dir(module)773            # Install each name into namespace, checking to make sure it774            # doesn't override anything that already exists.775            for name in names:776                # Check for conflicts to help prevent future problems.777                if name in namespace and protect:778                    if namespace[name] is not getattr(module, name):779                        raise error.AutoservError('importing name '780                                '%s from %s %r would override %r' %781                                (name, module_name, getattr(module, name),782                                 namespace[name]))783                    else:784                        # Encourage cleanliness and the use of __all__ for a785                        # more concrete API with less surprises on '*' imports.786                        warnings.warn('%s (%r) being imported from %s for use '787                                      'in server control files is not the '788                                      'first occurrance of that import.' %789                                      (name, namespace[name], module_name))790                namespace[name] = getattr(module, name)791        # This is the equivalent of prepending a bunch of import statements to792        # the front of the control script.793        namespace.update(os=os, sys=sys, logging=logging)794        _import_names('autotest_lib.server',795                ('hosts', 'autotest', 'kvm', 'git', 'standalone_profiler',796                 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))797        _import_names('autotest_lib.server.subcommand',798                      ('parallel', 'parallel_simple', 'subcommand'))799        _import_names('autotest_lib.server.utils',800                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))801        _import_names('autotest_lib.client.common_lib.error')802        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))803        # Inject ourself as the job object into other classes within the API.804        # (Yuck, this injection is a gross thing be part of a public API. -gps)805        #806        # XXX Base & SiteAutotest do not appear to use .job.  Who does?807        namespace['autotest'].Autotest.job = self808        # server.hosts.base_classes.Host uses .job.809        namespace['hosts'].Host.job = self810    def _execute_code(self, code_file, namespace, protect=True):811        """812        Execute code using a copy of namespace as a server control script.813        Unless protect_namespace is explicitly set to False, the dict will not814        be modified.815        Args:816          code_file: The filename of the control file to execute.817          namespace: A dict containing names to make available during execution.818          protect: Boolean.  If True (the default) a copy of the namespace dict819              is used during execution to prevent the code from modifying its820              contents outside of this function.  If False the raw dict is821              passed in and modifications will be allowed.822        """823        if protect:824            namespace = namespace.copy()825        self._fill_server_control_namespace(namespace, protect=protect)826        # TODO: Simplify and get rid of the special cases for only 1 machine.827        if len(self.machines) > 1:828            machines_text = '\n'.join(self.machines) + '\n'829            # Only rewrite the file if it does not match our machine list.830            try:831                machines_f = open(MACHINES_FILENAME, 'r')832                existing_machines_text = machines_f.read()833                machines_f.close()834            except EnvironmentError:835                existing_machines_text = None836            if machines_text != existing_machines_text:837                utils.open_write_close(MACHINES_FILENAME, machines_text)838        execfile(code_file, namespace, namespace)839    def _record(self, status_code, subdir, operation, status='',840                 epoch_time=None, optional_fields=None):841        """842        Actual function for recording a single line into the status843        logs. Should never be called directly, only by job.record as844        this would bypass the console monitor logging.845        """846        msg = self._render_record(status_code, subdir, operation, status,847                                  epoch_time, optional_fields=optional_fields)848        status_file = self.get_status_log_path()849        sys.stdout.write(msg)850        if status_file:851            open(status_file, "a").write(msg)852        if subdir:853            sub_status_file = self.get_status_log_path(subdir)854            open(sub_status_file, "a").write(msg)855        self.__parse_status(msg.splitlines())856    def __parse_status(self, new_lines):857        if not self.using_parser:858            return859        new_tests = self.parser.process_lines(new_lines)860        for test in new_tests:861            self.__insert_test(test)862    def __insert_test(self, test):863        """864        An internal method to insert a new test result into the865        database. This method will not raise an exception, even if an866        error occurs during the insert, to avoid failing a test867        simply because of unexpected database issues."""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
