Best Python code snippet using autotest_python
ec2_instance_cfg_engine.py
Source:ec2_instance_cfg_engine.py  
...121        log_file.write(json_log)122        log_file.close()123        self.log_object = None124        return125    def set_tunable(self, tunable, cmd,126                    get_tunable, log_index,127                    new_value, orig_value):128        '''129        Core function that actually does the job of setting tunable.130        orig_value will be passed only during recovery phase. orig_value131        param will be NULL during system configuration phase.132        '''133        # This is the action of current function invocation134        action = "{0} = {1}".format(tunable, new_value)135        # Fetch current tunable value in the system136        try:137            current_value = get_tunable(tunable)138        except Ec2AutotuneError, e:139            syslog(e.msg)140            return141        '''142        Current value is different from expected original value, then143        this tunable is being tweaked by system user and should be left144        untouched.145        '''146        if (orig_value is not None and orig_value != current_value):147            return148        # If new value to be set is same as current value, nothing to set149        if (new_value == current_value):150            return151        try:152            # Set new value & log the new value for recovery153            exec_cmds(cmd)154            syslog("set {0}".format(action))155            if (self.recovery_instance is False and current_value is not None):156                assert(orig_value is None)157                # Configuring system: log system changes158                tmp_log_object = {"Name": tunable,159                                  "Original": current_value,160                                  "Changed": new_value}161                self.log_object[log_index].append(tmp_log_object)162        except Ec2AutotuneError, e:163            syslog(e.msg)164        return165    def safe_set_tunable(self, tunable, data,166                         get_tunable, log_index,167                         new_value, orig_value):168        '''169        This function has the same functionality as set_tunable.170        This version uses python's internal write library call171        to set the tunable instead of forking to shell. We should172        deprecate calling set_tunable as much as possible in future173        to avoid forking to shell.174        '''175        '''176        Core function that actually does the job of setting tunable.177        orig_value will be passed only during recovery phase. orig_value178        param will be NULL during system configuration phase.179        '''180        # This is the action of current function invocation181        action = "{0} = {1}".format(tunable, new_value)182        # Fetch current tunable value in the system183        try:184            current_value = get_tunable(tunable)185        except Ec2AutotuneError, e:186            syslog(e.msg)187            return188        '''189        Current value is different from expected original value, then190        this tunable is being tweaked by system user and should be left191        untouched.192        '''193        if (orig_value is not None and orig_value != current_value):194            return195        # If new value to be set is same as current value, nothing to set196        if (new_value == current_value):197            return198        try:199            # Set new value & log the new value for recovery200            write_sysfs_file(tunable, data)201            syslog("set {0}".format(action))202            if (self.recovery_instance is False and current_value is not None):203                assert(orig_value is None)204                # Configuring system: log system changes205                tmp_log_object = {"Name": tunable,206                                  "Original": current_value,207                                  "Changed": new_value}208                self.log_object[log_index].append(tmp_log_object)209        except Ec2AutotuneError, e:210            syslog(e.msg)211        return212    def parse_tunable_output(self, output):213        '''214        Tunables values are in four formats (for tunables we are215        working):216        1: number217        2: [foo] bar218        3: foo219        4: number1\tnumber2\tnumber2220        Second format above is tricky as the value set is the one enclosed221        within []. In cases like these, we need to extract the string222        enclosed between [] and return it to caller.223        For all format, convert the output into a list and return224        '''225        if (output is None or len(output) == 0):226            return None227        output = output.strip()228        start = output.find("[")229        if (start != -1):230            end = output.find("]")231            if (end == -1):  # Should never happen232                raise Ec2AutotuneError(233                        "Parsing error of {0}".format(output))234            output = output[start+1:end]235        output = output.split()236        if (output[0].isdigit() is True):237            output = map(int, output)238        return output239    def convert_input_value(self, value):240        '''241        value will be a list, return string representation242        '''243        if (isinstance(value, list) is False):244            raise Ec2AutotuneError(245                     "input value is in an invalid format.")246        length = len(value)247        value = " ".join(map(str, value))248        # Embed multiple values inside strings249        if (length > 1):250            value = "\"{0}\"".format(value)251        return value252    def get_service_state(self, service):253        '''254        Returns the current status of service in the system255        '''256        try:257            output = get_piped_cmd_output(258                    "/bin/systemctl status {0}".format(service),259                    "/bin/grep Active:")260            if ("running" in output):261                return (["start"])262            elif ("dead" in output):263                return (["stop"])264            else:265                raise Ec2AutotuneError(266                        "{0} package not installed.".format(service))267        except Ec2AutotuneError, e:268            raise e269    def set_service_state(self, service, new_state, orig_state=None):270        '''271        Set the passed in service state.272        '''273        try:274            tmp_new_state = self.convert_input_value(new_state)275        except Ec2AutotuneError, e:276            syslog("Failed to set {0} = {1}".format(service, e.msg))277            return278        cmd = ["/bin/systemctl {0} {1}".format(tmp_new_state, service)]279        return self.set_tunable(service, cmd,280                                self.get_service_state, SERVICE,281                                new_state, orig_state)282    def get_sysctl_value(self, sysctl_setting):283        '''284        Get value of a particular kernel setting285        '''286        try:287            output = get_cmd_output("/sbin/sysctl {0}".format(sysctl_setting))288            if (len(output) == 0):289                return None290            output = output.split("=")[1]291            return self.parse_tunable_output(output)292        except Ec2AutotuneError, e:293            raise e294    def set_sysctl_value(self, sysctl_setting, new_value, orig_value=None):295        '''296        Set value of a particular kernel setting.297        '''298        # If the value to be configured is also being modified as part of299        # /etc/sysctl.d/*.conf, then this is a conflict.300        for ln in self.sysctl_conf:301            if (sysctl_setting in ln):302                syslog("Skipping {0} as it conflicts with "303                       "/etc/sysctl.d framework.".format(sysctl_setting))304                return305        try:306            tmp_new_value = self.convert_input_value(new_value)307        except Ec2AutotuneError, e:308            syslog("Failed to set {0} = {1}".format(sysctl_setting, e.msg))309            return310        cmd = ["/sbin/sysctl -q -w {0}={1}".format(sysctl_setting,311                                                   tmp_new_value)]312        return self.set_tunable(sysctl_setting, cmd,313                                self.get_sysctl_value, SYSCTL,314                                new_value, orig_value)315    def get_sysfs_value(self, sysfs_file):316        '''317        Get value of a particular sysfs setting318        '''319        try:320            output = read_sysfs_file(sysfs_file)321            if (len(output) == 0):322                return None323            return self.parse_tunable_output(output)324        except Ec2AutotuneError, e:325            raise e326    def set_sysfs_value(self, sysfs_file, new_value, orig_value=None):327        '''328        Set value of a particular sysfs setting.329        orig_value will be passed only during recovery phase. orig_value330        param will be NULL during system configuring phase.331        '''332        if (os.path.isfile(sysfs_file) is False):333            syslog("invalid sysfs_file={0}".format(sysfs_file))334            return335        try:336            tmp_new_value = self.convert_input_value(new_value)337        except Ec2AutotuneError, e:338            syslog("Failed to set {0} = {1}".format(sysfs_file, e.msg))339            return340        return self.safe_set_tunable(sysfs_file, tmp_new_value,341                                     self.get_sysfs_value, SYSFS,342                                     new_value, orig_value)343    def get_cpu_value(self, cpu_state):344        '''345        Get value of a particular CPU state346        '''347        try:348            retcode = 0349            output = None350            # Query frequency governor351            if (cpu_state == "p-state"):352                # Work only with intel_pstate driver353                driver = get_piped_cmd_output(354                             "/bin/cpupower frequency-info --driver",355                             "/bin/grep \"driver: intel_pstate\"")356                if(len(driver) > 0):357                    # Return current governor being used358                    output = get_piped_cmd_output(359                                 "/bin/cpupower frequency-info",360                                 "/bin/grep \"The governor\"")361                    start = output.find("\"")362                    if (start == -1):363                        raise Ec2AutotuneError(364                                    "Parsing error of current "365                                    "frequency-info governor")366                    end = output.find("\"", start+1)367                    if (end == -1):368                        raise Ec2AutotuneError(369                                    "Parsing error of current "370                                    "frequency-info governor")371                    return ([output[start+1:end]])372            # Query CPU idle state373            elif (cpu_state == "c-state"):374                # Work only with intel_idle driver375                driver = get_piped_cmd_output(376                             "/bin/cpupower idle-info --silent",377                             "/bin/grep \"driver: intel_idle\"")378                if(len(driver) > 0):379                    # Number of idle states380                    output = get_piped_cmd_output(381                                 "/bin/cpupower idle-info",382                                 "/bin/grep \"Number of idle states:\"")383                    max_states = int(output[(output.index("states: ") + 8):])384                    # Available idle states385                    idle_states = []386                    output = get_piped_cmd_output(387                                 "/bin/cpupower idle-info",388                                 "/bin/grep \"Available idle states:\"")389                    beg = 0390                    end = len(output)391                    for state in range(max_states):392                        try:393                            idx = output.rindex(" ", beg, end) + 1394                        except:395                            raise Ec2AutotuneError(396                                      "Parsing error of available idle states")397                        if (idx == -1):398                            raise Ec2AutotuneError(399                                    "Parsing error of available idle states")400                        idle_states.append(output[idx:end])401                        end = idx - 1402                    idle_states.reverse()403                    # Return deepest enabled state404                    output = get_piped_cmd_output(405                                "/bin/cpupower idle-info",406                                "/bin/grep DISABLED")407                    if (len(output) == 0):408                        # No state is disabled, return deepest state409                        return ([idle_states[max_states - 1]])410                    else:411                        index = output.index(" ")412                        output = output[:index]413                        return ([idle_states[idle_states.index(output) - 1]])414            else:415                # State should always be either p-state or c-state416                raise Ec2AutotuneError(417                    "Invalid {0} state".format(cpu_state))418        except Ec2AutotuneError, e:419            raise e420    def set_cpu_value(self, cpu_state, new_value, orig_value=None):421        '''422        Set value of a particular CPU state.423        orig_value will be passed only during recovery phase. orig_value424        param will be NULL during system configuring phase.425        We set this only on systems which has intel drivers (no acpi driver).426        '''427        try:428            tmp_new_value = self.convert_input_value(new_value)429        except Ec2AutotuneError, e:430            syslog("Failed to set {0} = {1}".format(cpu_state, e.msg))431            return432        if (cpu_state == "p-state"):433            '''434            P-state: value has to be one of the supported governors435            by intel_pstate driver.436            '''437            cmd = ["/bin/cpupower frequency-set -g {0}".format(tmp_new_value)]438        elif (cpu_state == "c-state"):439            '''440            C-state: value has to be one of the supported idle441            states by intel_idle driver442            '''443            idle_states = {  # state : latency444                            "POLL": 0,445                            "C1": 2,446                            "C1E": 10,447                            "C3": 40,448                            "C6": 133}449            if (not(tmp_new_value in idle_states)):450                raise Ec2AutotuneError(451                            "Invalid value for c-state = "452                            "{0}".format(tmp_new_value))453            cmd = (["/bin/cpupower idle-set --enable-all",454                   "/bin/cpupower idle-set --disable-by-latency {0}"455                    .format(str(idle_states[tmp_new_value] + 1))])456        else:457            raise Ec2AutotuneError(458                        "Invalid CPU state {0}".format(cpu_state))459        return self.set_tunable(cpu_state, cmd,460                                self.get_cpu_value, CPU,461                                new_value, orig_value)462    def configure_system(self, section, configure, dry_run=False):463        if (dry_run is True):464            fetch_configuration(self.auto_profile,465                                self.user_profile,466                                section,467                                None,468                                None,469                                None,470                                configure)471        else:472            fetch_configuration(self.auto_profile,473                                self.user_profile,...fsdev_disks.py
Source:fsdev_disks.py  
...397        """398        for dx in range(len(disks)):399            disk = disks[dx]['tunable']400            # Set the scheduler first before setting any other tunables401            self.set_tunable(disk, "scheduler",402                                   self.tune_loc["scheduler"],403                                   "cfq")404            # Now set all the tunable parameters we've been given405            for tune_desc in self.tune_list:406                self.set_tunable(disk, tune_desc[0],407                                 tune_desc[1],408                                 tune_desc[2])409    def set_tunable(self, disk, name, path, val):410        """411        Given a disk name, a path to a tunable value under _TUNE_PATH and the412        new value for the parameter, set the value and verify that the value413        has been successfully set.414        """415        fpath = partition.get_iosched_path(disk, path)416        # Things might go wrong so we'll catch exceptions417        try:418            step = "open tunable path"419            tunef = open(fpath, 'w', buffering=-1)420            step = "write new tunable value"421            tunef.write(val)422            step = "close the tunable path"423            tunef.close()424            step = "read back new tunable value"425            nval = open(fpath, 'r', buffering=-1).read().strip()426            # For 'scheduler' we need to fish out the bracketed value427            if name == "scheduler":428                nval = re.match(".*\[(.*)\].*", nval).group(1)429        except IOError, info:430            # Special case: for some reason 'max_sectors_kb' often doesn't work431            # with large values; try '128' if we haven't tried it already.432            if name == "max_sectors_kb" and info.errno == 22 and val != '128':433                self.set_tunable(disk, name, path, '128')434                return435            # Something went wrong, probably a 'config' problem of some kind436            raise Exception("Unable to set tunable value '" + name +437                            "' at step '" + step + "': " + str(info))438        except Exception:439            # We should only ever see 'IOError' above, but just in case ...440            raise Exception("Unable to set tunable value for " + name)441        # Make sure the new value is what we expected442        if nval != val:443            raise Exception("Unable to correctly set tunable value for " +444                            name + ": desired " + val + ", but found " + nval)...base.py
Source:base.py  
1"""2Base methods for the tunable tools3"""4# pylint: disable=C0303,C03305__all__ = [6    "sample",7]8import operator9import random10from functools import reduce11from itertools import product12from tabulate import tabulate13from ..finalize import finalize14class Sampler:15    "Base class for sampling values of a tunable object"16    def __init__(17        self,18        tunable,19        variables=None,20        n_samples=None,21        callback=None,22        callback_calls=False,23        label=None,24        **kwargs,25    ):26        "Initializes the tunable object and the variables"27        self.tunable = finalize(tunable).copy()28        self.compute_kwargs = kwargs29        if callback:30            self.callback = callback31        self.callback_calls = callback_calls32        if label:33            self.label = label34        if variables:35            self.variables = tuple(36                str(tunable.get_variable(var).key) for var in variables37            )38            set_vars = set(self.variables)39            set_tunable = set(self.tunable.tunable_variables)40            if not set_vars <= set_tunable:41                raise ValueError(42                    f"Variable(s) {set_vars-set_tunable} have been fixed and cannot be tuned"43                )44            # Fixes the tunable variable not involved45            for var in set_tunable.difference(set_vars):46                self.tunable.fix(var)47        else:48            self.variables = self.tunable.tunable_variables49        if n_samples:50            self.n_samples = n_samples51    @property52    def max_samples(self):53        "Size of the parameter space (product of variables' size)"54        lens = tuple(self.tunable[var].size for var in self.variables)55        return reduce(operator.mul, lens)56    @property57    def n_samples(self):58        "Number of samples"59        return getattr(self, "_n_samples", self.max_samples)60    @n_samples.setter61    def n_samples(self, value):62        if not (isinstance(value, int) and value > 0):63            raise ValueError("n_samples must be a positive integer")64        self._n_samples = min(value, self.max_samples)65    def __len__(self):66        self.n_samples67    @property68    def samples(self):69        "Samples of the parameters space"70        iters = tuple(self.tunable[var].values for var in self.variables)71        values = product(*iters)72        if self.n_samples >= self.max_samples:73            return tuple(values)74        idxs = sorted(random.sample(range(self.max_samples), self.n_samples))75        return tuple(76            val for idx, val in filter(lambda _: _[0] in idxs, enumerate(values))77        )78    def sample_values(self):79        "Returns the sampled values"80        return list(self)81    @property82    def callback(self):83        return getattr(self, "_callback", lambda _: _)84    @callback.setter85    def callback(self, value):86        "Function to be called on the result"87        if not callable(value):88            raise TypeError("callback must be a callable")89        self._callback = value90    def __iter__(self):91        for params in self.samples:92            tmp = self.tunable.copy()93            for var, val in zip(self.variables, params):94                tmp.fix(var, val)95            try:96                if self.callback_calls:97                    result = self.callback(lambda: tmp.compute(**self.compute_kwargs))98                else:99                    result = self.callback(tmp.compute(**self.compute_kwargs))100            except Exception as err:101                result = err102            yield params, result103    @property104    def label(self):105        "Label used for the result"106        return getattr(self, "_label", self.tunable.label)107    @label.setter108    def label(self, value):109        self._label = str(value)110    @property111    def headers(self):112        "Headers for the values returned by the sampler"113        return tuple(self.tunable[var].label for var in self.variables) + (self.label,)114    def tabulate(self, **kwargs):115        "Returns a table of the values"116        kwargs.setdefault("headers", self.headers)117        return tabulate((params + (repr(result),) for params, result in self), **kwargs)118    def _repr_html_(self):119        return self.tabulate(tablefmt="html")120def sample(tunable, *variables, samples=100, **kwargs):121    """122    Samples the value of the tunable object123    Parameters124    ----------125    variables: list of str126        Set of variables to sample.127    samples: int128        The number of samples to run. If None, all the combinations are sampled.129    kwargs: dict130        Variables passed to the compute function. See help(tunable.compute)131    """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
