Best Python code snippet using fMBT_python
ior_test_base.py
Source:ior_test_base.py  
1#!/usr/bin/python2"""3(C) Copyright 2018-2021 Intel Corporation.4SPDX-License-Identifier: BSD-2-Clause-Patent5"""6import os7import threading8from ClusterShell.NodeSet import NodeSet9from dfuse_test_base import DfuseTestBase10from ior_utils import IorCommand11from command_utils_base import CommandFailure12from job_manager_utils import Mpirun13from general_utils import pcmd14from daos_utils import DaosCommand15from mpio_utils import MpioUtils16from test_utils_pool import TestPool17from test_utils_container import TestContainer18class IorTestBase(DfuseTestBase):19    # pylint: disable=too-many-ancestors20    """Base IOR test class.21    :avocado: recursive22    """23    IOR_WRITE_PATTERN = "Commencing write performance test"24    IOR_READ_PATTERN = "Commencing read performance test"25    def __init__(self, *args, **kwargs):26        """Initialize a IorTestBase object."""27        super().__init__(*args, **kwargs)28        self.ior_cmd = None29        self.processes = None30        self.hostfile_clients_slots = None31        self.container = None32    def setUp(self):33        """Set up each test case."""34        # obtain separate logs35        self.update_log_file_names()36        # Start the servers and agents37        super().setUp()38        # Get the parameters for IOR39        self.ior_cmd = IorCommand()40        self.ior_cmd.get_params(self)41        self.processes = self.params.get("np", '/run/ior/client_processes/*')42        self.subprocess = self.params.get("subprocess", '/run/ior/*', False)43    def create_pool(self):44        """Create a TestPool object to use with ior."""45        # Get the pool params46        self.pool = TestPool(self.context, self.get_dmg_command())47        self.pool.get_params(self)48        # Create a pool49        self.pool.create()50    def create_cont(self):51        """Create a TestContainer object to be used to create container.52        """53        # Get container params54        self.container = TestContainer(55            self.pool, daos_command=DaosCommand(self.bin))56        self.container.get_params(self)57        # create container58        self.container.create()59    def display_pool_space(self, pool=None):60        """Display the current pool space.61        If the TestPool object has a DmgCommand object assigned, also display62        the free pool space per target.63        Args:64            pool (TestPool, optional): The pool for which to display space.65                    Default is self.pool.66        """67        if not pool:68            pool = self.pool69        pool.display_pool_daos_space()70        if pool.dmg:71            pool.set_query_data()72    def run_ior_with_pool(self, intercept=None, test_file_suffix="",73                          test_file="daos:testFile", create_pool=True,74                          create_cont=True, stop_dfuse=True, plugin_path=None,75                          timeout=None, fail_on_warning=False,76                          mount_dir=None):77        # pylint: disable=too-many-arguments78        """Execute ior with optional overrides for ior flags and object_class.79        If specified the ior flags and ior daos object class parameters will80        override the values read from the yaml file.81        Args:82            intercept (str, optional): path to the interception library. Shall83                    be used only for POSIX through DFUSE. Defaults to None.84            test_file_suffix (str, optional): suffix to add to the end of the85                test file name. Defaults to "".86            test_file (str, optional): ior test file name. Defaults to87                "daos:testFile". Is ignored when using POSIX through DFUSE.88            create_pool (bool, optional): If it is true, create pool and89                container else just run the ior. Defaults to True.90            create_cont (bool, optional): Create new container. Default is True91            stop_dfuse (bool, optional): Stop dfuse after ior command is92                finished. Default is True.93            plugin_path (str, optional): HDF5 vol connector library path.94                This will enable dfuse (xattr) working directory which is95                needed to run vol connector for DAOS. Default is None.96            timeout (int, optional): command timeout. Defaults to None.97            fail_on_warning (bool, optional): Controls whether the test98                should fail if a 'WARNING' is found. Default is False.99            mount_dir (str, optional): Create specific mount point100        Returns:101            CmdResult: result of the ior command execution102        """103        if create_pool:104            self.update_ior_cmd_with_pool(create_cont)105        # start dfuse if api is POSIX or HDF5 with vol connector106        if self.ior_cmd.api.value == "POSIX" or plugin_path:107            # Connect to the pool, create container and then start dfuse108            if not self.dfuse:109                self.start_dfuse(110                    self.hostlist_clients, self.pool, self.container, mount_dir)111        # setup test file for POSIX or HDF5 with vol connector112        if self.ior_cmd.api.value == "POSIX" or plugin_path:113            test_file = os.path.join(self.dfuse.mount_dir.value, "testfile")114        elif self.ior_cmd.api.value == "DFS":115            test_file = os.path.join("/", "testfile")116        self.ior_cmd.test_file.update("".join([test_file, test_file_suffix]))117        job_manager = self.get_ior_job_manager_command()118        job_manager.timeout = timeout119        try:120            out = self.run_ior(job_manager, self.processes,121                               intercept, plugin_path=plugin_path,122                               fail_on_warning=fail_on_warning)123        finally:124            if stop_dfuse:125                self.stop_dfuse()126        return out127    def update_ior_cmd_with_pool(self, create_cont=True):128        """Update ior_cmd with pool.129        Args:130          create_cont (bool, optional): create a container. Defaults to True.131        """132        # Create a pool if one does not already exist133        if self.pool is None:134            self.create_pool()135        # Create a container, if needed.136        # Don't pass uuid and pool handle to IOR.137        # It will not enable checksum feature138        if create_cont:139            self.pool.connect()140            self.create_cont()141        # Update IOR params with the pool and container params142        self.ior_cmd.set_daos_params(self.server_group, self.pool,143                                     self.container.uuid)144    def get_ior_job_manager_command(self, custom_ior_cmd=None):145        """Get the MPI job manager command for IOR.146        Args:147            custom_ior_cmd (IorCommand): Custom IorCommand instance to create148            job_manager with.149        Returns:150            str: the path for the mpi job manager command151        """152        # Initialize MpioUtils if IOR is running in MPIIO or DFS mode153        if self.ior_cmd.api.value in ["MPIIO", "POSIX", "DFS", "HDF5"]:154            mpio_util = MpioUtils()155            if mpio_util.mpich_installed(self.hostlist_clients) is False:156                self.fail("Exiting Test: Mpich not installed")157        else:158            self.fail("Unsupported IOR API")159        if custom_ior_cmd:160            self.job_manager = Mpirun(custom_ior_cmd, self.subprocess, "mpich")161        else:162            self.job_manager = Mpirun(self.ior_cmd, self.subprocess, "mpich")163        return self.job_manager164    def check_subprocess_status(self, operation="write"):165        """Check subprocess status."""166        if operation == "write":167            self.ior_cmd.pattern = self.IOR_WRITE_PATTERN168        elif operation == "read":169            self.ior_cmd.pattern = self.IOR_READ_PATTERN170        else:171            self.fail("Exiting Test: Inappropriate operation type \172                      for subprocess status check")173        if not self.ior_cmd.check_ior_subprocess_status(174                self.job_manager.process, self.ior_cmd):175            self.fail("Exiting Test: Subprocess not running")176    def run_ior(self, manager, processes, intercept=None, display_space=True,177                plugin_path=None, fail_on_warning=False, pool=None):178        """Run the IOR command.179        Args:180            manager (str): mpi job manager command181            processes (int): number of host processes182            intercept (str, optional): path to interception library.183            display_space (bool, optional): Whether to display the pool184                space. Defaults to True.185            plugin_path (str, optional): HDF5 vol connector library path.186                This will enable dfuse (xattr) working directory which is187                needed to run vol connector for DAOS. Default is None.188            fail_on_warning (bool, optional): Controls whether the test189                should fail if a 'WARNING' is found. Default is False.190            pool (TestPool, optional): The pool for which to display space.191                Default is self.pool.192        """193        env = self.ior_cmd.get_default_env(str(manager), self.client_log)194        if intercept:195            env["LD_PRELOAD"] = intercept196        if plugin_path:197            env["HDF5_VOL_CONNECTOR"] = "daos"198            env["HDF5_PLUGIN_PATH"] = str(plugin_path)199            manager.working_dir.value = self.dfuse.mount_dir.value200        manager.assign_hosts(201            self.hostlist_clients, self.workdir, self.hostfile_clients_slots)202        manager.assign_processes(processes)203        manager.assign_environment(env)204        if not pool:205            pool = self.pool206        try:207            if display_space:208                self.display_pool_space(pool)209            out = manager.run()210            if self.subprocess:211                return out212            if fail_on_warning:213                report_warning = self.fail214            else:215                report_warning = self.log.warning216            for line in out.stdout_text.splitlines():217                if 'WARNING' in line:218                    report_warning("IOR command issued warnings.\n")219            return out220        except CommandFailure as error:221            self.log.error("IOR Failed: %s", str(error))222            self.fail("Test was expected to pass but it failed.\n")223        finally:224            if not self.subprocess and display_space:225                self.display_pool_space(pool)226    def stop_ior(self):227        """Stop IOR process.228        Args:229            manager (str): mpi job manager command230        """231        self.log.info("<IOR> Stopping in-progress IOR command: %s",232                      str(self.job_manager))233        try:234            out = self.job_manager.stop()235            return out236        except CommandFailure as error:237            self.log.error("IOR stop Failed: %s", str(error))238            self.fail("Test was expected to pass but it failed.\n")239        finally:240            self.display_pool_space()241    def run_ior_threads_il(self, results, intercept, with_clients,242                           without_clients):243        """Execute 2 IOR threads in parallel. One thread with interception244        library (IL) and one without.245        Args:246            results (dict): Dictionary to store the IOR results that gets247                printed in the IOR output.248            intercept (str): Path to the interception library. Shall be used249                only for POSIX through DFUSE.250            with_clients (list): List of clients that use IL.251            without_clients (list): List of clients that doesn't use IL.252        """253        # We can't use the shared self.ior_cmd, so we need to create the254        # IorCommand object for each thread.255        ior_cmd1 = IorCommand()256        ior_cmd1.get_params(self)257        # Update IOR params with the pool and container params258        ior_cmd1.set_daos_params(259            self.server_group, self.pool, self.container.uuid)260        ior_cmd2 = IorCommand()261        ior_cmd2.get_params(self)262        ior_cmd2.set_daos_params(263            self.server_group, self.pool, self.container.uuid)264        # start dfuse for POSIX api. This is specific to interception library265        # test requirements.266        self.start_dfuse(self.hostlist_clients, self.pool, self.container)267        # Create two threads and run in parallel.268        thread1 = self.create_ior_thread(269            ior_cmd1, with_clients, 1, results, intercept)270        thread2 = self.create_ior_thread(271            ior_cmd2, without_clients, 2, results, None)272        thread1.start()273        thread2.start()274        thread1.join()275        thread2.join()276        self.stop_dfuse()277    def create_ior_thread(self, ior_command, clients, job_num, results,278                          intercept=None):279        """Create a new thread for ior run.280        Args:281            ior_command (IorCommand): IOR command instance.282            clients (list): hosts on which to run ior283            job_num (int): Assigned job number284            results (dict): A dictionary object to store the ior metrics285            intercept (path): Path to interception library286        """287        job = threading.Thread(288            target=self.run_custom_ior_cmd,289            args=[ior_command, clients, results, job_num, intercept])290        return job291    def run_custom_ior_cmd(self, ior_command, clients, results, job_num,292                           intercept=None):293        """Run customized IOR command, not self.ior_cmd.294        Expected to be used with a threaded code where multiple IOR commands are295        executed in parallel.296        Display pool space before running it for a reference.297        Args:298            ior_command (IorCommand): Custom IOR command instance.299            clients (list): hosts on which to run ior300            results (dict): A dictionary object to store the ior metrics301            job_num (int): Assigned job number302            intercept (str, optional): path to interception library. Defaults to303                None.304        """305        self.log.info("--- IOR Thread %d: Start ---", job_num)306        tsize = ior_command.transfer_size.value307        testfile = os.path.join(308            self.dfuse.mount_dir.value, "testfile{}{}".format(tsize, job_num))309        if intercept:310            testfile += "intercept"311        ior_command.test_file.update(testfile)312        # Get the custom job manager that's associated with this thread.313        manager = self.get_ior_job_manager_command(custom_ior_cmd=ior_command)314        procs = (self.processes // len(self.hostlist_clients)) * len(clients)315        env = ior_command.get_default_env(str(manager), self.client_log)316        if intercept:317            env["LD_PRELOAD"] = intercept318        manager.assign_hosts(319            clients, self.workdir, self.hostfile_clients_slots)320        manager.assign_processes(procs)321        manager.assign_environment(env)322        self.display_pool_space()323        self.log.info("--- IOR Thread %d: Starting IOR ---", job_num)324        try:325            ior_output = manager.run()326            results[job_num] = IorCommand.get_ior_metrics(ior_output)327        except CommandFailure as error:328            self.log.error("IOR Failed: %s", str(error))329            self.fail("IOR thread failed!")330        finally:331            self.display_pool_space()332        self.log.info("--- IOR Thread %d: End ---", job_num)333    def verify_pool_size(self, original_pool_info, processes):334        """Validate the pool size.335        Args:336            original_pool_info (PoolInfo): Pool info prior to IOR337            processes (int): number of processes338        """339        # Get the current pool size for comparison340        current_pool_info = self.pool.pool.pool_query()341        # If Transfer size is < 4K, Pool size will verified against NVMe, else342        # it will be checked against SCM343        if self.ior_cmd.transfer_size.value >= 4096:344            self.log.info(345                "Size is > 4K,Size verification will be done with NVMe size")346            storage_index = 1347        else:348            self.log.info(349                "Size is < 4K,Size verification will be done with SCM size")350            storage_index = 0351        actual_pool_size = \352            original_pool_info.pi_space.ps_space.s_free[storage_index] - \353            current_pool_info.pi_space.ps_space.s_free[storage_index]354        expected_pool_size = self.ior_cmd.get_aggregate_total(processes)355        if actual_pool_size < expected_pool_size:356            self.fail(357                "Pool Free Size did not match: actual={}, expected={}".format(358                    actual_pool_size, expected_pool_size))359    def execute_cmd(self, command, fail_on_err=True, display_output=True):360        """Execute cmd using general_utils.pcmd.361        Args:362            command (str): the command to execute on the client hosts363            fail_on_err (bool, optional): whether or not to fail the test if364                command returns a non zero return code. Defaults to True.365            display_output (bool, optional): whether or not to display output.366                Defaults to True.367        Returns:368            dict: a dictionary of return codes keys and accompanying NodeSet369                values indicating which hosts yielded the return code.370        """371        try:372            # Execute the bash command on each client host373            result = self._execute_command(command, fail_on_err, display_output)374        except CommandFailure as error:375            # Report an error if any command fails376            self.log.error("DfuseSparseFile Test Failed: %s", str(error))377            self.fail("Test was expected to pass but it failed.\n")378        return result379    def _execute_command(self, command, fail_on_err=True, display_output=True):380        """Execute the command on all client hosts.381        Optionally verify if the command returns a non zero return code.382        Args:383            command (str): the command to execute on the client hosts384            fail_on_err (bool, optional): whether or not to fail the test if385                command returns a non zero return code. Defaults to True.386            display_output (bool, optional): whether or not to display output.387                Defaults to True.388        Raises:389            CommandFailure: if 'fail_on_err' is set and the command fails on at390                least one of the client hosts391        Returns:392            dict: a dictionary of return codes keys and accompanying NodeSet393                values indicating which hosts yielded the return code.394        """395        result = pcmd(396            self.hostlist_clients, command, verbose=display_output, timeout=300)397        if 0 not in result and fail_on_err:398            hosts = [str(399                nodes) for code, nodes in list(400                    result.items()) if code != 0]401            raise CommandFailure(402                "Error running '{}' on the following hosts: {}".format(403                    command, NodeSet(",".join(hosts))))...ior_small.py
Source:ior_small.py  
1#!/usr/bin/python2"""3(C) Copyright 2018-2021 Intel Corporation.4SPDX-License-Identifier: BSD-2-Clause-Patent5"""6import os7from ior_test_base import IorTestBase8from avocado.core.exceptions import TestFail9from general_utils import get_random_string10class IorSmall(IorTestBase):11    # pylint: disable=too-many-ancestors12    # pylint: disable=too-few-public-methods13    """Test class Description: Runs IOR with 1 server with basic parameters.14    :avocado: recursive15    """16    def test_ior_small(self):17        """Jira ID: DAOS-2715, DAOS-3657, DAOS-4909.18        Test Description:19            Purpose of this test is to have small ior test to check basic20            functionality for DFS, MPIIO and HDF5 api21        Use case:22            Run ior with read, write, CheckWrite, CheckRead in ssf mode.23            Run ior with read, write, CheckWrite, CheckRead in fpp mode.24            Run ior with read, write, CheckWrite and access to random25                offset instead of sequential.26            All above three cases to be run with single client and27                multiple client processes in two separate nodes.28        :avocado: tags=all,pr,daily_regression,hw,large,daosio,iorsmall29        :avocado: tags=DAOS_561030        """31        results = []32        cncl_tickets = []33        dfuse_mount_dir = None34        ior_timeout = self.params.get("ior_timeout", '/run/ior/*')35        flags = self.params.get("ior_flags", '/run/ior/iorflags/*')36        apis = self.params.get("ior_api", '/run/ior/iorflags/*')37        mount_dir = self.params.get("mount_dir", "/run/dfuse/*")38        transfer_block_size = self.params.get("transfer_block_size",39                                              '/run/ior/iorflags/*')40        obj_class = self.params.get("obj_class", '/run/ior/iorflags/*')41        for oclass in obj_class:42            self.ior_cmd.dfs_oclass.update(oclass)43            for api in apis:44                if api == "HDF5-VOL":45                    self.ior_cmd.api.update("HDF5")46                    hdf5_plugin_path = self.params.get(47                        "plugin_path", '/run/hdf5_vol/*')48                    flags_w_k = " ".join([flags[0]] + ["-k"])49                    self.ior_cmd.flags.update(flags_w_k, "ior.flags")50                else:51                    # run tests for different variants52                    self.ior_cmd.flags.update(flags[0], "ior.flags")53                    hdf5_plugin_path = None54                    self.ior_cmd.api.update(api)55                for test in transfer_block_size:56                    # update transfer and block size57                    self.ior_cmd.transfer_size.update(test[0])58                    self.ior_cmd.block_size.update(test[1])59                    # run ior60                    if api == "HDF5-VOL":61                        sub_dir = get_random_string(5)62                        dfuse_mount_dir = os.path.join(mount_dir, sub_dir)63                    try:64                        self.run_ior_with_pool(65                            plugin_path=hdf5_plugin_path, timeout=ior_timeout,66                            mount_dir=dfuse_mount_dir)67                        results.append(["PASS", str(self.ior_cmd)])68                    except TestFail:69                        results.append(["FAIL", str(self.ior_cmd)])70        # Running a variant for ior fpp71        self.ior_cmd.flags.update(flags[1])72        self.ior_cmd.api.update(apis[0])73        self.ior_cmd.block_size.update((transfer_block_size[1])[1])74        self.ior_cmd.transfer_size.update((transfer_block_size[1])[0])75        self.ior_cmd.dfs_oclass.update(obj_class[0])76        # run ior77        try:78            self.run_ior_with_pool(plugin_path=None, timeout=ior_timeout)79            results.append(["PASS", str(self.ior_cmd)])80        except TestFail:81            results.append(["FAIL", str(self.ior_cmd)])82        self.log.error("Summary of IOR small test results:")83        errors = False84        for item in results:85            self.log.info("  %s  %s", item[0], item[1])86            if item[0] == "FAIL":87                errors = True88        if errors:89            self.fail("Test FAILED")90        if cncl_tickets:...extract-ior-args.py
Source:extract-ior-args.py  
1#!/usr/bin/env python32"""Extracts IOR arguments from IOR outputs3Finds IOR output files, extracts the command line used to generate that output4file, then identifies how many unique IOR configurations were used.  Useful5for finding common arguments across the outputs of large parameter sweeps.6"""7import os8import re9import gzip10# ior-scaling-rdma.vast.v3-naive ior+incompress       --stoneWallingWearOut=1 -C -D=45 -F -e -g -k -vv -w11# ior-scaling-rdma.vast.v3-naive ior+incompress       --stoneWallingWearOut=1 -C -D=45 -F -e -g -r -vv12# ior-scaling-rdma.vast.v4       ior+incompress       --stoneWallingWearOut=1 -C -D=45 -F -e -g -k -l=random -r -vv13# ior-scaling-rdma.vast.v4       ior+incompress       --stoneWallingWearOut=1 -C -D=45 -F -e -g -k -l=random -vv -w14# ior-scaling-rdma.vast.v4       ior+incompress       --stoneWallingWearOut=1 -C -D=45 -F -e -g -l=random -r -vv15# randio                         glior-3.3            -C -D=45 -F -e -g -k -r -vv -z16# randio                         glior-3.3            -C -D=45 -F -e -g -vv -w -z17# randio.odirect                 glior-3.3            --posix.odirect -C -D=45 -F -e -g -k -r -vv -z18# randio.odirect                 glior-3.3            --posix.odirect -C -D=45 -F -e -g -vv -w -z19# randio.vs-size                 glior-3.3            -C -D=45 -F -g -r -vv -z20# randio.vs-size                 ior+incompress       -C -D=300 -F -k -l=random -vv -w21DATASET_NAMES = {22    "ior-scaling-rdma.vast.v3-naive": "Bandwidth, Naive",23    "ior-scaling-rdma.vast.v4": "Bandwidth, Aged",24    "randio": "IOPS, Buffered I/O",25    "randio.odirect": "IOPS, Direct I/O",26    "randio.vs-size": "IOPS vs. Size",27}28def decode_command_line(line):29    line = line.split(':', 1)[-1]30    line = re.sub(r' -[btsp]\s+\S+', '', line)31    line = re.sub(r' -O stoneWallingStatusFile=\S+', '', line)32    line = re.sub(r' -O ', ' --', line)33    line = re.sub(r' -l ', ' -l=', line)34    line = re.sub(r' -D ', ' -D=', line)35    line = re.sub(r' -o \S+', '', line)36    line = re.sub(r'/ior-n\S+.out ', ' ', line)37    ior_exe, args = line.split(None, 1)38    # /global/u2/g/glock/src/git/n10/vast-eval/results/ior-scaling-rdma.vast.v4/../../src/ior+incompress/install.cgpu/bin/ior39    ior_exe = re.sub(r'\S+/([^/]+)/([^/]+)/bin/(ior|IOR)', r'\1', ior_exe)40    return ior_exe.strip(), args.strip()41def extract_commands(filename):42    if filename.endswith('.gz'):43        opener = gzip.open44    else:45        opener = open46    found = 047    with opener(filename, 'r') as outfile:48        for line in outfile:49            if isinstance(line, bytes):50                line = line.decode()51            if line.startswith('Command line'):52                found += 153                yield decode_command_line(line)54if __name__ == "__main__":55    ior_cmds = set()56    for dirname, subdirs, files in os.walk(os.getcwd()):57        for fname in files:58            if fname.endswith('.out') or fname.endswith('.out.gz'):59                dataset_name = os.path.basename(dirname)60                for ior_exe, ior_args in extract_commands(os.path.join(dirname, fname)):61                    ior_cmds.add("{} {} {}".format(62                        dataset_name,63                        ior_exe,64                        " ".join(sorted(ior_args.split()))65                    ))66    for ior_cmd in sorted(list(ior_cmds)):67        dataset_name, ior_exe, args = ior_cmd.split(None, 2)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
