Best Python code snippet using lisa_python
schema.py
Source:schema.py  
1# Copyright (c) Microsoft Corporation.2# Licensed under the MIT license.3import copy4from dataclasses import dataclass, field5from enum import Enum6from functools import partial7from pathlib import Path8from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast9from dataclasses_json import (10    CatchAll,11    DataClassJsonMixin,12    Undefined,13    config,14    dataclass_json,15)16from marshmallow import ValidationError, fields, validate17from lisa import search_space18from lisa.secret import PATTERN_HEADTAIL, add_secret19from lisa.util import (20    BaseClassMixin,21    LisaException,22    constants,23    deep_update_dict,24    field_metadata,25    strip_strs,26)27"""28Schema is dealt with three components,291. dataclasses. It's a builtin class, uses to define schema of an instance. field()30   function uses to describe a field.312. dataclasses_json. Serializer. config() function customizes this component.323. marshmallow. Validator. It's wrapped by dataclasses_json. config(mm_field=xxx)33   function customizes this component.34"""35T = TypeVar("T")36class ListableValidator(validate.Validator):37    default_message = ""38    def __init__(39        self,40        value_type: type,41        value_validator: Optional[42            Union[validate.Validator, List[validate.Validator]]43        ] = None,44        error: str = "",45    ) -> None:46        self._value_type: Any = value_type47        if value_validator is None:48            self._inner_validator: List[validate.Validator] = []49        elif callable(value_validator):50            self._inner_validator = [value_validator]51        elif isinstance(value_validator, list):52            self._inner_validator = list(value_validator)53        else:54            raise ValueError(55                "The 'value_validator' parameter must be a callable "56                "or a collection of callables."57            )58        self.error: str = error or self.default_message59    def _repr_args(self) -> str:60        return f"_inner_validator={self._inner_validator}"61    def _format_error(self, value: Any) -> str:62        return self.error.format(input=value)63    def __call__(self, value: Any) -> Any:64        if isinstance(value, self._value_type):65            if self._inner_validator:66                for validator in self._inner_validator:67                    validator(value)68        elif isinstance(value, list):69            for value_item in value:70                assert isinstance(value_item, self._value_type), (71                    f"must be '{self._value_type}' but '{value_item}' "72                    f"is '{type(value_item)}'"73                )74                if self._inner_validator:75                    for validator in self._inner_validator:76                        validator(value_item)77        elif value is not None:78            raise ValidationError(79                f"must be Union[{self._value_type}, List[{self._value_type}]], "80                f"but '{value}' is '{type(value)}'"81            )82        return value83@dataclass_json(undefined=Undefined.INCLUDE)84@dataclass85class ExtendableSchemaMixin:86    extended_schemas: CatchAll = field(default_factory=dict)  # type: ignore87    def get_extended_runbook(self, runbook_type: Type[T], type_name: str = "") -> T:88        """89        runbook_type: type of runbook90        field_name: the field name which stores the data, if it's "", get it from type91        """92        if not hasattr(self, "_extended_runbook"):93            type_name = self.__resolve_type_name(94                runbook_type=runbook_type, type_name=type_name95            )96            if self.extended_schemas and type_name in self.extended_schemas:97                self._extended_runbook: T = load_by_type(98                    runbook_type, self.extended_schemas[type_name]99                )100            else:101                # value may be filled outside, so hold and return an object.102                self._extended_runbook = runbook_type()103            # if there is any extra key, raise exception to help user find it earlier.104            if self.extended_schemas and len(self.extended_schemas) > 0:105                expected_extra_count = 0106                if type_name in self.extended_schemas:107                    expected_extra_count = 1108                if len(self.extended_schemas) > expected_extra_count:109                    extra_names = [110                        name for name in self.extended_schemas if name != type_name111                    ]112                    raise LisaException(113                        f"unknown keys in extendable schema [{runbook_type.__name__}]: "114                        f"{extra_names}"115                    )116        return self._extended_runbook117    def set_extended_runbook(self, runbook: Any, type_name: str = "") -> None:118        self._extended_runbook = runbook119        if self.extended_schemas and type_name in self.extended_schemas:120            # save extended runbook back to raw dict121            self.extended_schemas[type_name] = runbook.to_dict()122    def __resolve_type_name(self, runbook_type: Type[Any], type_name: str) -> str:123        assert issubclass(124            runbook_type, DataClassJsonMixin125        ), "runbook_type must annotate from DataClassJsonMixin"126        if not type_name:127            assert hasattr(self, constants.TYPE), (128                f"cannot find type attr on '{runbook_type.__name__}'."129                f"either set field_name or make sure type attr exists."130            )131            type_name = getattr(self, constants.TYPE)132        return type_name133    def __repr__(self) -> str:134        result = ""135        if hasattr(self, "_extended_runbook"):136            result = f"ext:{self._extended_runbook}"137        elif self.extended_schemas:138            result = f"ext:{self.extended_schemas}"139        return result140    def __hash__(self) -> int:141        return super().__hash__()142@dataclass_json()143@dataclass144class TypedSchema:145    type: str = field(default="", metadata=field_metadata(required=True))146    def __hash__(self) -> int:147        return super().__hash__()148@dataclass_json()149@dataclass150class Transformer(TypedSchema, ExtendableSchemaMixin):151    # the name can be referenced by other transformers. If it's not specified,152    # the type will be used.153    name: str = ""154    # prefix of generated variables. if it's not specified, the name will be155    # used. For example, a variable called "a" with the prefix "b", so the156    # variable name will be "b_a" in the variable dict157    prefix: str = ""158    # specify which transformers are depended.159    depends_on: List[str] = field(default_factory=list)160    # rename some of variables for easier use.161    rename: Dict[str, str] = field(default_factory=dict)162    # enable this transformer or not, only enabled transformers run actually.163    enabled: bool = True164    # decide when the transformer run. The init means run at very beginning165    # phase, which is before the combinator. The expanded means run after166    # combinator expanded variables.167    phase: str = field(168        default=constants.TRANSFORMER_PHASE_INIT,169        metadata=field_metadata(170            validate=validate.OneOf(171                [172                    constants.TRANSFORMER_PHASE_INIT,173                    constants.TRANSFORMER_PHASE_EXPANDED,174                    constants.TRANSFORMER_PHASE_CLEANUP,175                ]176            ),177        ),178    )179    def __post_init__(self, *args: Any, **kwargs: Any) -> None:180        if not self.name:181            self.name = self.type182        if not self.prefix:183            self.prefix = self.name184@dataclass_json()185@dataclass186class Combinator(TypedSchema, ExtendableSchemaMixin):187    type: str = field(188        default=constants.COMBINATOR_GRID, metadata=field_metadata(required=True)189    )190@dataclass_json()191@dataclass192class Strategy:193    """194    node_path is the path of yaml node. For example:195        environment.nodes196    if node_path doesn't present, it means to all.197    operations include:198    overwrite: default behavior. add non-exist items and replace exist.199    remove: remove specified path totally.200    add: add non-exist, not replace exist.201    """202    node_path: str = field(default="", metadata=field_metadata(required=True))203    operation: str = field(204        default=constants.OPERATION_OVERWRITE,205        metadata=field_metadata(206            required=True,207            validate=validate.OneOf(208                [209                    constants.OPERATION_ADD,210                    constants.OPERATION_OVERWRITE,211                    constants.OPERATION_REMOVE,212                ]213            ),214        ),215    )216@dataclass_json()217@dataclass218class Include:219    """220    Inclusion of runbook logic, for similar runs.221    """222    path: str = field(default="", metadata=field_metadata(required=True))223    strategy: Union[List[Strategy], Strategy, None] = None224@dataclass_json()225@dataclass226class Extension:227    path: str228    name: Optional[str] = None229    @classmethod230    def from_raw(cls, raw_data: Any) -> List["Extension"]:231        results: List[Extension] = []232        assert isinstance(raw_data, list), f"actual: {type(raw_data)}"233        for extension in raw_data:234            # convert to structured Extension235            if isinstance(extension, str):236                extension = Extension(path=extension)237            elif isinstance(extension, dict):238                extension = load_by_type(Extension, extension)239            results.append(extension)240        return results241@dataclass_json()242@dataclass243class Variable:244    """245    it uses to support variables in other fields.246    duplicate items will be overwritten one by one.247    if a variable is not defined here, LISA can fail earlier to ask check it.248    file path is relative to LISA command starts.249    """250    # If it's secret, it will be removed from log and other output information.251    # secret files also need to be removed after test252    # it's not recommended highly to put secret in runbook directly.253    is_secret: bool = False254    # continue to support v2 format. it's simple.255    file: str = field(256        default="",257        metadata=field_metadata(258            validate=validate.Regexp(r"([\w\W]+[.](xml|yml|yaml)$)|(^$)")259        ),260    )261    name: str = field(default="")262    value: Union[str, bool, int, Dict[Any, Any], List[Any], None] = field(default="")263    # True means this variable can be used in test cases.264    is_case_visible: bool = False265    mask: str = ""266    def __post_init__(self, *args: Any, **kwargs: Any) -> None:267        if self.file and (self.name or self.value):268            raise LisaException(269                f"file cannot be specified with name or value"270                f"file: '{self.file}'"271                f"name: '{self.name}'"272                f"value: '{self.value}'"273            )274@dataclass_json()275@dataclass276class Notifier(TypedSchema, ExtendableSchemaMixin):277    """278    it sends test progress and results to any place wanted.279    detail types are defined in notifier itself, allowed items are handled in code.280    """281    # A notifier is disabled, if it's false. It helps to disable notifier by282    # variables.283    enabled: bool = True284@dataclass_json()285@dataclass()286class FeatureSettings(287    search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin288):289    """290    It's the default feature setting. It's used by features without settings,291    and it's the base class of specified settings.292    """293    def __eq__(self, o: object) -> bool:294        assert isinstance(o, FeatureSettings), f"actual: {type(o)}"295        return self.type == o.type296    def __repr__(self) -> str:297        return self.type298    def __hash__(self) -> int:299        return hash(self._get_key())300    @staticmethod301    def create(302        type: str, extended_schemas: Optional[Dict[Any, Any]] = None303    ) -> "FeatureSettings":304        # If a feature has no setting, it will return the default settings.305        if extended_schemas:306            feature = FeatureSettings(type=type, extended_schemas=extended_schemas)307        else:308            feature = FeatureSettings(type=type)309        return feature310    def check(self, capability: Any) -> search_space.ResultReason:311        assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"312        # default FeatureSetting is a place holder, nothing to do.313        result = search_space.ResultReason()314        if self.type != capability.type:315            result.add_reason(316                f"settings are different, "317                f"requirement: {self.type}, capability: {capability.type}"318            )319        return result320    def _get_key(self) -> str:321        return self.type322    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:323        assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"324        # default FeatureSetting is a place holder, nothing to do.325        value = FeatureSettings.create(self.type)326        # try best to intersect the extended schemas327        if method_name == search_space.RequirementMethod.intersect:328            if self.extended_schemas and capability and capability.extended_schemas:329                value.extended_schemas = deep_update_dict(330                    self.extended_schemas,331                    capability.extended_schemas,332                )333            else:334                value.extended_schemas = (335                    capability.extended_schemas336                    if capability and capability.extended_schemas337                    else self.extended_schemas338                )339        return value340class DiskType(str, Enum):341    PremiumSSDLRS = "PremiumSSDLRS"342    Ephemeral = "Ephemeral"343    StandardHDDLRS = "StandardHDDLRS"344    StandardSSDLRS = "StandardSSDLRS"345# disk types are ordered by commonly and cost. The earlier is lower cost.346disk_type_priority: List[DiskType] = [347    DiskType.StandardHDDLRS,348    DiskType.StandardSSDLRS,349    DiskType.Ephemeral,350    DiskType.PremiumSSDLRS,351]352@dataclass_json()353@dataclass()354class DiskOptionSettings(FeatureSettings):355    type: str = constants.FEATURE_DISK356    disk_type: Optional[357        Union[search_space.SetSpace[DiskType], DiskType]358    ] = field(  # type:ignore359        default_factory=partial(360            search_space.SetSpace,361            items=[362                DiskType.StandardHDDLRS,363                DiskType.StandardSSDLRS,364                DiskType.Ephemeral,365                DiskType.PremiumSSDLRS,366            ],367        ),368        metadata=field_metadata(369            decoder=partial(search_space.decode_set_space_by_type, base_type=DiskType)370        ),371    )372    data_disk_count: search_space.CountSpace = field(373        default_factory=partial(search_space.IntRange, min=0),374        metadata=field_metadata(decoder=search_space.decode_count_space),375    )376    data_disk_caching_type: str = field(377        default=constants.DATADISK_CACHING_TYPE_NONE,378        metadata=field_metadata(379            validate=validate.OneOf(380                [381                    constants.DATADISK_CACHING_TYPE_NONE,382                    constants.DATADISK_CACHING_TYPE_READONLY,383                    constants.DATADISK_CACHING_TYPE_READYWRITE,384                ]385            ),386        ),387    )388    data_disk_iops: search_space.CountSpace = field(389        default_factory=partial(search_space.IntRange, min=0),390        metadata=field_metadata(391            allow_none=True, decoder=search_space.decode_count_space392        ),393    )394    data_disk_size: search_space.CountSpace = field(395        default_factory=partial(search_space.IntRange, min=0),396        metadata=field_metadata(397            allow_none=True, decoder=search_space.decode_count_space398        ),399    )400    max_data_disk_count: search_space.CountSpace = field(401        default=None,402        metadata=field_metadata(403            allow_none=True, decoder=search_space.decode_count_space404        ),405    )406    def __eq__(self, o: object) -> bool:407        assert isinstance(o, DiskOptionSettings), f"actual: {type(o)}"408        return (409            self.type == o.type410            and self.disk_type == o.disk_type411            and self.data_disk_count == o.data_disk_count412            and self.data_disk_caching_type == o.data_disk_caching_type413            and self.data_disk_iops == o.data_disk_iops414            and self.data_disk_size == o.data_disk_size415            and self.max_data_disk_count == o.max_data_disk_count416        )417    def __repr__(self) -> str:418        return (419            f"disk_type: {self.disk_type},"420            f"count: {self.data_disk_count},"421            f"caching: {self.data_disk_caching_type},"422            f"iops: {self.data_disk_iops},"423            f"size: {self.data_disk_size},"424            f"max_data_disk_count: {self.max_data_disk_count}"425        )426    def __str__(self) -> str:427        return self.__repr__()428    def __hash__(self) -> int:429        return super().__hash__()430    def check(self, capability: Any) -> search_space.ResultReason:431        result = super().check(capability)432        result.merge(433            search_space.check_countspace(434                self.data_disk_count, capability.data_disk_count435            ),436            "data_disk_count",437        )438        result.merge(439            search_space.check_countspace(440                self.max_data_disk_count, capability.max_data_disk_count441            ),442            "max_data_disk_count",443        )444        result.merge(445            search_space.check_countspace(446                self.data_disk_iops, capability.data_disk_iops447            ),448            "data_disk_iops",449        )450        return result451    def _get_key(self) -> str:452        return (453            f"{super()._get_key()}/{self.disk_type}/"454            f"{self.data_disk_count}/{self.data_disk_caching_type}/"455            f"{self.data_disk_iops}/{self.data_disk_size}"456        )457    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:458        assert isinstance(capability, DiskOptionSettings), f"actual: {type(capability)}"459        parent_value = super()._call_requirement_method(method_name, capability)460        # convert parent type to child type461        value = DiskOptionSettings()462        value.extended_schemas = parent_value.extended_schemas463        search_space_countspace_method = getattr(464            search_space, f"{method_name}_countspace"465        )466        if self.disk_type or capability.disk_type:467            value.disk_type = getattr(468                search_space, f"{method_name}_setspace_by_priority"469            )(self.disk_type, capability.disk_type, disk_type_priority)470        if self.data_disk_count or capability.data_disk_count:471            value.data_disk_count = search_space_countspace_method(472                self.data_disk_count, capability.data_disk_count473            )474        if self.data_disk_iops or capability.data_disk_iops:475            value.data_disk_iops = search_space_countspace_method(476                self.data_disk_iops, capability.data_disk_iops477            )478        if self.data_disk_size or capability.data_disk_size:479            value.data_disk_size = search_space_countspace_method(480                self.data_disk_size, capability.data_disk_size481            )482        if self.data_disk_caching_type or capability.data_disk_caching_type:483            value.data_disk_caching_type = (484                self.data_disk_caching_type or capability.data_disk_caching_type485            )486        if self.max_data_disk_count or capability.max_data_disk_count:487            value.max_data_disk_count = search_space_countspace_method(488                self.max_data_disk_count, capability.max_data_disk_count489            )490        return value491class NetworkDataPath(str, Enum):492    Synthetic = "Synthetic"493    Sriov = "Sriov"494_network_data_path_priority: List[NetworkDataPath] = [495    NetworkDataPath.Sriov,496    NetworkDataPath.Synthetic,497]498@dataclass_json()499@dataclass()500class NetworkInterfaceOptionSettings(FeatureSettings):501    type: str = "NetworkInterface"502    data_path: Optional[503        Union[search_space.SetSpace[NetworkDataPath], NetworkDataPath]504    ] = field(  # type: ignore505        default_factory=partial(506            search_space.SetSpace,507            items=[508                NetworkDataPath.Synthetic,509                NetworkDataPath.Sriov,510            ],511        ),512        metadata=field_metadata(513            decoder=partial(514                search_space.decode_set_space_by_type, base_type=NetworkDataPath515            )516        ),517    )518    # nic_count is used for specifying associated nic count during provisioning vm519    nic_count: search_space.CountSpace = field(520        default_factory=partial(search_space.IntRange, min=1),521        metadata=field_metadata(decoder=search_space.decode_count_space),522    )523    # max_nic_count is used for getting the size max nic capability, it can be used to524    #  check how many nics the vm can be associated after provisioning525    max_nic_count: search_space.CountSpace = field(526        default_factory=partial(search_space.IntRange, min=1),527        metadata=field_metadata(528            allow_none=True, decoder=search_space.decode_count_space529        ),530    )531    def __eq__(self, o: object) -> bool:532        assert isinstance(o, NetworkInterfaceOptionSettings), f"actual: {type(o)}"533        return (534            self.type == o.type535            and self.data_path == o.data_path536            and self.nic_count == o.nic_count537            and self.max_nic_count == o.max_nic_count538        )539    def __repr__(self) -> str:540        return (541            f"data_path:{self.data_path}, nic_count:{self.nic_count},"542            f" max_nic_count:{self.max_nic_count}"543        )544    def __str__(self) -> str:545        return self.__repr__()546    def __hash__(self) -> int:547        return super().__hash__()548    def _get_key(self) -> str:549        return (550            f"{super()._get_key()}/{self.data_path}/{self.nic_count}"551            f"/{self.max_nic_count}"552        )553    def check(self, capability: Any) -> search_space.ResultReason:554        assert isinstance(555            capability, NetworkInterfaceOptionSettings556        ), f"actual: {type(capability)}"557        result = super().check(capability)558        result.merge(559            search_space.check_countspace(self.nic_count, capability.nic_count),560            "nic_count",561        )562        result.merge(563            search_space.check_setspace(self.data_path, capability.data_path),564            "data_path",565        )566        result.merge(567            search_space.check_countspace(self.max_nic_count, capability.max_nic_count),568            "max_nic_count",569        )570        return result571    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:572        assert isinstance(573            capability, NetworkInterfaceOptionSettings574        ), f"actual: {type(capability)}"575        parent_value = super()._call_requirement_method(method_name, capability)576        # convert parent type to child type577        value = NetworkInterfaceOptionSettings()578        value.extended_schemas = parent_value.extended_schemas579        value.max_nic_count = getattr(search_space, f"{method_name}_countspace")(580            self.max_nic_count, capability.max_nic_count581        )582        if self.nic_count or capability.nic_count:583            value.nic_count = getattr(search_space, f"{method_name}_countspace")(584                self.nic_count, capability.nic_count585            )586        else:587            raise LisaException("nic_count cannot be zero")588        value.data_path = getattr(search_space, f"{method_name}_setspace_by_priority")(589            self.data_path, capability.data_path, _network_data_path_priority590        )591        return value592@dataclass_json()593@dataclass()594class FeaturesSpace(595    search_space.SetSpace[Union[str, FeatureSettings]],596):597    def __post_init__(self, *args: Any, **kwargs: Any) -> None:598        if self.items:599            for index, item in enumerate(self.items):600                if isinstance(item, dict):601                    item = load_by_type(FeatureSettings, item)602                    self.items[index] = item603@dataclass_json()604@dataclass()605class NodeSpace(search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin):606    type: str = field(607        default=constants.ENVIRONMENTS_NODES_REQUIREMENT,608        metadata=field_metadata(609            required=True,610            validate=validate.OneOf([constants.ENVIRONMENTS_NODES_REQUIREMENT]),611        ),612    )613    name: str = ""614    is_default: bool = field(default=False)615    node_count: search_space.CountSpace = field(616        default=search_space.IntRange(min=1),617        metadata=field_metadata(decoder=search_space.decode_count_space),618    )619    core_count: search_space.CountSpace = field(620        default=search_space.IntRange(min=1),621        metadata=field_metadata(decoder=search_space.decode_count_space),622    )623    memory_mb: search_space.CountSpace = field(624        default=search_space.IntRange(min=512),625        metadata=field_metadata(decoder=search_space.decode_count_space),626    )627    disk: Optional[DiskOptionSettings] = None628    network_interface: Optional[NetworkInterfaceOptionSettings] = None629    gpu_count: search_space.CountSpace = field(630        default=search_space.IntRange(min=0),631        metadata=field_metadata(decoder=search_space.decode_count_space),632    )633    # all features on requirement should be included.634    # all features on capability can be included.635    _features: Optional[FeaturesSpace] = field(636        default=None,637        metadata=field_metadata(allow_none=True, data_key="features"),638    )639    # set by requirements640    # capability's is ignored641    _excluded_features: Optional[FeaturesSpace] = field(642        default=None,643        metadata=field_metadata(644            allow_none=True,645            data_key="excluded_features",646        ),647    )648    def __post_init__(self, *args: Any, **kwargs: Any) -> None:649        # clarify types to avoid type errors in properties.650        self._features: Optional[search_space.SetSpace[FeatureSettings]]651        self._excluded_features: Optional[search_space.SetSpace[FeatureSettings]]652    def __eq__(self, o: object) -> bool:653        assert isinstance(o, NodeSpace), f"actual: {type(o)}"654        return (655            self.type == o.type656            and self.node_count == o.node_count657            and self.core_count == o.core_count658            and self.memory_mb == o.memory_mb659            and self.disk == o.disk660            and self.network_interface == o.network_interface661            and self.gpu_count == o.gpu_count662            and self.features == o.features663            and self.excluded_features == o.excluded_features664        )665    def __repr__(self) -> str:666        """667        override it for shorter text668        """669        return (670            f"type:{self.type},name:{self.name},"671            f"default:{self.is_default},"672            f"count:{self.node_count},core:{self.core_count},"673            f"mem:{self.memory_mb},disk:{self.disk},"674            f"network interface: {self.network_interface}, gpu:{self.gpu_count},"675            f"f:{self.features},ef:{self.excluded_features},"676            f"{super().__repr__()}"677        )678    @property679    def cost(self) -> float:680        core_count = search_space.generate_min_capability_countspace(681            self.core_count, self.core_count682        )683        gpu_count = search_space.generate_min_capability_countspace(684            self.gpu_count, self.gpu_count685        )686        return core_count + gpu_count * 100687    @property688    def features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:689        self._features = self._create_feature_settings_list(self._features)690        if self._features is not None:691            self._features.is_allow_set = True692        return cast(Optional[search_space.SetSpace[FeatureSettings]], self._features)693    @features.setter694    def features(self, value: Optional[search_space.SetSpace[FeatureSettings]]) -> None:695        self._features = cast(FeaturesSpace, value)696    @property697    def excluded_features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:698        if not self._excluded_features:699            self._excluded_features = self._create_feature_settings_list(700                self._excluded_features701            )702            if self._excluded_features is not None:703                self._excluded_features.is_allow_set = False704        return cast(705            Optional[search_space.SetSpace[FeatureSettings]], self._excluded_features706        )707    @excluded_features.setter708    def excluded_features(709        self, value: Optional[search_space.SetSpace[FeatureSettings]]710    ) -> None:711        self._excluded_features = cast(FeaturesSpace, value)712    def check(self, capability: Any) -> search_space.ResultReason:713        result = search_space.ResultReason()714        if capability is None:715            result.add_reason("capability shouldn't be None")716        if self.features:717            assert self.features.is_allow_set, "features should be allow set"718        if self.excluded_features:719            assert (720                not self.excluded_features.is_allow_set721            ), "excluded_features shouldn't be allow set"722        assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"723        if (724            not capability.node_count725            or not capability.core_count726            or not capability.memory_mb727        ):728            result.add_reason(729                "node_count, core_count, memory_mb " "shouldn't be None or zero."730            )731        if isinstance(self.node_count, int) and isinstance(capability.node_count, int):732            if self.node_count > capability.node_count:733                result.add_reason(734                    f"capability node count {capability.node_count} "735                    f"must be more than requirement {self.node_count}"736                )737        else:738            result.merge(739                search_space.check_countspace(self.node_count, capability.node_count),740                "node_count",741            )742        result.merge(743            search_space.check_countspace(self.core_count, capability.core_count),744            "core_count",745        )746        result.merge(747            search_space.check_countspace(self.memory_mb, capability.memory_mb),748            "memory_mb",749        )750        if self.disk:751            result.merge(self.disk.check(capability.disk))752        if self.network_interface:753            result.merge(self.network_interface.check(capability.network_interface))754        result.merge(755            search_space.check_countspace(self.gpu_count, capability.gpu_count),756            "gpu_count",757        )758        if self.features:759            for feature in self.features:760                cap_feature = self._find_feature_by_type(761                    feature.type, capability.features762                )763                if cap_feature:764                    result.merge(feature.check(cap_feature))765                else:766                    result.add_reason(767                        f"no feature '{feature.type}' found in capability"768                    )769        if self.excluded_features:770            for feature in self.excluded_features:771                cap_feature = self._find_feature_by_type(772                    feature.type, capability.features773                )774                if cap_feature:775                    result.add_reason(776                        f"excluded feature '{feature.type}' found in capability"777                    )778        return result779    def expand_by_node_count(self) -> List[Any]:780        # expand node count in requirement to one,781        # so that's easy to compare equalization later.782        expanded_requirements: List[NodeSpace] = []783        node_count = search_space.generate_min_capability_countspace(784            self.node_count, self.node_count785        )786        for _ in range(node_count):787            expanded_copy = copy.copy(self)788            expanded_copy.node_count = 1789            expanded_requirements.append(expanded_copy)790        return expanded_requirements791    def has_feature(self, find_type: str) -> bool:792        result = False793        if not self.features:794            return result795        return any(feature for feature in self.features if feature.type == find_type)796    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:797        assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"798        # copy to duplicate extended schema799        value: NodeSpace = copy.deepcopy(self)800        if self.node_count or capability.node_count:801            if isinstance(self.node_count, int) and isinstance(802                capability.node_count, int803            ):804                # capability can have more node805                value.node_count = capability.node_count806            else:807                value.node_count = getattr(search_space, f"{method_name}_countspace")(808                    self.node_count, capability.node_count809                )810        else:811            raise LisaException("node_count cannot be zero")812        if self.core_count or capability.core_count:813            value.core_count = getattr(search_space, f"{method_name}_countspace")(814                self.core_count, capability.core_count815            )816        else:817            raise LisaException("core_count cannot be zero")818        if self.memory_mb or capability.memory_mb:819            value.memory_mb = getattr(search_space, f"{method_name}_countspace")(820                self.memory_mb, capability.memory_mb821            )822        else:823            raise LisaException("memory_mb cannot be zero")824        if self.disk or capability.disk:825            value.disk = getattr(search_space, method_name)(self.disk, capability.disk)826        if self.network_interface or capability.network_interface:827            value.network_interface = getattr(search_space, method_name)(828                self.network_interface, capability.network_interface829            )830        if self.gpu_count or capability.gpu_count:831            value.gpu_count = getattr(search_space, f"{method_name}_countspace")(832                self.gpu_count, capability.gpu_count833            )834        else:835            value.gpu_count = 0836        if (837            capability.features838            and method_name == search_space.RequirementMethod.generate_min_capability839        ):840            # The requirement features are ignored, if cap doesn't have it.841            value.features = search_space.SetSpace[FeatureSettings](is_allow_set=True)842            for original_cap_feature in capability.features:843                capability_feature = self._get_or_create_feature_settings(844                    original_cap_feature845                )846                requirement_feature = (847                    self._find_feature_by_type(capability_feature.type, self.features)848                    or capability_feature849                )850                current_feature = getattr(requirement_feature, method_name)(851                    capability_feature852                )853                value.features.add(current_feature)854        elif method_name == search_space.RequirementMethod.intersect and (855            capability.features or self.features856        ):857            # This is a hack to work with lisa_runner. The capability features858            # are joined case req and runbook req. Here just take the results859            # from capability.860            value.features = capability.features861        if (862            capability.excluded_features863            and method_name == search_space.RequirementMethod.generate_min_capability864        ):865            # TODO: the min value for excluded feature is not clear. It may need866            # to be improved with real scenarios.867            value.excluded_features = search_space.SetSpace[FeatureSettings](868                is_allow_set=True869            )870            for original_cap_feature in capability.excluded_features:871                capability_feature = self._get_or_create_feature_settings(872                    original_cap_feature873                )874                requirement_feature = (875                    self._find_feature_by_type(876                        capability_feature.type, self.excluded_features877                    )878                    or capability_feature879                )880                current_feature = getattr(requirement_feature, method_name)(881                    capability_feature882                )883                value.excluded_features.add(current_feature)884        elif method_name == search_space.RequirementMethod.intersect and (885            capability.excluded_features or self.excluded_features886        ):887            # This is a hack to work with lisa_runner. The capability features888            # are joined case req and runbook req. Here just take the results889            # from capability.890            value.excluded_features = capability.excluded_features891        return value892    def _find_feature_by_type(893        self,894        find_type: str,895        features: Optional[search_space.SetSpace[Any]],896    ) -> Optional[FeatureSettings]:897        result: Optional[FeatureSettings] = None898        if not features:899            return result900        is_found = False901        for original_feature in features.items:902            feature = self._get_or_create_feature_settings(original_feature)903            if feature.type == find_type:904                is_found = True905                break906        if is_found:907            result = feature908        return result909    def _create_feature_settings_list(910        self, features: Optional[search_space.SetSpace[Any]]911    ) -> Optional[FeaturesSpace]:912        result: Optional[FeaturesSpace] = None913        if features is None:914            return result915        result = cast(916            FeaturesSpace,917            search_space.SetSpace[FeatureSettings](is_allow_set=features.is_allow_set),918        )919        for raw_feature in features.items:920            feature = self._get_or_create_feature_settings(raw_feature)921            result.add(feature)922        return result923    def _get_or_create_feature_settings(self, feature: Any) -> FeatureSettings:924        if isinstance(feature, str):925            feature_setting = FeatureSettings.create(feature)926        elif isinstance(feature, FeatureSettings):927            feature_setting = feature928        else:929            raise LisaException(930                f"unsupported type {type(feature)} found in features, "931                "only str and FeatureSettings supported."932            )933        return feature_setting934@dataclass_json()935@dataclass936class Capability(NodeSpace):937    type: str = constants.ENVIRONMENTS_NODES_REQUIREMENT938    def __post_init__(self, *args: Any, **kwargs: Any) -> None:939        super().__post_init__(*args, **kwargs)940        self.node_count = 1941@dataclass_json()942@dataclass943class Node(TypedSchema, ExtendableSchemaMixin):944    capability: Capability = field(default_factory=Capability)945    name: str = ""946    is_default: bool = field(default=False)947@dataclass_json()948@dataclass949class LocalNode(Node):950    type: str = constants.ENVIRONMENTS_NODES_LOCAL951@dataclass_json()952@dataclass953class RemoteNode(Node):954    type: str = constants.ENVIRONMENTS_NODES_REMOTE955    address: str = ""956    port: int = field(957        default=22,958        metadata=field_metadata(959            field_function=fields.Int, validate=validate.Range(min=1, max=65535)960        ),961    )962    public_address: str = ""963    public_port: int = field(964        default=22,965        metadata=field_metadata(966            field_function=fields.Int, validate=validate.Range(min=1, max=65535)967        ),968    )969    username: str = constants.DEFAULT_USER_NAME970    password: str = ""971    private_key_file: str = ""972    def __post_init__(self, *args: Any, **kwargs: Any) -> None:973        add_secret(self.username, PATTERN_HEADTAIL)974        add_secret(self.password)975        add_secret(self.private_key_file)976@dataclass_json()977@dataclass978class Environment:979    name: str = field(default="")980    topology: str = field(981        default=constants.ENVIRONMENTS_SUBNET,982        metadata=field_metadata(983            validate=validate.OneOf([constants.ENVIRONMENTS_SUBNET])984        ),985    )986    nodes_raw: Optional[List[Any]] = field(987        default=None,988        metadata=field_metadata(data_key=constants.NODES),989    )990    nodes_requirement: Optional[List[NodeSpace]] = None991    _original_nodes_requirement: Optional[List[NodeSpace]] = None992    def __post_init__(self, *args: Any, **kwargs: Any) -> None:993        self._original_nodes_requirement = self.nodes_requirement994        self.reload_requirements()995    def reload_requirements(self) -> None:996        results: List[Node] = []997        self.nodes = []998        self.nodes_requirement = None999        if self._original_nodes_requirement:1000            self.nodes_requirement = []1001            self.nodes_requirement.extend(copy.copy(self._original_nodes_requirement))1002        if self.nodes_raw:1003            for node_raw in self.nodes_raw:1004                node_type = node_raw[constants.TYPE]1005                if node_type == constants.ENVIRONMENTS_NODES_REQUIREMENT:1006                    original_req: NodeSpace = load_by_type(NodeSpace, node_raw)1007                    expanded_req = original_req.expand_by_node_count()1008                    if self.nodes_requirement is None:1009                        self.nodes_requirement = []1010                    self.nodes_requirement.extend(expanded_req)1011                else:1012                    # load base schema for future parsing1013                    node: Node = load_by_type(Node, node_raw)1014                    results.append(node)1015        self.nodes = results1016@dataclass_json()1017@dataclass1018class EnvironmentRoot:1019    warn_as_error: bool = field(default=False)1020    environments: List[Environment] = field(default_factory=list)1021@dataclass_json()1022@dataclass1023class Platform(TypedSchema, ExtendableSchemaMixin):1024    type: str = field(1025        default=constants.PLATFORM_READY,1026        metadata=field_metadata(required=True),1027    )1028    admin_username: str = constants.DEFAULT_USER_NAME1029    admin_password: str = ""1030    admin_private_key_file: str = ""1031    # no/False: means to delete the environment regardless case fail or pass1032    # yes/always/True: means to keep the environment regardless case fail or pass1033    keep_environment: Optional[Union[str, bool]] = constants.ENVIRONMENT_KEEP_NO1034    # platform can specify a default environment requirement1035    requirement: Optional[Dict[str, Any]] = None1036    def __post_init__(self, *args: Any, **kwargs: Any) -> None:1037        add_secret(self.admin_username, PATTERN_HEADTAIL)1038        add_secret(self.admin_password)1039        if self.type != constants.PLATFORM_READY:1040            if not self.admin_password and not self.admin_private_key_file:1041                raise LisaException(1042                    "one of admin_password and admin_private_key_file must be set"1043                )1044        if isinstance(self.keep_environment, bool):1045            if self.keep_environment:1046                self.keep_environment = constants.ENVIRONMENT_KEEP_ALWAYS1047            else:1048                self.keep_environment = constants.ENVIRONMENT_KEEP_NO1049        allow_list = [1050            constants.ENVIRONMENT_KEEP_ALWAYS,1051            constants.ENVIRONMENT_KEEP_FAILED,1052            constants.ENVIRONMENT_KEEP_NO,1053        ]1054        assert isinstance(self.keep_environment, str), (1055            f"keep_environment should be {allow_list} or bool, "1056            f"but it's {type(self.keep_environment)}, '{self.keep_environment}'"1057        )1058        if isinstance(self.keep_environment, str):1059            self.keep_environment = self.keep_environment.lower()1060            if self.keep_environment not in allow_list:1061                raise LisaException(1062                    f"keep_environment only can be set as one of {allow_list}"1063                )1064        # this requirement in platform will be applied to each test case1065        # requirement. It means the set value will override value in test cases.1066        # But the schema will be validated here. The original NodeSpace object holds1067        if self.requirement:1068            # validate schema of raw inputs1069            load_by_type(Capability, self.requirement)1070@dataclass_json()1071@dataclass1072class Criteria:1073    """1074    all rules in same criteria are AND condition.1075    we may support richer conditions later.1076    match case by name pattern1077    """1078    name: Optional[str] = None1079    area: Optional[str] = None1080    category: Optional[str] = None1081    priority: Optional[Union[int, List[int]]] = field(1082        default=None,1083        metadata=field_metadata(1084            validate=ListableValidator(int, validate.Range(min=0, max=4)),1085            allow_none=True,1086        ),1087    )1088    # tags is a simple way to include test cases within same topic.1089    tags: Optional[Union[str, List[str]]] = field(1090        default=None,1091        metadata=field_metadata(validate=ListableValidator(str), allow_none=True),1092    )1093    def __post_init__(self, *args: Any, **kwargs: Any) -> None:1094        strip_strs(self, ["name", "area", "category", "tags"])1095@dataclass_json()1096@dataclass1097class BaseTestCaseFilter(TypedSchema, ExtendableSchemaMixin, BaseClassMixin):1098    """1099    base test case filters for subclass factory1100    """1101    type: str = field(1102        default=constants.TESTCASE_TYPE_LISA,1103    )1104    # if it's false, current filter is ineffective.1105    enabled: bool = field(default=True)1106@dataclass_json()1107@dataclass1108class TestCase(BaseTestCaseFilter):1109    type: str = field(1110        default=constants.TESTCASE_TYPE_LISA,1111        metadata=field_metadata(1112            validate=validate.OneOf([constants.TESTCASE_TYPE_LISA]),1113        ),1114    )1115    name: str = ""1116    criteria: Optional[Criteria] = None1117    # specify use this rule to select or drop test cases. if it's forced include or1118    # exclude, it won't be effect by following select actions. And it fails if1119    # there are force rules conflict.1120    select_action: str = field(1121        default=constants.TESTCASE_SELECT_ACTION_INCLUDE,1122        metadata=config(1123            mm_field=fields.String(1124                validate=validate.OneOf(1125                    [1126                        # none means this action part doesn't include or exclude cases1127                        constants.TESTCASE_SELECT_ACTION_NONE,1128                        constants.TESTCASE_SELECT_ACTION_INCLUDE,1129                        constants.TESTCASE_SELECT_ACTION_FORCE_INCLUDE,1130                        constants.TESTCASE_SELECT_ACTION_EXCLUDE,1131                        constants.TESTCASE_SELECT_ACTION_FORCE_EXCLUDE,1132                    ]1133                )1134            ),1135        ),1136    )1137    # run this group of test cases several times1138    # default is 11139    times: int = field(1140        default=1,1141        metadata=field_metadata(1142            field_function=fields.Int, validate=validate.Range(min=1)1143        ),1144    )1145    # retry times if fails. Default is 0, not to retry.1146    retry: int = field(1147        default=0,1148        metadata=field_metadata(1149            field_function=fields.Int, validate=validate.Range(min=0)1150        ),1151    )1152    # each case with this rule will be run in a new environment.1153    use_new_environment: bool = False1154    # Once it's set, failed test result will be rewrite to success1155    # it uses to work around some cases temporarily, don't overuse it.1156    # default is false1157    ignore_failure: bool = False1158    # case should run on a specified environment1159    environment: str = ""1160    @classmethod1161    def type_name(cls) -> str:1162        return constants.TESTCASE_TYPE_LISA1163@dataclass_json()1164@dataclass1165class LegacyTestCase(BaseTestCaseFilter):1166    type: str = field(1167        default=constants.TESTCASE_TYPE_LEGACY,1168        metadata=field_metadata(1169            required=True,1170            validate=validate.OneOf([constants.TESTCASE_TYPE_LEGACY]),1171        ),1172    )1173    repo: str = "https://github.com/microsoft/lisa.git"1174    branch: str = "master"1175    command: str = ""1176    @classmethod1177    def type_name(cls) -> str:1178        return constants.TESTCASE_TYPE_LEGACY1179@dataclass_json()1180@dataclass1181class ConnectionInfo:1182    address: str = ""1183    port: int = field(1184        default=22,1185        metadata=field_metadata(1186            field_function=fields.Int, validate=validate.Range(min=1, max=65535)1187        ),1188    )1189    username: str = constants.DEFAULT_USER_NAME1190    password: Optional[str] = ""1191    private_key_file: Optional[str] = ""1192    def __post_init__(self, *args: Any, **kwargs: Any) -> None:1193        add_secret(self.username, PATTERN_HEADTAIL)1194        add_secret(self.password)1195        add_secret(self.private_key_file)1196        if not self.password and not self.private_key_file:1197            raise LisaException(1198                "at least one of password or private_key_file need to be set when "1199                "connecting"1200            )1201        elif not self.private_key_file:1202            # use password1203            # spurplus doesn't process empty string correctly, use None1204            self.private_key_file = None1205        else:1206            if not Path(self.private_key_file).exists():1207                raise FileNotFoundError(self.private_key_file)1208            self.password = None1209        if not self.username:1210            raise LisaException("username must be set")1211    def __str__(self) -> str:1212        return f"{self.username}@{self.address}:{self.port}"1213@dataclass_json()1214@dataclass1215class ProxyConnectionInfo(ConnectionInfo):1216    private_address: str = ""1217    private_port: int = field(1218        default=22,1219        metadata=field_metadata(1220            field_function=fields.Int, validate=validate.Range(min=1, max=65535)1221        ),1222    )1223@dataclass_json()1224@dataclass1225class Development:1226    enabled: bool = True1227    enable_trace: bool = False1228    mock_tcp_ping: bool = False1229    jump_boxes: List[ProxyConnectionInfo] = field(default_factory=list)1230@dataclass_json()1231@dataclass1232class Runbook:1233    # run name prefix to help grouping results and put it in title.1234    name: str = "not_named"1235    exit_with_failed_count: bool = True1236    test_project: str = ""1237    test_pass: str = ""1238    tags: Optional[List[str]] = None1239    concurrency: int = 11240    # minutes to wait for resource1241    wait_resource_timeout: float = 51242    include: Optional[List[Include]] = field(default=None)1243    extension: Optional[List[Union[str, Extension]]] = field(default=None)1244    variable: Optional[List[Variable]] = field(default=None)1245    transformer: Optional[List[Transformer]] = field(default=None)1246    combinator: Optional[Combinator] = field(default=None)1247    environment: Optional[EnvironmentRoot] = field(default=None)1248    notifier: Optional[List[Notifier]] = field(default=None)1249    platform: List[Platform] = field(default_factory=list)1250    #  will be parsed in runner.1251    testcase_raw: List[Any] = field(1252        default_factory=list, metadata=field_metadata(data_key=constants.TESTCASE)1253    )1254    dev: Optional[Development] = field(default=None)1255    def __post_init__(self, *args: Any, **kwargs: Any) -> None:1256        if not self.platform:1257            self.platform = [Platform(type=constants.PLATFORM_READY)]1258        if not self.testcase_raw:1259            self.testcase_raw = [1260                {1261                    constants.TESTCASE_CRITERIA: {1262                        constants.TESTCASE_CRITERIA_AREA: "demo"1263                    }1264                }1265            ]1266        self.testcase: List[Any] = []1267def load_by_type(schema_type: Type[T], raw_runbook: Any, many: bool = False) -> T:1268    """1269    Convert dict, list or base typed schema to specified typed schema.1270    """1271    if type(raw_runbook) == schema_type:1272        return raw_runbook1273    if not isinstance(raw_runbook, dict) and not many:1274        raw_runbook = raw_runbook.to_dict()1275    result: T = schema_type.schema().load(raw_runbook, many=many)  # type: ignore1276    return result1277def load_by_type_many(schema_type: Type[T], raw_runbook: Any) -> List[T]:1278    """1279    Convert raw list to list of typed schema. It has different returned type1280    with load_by_type.1281    """1282    result = load_by_type(schema_type, raw_runbook=raw_runbook, many=True)...search_space.py
Source:search_space.py  
...185                f"IntRange doesn't support other intersect on {type(capability)}."186            )187        return result188CountSpace = Union[int, List[IntRange], IntRange, None]189def decode_count_space(data: Any) -> Any:190    """191    CountSpace is complex to marshmallow, so it needs customized decode.192    Anyway, marshmallow can encode it correctly.193    """194    decoded_data: CountSpace = None195    if data is None or isinstance(data, int) or isinstance(data, IntRange):196        decoded_data = data197    elif isinstance(data, list):198        decoded_data = []199        for item in data:200            if isinstance(item, dict):201                decoded_data.append(IntRange.schema().load(item))  # type: ignore202            else:203                assert isinstance(item, IntRange), f"actual: {type(item)}"...nvme.py
Source:nvme.py  
1# Copyright (c) Microsoft Corporation.2# Licensed under the MIT license.3import re4from dataclasses import dataclass, field5from typing import Any, List, Type6from dataclasses_json import dataclass_json7from lisa import schema, search_space8from lisa.feature import Feature9from lisa.schema import FeatureSettings10from lisa.tools import Lspci, Nvmecli11from lisa.tools.lspci import PciDevice12from lisa.util import field_metadata13class Nvme(Feature):14    # crw------- 1 root root 251, 0 Jun 21 03:08 /dev/nvme015    _device_pattern = re.compile(r".*(?P<device_name>/dev/nvme[0-9]$)", re.MULTILINE)16    # brw-rw---- 1 root disk 259, 0 Jun 21 03:08 /dev/nvme0n117    _namespace_pattern = re.compile(18        r".*(?P<namespace>/dev/nvme[0-9]n[0-9]$)", re.MULTILINE19    )20    # '/dev/nvme0n1          351f1f720e5a00000001 Microsoft NVMe Direct Disk               1           0.00   B /   1.92  TB    512   B +  0 B   NVMDV001' # noqa: E50121    _namespace_cli_pattern = re.compile(22        r"(?P<namespace>/dev/nvme[0-9]n[0-9])", re.MULTILINE23    )24    _pci_device_name = "Non-Volatile memory controller"25    _ls_devices: str = ""26    @classmethod27    def settings_type(cls) -> Type[schema.FeatureSettings]:28        return NvmeSettings29    @classmethod30    def can_disable(cls) -> bool:31        return True32    def enabled(self) -> bool:33        return True34    def get_devices(self) -> List[str]:35        devices_list = []36        self._get_device_from_ls()37        for row in self._ls_devices.splitlines():38            matched_result = self._device_pattern.match(row)39            if matched_result:40                devices_list.append(matched_result.group("device_name"))41        return devices_list42    def get_namespaces(self) -> List[str]:43        namespaces = []44        self._get_device_from_ls()45        for row in self._ls_devices.splitlines():46            matched_result = self._namespace_pattern.match(row)47            if matched_result:48                namespaces.append(matched_result.group("namespace"))49        return namespaces50    def get_namespaces_from_cli(self) -> List[str]:51        namespaces_cli = []52        nvme_cli = self._node.tools[Nvmecli]53        nvme_list = nvme_cli.run("list", shell=True, sudo=True)54        for row in nvme_list.stdout.splitlines():55            matched_result = self._namespace_cli_pattern.match(row)56            if matched_result:57                namespaces_cli.append(matched_result.group("namespace"))58        return namespaces_cli59    def get_devices_from_lspci(self) -> List[PciDevice]:60        devices_from_lspci = []61        lspci_tool = self._node.tools[Lspci]62        device_list = lspci_tool.get_devices()63        devices_from_lspci = [64            x for x in device_list if self._pci_device_name == x.device_class65        ]66        return devices_from_lspci67    def get_raw_data_disks(self) -> List[str]:68        return self.get_namespaces()69    def _get_device_from_ls(self, force_run: bool = False) -> None:70        if (not self._ls_devices) or force_run:71            execute_results = self._node.execute(72                "ls -l /dev/nvme*", shell=True, sudo=True73            )74            self._ls_devices = execute_results.stdout75@dataclass_json()76@dataclass()77class NvmeSettings(FeatureSettings):78    type: str = "Nvme"79    disk_count: search_space.CountSpace = field(80        default=search_space.IntRange(min=0),81        metadata=field_metadata(decoder=search_space.decode_count_space),82    )83    def __eq__(self, o: object) -> bool:84        assert isinstance(o, NvmeSettings), f"actual: {type(o)}"85        return self.type == o.type and self.disk_count == o.disk_count86    def __repr__(self) -> str:87        return f"disk_count:{self.disk_count}"88    def __str__(self) -> str:89        return self.__repr__()90    def __hash__(self) -> int:91        return super().__hash__()92    def _get_key(self) -> str:93        return f"{super()._get_key()}/{self.disk_count}"94    def check(self, capability: Any) -> search_space.ResultReason:95        assert isinstance(capability, NvmeSettings), f"actual: {type(capability)}"96        result = super().check(capability)97        result.merge(98            search_space.check_countspace(self.disk_count, capability.disk_count),99            "disk_count",100        )101        return result102    def _generate_min_capability(self, capability: Any) -> Any:103        assert isinstance(capability, NvmeSettings), f"actual: {type(capability)}"104        min_value = NvmeSettings()105        if self.disk_count or capability.disk_count:106            min_value.disk_count = search_space.generate_min_capability_countspace(107                self.disk_count, capability.disk_count108            )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
