Best Python code snippet using autotest_python
job.py
Source:job.py  
...615                           (old_count, cpu_count))616            self._record_reboot_failure(subdir, 'reboot.verify_config',617                                        description, running_id=running_id)618            raise error.JobError('Reboot failed: %s' % description)619    def end_reboot(self, subdir, kernel, patches, running_id=None):620        self._check_post_reboot(subdir, running_id=running_id)621        # strip ::<timestamp> from the kernel version if present622        kernel = kernel.split("::")[0]623        kernel_info = {"kernel": kernel}624        for i, patch in enumerate(patches):625            kernel_info["patch%d" % i] = patch626        self.record("END GOOD", subdir, "reboot", optional_fields=kernel_info)627    def end_reboot_and_verify(self, expected_when, expected_id, subdir,628                              type='src', patches=[]):629        """ Check the passed kernel identifier against the command line630            and the running kernel, abort the job on missmatch. """631        logging.info("POST BOOT: checking booted kernel "632                     "mark=%d identity='%s' type='%s'",633                     expected_when, expected_id, type)634        running_id = utils.running_os_ident()635        cmdline = utils.read_one_line("/proc/cmdline")636        find_sum = re.compile(r'.*IDENT=(\d+)')637        m = find_sum.match(cmdline)638        cmdline_when = -1639        if m:640            cmdline_when = int(m.groups()[0])641        # We have all the facts, see if they indicate we642        # booted the requested kernel or not.643        bad = False644        if (type == 'src' and expected_id != running_id or645            type == 'rpm' and646            not running_id.startswith(expected_id + '::')):647            logging.error("Kernel identifier mismatch")648            bad = True649        if expected_when != cmdline_when:650            logging.error("Kernel command line mismatch")651            bad = True652        if bad:653            logging.error("   Expected Ident: " + expected_id)654            logging.error("    Running Ident: " + running_id)655            logging.error("    Expected Mark: %d", expected_when)656            logging.error("Command Line Mark: %d", cmdline_when)657            logging.error("     Command Line: " + cmdline)658            self._record_reboot_failure(subdir, "reboot.verify", "boot failure",659                                        running_id=running_id)660            raise error.JobError("Reboot returned with the wrong kernel")661        self.record('GOOD', subdir, 'reboot.verify',662                    utils.running_os_full_version())663        self.end_reboot(subdir, expected_id, patches, running_id=running_id)664    def partition(self, device, loop_size=0, mountpoint=None):665        """666        Work with a machine partition667            @param device: e.g. /dev/sda2, /dev/sdb1 etc...668            @param mountpoint: Specify a directory to mount to. If not specified669                               autotest tmp directory will be used.670            @param loop_size: Size of loopback device (in MB). Defaults to 0.671            @return: A L{client.bin.partition.partition} object672        """673        if not mountpoint:674            mountpoint = self.tmpdir675        return partition_lib.partition(self, device, loop_size, mountpoint)676    @utils.deprecated677    def filesystem(self, device, mountpoint=None, loop_size=0):...kernel.py
Source:kernel.py  
1import os, shutil, copy, pickle, re, glob, time, logging2from autotest_lib.client.bin import kernel_config, os_dep, kernelexpand, test3from autotest_lib.client.bin import utils4from autotest_lib.client.common_lib import log, error, packages5def tee_output_logdir_mark(fn):6    def tee_logdir_mark_wrapper(self, *args, **dargs):7        mark = self.__class__.__name__ + "." + fn.__name__8        logging.info("--- START %s ---", mark)9        self.job.logging.tee_redirect_debug_dir(self.log_dir)10        try:11            result = fn(self, *args, **dargs)12        finally:13            self.job.logging.restore()14            logging.info("--- END %s ---", mark)15        return result16    tee_logdir_mark_wrapper.__name__ = fn.__name__17    return tee_logdir_mark_wrapper18class kernel(object):19    """ Class for compiling kernels.20    Data for the object includes the src files21    used to create the kernel, patches applied, config (base + changes),22    the build directory itself, and logged output23    Properties:24            job25                    Backpointer to the job object we're part of26            autodir27                    Path to the top level autotest dir (/usr/local/autotest)28            src_dir29                    <tmp_dir>/src/30            build_dir31                    <tmp_dir>/linux/32            config_dir33                    <results_dir>/config/34            log_dir35                    <results_dir>/debug/36            results_dir37                    <results_dir>/results/38    """39    autodir = ''40    def __init__(self, job, base_tree, subdir, tmp_dir, build_dir, leave=False):41        """Initialize the kernel build environment42        job43                which job this build is part of44        base_tree45                base kernel tree. Can be one of the following:46                        1. A local tarball47                        2. A URL to a tarball48                        3. A local directory (will symlink it)49                        4. A shorthand expandable (eg '2.6.11-git3')50        subdir51                subdir in the results directory (eg "build")52                (holds config/, debug/, results/)53        tmp_dir54        leave55                Boolean, whether to leave existing tmpdir or not56        """57        self.job = job58        self.autodir = job.autodir59        self.src_dir    = os.path.join(tmp_dir, 'src')60        self.build_dir  = os.path.join(tmp_dir, build_dir)61                # created by get_kernel_tree62        self.config_dir = os.path.join(subdir, 'config')63        self.log_dir    = os.path.join(subdir, 'debug')64        self.results_dir = os.path.join(subdir, 'results')65        self.subdir     = os.path.basename(subdir)66        self.installed_as = None67        if not leave:68            if os.path.isdir(self.src_dir):69                utils.system('rm -rf ' + self.src_dir)70            if os.path.isdir(self.build_dir):71                utils.system('rm -rf ' + self.build_dir)72        if not os.path.exists(self.src_dir):73            os.mkdir(self.src_dir)74        for path in [self.config_dir, self.log_dir, self.results_dir]:75            if os.path.exists(path):76                utils.system('rm -rf ' + path)77            os.mkdir(path)78        logpath = os.path.join(self.log_dir, 'build_log')79        self.logfile = open(logpath, 'w+')80        self.applied_patches = []81        self.target_arch = None82        self.build_target = 'bzImage'83        self.build_image = None84        arch = utils.get_current_kernel_arch()85        if arch == 's390' or arch == 's390x':86            self.build_target = 'image'87        elif arch == 'ia64':88            self.build_target = 'all'89            self.build_image = 'vmlinux.gz'90        if not leave:91            self.logfile.write('BASE: %s\n' % base_tree)92            # Where we have direct version hint record that93            # for later configuration selection.94            shorthand = re.compile(r'^\d+\.\d+\.\d+')95            if shorthand.match(base_tree):96                self.base_tree_version = base_tree97            else:98                self.base_tree_version = None99            # Actually extract the tree.  Make sure we know it occured100            self.extract(base_tree)101    def kernelexpand(self, kernel):102        # If we have something like a path, just use it as it is103        if '/' in kernel:104            return [kernel]105        # Find the configured mirror list.106        mirrors = self.job.config_get('mirror.mirrors')107        if not mirrors:108            # LEGACY: convert the kernel.org mirror109            mirror = self.job.config_get('mirror.ftp_kernel_org')110            if mirror:111                korg = 'http://www.kernel.org/pub/linux/kernel'112                mirrors = [113                  [ korg + '/v2.6', mirror + '/v2.6' ],114                  [ korg + '/people/akpm/patches/2.6', mirror + '/akpm' ],115                  [ korg + '/people/mbligh', mirror + '/mbligh' ],116                ]117        patches = kernelexpand.expand_classic(kernel, mirrors)118        print patches119        return patches120    @log.record121    @tee_output_logdir_mark122    def extract(self, base_tree):123        if os.path.exists(base_tree):124            self.get_kernel_tree(base_tree)125        else:126            base_components = self.kernelexpand(base_tree)127            print 'kernelexpand: '128            print base_components129            self.get_kernel_tree(base_components.pop(0))130            if base_components:      # apply remaining patches131                self.patch(*base_components)132    @log.record133    @tee_output_logdir_mark134    def patch(self, *patches):135        """Apply a list of patches (in order)"""136        if not patches:137            return138        print 'Applying patches: ', patches139        self.apply_patches(self.get_patches(patches))140    @log.record141    @tee_output_logdir_mark142    def config(self, config_file = '', config_list = None, defconfig = False, make = None):143        self.set_cross_cc()144        config = kernel_config.kernel_config(self.job, self.build_dir,145                 self.config_dir, config_file, config_list,146                 defconfig, self.base_tree_version, make)147    def get_patches(self, patches):148        """fetch the patches to the local src_dir"""149        local_patches = []150        for patch in patches:151            dest = os.path.join(self.src_dir, os.path.basename(patch))152            # FIXME: this isn't unique. Append something to it153            # like wget does if it's not there?154            print "get_file %s %s %s %s" % (patch, dest, self.src_dir,155                                            os.path.basename(patch))156            utils.get_file(patch, dest)157            # probably safer to use the command, not python library158            md5sum = utils.system_output('md5sum ' + dest).split()[0]159            local_patches.append((patch, dest, md5sum))160        return local_patches161    def apply_patches(self, local_patches):162        """apply the list of patches, in order"""163        builddir = self.build_dir164        os.chdir(builddir)165        if not local_patches:166            return None167        for (spec, local, md5sum) in local_patches:168            if local.endswith('.bz2') or local.endswith('.gz'):169                ref = spec170            else:171                ref = utils.force_copy(local, self.results_dir)172                ref = self.job.relative_path(ref)173            patch_id = "%s %s %s" % (spec, ref, md5sum)174            log = "PATCH: " + patch_id + "\n"175            print log176            utils.cat_file_to_cmd(local, 'patch -p1 > /dev/null')177            self.logfile.write(log)178            self.applied_patches.append(patch_id)179    def get_kernel_tree(self, base_tree):180        """Extract/link base_tree to self.build_dir"""181        # if base_tree is a dir, assume uncompressed kernel182        if os.path.isdir(base_tree):183            print 'Symlinking existing kernel source'184            os.symlink(base_tree, self.build_dir)185        # otherwise, extract tarball186        else:187            os.chdir(os.path.dirname(self.src_dir))188            # Figure out local destination for tarball189            tarball = os.path.join(self.src_dir, os.path.basename(base_tree))190            utils.get_file(base_tree, tarball)191            print 'Extracting kernel tarball:', tarball, '...'192            utils.extract_tarball_to_dir(tarball, self.build_dir)193    def extraversion(self, tag, append=1):194        os.chdir(self.build_dir)195        extraversion_sub = r's/^EXTRAVERSION =\s*\(.*\)/EXTRAVERSION = '196        if append:197            p = extraversion_sub + '\\1-%s/' % tag198        else:199            p = extraversion_sub + '-%s/' % tag200        utils.system('mv Makefile Makefile.old')201        utils.system('sed "%s" < Makefile.old > Makefile' % p)202    @log.record203    @tee_output_logdir_mark204    def build(self, make_opts = '', logfile = '', extraversion='autotest'):205        """build the kernel206        make_opts207                additional options to make, if any208        """209        os_dep.commands('gcc', 'make')210        if logfile == '':211            logfile = os.path.join(self.log_dir, 'kernel_build')212        os.chdir(self.build_dir)213        if extraversion:214            self.extraversion(extraversion)215        self.set_cross_cc()216        # setup_config_file(config_file, config_overrides)217        # Not needed on 2.6, but hard to tell -- handle failure218        utils.system('make dep', ignore_status=True)219        threads = 2 * utils.count_cpus()220        build_string = 'make -j %d %s %s' % (threads, make_opts,221                                     self.build_target)222                                # eg make bzImage, or make zImage223        print build_string224        utils.system(build_string)225        if kernel_config.modules_needed('.config'):226            utils.system('make -j %d modules' % (threads))227        kernel_version = self.get_kernel_build_ver()228        kernel_version = re.sub('-autotest', '', kernel_version)229        self.logfile.write('BUILD VERSION: %s\n' % kernel_version)230        utils.force_copy(self.build_dir+'/System.map',231                                  self.results_dir)232    def build_timed(self, threads, timefile = '/dev/null', make_opts = '',233                                                    output = '/dev/null'):234        """time the bulding of the kernel"""235        os.chdir(self.build_dir)236        self.set_cross_cc()237        self.clean(logged=False)238        build_string = "/usr/bin/time -o %s make %s -j %s vmlinux" \239                                        % (timefile, make_opts, threads)240        build_string += ' > %s 2>&1' % output241        print build_string242        utils.system(build_string)243        if (not os.path.isfile('vmlinux')):244            errmsg = "no vmlinux found, kernel build failed"245            raise error.TestError(errmsg)246    @log.record247    @tee_output_logdir_mark248    def clean(self):249        """make clean in the kernel tree"""250        os.chdir(self.build_dir)251        print "make clean"252        utils.system('make clean > /dev/null 2> /dev/null')253    @log.record254    @tee_output_logdir_mark255    def mkinitrd(self, version, image, system_map, initrd):256        """Build kernel initrd image.257        Try to use distro specific way to build initrd image.258        Parameters:259                version260                        new kernel version261                image262                        new kernel image file263                system_map264                        System.map file265                initrd266                        initrd image file to build267        """268        vendor = utils.get_os_vendor()269        if os.path.isfile(initrd):270            print "Existing %s file, will remove it." % initrd271            os.remove(initrd)272        args = self.job.config_get('kernel.mkinitrd_extra_args')273        # don't leak 'None' into mkinitrd command274        if not args:275            args = ''276        if vendor in ['Red Hat', 'Fedora Core']:277            utils.system('mkinitrd %s %s %s' % (args, initrd, version))278        elif vendor in ['SUSE']:279            utils.system('mkinitrd %s -k %s -i %s -M %s' %280                         (args, image, initrd, system_map))281        elif vendor in ['Debian', 'Ubuntu']:282            if os.path.isfile('/usr/sbin/mkinitrd'):283                cmd = '/usr/sbin/mkinitrd'284            elif os.path.isfile('/usr/sbin/mkinitramfs'):285                cmd = '/usr/sbin/mkinitramfs'286            else:287                raise error.TestError('No Debian initrd builder')288            utils.system('%s %s -o %s %s' % (cmd, args, initrd, version))289        else:290            raise error.TestError('Unsupported vendor %s' % vendor)291    def set_build_image(self, image):292        self.build_image = image293    @log.record294    @tee_output_logdir_mark295    def install(self, tag='autotest', prefix = '/'):296        """make install in the kernel tree"""297        # Record that we have installed the kernel, and298        # the tag under which we installed it.299        self.installed_as = tag300        os.chdir(self.build_dir)301        if not os.path.isdir(prefix):302            os.mkdir(prefix)303        self.boot_dir = os.path.join(prefix, 'boot')304        if not os.path.isdir(self.boot_dir):305            os.mkdir(self.boot_dir)306        if not self.build_image:307            images = glob.glob('arch/*/boot/' + self.build_target)308            if len(images):309                self.build_image = images[0]310            else:311                self.build_image = self.build_target312        # remember installed files313        self.vmlinux = self.boot_dir + '/vmlinux-' + tag314        if (self.build_image != 'vmlinux'):315            self.image = self.boot_dir + '/vmlinuz-' + tag316        else:317            self.image = self.vmlinux318        self.system_map = self.boot_dir + '/System.map-' + tag319        self.config_file = self.boot_dir + '/config-' + tag320        self.initrd = ''321        # copy to boot dir322        utils.force_copy('vmlinux', self.vmlinux)323        if (self.build_image != 'vmlinux'):324            utils.force_copy(self.build_image, self.image)325        utils.force_copy('System.map', self.system_map)326        utils.force_copy('.config', self.config_file)327        if not kernel_config.modules_needed('.config'):328            return329        utils.system('make modules_install INSTALL_MOD_PATH=%s' % prefix)330        if prefix == '/':331            self.initrd = self.boot_dir + '/initrd-' + tag332            self.mkinitrd(self.get_kernel_build_ver(), self.image,333                          self.system_map, self.initrd)334    def add_to_bootloader(self, tag='autotest', args=''):335        """ add this kernel to bootloader, taking an336            optional parameter of space separated parameters337            e.g.:  kernel.add_to_bootloader('mykernel', 'ro acpi=off')338        """339        # remove existing entry if present340        self.job.bootloader.remove_kernel(tag)341        # pull the base argument set from the job config,342        baseargs = self.job.config_get('boot.default_args')343        if baseargs:344            args = baseargs + " " + args345        # otherwise populate from /proc/cmdline346        # if not baseargs:347        #       baseargs = open('/proc/cmdline', 'r').readline().strip()348        # NOTE: This is unnecessary, because boottool does it.349        root = None350        roots = [x for x in args.split() if x.startswith('root=')]351        if roots:352            root = re.sub('^root=', '', roots[0])353        arglist = [x for x in args.split() if not x.startswith('root=')]354        args = ' '.join(arglist)355        # add the kernel entry356        # add_kernel(image, title='autotest', initrd='')357        self.job.bootloader.add_kernel(self.image, tag, self.initrd, \358                                        args = args, root = root)359    def get_kernel_build_arch(self, arch=None):360        """361        Work out the current kernel architecture (as a kernel arch)362        """363        if not arch:364            arch = utils.get_current_kernel_arch()365        if re.match('i.86', arch):366            return 'i386'367        elif re.match('sun4u', arch):368            return 'sparc64'369        elif re.match('arm.*', arch):370            return 'arm'371        elif re.match('sa110', arch):372            return 'arm'373        elif re.match('s390x', arch):374            return 's390'375        elif re.match('parisc64', arch):376            return 'parisc'377        elif re.match('ppc.*', arch):378            return 'powerpc'379        elif re.match('mips.*', arch):380            return 'mips'381        else:382            return arch383    def get_kernel_build_release(self):384        releasem = re.compile(r'.*UTS_RELEASE\s+"([^"]+)".*');385        versionm = re.compile(r'.*UTS_VERSION\s+"([^"]+)".*');386        release = None387        version = None388        for f in [self.build_dir + "/include/linux/version.h",389                  self.build_dir + "/include/linux/utsrelease.h",390                  self.build_dir + "/include/linux/compile.h"]:391            if os.path.exists(f):392                fd = open(f, 'r')393                for line in fd.readlines():394                    m = releasem.match(line)395                    if m:396                        release = m.groups()[0]397                    m = versionm.match(line)398                    if m:399                        version = m.groups()[0]400                fd.close()401        return (release, version)402    def get_kernel_build_ident(self):403        (release, version) = self.get_kernel_build_release()404        if not release or not version:405            raise error.JobError('kernel has no identity')406        return release + '::' + version407    def boot(self, args='', ident=True):408        """ install and boot this kernel, do not care how409            just make it happen.410        """411        # If we can check the kernel identity do so.412        expected_ident = self.get_kernel_build_ident()413        if ident:414            when = int(time.time())415            args += " IDENT=%d" % (when)416            self.job.next_step_prepend(["job.end_reboot_and_verify", when,417                                        expected_ident, self.subdir,418                                        self.applied_patches])419        else:420            self.job.next_step_prepend(["job.end_reboot", self.subdir,421                                        expected_ident, self.applied_patches])422        # Check if the kernel has been installed, if not install423        # as the default tag and boot that.424        if not self.installed_as:425            self.install()426        # Boot the selected tag.427        self.add_to_bootloader(args=args, tag=self.installed_as)428        # Boot it.429        self.job.start_reboot()430        self.job.reboot(tag=self.installed_as)431    def get_kernel_build_ver(self):432        """Check Makefile and .config to return kernel version"""433        version = patchlevel = sublevel = extraversion = localversion = ''434        for line in open(self.build_dir + '/Makefile', 'r').readlines():435            if line.startswith('VERSION'):436                version = line[line.index('=') + 1:].strip()437            if line.startswith('PATCHLEVEL'):438                patchlevel = line[line.index('=') + 1:].strip()439            if line.startswith('SUBLEVEL'):440                sublevel = line[line.index('=') + 1:].strip()441            if line.startswith('EXTRAVERSION'):442                extraversion = line[line.index('=') + 1:].strip()443        for line in open(self.build_dir + '/.config', 'r').readlines():444            if line.startswith('CONFIG_LOCALVERSION='):445                localversion = line.rstrip().split('"')[1]446        return "%s.%s.%s%s%s" %(version, patchlevel, sublevel, extraversion, localversion)447    def set_build_target(self, build_target):448        if build_target:449            self.build_target = build_target450            print 'BUILD TARGET: %s' % self.build_target451    def set_cross_cc(self, target_arch=None, cross_compile=None,452                     build_target='bzImage'):453        """Set up to cross-compile.454                This is broken. We need to work out what the default455                compile produces, and if not, THEN set the cross456                compiler.457        """458        if self.target_arch:459            return460        # if someone has set build_target, don't clobber in set_cross_cc461        # run set_build_target before calling set_cross_cc462        if not self.build_target:463            self.set_build_target(build_target)464        # If no 'target_arch' given assume native compilation465        if target_arch is None:466            target_arch = utils.get_current_kernel_arch()467            if target_arch == 'ppc64':468                if self.build_target == 'bzImage':469                    self.build_target = 'vmlinux'470        if not cross_compile:471            cross_compile = self.job.config_get('kernel.cross_cc')472        if cross_compile:473            os.environ['CROSS_COMPILE'] = cross_compile474        else:475            if os.environ.has_key('CROSS_COMPILE'):476                del os.environ['CROSS_COMPILE']477        return                 # HACK. Crap out for now.478        # At this point I know what arch I *want* to build for479        # but have no way of working out what arch the default480        # compiler DOES build for.481        def install_package(package):482            raise NotImplementedError("I don't exist yet!")483        if target_arch == 'ppc64':484            install_package('ppc64-cross')485            cross_compile = os.path.join(self.autodir, 'sources/ppc64-cross/bin')486        elif target_arch == 'x86_64':487            install_package('x86_64-cross')488            cross_compile = os.path.join(self.autodir, 'sources/x86_64-cross/bin')489        os.environ['ARCH'] = self.target_arch = target_arch490        self.cross_compile = cross_compile491        if self.cross_compile:492            os.environ['CROSS_COMPILE'] = self.cross_compile493    def pickle_dump(self, filename):494        """dump a pickle of ourself out to the specified filename495        we can't pickle the backreference to job (it contains fd's),496        nor would we want to. Same for logfile (fd's).497        """498        temp = copy.copy(self)499        temp.job = None500        temp.logfile = None501        pickle.dump(temp, open(filename, 'w'))502class rpm_kernel(object):503    """ Class for installing rpm kernel package504    """505    def __init__(self, job, rpm_package, subdir):506        self.job = job507        self.rpm_package = rpm_package508        self.log_dir = os.path.join(subdir, 'debug')509        self.subdir = os.path.basename(subdir)510        if os.path.exists(self.log_dir):511            utils.system('rm -rf ' + self.log_dir)512        os.mkdir(self.log_dir)513        self.installed_as = None514    @log.record515    @tee_output_logdir_mark516    def install(self, tag='autotest', install_vmlinux=True):517        self.installed_as = tag518        self.image = None519        self.initrd = ''520        for rpm_pack in self.rpm_package:521            rpm_name = utils.system_output('rpm -qp ' + rpm_pack)522            # install523            utils.system('rpm -i --force ' + rpm_pack)524            # get file list525            files = utils.system_output('rpm -ql ' + rpm_name).splitlines()526            # search for vmlinuz527            for file in files:528                if file.startswith('/boot/vmlinuz'):529                    self.full_version = file[len('/boot/vmlinuz-'):]530                    self.image = file531                    self.rpm_flavour = rpm_name.split('-')[1]532                    # get version and release number533                    self.version, self.release = utils.system_output(534                            'rpm --queryformat="%{VERSION}\\n%{RELEASE}\\n" -q '535                            + rpm_name).splitlines()[0:2]536                    # prefer /boot/kernel-version before /boot/kernel537                    if self.full_version:538                        break539            # search for initrd540            for file in files:541                if file.startswith('/boot/initrd'):542                    self.initrd = file543                    # prefer /boot/initrd-version before /boot/initrd544                    if len(file) > len('/boot/initrd'):545                        break546        if self.image == None:547            errmsg = "specified rpm file(s) don't contain /boot/vmlinuz"548            raise error.TestError(errmsg)549        # install vmlinux550        if install_vmlinux:551            for rpm_pack in self.rpm_package:552                vmlinux = utils.system_output(553                        'rpm -q -l -p %s | grep /boot/vmlinux' % rpm_pack)554            utils.system('cd /; rpm2cpio %s | cpio -imuv .%s'555                         % (rpm_pack, vmlinux))556            if not os.path.exists(vmlinux):557                raise error.TestError('%s does not exist after installing %s'558                                      % (vmlinux, rpm_pack))559    def add_to_bootloader(self, tag='autotest', args=''):560        """ Add this kernel to bootloader561        """562        # remove existing entry if present563        self.job.bootloader.remove_kernel(tag)564        # pull the base argument set from the job config565        baseargs = self.job.config_get('boot.default_args')566        if baseargs:567            args = baseargs + ' ' + args568        # otherwise populate from /proc/cmdline569        # if not baseargs:570        #       baseargs = open('/proc/cmdline', 'r').readline().strip()571        # NOTE: This is unnecessary, because boottool does it.572        root = None573        roots = [x for x in args.split() if x.startswith('root=')]574        if roots:575            root = re.sub('^root=', '', roots[0])576        arglist = [x for x in args.split() if not x.startswith('root=')]577        args = ' '.join(arglist)578        # add the kernel entry579        self.job.bootloader.add_kernel(self.image, tag, self.initrd,580                                       args = args, root = root)581    def boot(self, args='', ident=True):582        """ install and boot this kernel583        """584        # Check if the kernel has been installed, if not install585        # as the default tag and boot that.586        if not self.installed_as:587            self.install()588        # If we can check the kernel identity do so.589        expected_ident = self.full_version590        if not expected_ident:591            expected_ident = '-'.join([self.version,592                                       self.rpm_flavour,593                                       self.release])594        if ident:595            when = int(time.time())596            args += " IDENT=%d" % (when)597            self.job.next_step_prepend(["job.end_reboot_and_verify",598                                        when, expected_ident, None, 'rpm'])599        else:600            self.job.next_step_prepend(["job.end_reboot", None,601                                        expected_ident, []])602        # Boot the selected tag.603        self.add_to_bootloader(args=args, tag=self.installed_as)604        # Boot it.605        self.job.start_reboot()606        self.job.reboot(tag=self.installed_as)607class rpm_kernel_suse(rpm_kernel):608    """ Class for installing openSUSE/SLE rpm kernel package609    """610    def install(self):611        # do not set the new kernel as the default one612        os.environ['PBL_AUTOTEST'] = '1'613        rpm_kernel.install(self, 'dummy')614        self.installed_as = self.job.bootloader.get_title_for_kernel(self.image)615        if not self.installed_as:616            errmsg = "cannot find installed kernel in bootloader configuration"617            raise error.TestError(errmsg)618    def add_to_bootloader(self, tag='dummy', args=''):619        """ Set parameters of this kernel in bootloader620        """621        # pull the base argument set from the job config622        baseargs = self.job.config_get('boot.default_args')623        if baseargs:624            args = baseargs + ' ' + args625        self.job.bootloader.add_args(tag, args)626def rpm_kernel_vendor(job, rpm_package, subdir):627        vendor = utils.get_os_vendor()628        if vendor == "SUSE":629            return rpm_kernel_suse(job, rpm_package, subdir)630        else:631            return rpm_kernel(job, rpm_package, subdir)632# just make the preprocessor a nop633def _preprocess_path_dummy(path):634    return path.strip()635# pull in some optional site-specific path pre-processing636preprocess_path = utils.import_site_function(__file__, 637    "autotest_lib.client.bin.site_kernel", "preprocess_path",638    _preprocess_path_dummy)639def auto_kernel(job, path, subdir, tmp_dir, build_dir, leave=False):640    """641    Create a kernel object, dynamically selecting the appropriate class to use642    based on the path provided.643    """644    kernel_paths = [preprocess_path(path)]645    if kernel_paths[0].endswith('.list'):646        # Fetch the list of packages to install647        kernel_list = os.path.join(tmp_dir, 'kernel.list')648        utils.get_file(kernel_paths[0], kernel_list)649        kernel_paths = [p.strip() for p in open(kernel_list).readlines()]650    if kernel_paths[0].endswith('.rpm'):651        rpm_paths = []652        for kernel_path in kernel_paths:653            if utils.is_url(kernel_path) or os.path.exists(kernel_path):654                rpm_paths.append(kernel_path)655            else:656                # Fetch the rpm into the job's packages directory and pass it to657                # rpm_kernel658                rpm_name = os.path.basename(kernel_path)659                # If the preprocessed path (kernel_path) is only a name then660                # search for the kernel in all the repositories, else fetch the661                # kernel from that specific path.662                job.pkgmgr.fetch_pkg(rpm_name, os.path.join(job.pkgdir, rpm_name),663                                     repo_url=os.path.dirname(kernel_path))664                rpm_paths.append(os.path.join(job.pkgdir, rpm_name))665        return rpm_kernel_vendor(job, rpm_paths, subdir)666    else:667        if len(kernel_paths) > 1:668            raise error.TestError("don't know what to do with more than one non-rpm kernel file")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
