Best Python code snippet using autotest_python
autotest_remote.py
Source:autotest_remote.py  
...29            os_dep.command(path)30        except ValueError:31            return False32    return True33def _client_system_wide_install(host):34    for path in SYSTEM_WIDE_PATHS:35        try:36            host.run('test -x %s' % utils.sh_escape(path))37        except Exception:38            return False39    return True40# For now, the only fully developed distro package is in Fedora/RHEL41_yum_install_cmd = 'yum -y install autotest-framework'42_yum_uninstall_cmd = 'yum -y remove autotest-framework'43INSTALL_CLIENT_CMD_MAPPING = {'Fedora': _yum_install_cmd,44                              'RHEL': _yum_install_cmd}45UNINSTALL_CLIENT_CMD_MAPPING = {'Fedora': _yum_uninstall_cmd,46                                'RHEL': _yum_uninstall_cmd}47class BaseAutotest(installable_object.InstallableObject):48    """49    This class represents the Autotest program.50    Autotest is used to run tests automatically and collect the results.51    It also supports profilers.52    Implementation details:53    This is a leaf class in an abstract class hierarchy, it must54    implement the unimplemented methods in parent classes.55    """56    def __init__(self, host=None):57        self.host = host58        self.got = False59        self.installed = False60        self.serverdir = utils.get_server_dir()61        self.os_vendor = client_utils.get_os_vendor()62        self.server_system_wide_install = _server_system_wide_install()63        super(BaseAutotest, self).__init__()64    install_in_tmpdir = False65    @classmethod66    def set_install_in_tmpdir(cls, flag):67        """68        Sets a flag that controls whether or not Autotest should by69        default be installed in a "standard" directory (e.g. /home/autotest) or70        a temporary directory.71        """72        cls.install_in_tmpdir = flag73    @classmethod74    def get_client_autodir_paths(cls, host):75        return settings.get_value('AUTOSERV', 'client_autodir_paths', type=list)76    @classmethod77    def get_installed_autodir(cls, host):78        """79        Find where the Autotest client is installed on the host.80        :return: an absolute path to an installed Autotest client root.81        :raise AutodirNotFoundError if no Autotest installation can be found.82        """83        autodir = host.get_autodir()84        if autodir:85            logging.debug('Using existing host autodir: %s', autodir)86            return autodir87        if not _server_system_wide_install():88            for path in Autotest.get_client_autodir_paths(host):89                try:90                    autotest_binary = os.path.join(path, CLIENT_BINARY)91                    host.run('test -x %s' % utils.sh_escape(autotest_binary))92                    host.run('test -w %s' % utils.sh_escape(path))93                    logging.debug('Found existing autodir at %s', path)94                    return path95                except error.AutoservRunError:96                    logging.debug('%s does not exist on %s', autotest_binary,97                                  host.hostname)98        else:99            for path in Autotest.get_client_autodir_paths(host):100                host.run('test -w %s' % utils.sh_escape(path))101                logging.debug('Found existing autodir at %s', path)102                host.autodir = path103                return path104        raise AutodirNotFoundError105    @classmethod106    def get_install_dir(cls, host):107        """108        Determines the location where autotest should be installed on109        host. If self.install_in_tmpdir is set, it will return a unique110        temporary directory that autotest can be installed in. Otherwise, looks111        for an existing installation to use; if none is found, looks for a112        usable directory in the global config client_autodir_paths.113        """114        try:115            install_dir = cls.get_installed_autodir(host)116        except AutodirNotFoundError:117            install_dir = cls._find_installable_dir(host)118        if cls.install_in_tmpdir:119            return host.get_tmp_dir(parent=install_dir)120        return install_dir121    @classmethod122    def _find_installable_dir(cls, host):123        client_autodir_paths = cls.get_client_autodir_paths(host)124        for path in client_autodir_paths:125            try:126                host.run('mkdir -p %s' % utils.sh_escape(path))127                host.run('test -w %s' % utils.sh_escape(path))128                return path129            except error.AutoservRunError:130                logging.debug('Failed to create %s', path)131        raise error.AutoservInstallError(132            'Unable to find a place to install Autotest; tried %s' %133            ', '.join(client_autodir_paths))134    def _create_test_output_dir(self, host, autodir):135        tmpdir = os.path.join(autodir, 'tmp')136        state_autodir = settings.get_value('COMMON', 'test_output_dir',137                                           default=tmpdir)138        host.run('mkdir -p %s' % utils.sh_escape(state_autodir))139    def get_fetch_location(self):140        repos = settings.get_value("PACKAGES", 'fetch_location', type=list,141                                   default=[])142        repos.reverse()143        return repos144    def install(self, host=None, autodir=None):145        self._install(host=host, autodir=autodir)146    def install_full_client(self, host=None, autodir=None):147        self._install(host=host, autodir=autodir, use_autoserv=False,148                      use_packaging=False)149    def install_no_autoserv(self, host=None, autodir=None):150        self._install(host=host, autodir=autodir, use_autoserv=False)151    def _install_using_packaging(self, host, autodir):152        repos = self.get_fetch_location()153        if not repos:154            raise error.PackageInstallError("No repos to install an "155                                            "autotest client from")156        pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,157                                         repo_urls=repos,158                                         do_locking=False,159                                         run_function=host.run,160                                         run_function_dargs=dict(timeout=600))161        # The packages dir is used to store all the packages that162        # are fetched on that client. (for the tests,deps etc.163        # too apart from the client)164        pkg_dir = os.path.join(autodir, 'packages')165        # clean up the autodir except for the packages directory166        host.run('cd %s && ls | grep -v "^packages$"'167                 ' | xargs rm -rf && rm -rf .[^.]*' % autodir)168        pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,169                           preserve_install_dir=True)170        self._create_test_output_dir(host, autodir)171        logging.info("Installation of autotest completed")172        self.installed = True173    def _install_using_send_file(self, host, autodir):174        dirs_to_exclude = set(["tests", "site_tests", "deps", "profilers"])175        light_files = [os.path.join(self.source_material, f)176                       for f in os.listdir(self.source_material)177                       if f not in dirs_to_exclude]178        # there should be one and only one grubby tarball179        grubby_glob = os.path.join(self.source_material,180                                   "deps/grubby/grubby-*.tar.bz2")181        grubby_tarball_paths = glob.glob(grubby_glob)182        if grubby_tarball_paths:183            grubby_tarball_path = grubby_tarball_paths[0]184            if os.path.exists(grubby_tarball_path):185                light_files.append(grubby_tarball_path)186        host.send_file(light_files, autodir, delete_dest=True)187        profilers_autodir = os.path.join(autodir, 'profilers')188        profilers_init = os.path.join(self.source_material, 'profilers',189                                      '__init__.py')190        host.run("mkdir -p %s" % profilers_autodir)191        host.send_file(profilers_init, profilers_autodir, delete_dest=True)192        dirs_to_exclude.discard("profilers")193        # create empty dirs for all the stuff we excluded194        commands = []195        for path in dirs_to_exclude:196            abs_path = os.path.join(autodir, path)197            abs_path = utils.sh_escape(abs_path)198            commands.append("mkdir -p '%s'" % abs_path)199            commands.append("touch '%s'/__init__.py" % abs_path)200        host.run(';'.join(commands))201    def _install(self, host=None, autodir=None, use_autoserv=True,202                 use_packaging=True):203        """204        Install autotest.205        :param host A Host instance on which autotest will be installed206        :param autodir Location on the remote host to install to207        :param use_autoserv Enable install modes that depend on the client208            running with the autoserv harness209        :param use_packaging Enable install modes that use the packaging system210        @exception AutoservError If it wasn't possible to install the client211                after trying all available methods212        """213        if not host:214            host = self.host215        if not self.got:216            self.get()217        host.wait_up(timeout=30)218        host.setup()219        logging.info("Installing autotest on %s", host.hostname)220        if self.server_system_wide_install:221            msg_install = ("Autotest seems to be installed in the "222                           "client on a system wide location, proceeding...")223            logging.info("Verifying client package install")224            if _client_system_wide_install(host):225                logging.info(msg_install)226                self.installed = True227                return228            install_cmd = INSTALL_CLIENT_CMD_MAPPING.get(self.os_vendor, None)229            if install_cmd is not None:230                logging.info(msg_install)231                host.run(install_cmd)232                if _client_system_wide_install(host):233                    logging.info("Autotest seems to be installed in the "234                                 "client on a system wide location, proceeding...")235                    self.installed = True236                    return237            raise error.AutoservError("The autotest client package "238                                      "does not seem to be installed "239                                      "on %s" % host.hostname)240        # set up the autotest directory on the remote machine241        if not autodir:242            autodir = self.get_install_dir(host)243        logging.info('Using installation dir %s', autodir)244        host.set_autodir(autodir)245        host.run('mkdir -p %s' % utils.sh_escape(autodir))246        # make sure there are no files in $AUTODIR/results247        results_path = os.path.join(autodir, 'results')248        host.run('rm -rf %s/*' % utils.sh_escape(results_path),249                 ignore_status=True)250        # Fetch the autotest client from the nearest repository251        if use_packaging:252            try:253                self._install_using_packaging(host, autodir)254                self._create_test_output_dir(host, autodir)255                logging.info("Installation of autotest completed")256                self.installed = True257                return258            except (error.PackageInstallError, error.AutoservRunError,259                    SettingsError) as e:260                logging.info("Could not install autotest using the packaging "261                             "system: %s. Trying other methods", e)262        # try to install from file or directory263        if self.source_material:264            supports_autoserv_packaging = settings.get_value("PACKAGES",265                                                             "serve_packages_from_autoserv",266                                                             type=bool)267            # Copy autotest recursively268            if supports_autoserv_packaging and use_autoserv:269                self._install_using_send_file(host, autodir)270            else:271                host.send_file(self.source_material, autodir, delete_dest=True)272            self._create_test_output_dir(host, autodir)273            logging.info("Installation of autotest completed")274            self.installed = True275            return276        raise error.AutoservError('Could not install autotest on '277                                  'target machine: %s' % host.name)278    def uninstall(self, host=None):279        """280        Uninstall (i.e. delete) autotest. Removes the autotest client install281        from the specified host.282        :params host a Host instance from which the client will be removed283        """284        if not self.installed:285            return286        if self.server_system_wide_install:287            uninstall_cmd = UNINSTALL_CLIENT_CMD_MAPPING.get(self.os_vendor,288                                                             None)289            if uninstall_cmd is not None:290                logging.info("Trying to uninstall autotest using distro "291                             "provided package manager")292                host.run(uninstall_cmd)293            return294        if not host:295            host = self.host296        autodir = host.get_autodir()297        if not autodir:298            return299        # perform the actual uninstall300        host.run("rm -rf %s" % utils.sh_escape(autodir), ignore_status=True)301        host.set_autodir(None)302        self.installed = False303    def get(self, location=None):304        if not location:305            location = os.path.join(self.serverdir, '../client')306            location = os.path.abspath(location)307        if not self.server_system_wide_install:308            # If there's stuff run on our client directory already, it309            # can cause problems. Try giving it a quick clean first.310            cwd = os.getcwd()311            os.chdir(location)312            try:313                utils.system('tools/make_clean', ignore_status=True)314            finally:315                os.chdir(cwd)316        super(BaseAutotest, self).get(location)317        self.got = True318    def run(self, control_file, results_dir='.', host=None, timeout=None,319            tag=None, parallel_flag=False, background=False,320            client_disconnect_timeout=None):321        """322        Run an autotest job on the remote machine.323        :param control_file: An open file-like-obj of the control file.324        :param results_dir: A str path where the results should be stored325                on the local filesystem.326        :param host: A Host instance on which the control file should327                be run.328        :param timeout: Maximum number of seconds to wait for the run or None.329        :param tag: Tag name for the client side instance of autotest.330        :param parallel_flag: Flag set when multiple jobs are run at the331                same time.332        :param background: Indicates that the client should be launched as333                a background job; the code calling run will be responsible334                for monitoring the client and collecting the results.335        :param client_disconnect_timeout: Seconds to wait for the remote host336                to come back after a reboot. Defaults to the host setting for337                DEFAULT_REBOOT_TIMEOUT.338        :raise AutotestRunError: If there is a problem executing339                the control file.340        """341        host = self._get_host_and_setup(host)342        results_dir = os.path.abspath(results_dir)343        if client_disconnect_timeout is None:344            client_disconnect_timeout = host.DEFAULT_REBOOT_TIMEOUT345        if tag:346            results_dir = os.path.join(results_dir, tag)347        atrun = _Run(host, results_dir, tag, parallel_flag, background)348        self._do_run(control_file, results_dir, host, atrun, timeout,349                     client_disconnect_timeout)350    def _get_host_and_setup(self, host):351        if not host:352            host = self.host353        if not self.installed:354            self.install(host)355        host.wait_up(timeout=30)356        return host357    def _do_run(self, control_file, results_dir, host, atrun, timeout,358                client_disconnect_timeout):359        try:360            atrun.verify_machine()361        except Exception:362            logging.error("Verify failed on %s. Reinstalling autotest",363                          host.hostname)364            self.install(host)365        atrun.verify_machine()366        debug = os.path.join(results_dir, 'debug')367        try:368            os.makedirs(debug)369        except Exception:370            pass371        delete_file_list = [atrun.remote_control_file,372                            atrun.remote_control_state,373                            atrun.manual_control_file,374                            atrun.manual_control_state]375        cmd = ';'.join('rm -f ' + control for control in delete_file_list)376        host.run(cmd, ignore_status=True)377        tmppath = utils.get(control_file)378        # build up the initialization prologue for the control file379        prologue_lines = []380        # Add the additional user arguments381        prologue_lines.append("args = %r\n" % self.job.args)382        # If the packaging system is being used, add the repository list.383        repos = None384        try:385            repos = self.get_fetch_location()386            pkgmgr = packages.PackageManager('autotest', hostname=host.hostname,387                                             repo_urls=repos)388            prologue_lines.append('job.add_repository(%s)\n' % repos)389        except SettingsError as e:390            # If repos is defined packaging is enabled so log the error391            if repos:392                logging.error(e)393        # on full-size installs, turn on any profilers the server is using394        if not atrun.background:395            running_profilers = host.job.profilers.add_log.iteritems()396            for profiler, (args, dargs) in running_profilers:397                call_args = [repr(profiler)]398                call_args += [repr(arg) for arg in args]399                call_args += ["%s=%r" % item for item in dargs.iteritems()]400                prologue_lines.append("job.profilers.add(%s)\n"401                                      % ", ".join(call_args))402        cfile = "".join(prologue_lines)403        cfile += open(tmppath).read()404        open(tmppath, "w").write(cfile)405        # Create and copy state file to remote_control_file + '.state'406        state_file = host.job.preprocess_client_state()407        host.send_file(state_file, atrun.remote_control_init_state)408        os.remove(state_file)409        # Copy control_file to remote_control_file on the host410        host.send_file(tmppath, atrun.remote_control_file)411        if os.path.abspath(tmppath) != os.path.abspath(control_file):412            os.remove(tmppath)413        atrun.execute_control(414            timeout=timeout,415            client_disconnect_timeout=client_disconnect_timeout)416    def run_timed_test(self, test_name, results_dir='.', host=None,417                       timeout=None, *args, **dargs):418        """419        Assemble a tiny little control file to just run one test,420        and run it as an autotest client-side test421        """422        if not host:423            host = self.host424        if not self.installed:425            self.install(host)426        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]427        cmd = ", ".join([repr(test_name)] + map(repr, args) + opts)428        control = "job.run_test(%s)\n" % cmd429        self.run(control, results_dir, host, timeout=timeout)430    def run_test(self, test_name, results_dir='.', host=None, *args, **dargs):431        self.run_timed_test(test_name, results_dir, host, timeout=None,432                            *args, **dargs)433class _BaseRun(object):434    """435    Represents a run of autotest control file.  This class maintains436    all the state necessary as an autotest control file is executed.437    It is not intended to be used directly, rather control files438    should be run using the run method in Autotest.439    """440    def __init__(self, host, results_dir, tag, parallel_flag, background):441        self.host = host442        self.results_dir = results_dir443        self.env = host.env444        self.tag = tag445        self.parallel_flag = parallel_flag446        self.background = background447        self.server_system_wide_install = _server_system_wide_install()448        self.autodir = Autotest.get_installed_autodir(self.host)449        tmpdir = os.path.join(self.autodir, 'tmp')450        state_dir = settings.get_value('COMMON', 'test_output_dir',451                                       default=tmpdir)452        if self.server_system_wide_install:453            control = os.path.join(state_dir, 'control')454        else:455            control = os.path.join(self.autodir, 'control')456        if tag:457            control += '.' + tag458        self.manual_control_file = control459        self.manual_control_init_state = os.path.join(state_dir,460                                                      os.path.basename(control) + ".init.state")461        self.manual_control_state = os.path.join(state_dir,462                                                 os.path.basename(control) + ".state")463        self.remote_control_file = control + '.autoserv'464        self.remote_control_init_state = os.path.join(state_dir,465                                                      os.path.basename(control) + ".autoserv.init.state")466        self.remote_control_state = os.path.join(state_dir,467                                                 os.path.basename(control) + ".autoserv.state")468        logging.debug("Remote control file: %s", self.remote_control_file)469        logging.debug("Remote control init state: %s", self.remote_control_init_state)470        logging.debug("Remote control state: %s", self.remote_control_state)471        self.config_file = os.path.join(self.autodir, 'global_config.ini')472    def _verify_machine_system_wide(self):473        if not _client_system_wide_install(self.host):474            raise error.AutoservInstallError("Autotest does not appear "475                                             "to be installed")476    def _verify_machine_local(self):477        binary = os.path.join(self.autodir, CLIENT_BINARY)478        try:479            self.host.run('test -x %s' % binary)480        except Exception:481            raise error.AutoservInstallError("Autotest does not appear "482                                             "to be installed")483    def verify_machine(self):484        if self.server_system_wide_install:485            self._verify_machine_system_wide()486        else:487            self._verify_machine_local()...auto_remote.py
Source:auto_remote.py  
...54        if self.server_system_wide_install:55            msg_install = ("Caliper seems to be installed in the client on a system wide"56                           " location, proceeding...")57            logging.info("Verifying client package install")58#            if _client_system_wide_install(host):59#                logging.info(msg_install)60#                self.installed = True61#                return62#63#            install_cmd = INSATLL_CLIENT_CMD_MAPPING.get(self.os_vendor, None)64#            if install_cmd is not None:65#                logging.info(msg_install)66#                host.run(install_cmd)67#                if _client_system_wide_install(host):68#                    logging.info("Caliper seems to be installed in the client on a system wide location")69#                    self.installed = True70#                    return71#            raise error.ServError("The caliper client package does not seem to be installed on %s"72#                                    % host.hostname)73#        # set up the caliper directory on the remote machine74#        if not autodir:75#            autodir = self.get_install_dir(host)76#        logging.info('Using installation dir %s', autodir)77#        host.set_autodir(sutodir)78#        host.run('mkdir -p %s' % utils.sh_escape(autodir))79#       80#        # make sure there are no files in $AUTODIR/results81#        results_path = os.path.join(autodir, 'results')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
