Best Python code snippet using autotest_python
base_packages.py
Source:base_packages.py  
...159        """160        self.run_command = package_manager._run_command161        self.url = repository_url162        self.pkgmgr = package_manager163    def install_pkg_setup(self, name, fetch_dir, install):164        """165        Install setup for a package based on fetcher type.166        :type name: string167        :param name:  The filename to be munged168        :type fetch_dir: string169        :param fetch_dir: The destination path to be munged170        :type install: boolean171        :param install: Whether this is be called from the install path or not172        :return: tuple with (name, fetch_dir)173        """174        if install:175            fetch_dir = os.path.join(fetch_dir, re.sub("/", "_", name))176        return (name, fetch_dir)177    def fetch_pkg_file(self, filename, dest_path):178        """179        Fetch a package file from a package repository.180        :type filename: string181        :param filename: The filename of the package file to fetch.182        :type dest_path: string183        :param dest_path: Destination path to download the file to.184        :raise PackageFetchError: if the fetch failed185        """186        raise NotImplementedError()187    def install_pkg_post(self, filename, fetch_dir,188                         install_dir, preserve_install_dir=False):189        """190        Fetcher specific post install191        :type filename: string192        :param filename: The filename of the package to install193        :type fetch_dir: string194        :param fetch_dir: The fetched path of the package195        :type install_dir: string196        :param install_dir: The path to install the package to197        :type preserve_install_dir: boolean198        @preserve_install_dir: Preserve the install directory199        """200        # check to see if the install_dir exists and if it does201        # then check to see if the .checksum file is the latest202        install_dir_exists = False203        try:204            self.pkgmgr._run_command("ls %s" % install_dir)205            install_dir_exists = True206        except (error.CmdError, error.AutoservRunError):207            pass208        fetch_path = os.path.join(fetch_dir, re.sub("/", "_", filename))209        if (install_dir_exists and210                not self.pkgmgr.untar_required(fetch_path, install_dir)):211            return212        # untar the package into install_dir and213        # update the checksum in that directory214        if not preserve_install_dir:215            # Make sure we clean up the install_dir216            self.pkgmgr._run_command('rm -rf %s' % install_dir)217        self.pkgmgr._run_command('mkdir -p %s' % install_dir)218        self.pkgmgr.untar_pkg(fetch_path, install_dir)219class HttpFetcher(RepositoryFetcher):220    '''221    Repository Fetcher using HTTP222    '''223    #224    # parameters: url, destination file path225    #226    wget_cmd_pattern = 'wget --connect-timeout=15 -nv %s -O %s'227    def _quick_http_test(self):228        """229        Runs a wget command with a 30s timeout230        This checks that the repository is reachable, and avoids the need to231        wait for a full 10min timeout.232        """233        # just make a temp file to write a test fetch into234        mktemp = 'mktemp -u /tmp/tmp.XXXXXX'235        dest_file_path = self.run_command(mktemp).stdout.strip()236        try:237            # build up a wget command238            http_cmd = self.wget_cmd_pattern % (self.url, dest_file_path)239            try:240                self.run_command(http_cmd, _run_command_dargs={'timeout': 30})241            except Exception, e:242                msg = 'HTTP test failed, unable to contact %s: %s'243                raise error.PackageFetchError(msg % (self.url, e))244        finally:245            self.run_command('rm -rf %s' % dest_file_path)246    def fetch_pkg_file(self, filename, dest_path):247        """248        Fetch a package file from a package repository.249        :type filename: string250        :param filename: The filename of the package file to fetch.251        :type dest_path: string252        :param dest_path: Destination path to download the file to.253        :raise PackageFetchError: if the fetch failed254        """255        logging.info('Fetching %s from %s to %s', filename, self.url,256                     dest_path)257        # do a quick test to verify the repo is reachable258        self._quick_http_test()259        # try to retrieve the package via http260        package_url = os.path.join(self.url, filename)261        try:262            cmd = self.wget_cmd_pattern % (package_url, dest_path)263            result = self.run_command(cmd)264            file_exists = self.run_command(265                'ls %s' % dest_path,266                _run_command_dargs={'ignore_status': True}).exit_status == 0267            if not file_exists:268                logging.error('wget failed: %s', result)269                raise error.CmdError(cmd, result)270            logging.debug('Successfully fetched %s from %s', filename,271                          package_url)272        except error.CmdError:273            # remove whatever junk was retrieved when the get failed274            self.run_command('rm -f %s' % dest_path)275            raise error.PackageFetchError('%s not found in %s' % (filename,276                                                                  package_url))277class GitFetcher(RepositoryFetcher):278    """279    A git based repository fetcher280    """281    #282    # parameters: url, destination file path, <branch>:<file name>283    #284    git_archive_cmd_pattern = 'git archive --remote=%s -o %s %s'285    def __init__(self, package_manager, repository_url):286        """287        Initializes a new GitFetcher288        :type package_manager: BasePackageManager class289        :param package_manager: and instance of BasePackageManager class290        :type repository_url: string291        :param repository_url: The base URL of the git repository292        """293        super(GitFetcher, self).__init__(package_manager, repository_url)294        self._set_repo_url_branch(repository_url)295        logging.debug('GitFetcher initialized with repo=%s and branch=%s',296                      self.url, self.branch)297    def _set_repo_url_branch(self, repository_url):298        '''299        Parse the url, look for a branch and set it accordingly300        :type repository_url: string301        :param repository_url: The base URL of the git repository302        '''303        # do we have branch info in the repoistory_url?304        branch = "master"305        match = repository_url.split(":")306        if len(match) > 2:307            # we have a branch308            branch = match[2]309            repository_url = re.sub(":" + branch, "", repository_url)310        self.branch = branch311    def fetch_pkg_file(self, filename, dest_path):312        """313        Fetch a package file and save it to the given destination path314        git is an SCM, you can download the test directly.  No need to fetch315        a bz2'd tarball file.  However 'filename' is <type>-<name>.tar.bz2316        break this up and only fetch <name>.317        :type filename: string318        :param filename: The filename of the package file to fetch.319        :type dest_path: string320        :param dest_path: Destination path to download the file to.321        """322        logging.info('Fetching %s from %s to %s', filename, self.url,323                     dest_path)324        name, _ = self.pkgmgr.parse_tarball_name(filename)325        package_path = self.branch + " " + name326        try:327            cmd = self.git_archive_cmd_pattern % (self.url, dest_path, package_path)328            result = self.run_command(cmd)329            file_exists = self.run_command(330                'ls %s' % dest_path,331                _run_command_dargs={'ignore_status': True}).exit_status == 0332            if not file_exists:333                logging.error('git archive failed: %s', result)334                raise error.CmdError(cmd, result)335            logging.debug('Successfully fetched %s from %s', package_path,336                          self.url)337        except error.CmdError:338            raise error.PackageFetchError('%s not found in %s' % (name,339                                                                  package_path))340    def install_pkg_post(self, filename, fetch_dir, install_dir,341                         preserve_install_dir=False):342        os_dep.command("tar")343        filename, _ = self.pkgmgr.parse_tarball_name(filename)344        install_path = re.sub(filename, "", install_dir)345        for suffix in ['', '.tar', '.tar.bz2']:346            pkg_name = "%s%s" % (suffix, re.sub("/", "_", filename))347            fetch_path = os.path.join(fetch_dir, pkg_name)348            if os.path.exists(fetch_path):349                self.pkgmgr._run_command('tar -xf %s -C %s' % (fetch_path,350                                                               install_path))351class LocalFilesystemFetcher(RepositoryFetcher):352    def fetch_pkg_file(self, filename, dest_path):353        logging.info('Fetching %s from %s to %s', filename, self.url,354                     dest_path)355        local_path = os.path.join(self.url, filename)356        try:357            self.run_command('cp %s %s' % (local_path, dest_path))358            logging.debug('Successfully fetched %s from %s', filename,359                          local_path)360        except error.CmdError, e:361            raise error.PackageFetchError(362                'Package %s could not be fetched from %s'363                % (filename, self.url), e)364class BasePackageManager(object):365    def __init__(self, pkgmgr_dir, hostname=None, repo_urls=None,366                 upload_paths=None, do_locking=True, run_function=utils.run,367                 run_function_args=[], run_function_dargs={}):368        '''369        Initializes a new BasePackageManager instance370        One of most used interfaces on this class is the _run_command(), which371        is controlled by the run_function parameter. It defaults to utils.run()372        but a custom method (if provided) should be of the same schema as373        utils.run. It should return a CmdResult object and throw a CmdError374        exception. The reason for using a separate function to run the commands375        is that the same code can be run to fetch a package on the local376        machine or on a remote machine (in which case ssh_host's run function377        is passed in for run_function).378        :type pkgmgr_dir: string379        :param pkgmgr_dir: A directory that can be used by the package manager380        to dump stuff (like checksum files of the repositories etc)381        :type hostname: string382        :param hostname: hostname from where to fetch a list of package repos383        :type repo_urls: list of strings384        :param repo_urls: The list of the repository urls which is consulted385        whilst fetching the package386        :type upload_paths: list of strings387        :param upload_paths: The list of the upload of repositories to which388        the package is uploaded to389        :type do_locking: boolean390        :param do_locking: Enable locking when the packages are installed.391        :type run_function: function392        :param run_function: function used to execute commands.393        :type run_function_args: tuple394        :param run_function_args: positional (tuple-like) arguments to395        run_function396        :param run_function_dargs: dictionary397        :param run_function_dargs: named (dictionary-like) arguments to398        run_function399        '''400        # In memory dictionary that stores the checksum's of packages401        self._checksum_dict = {}402        self.pkgmgr_dir = pkgmgr_dir403        self.do_locking = do_locking404        self.hostname = hostname405        self.repositories = []406        # Create an internal function that is a simple wrapper of407        # run_function and takes in the args and dargs as arguments408        def _run_command(command, _run_command_args=run_function_args,409                         _run_command_dargs={}):410            '''411            Special internal function that takes in a command as412            argument and passes it on to run_function (if specified).413            The _run_command_dargs are merged into run_function_dargs414            with the former having more precedence than the latter.415            '''416            new_dargs = dict(run_function_dargs)417            new_dargs.update(_run_command_dargs)418            # avoid polluting logs with extremely verbose packaging output419            new_dargs.update({'stdout_tee': None})420            return run_function(command, *_run_command_args,421                                **new_dargs)422        self._run_command = _run_command423        # Process the repository URLs424        if not repo_urls:425            repo_urls = []426        elif hostname:427            repo_urls = self.get_mirror_list(repo_urls)428        for url in repo_urls:429            self.add_repository(url)430        # Process the upload URLs431        if not upload_paths:432            self.upload_paths = []433        else:434            self.upload_paths = list(upload_paths)435    def add_repository(self, repo):436        if isinstance(repo, basestring):437            self.repositories.append(self.get_fetcher(repo))438        elif isinstance(repo, RepositoryFetcher):439            self.repositories.append(repo)440        else:441            raise TypeError("repo must be RepositoryFetcher or url string")442    def get_fetcher(self, url):443        if url.startswith('http://'):444            return HttpFetcher(self, url)445        elif url.startswith('git://'):446            return GitFetcher(self, url)447        else:448            return LocalFilesystemFetcher(self, url)449    def repo_check(self, repo):450        '''451        Check to make sure the repo is in a sane state:452        ensure we have at least XX amount of free space453        Make sure we can write to the repo454        '''455        if not repo.startswith('/') and not repo.startswith('ssh:'):456            return457        try:458            create_directory(repo)459            check_diskspace(repo)460            check_write(repo)461        except (error.RepoWriteError, error.RepoUnknownError,462                error.RepoDiskFullError), e:463            raise error.RepoError("ERROR: Repo %s: %s" % (repo, e))464    def upkeep(self, custom_repos=None):465        '''466        Clean up custom upload/download areas467        '''468        from autotest.server import subcommand469        if not custom_repos:470            # Not all package types necessarily require or allow custom repos471            try:472                custom_repos = settings.get_value('PACKAGES',473                                                  'custom_upload_location').split(',')474            except SettingsError:475                custom_repos = []476            try:477                custom_download = settings.get_value('PACKAGES',478                                                     'custom_download_location')479                custom_repos += [custom_download]480            except SettingsError:481                pass482            if not custom_repos:483                return484        subcommand.parallel_simple(trim_custom_directories, custom_repos,485                                   log=False)486    def install_pkg(self, name, pkg_type, fetch_dir, install_dir,487                    preserve_install_dir=False, repo_url=None):488        '''489        Remove install_dir if it already exists and then recreate it unless490        preserve_install_dir is specified as True.491        Fetch the package into the pkg_dir. Untar the package into install_dir492        The assumption is that packages are of the form :493        <pkg_type>.<pkg_name>.tar.bz2494        name        : name of the package495        type        : type of the package496        fetch_dir   : The directory into which the package tarball will be497                      fetched to.498        install_dir : the directory where the package files will be untarred to499        repo_url    : the url of the repository to fetch the package from.500        '''501        # do_locking flag is on by default unless you disable it (typically502        # in the cases where packages are directly installed from the server503        # onto the client in which case fcntl stuff wont work as the code504        # will run on the server in that case..505        if self.do_locking:506            lockfile_name = '.%s-%s-lock' % (re.sub("/", "_", name), pkg_type)507            lockfile = open(os.path.join(self.pkgmgr_dir, lockfile_name), 'w')508        try:509            if self.do_locking:510                fcntl.flock(lockfile, fcntl.LOCK_EX)511            self._run_command('mkdir -p %s' % fetch_dir)512            pkg_name = self.get_tarball_name(name, pkg_type)513            try:514                # Fetch the package into fetch_dir515                fetcher = self.fetch_pkg(pkg_name, fetch_dir, use_checksum=True,516                                         repo_url=repo_url, install=True)517                fetcher.install_pkg_post(pkg_name, fetch_dir, install_dir, preserve_install_dir)518            except error.PackageFetchError, why:519                raise error.PackageInstallError(520                    'Installation of %s(type:%s) failed : %s'521                    % (name, pkg_type, why))522        finally:523            if self.do_locking:524                fcntl.flock(lockfile, fcntl.LOCK_UN)525                lockfile.close()526    def fetch_pkg(self, pkg_name, dest_path, repo_url=None, use_checksum=False, install=False):527        '''528        Fetch the package into dest_dir from repo_url. By default repo_url529        is None and the package is looked in all the repositories specified.530        Otherwise it fetches it from the specific repo_url.531        pkg_name     : name of the package (ex: test-sleeptest.tar.bz2,532                                            dep-gcc.tar.bz2, kernel.1-1.rpm)533        repo_url     : the URL of the repository where the package is located.534        dest_path    : complete path of where the package will be fetched to.535        use_checksum : This is set to False to fetch the packages.checksum file536                       so that the checksum comparison is bypassed for the537                       checksum file itself. This is used internally by the538                       packaging system. It should be ignored by externals539                       callers of this method who use it fetch custom packages.540        install      : install path has unique name and destination requirements541                       that vary based on the fetcher that is used.  So call them542                       here as opposed to install_pkg.543        '''544        try:545            self._run_command("ls %s" % os.path.dirname(dest_path))546        except (error.CmdError, error.AutoservRunError):547            raise error.PackageFetchError("Please provide a valid "548                                          "destination: %s " % dest_path)549        # See if the package was already fetched earlier, if so550        # the checksums need to be compared and the package is now551        # fetched only if they differ.552        pkg_exists = False553        try:554            self._run_command("ls %s" % dest_path)555            pkg_exists = True556        except (error.CmdError, error.AutoservRunError):557            pass558        # if a repository location is explicitly provided, fetch the package559        # from there and return560        if repo_url:561            repositories = [self.get_fetcher(repo_url)]562        elif self.repositories:563            repositories = self.repositories564        else:565            raise error.PackageFetchError("No repository urls specified")566        # install the package from the package repos, try the repos in567        # reverse order, assuming that the 'newest' repos are most desirable568        for fetcher in reversed(repositories):569            try:570                if isinstance(fetcher, GitFetcher):571                    use_checksum = False572                # different fetchers have different install requirements573                dest = fetcher.install_pkg_setup(pkg_name, dest_path, install)[1]574                # Fetch the package if it is not there, the checksum does575                # not match, or checksums are disabled entirely576                need_to_fetch = (not use_checksum or577                                 not pkg_exists or578                                 not self.compare_checksum(dest, fetcher.url))579                if need_to_fetch:580                    fetcher.fetch_pkg_file(pkg_name, dest)581                    # update checksum so we won't refetch next time.582                    if use_checksum:583                        self.update_checksum(dest)584                return fetcher585            except (error.PackageFetchError, error.AutoservRunError):586                # The package could not be found in this repo, continue looking587                logging.debug('%s could not be fetched from %s', pkg_name,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
