Best Python code snippet using lisa_python
schema.py
Source:schema.py  
...318            )319        return result320    def _get_key(self) -> str:321        return self.type322    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:323        assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"324        # default FeatureSetting is a place holder, nothing to do.325        value = FeatureSettings.create(self.type)326        # try best to intersect the extended schemas327        if method_name == search_space.RequirementMethod.intersect:328            if self.extended_schemas and capability and capability.extended_schemas:329                value.extended_schemas = deep_update_dict(330                    self.extended_schemas,331                    capability.extended_schemas,332                )333            else:334                value.extended_schemas = (335                    capability.extended_schemas336                    if capability and capability.extended_schemas337                    else self.extended_schemas338                )339        return value340class DiskType(str, Enum):341    PremiumSSDLRS = "PremiumSSDLRS"342    Ephemeral = "Ephemeral"343    StandardHDDLRS = "StandardHDDLRS"344    StandardSSDLRS = "StandardSSDLRS"345# disk types are ordered by commonly and cost. The earlier is lower cost.346disk_type_priority: List[DiskType] = [347    DiskType.StandardHDDLRS,348    DiskType.StandardSSDLRS,349    DiskType.Ephemeral,350    DiskType.PremiumSSDLRS,351]352@dataclass_json()353@dataclass()354class DiskOptionSettings(FeatureSettings):355    type: str = constants.FEATURE_DISK356    disk_type: Optional[357        Union[search_space.SetSpace[DiskType], DiskType]358    ] = field(  # type:ignore359        default_factory=partial(360            search_space.SetSpace,361            items=[362                DiskType.StandardHDDLRS,363                DiskType.StandardSSDLRS,364                DiskType.Ephemeral,365                DiskType.PremiumSSDLRS,366            ],367        ),368        metadata=field_metadata(369            decoder=partial(search_space.decode_set_space_by_type, base_type=DiskType)370        ),371    )372    data_disk_count: search_space.CountSpace = field(373        default_factory=partial(search_space.IntRange, min=0),374        metadata=field_metadata(decoder=search_space.decode_count_space),375    )376    data_disk_caching_type: str = field(377        default=constants.DATADISK_CACHING_TYPE_NONE,378        metadata=field_metadata(379            validate=validate.OneOf(380                [381                    constants.DATADISK_CACHING_TYPE_NONE,382                    constants.DATADISK_CACHING_TYPE_READONLY,383                    constants.DATADISK_CACHING_TYPE_READYWRITE,384                ]385            ),386        ),387    )388    data_disk_iops: search_space.CountSpace = field(389        default_factory=partial(search_space.IntRange, min=0),390        metadata=field_metadata(391            allow_none=True, decoder=search_space.decode_count_space392        ),393    )394    data_disk_size: search_space.CountSpace = field(395        default_factory=partial(search_space.IntRange, min=0),396        metadata=field_metadata(397            allow_none=True, decoder=search_space.decode_count_space398        ),399    )400    max_data_disk_count: search_space.CountSpace = field(401        default=None,402        metadata=field_metadata(403            allow_none=True, decoder=search_space.decode_count_space404        ),405    )406    def __eq__(self, o: object) -> bool:407        assert isinstance(o, DiskOptionSettings), f"actual: {type(o)}"408        return (409            self.type == o.type410            and self.disk_type == o.disk_type411            and self.data_disk_count == o.data_disk_count412            and self.data_disk_caching_type == o.data_disk_caching_type413            and self.data_disk_iops == o.data_disk_iops414            and self.data_disk_size == o.data_disk_size415            and self.max_data_disk_count == o.max_data_disk_count416        )417    def __repr__(self) -> str:418        return (419            f"disk_type: {self.disk_type},"420            f"count: {self.data_disk_count},"421            f"caching: {self.data_disk_caching_type},"422            f"iops: {self.data_disk_iops},"423            f"size: {self.data_disk_size},"424            f"max_data_disk_count: {self.max_data_disk_count}"425        )426    def __str__(self) -> str:427        return self.__repr__()428    def __hash__(self) -> int:429        return super().__hash__()430    def check(self, capability: Any) -> search_space.ResultReason:431        result = super().check(capability)432        result.merge(433            search_space.check_countspace(434                self.data_disk_count, capability.data_disk_count435            ),436            "data_disk_count",437        )438        result.merge(439            search_space.check_countspace(440                self.max_data_disk_count, capability.max_data_disk_count441            ),442            "max_data_disk_count",443        )444        result.merge(445            search_space.check_countspace(446                self.data_disk_iops, capability.data_disk_iops447            ),448            "data_disk_iops",449        )450        return result451    def _get_key(self) -> str:452        return (453            f"{super()._get_key()}/{self.disk_type}/"454            f"{self.data_disk_count}/{self.data_disk_caching_type}/"455            f"{self.data_disk_iops}/{self.data_disk_size}"456        )457    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:458        assert isinstance(capability, DiskOptionSettings), f"actual: {type(capability)}"459        parent_value = super()._call_requirement_method(method_name, capability)460        # convert parent type to child type461        value = DiskOptionSettings()462        value.extended_schemas = parent_value.extended_schemas463        search_space_countspace_method = getattr(464            search_space, f"{method_name}_countspace"465        )466        if self.disk_type or capability.disk_type:467            value.disk_type = getattr(468                search_space, f"{method_name}_setspace_by_priority"469            )(self.disk_type, capability.disk_type, disk_type_priority)470        if self.data_disk_count or capability.data_disk_count:471            value.data_disk_count = search_space_countspace_method(472                self.data_disk_count, capability.data_disk_count473            )474        if self.data_disk_iops or capability.data_disk_iops:475            value.data_disk_iops = search_space_countspace_method(476                self.data_disk_iops, capability.data_disk_iops477            )478        if self.data_disk_size or capability.data_disk_size:479            value.data_disk_size = search_space_countspace_method(480                self.data_disk_size, capability.data_disk_size481            )482        if self.data_disk_caching_type or capability.data_disk_caching_type:483            value.data_disk_caching_type = (484                self.data_disk_caching_type or capability.data_disk_caching_type485            )486        if self.max_data_disk_count or capability.max_data_disk_count:487            value.max_data_disk_count = search_space_countspace_method(488                self.max_data_disk_count, capability.max_data_disk_count489            )490        return value491class NetworkDataPath(str, Enum):492    Synthetic = "Synthetic"493    Sriov = "Sriov"494_network_data_path_priority: List[NetworkDataPath] = [495    NetworkDataPath.Sriov,496    NetworkDataPath.Synthetic,497]498@dataclass_json()499@dataclass()500class NetworkInterfaceOptionSettings(FeatureSettings):501    type: str = "NetworkInterface"502    data_path: Optional[503        Union[search_space.SetSpace[NetworkDataPath], NetworkDataPath]504    ] = field(  # type: ignore505        default_factory=partial(506            search_space.SetSpace,507            items=[508                NetworkDataPath.Synthetic,509                NetworkDataPath.Sriov,510            ],511        ),512        metadata=field_metadata(513            decoder=partial(514                search_space.decode_set_space_by_type, base_type=NetworkDataPath515            )516        ),517    )518    # nic_count is used for specifying associated nic count during provisioning vm519    nic_count: search_space.CountSpace = field(520        default_factory=partial(search_space.IntRange, min=1),521        metadata=field_metadata(decoder=search_space.decode_count_space),522    )523    # max_nic_count is used for getting the size max nic capability, it can be used to524    #  check how many nics the vm can be associated after provisioning525    max_nic_count: search_space.CountSpace = field(526        default_factory=partial(search_space.IntRange, min=1),527        metadata=field_metadata(528            allow_none=True, decoder=search_space.decode_count_space529        ),530    )531    def __eq__(self, o: object) -> bool:532        assert isinstance(o, NetworkInterfaceOptionSettings), f"actual: {type(o)}"533        return (534            self.type == o.type535            and self.data_path == o.data_path536            and self.nic_count == o.nic_count537            and self.max_nic_count == o.max_nic_count538        )539    def __repr__(self) -> str:540        return (541            f"data_path:{self.data_path}, nic_count:{self.nic_count},"542            f" max_nic_count:{self.max_nic_count}"543        )544    def __str__(self) -> str:545        return self.__repr__()546    def __hash__(self) -> int:547        return super().__hash__()548    def _get_key(self) -> str:549        return (550            f"{super()._get_key()}/{self.data_path}/{self.nic_count}"551            f"/{self.max_nic_count}"552        )553    def check(self, capability: Any) -> search_space.ResultReason:554        assert isinstance(555            capability, NetworkInterfaceOptionSettings556        ), f"actual: {type(capability)}"557        result = super().check(capability)558        result.merge(559            search_space.check_countspace(self.nic_count, capability.nic_count),560            "nic_count",561        )562        result.merge(563            search_space.check_setspace(self.data_path, capability.data_path),564            "data_path",565        )566        result.merge(567            search_space.check_countspace(self.max_nic_count, capability.max_nic_count),568            "max_nic_count",569        )570        return result571    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:572        assert isinstance(573            capability, NetworkInterfaceOptionSettings574        ), f"actual: {type(capability)}"575        parent_value = super()._call_requirement_method(method_name, capability)576        # convert parent type to child type577        value = NetworkInterfaceOptionSettings()578        value.extended_schemas = parent_value.extended_schemas579        value.max_nic_count = getattr(search_space, f"{method_name}_countspace")(580            self.max_nic_count, capability.max_nic_count581        )582        if self.nic_count or capability.nic_count:583            value.nic_count = getattr(search_space, f"{method_name}_countspace")(584                self.nic_count, capability.nic_count585            )586        else:587            raise LisaException("nic_count cannot be zero")588        value.data_path = getattr(search_space, f"{method_name}_setspace_by_priority")(589            self.data_path, capability.data_path, _network_data_path_priority590        )591        return value592@dataclass_json()593@dataclass()594class FeaturesSpace(595    search_space.SetSpace[Union[str, FeatureSettings]],596):597    def __post_init__(self, *args: Any, **kwargs: Any) -> None:598        if self.items:599            for index, item in enumerate(self.items):600                if isinstance(item, dict):601                    item = load_by_type(FeatureSettings, item)602                    self.items[index] = item603@dataclass_json()604@dataclass()605class NodeSpace(search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin):606    type: str = field(607        default=constants.ENVIRONMENTS_NODES_REQUIREMENT,608        metadata=field_metadata(609            required=True,610            validate=validate.OneOf([constants.ENVIRONMENTS_NODES_REQUIREMENT]),611        ),612    )613    name: str = ""614    is_default: bool = field(default=False)615    node_count: search_space.CountSpace = field(616        default=search_space.IntRange(min=1),617        metadata=field_metadata(decoder=search_space.decode_count_space),618    )619    core_count: search_space.CountSpace = field(620        default=search_space.IntRange(min=1),621        metadata=field_metadata(decoder=search_space.decode_count_space),622    )623    memory_mb: search_space.CountSpace = field(624        default=search_space.IntRange(min=512),625        metadata=field_metadata(decoder=search_space.decode_count_space),626    )627    disk: Optional[DiskOptionSettings] = None628    network_interface: Optional[NetworkInterfaceOptionSettings] = None629    gpu_count: search_space.CountSpace = field(630        default=search_space.IntRange(min=0),631        metadata=field_metadata(decoder=search_space.decode_count_space),632    )633    # all features on requirement should be included.634    # all features on capability can be included.635    _features: Optional[FeaturesSpace] = field(636        default=None,637        metadata=field_metadata(allow_none=True, data_key="features"),638    )639    # set by requirements640    # capability's is ignored641    _excluded_features: Optional[FeaturesSpace] = field(642        default=None,643        metadata=field_metadata(644            allow_none=True,645            data_key="excluded_features",646        ),647    )648    def __post_init__(self, *args: Any, **kwargs: Any) -> None:649        # clarify types to avoid type errors in properties.650        self._features: Optional[search_space.SetSpace[FeatureSettings]]651        self._excluded_features: Optional[search_space.SetSpace[FeatureSettings]]652    def __eq__(self, o: object) -> bool:653        assert isinstance(o, NodeSpace), f"actual: {type(o)}"654        return (655            self.type == o.type656            and self.node_count == o.node_count657            and self.core_count == o.core_count658            and self.memory_mb == o.memory_mb659            and self.disk == o.disk660            and self.network_interface == o.network_interface661            and self.gpu_count == o.gpu_count662            and self.features == o.features663            and self.excluded_features == o.excluded_features664        )665    def __repr__(self) -> str:666        """667        override it for shorter text668        """669        return (670            f"type:{self.type},name:{self.name},"671            f"default:{self.is_default},"672            f"count:{self.node_count},core:{self.core_count},"673            f"mem:{self.memory_mb},disk:{self.disk},"674            f"network interface: {self.network_interface}, gpu:{self.gpu_count},"675            f"f:{self.features},ef:{self.excluded_features},"676            f"{super().__repr__()}"677        )678    @property679    def cost(self) -> float:680        core_count = search_space.generate_min_capability_countspace(681            self.core_count, self.core_count682        )683        gpu_count = search_space.generate_min_capability_countspace(684            self.gpu_count, self.gpu_count685        )686        return core_count + gpu_count * 100687    @property688    def features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:689        self._features = self._create_feature_settings_list(self._features)690        if self._features is not None:691            self._features.is_allow_set = True692        return cast(Optional[search_space.SetSpace[FeatureSettings]], self._features)693    @features.setter694    def features(self, value: Optional[search_space.SetSpace[FeatureSettings]]) -> None:695        self._features = cast(FeaturesSpace, value)696    @property697    def excluded_features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:698        if not self._excluded_features:699            self._excluded_features = self._create_feature_settings_list(700                self._excluded_features701            )702            if self._excluded_features is not None:703                self._excluded_features.is_allow_set = False704        return cast(705            Optional[search_space.SetSpace[FeatureSettings]], self._excluded_features706        )707    @excluded_features.setter708    def excluded_features(709        self, value: Optional[search_space.SetSpace[FeatureSettings]]710    ) -> None:711        self._excluded_features = cast(FeaturesSpace, value)712    def check(self, capability: Any) -> search_space.ResultReason:713        result = search_space.ResultReason()714        if capability is None:715            result.add_reason("capability shouldn't be None")716        if self.features:717            assert self.features.is_allow_set, "features should be allow set"718        if self.excluded_features:719            assert (720                not self.excluded_features.is_allow_set721            ), "excluded_features shouldn't be allow set"722        assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"723        if (724            not capability.node_count725            or not capability.core_count726            or not capability.memory_mb727        ):728            result.add_reason(729                "node_count, core_count, memory_mb " "shouldn't be None or zero."730            )731        if isinstance(self.node_count, int) and isinstance(capability.node_count, int):732            if self.node_count > capability.node_count:733                result.add_reason(734                    f"capability node count {capability.node_count} "735                    f"must be more than requirement {self.node_count}"736                )737        else:738            result.merge(739                search_space.check_countspace(self.node_count, capability.node_count),740                "node_count",741            )742        result.merge(743            search_space.check_countspace(self.core_count, capability.core_count),744            "core_count",745        )746        result.merge(747            search_space.check_countspace(self.memory_mb, capability.memory_mb),748            "memory_mb",749        )750        if self.disk:751            result.merge(self.disk.check(capability.disk))752        if self.network_interface:753            result.merge(self.network_interface.check(capability.network_interface))754        result.merge(755            search_space.check_countspace(self.gpu_count, capability.gpu_count),756            "gpu_count",757        )758        if self.features:759            for feature in self.features:760                cap_feature = self._find_feature_by_type(761                    feature.type, capability.features762                )763                if cap_feature:764                    result.merge(feature.check(cap_feature))765                else:766                    result.add_reason(767                        f"no feature '{feature.type}' found in capability"768                    )769        if self.excluded_features:770            for feature in self.excluded_features:771                cap_feature = self._find_feature_by_type(772                    feature.type, capability.features773                )774                if cap_feature:775                    result.add_reason(776                        f"excluded feature '{feature.type}' found in capability"777                    )778        return result779    def expand_by_node_count(self) -> List[Any]:780        # expand node count in requirement to one,781        # so that's easy to compare equalization later.782        expanded_requirements: List[NodeSpace] = []783        node_count = search_space.generate_min_capability_countspace(784            self.node_count, self.node_count785        )786        for _ in range(node_count):787            expanded_copy = copy.copy(self)788            expanded_copy.node_count = 1789            expanded_requirements.append(expanded_copy)790        return expanded_requirements791    def has_feature(self, find_type: str) -> bool:792        result = False793        if not self.features:794            return result795        return any(feature for feature in self.features if feature.type == find_type)796    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:797        assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"798        # copy to duplicate extended schema799        value: NodeSpace = copy.deepcopy(self)800        if self.node_count or capability.node_count:801            if isinstance(self.node_count, int) and isinstance(802                capability.node_count, int803            ):804                # capability can have more node805                value.node_count = capability.node_count806            else:807                value.node_count = getattr(search_space, f"{method_name}_countspace")(808                    self.node_count, capability.node_count809                )810        else:...search_space.py
Source:search_space.py  
...48        return self._generate_min_capability(capability)49    def intersect(self, capability: Any) -> Any:50        self._validate_result(capability)51        return self._intersect(capability)52    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:53        raise NotImplementedError(method_name)54    def _generate_min_capability(self, capability: Any) -> Any:55        return self._call_requirement_method(56            method_name=RequirementMethod.generate_min_capability,57            capability=capability,58        )59    def _intersect(self, capability: Any) -> Any:60        return self._call_requirement_method(61            method_name=RequirementMethod.intersect, capability=capability62        )63    def _validate_result(self, capability: Any) -> None:64        check_result = self.check(capability)65        if not check_result.result:66            raise NotMeetRequirementException(67                f"capability doesn't support requirement: {check_result.reasons}"68            )69T_SEARCH_SPACE = TypeVar("T_SEARCH_SPACE", bound=RequirementMixin)70@dataclass_json()71@dataclass72class IntRange(RequirementMixin):73    min: int = 074    max: int = field(default=sys.maxsize)75    max_inclusive: bool = True76    def __post_init__(self, *args: Any, **kwargs: Any) -> None:77        if self.min > self.max:78            raise LisaException(79                f"min: {self.min} shouldn't be greater than max: {self.max}"80            )81        elif self.min == self.max and self.max_inclusive is False:82            raise LisaException(83                "min shouldn't be equal to max, if max_includes is False."84            )85    def __repr__(self) -> str:86        max_value = self.max if self.max < sys.maxsize else ""87        max_inclusive = ""88        if max_value:89            max_inclusive = "(inc)" if self.max_inclusive else "(exc)"90        return f"[{self.min},{max_value}{max_inclusive}]"91    def __eq__(self, __o: object) -> bool:92        assert isinstance(__o, IntRange), f"actual type: {type(__o)}"93        return (94            self.min == __o.min95            and self.max == __o.max96            and self.max_inclusive == __o.max_inclusive97        )98    def check(self, capability: Any) -> ResultReason:99        result = ResultReason()100        if capability is None:101            result.add_reason("capability shouldn't be None")102        else:103            if isinstance(capability, IntRange):104                if capability.max < self.min:105                    result.add_reason(106                        f"capability max({capability.max}) is "107                        f"smaller than requirement min({self.min})"108                    )109                elif capability.max == self.min and not capability.max_inclusive:110                    result.add_reason(111                        f"capability max({capability.max}) equals "112                        f"to requirement min({self.min}), but "113                        f"capability is not max_inclusive"114                    )115                elif capability.min > self.max:116                    result.add_reason(117                        f"capability min({capability.min}) is "118                        f"bigger than requirement max({self.max})"119                    )120                elif capability.min == self.max and not self.max_inclusive:121                    result.add_reason(122                        f"capability min({capability.min}) equals "123                        f"to requirement max({self.max}), but "124                        f"requirement is not max_inclusive"125                    )126            elif isinstance(capability, int):127                if capability < self.min:128                    result.add_reason(129                        f"capability({capability}) is "130                        f"smaller than requirement min({self.min})"131                    )132                elif capability > self.max:133                    result.add_reason(134                        f"capability ({capability}) is "135                        f"bigger than requirement max({self.max})"136                    )137                elif capability == self.max and not self.max_inclusive:138                    result.add_reason(139                        f"capability({capability}) equals "140                        f"to requirement max({self.max}), but "141                        f"requirement is not max_inclusive"142                    )143            else:144                assert isinstance(capability, list), f"actual: {type(capability)}"145                temp_result = _one_of_matched(self, capability)146                if not temp_result.result:147                    result.add_reason(148                        "no capability matches requirement, "149                        f"requirement: {self}, capability: {capability}"150                    )151        return result152    def _generate_min_capability(self, capability: Any) -> int:153        if isinstance(capability, int):154            result: int = capability155        elif isinstance(capability, IntRange):156            if self.min < capability.min:157                result = capability.min158            else:159                result = self.min160        else:161            assert isinstance(capability, list), f"actual: {type(capability)}"162            result = self.max if self.max_inclusive else self.max - 1163            for cap_item in capability:164                temp_result = self.check(cap_item)165                if temp_result.result:166                    temp_min = self.generate_min_capability(cap_item)167                    result = min(temp_min, result)168        return result169    def _intersect(self, capability: Any) -> Any:170        if isinstance(capability, int):171            return capability172        elif isinstance(capability, IntRange):173            result = IntRange(174                min=self.min, max=self.max, max_inclusive=self.max_inclusive175            )176            if self.min < capability.min:177                result.min = capability.min178            if self.max > capability.max:179                result.max = capability.max180                result.max_inclusive = capability.max_inclusive181            elif self.max == capability.max:182                result.max_inclusive = capability.max_inclusive and self.max_inclusive183        else:184            raise NotImplementedError(185                f"IntRange doesn't support other intersect on {type(capability)}."186            )187        return result188CountSpace = Union[int, List[IntRange], IntRange, None]189def decode_count_space(data: Any) -> Any:190    """191    CountSpace is complex to marshmallow, so it needs customized decode.192    Anyway, marshmallow can encode it correctly.193    """194    decoded_data: CountSpace = None195    if data is None or isinstance(data, int) or isinstance(data, IntRange):196        decoded_data = data197    elif isinstance(data, list):198        decoded_data = []199        for item in data:200            if isinstance(item, dict):201                decoded_data.append(IntRange.schema().load(item))  # type: ignore202            else:203                assert isinstance(item, IntRange), f"actual: {type(item)}"204                decoded_data.append(item)205    else:206        assert isinstance(data, dict), f"actual: {type(data)}"207        decoded_data = IntRange.schema().load(data)  # type: ignore208    return decoded_data209def _one_of_matched(requirement: Any, capabilities: List[Any]) -> ResultReason:210    result = ResultReason()211    supported = False212    assert isinstance(requirement, RequirementMixin), f"actual: {type(requirement)}"213    for cap_item in capabilities:214        temp_result = requirement.check(cap_item)215        if temp_result.result:216            supported = True217            break218    if not supported:219        result.add_reason("no one meeting requirement")220    return result221@dataclass_json()222@dataclass223class SetSpace(RequirementMixin, Set[T]):224    is_allow_set: bool = False225    items: List[T] = field(default_factory=list)226    def __init__(227        self,228        is_allow_set: Optional[bool] = None,229        items: Optional[Iterable[T]] = None,230    ) -> None:231        self.items: List[T] = []232        if items:233            self.update(items)234        if is_allow_set is not None:235            self.is_allow_set = is_allow_set236    def __repr__(self) -> str:237        return (238            f"allowed:{self.is_allow_set},"239            f"items:[{','.join([str(x) for x in self])}]"240        )241    def __post_init__(self, *args: Any, **kwargs: Any) -> None:242        self.update(self.items)243    def check(self, capability: Any) -> ResultReason:244        result = ResultReason()245        if self.is_allow_set and len(self) > 0 and not capability:246            result.add_reason(247                "if requirements is allow set and len > 0, capability shouldn't be None"248            )249        assert isinstance(capability, SetSpace), f"actual: {type(capability)}"250        assert capability.is_allow_set, "capability must be allow set"251        # if self.options is not None:252        # cap_set = capability.options253        if result.result:254            if self.is_allow_set:255                if not capability.issuperset(self):256                    result.add_reason(257                        "capability cannot support some of requirements, "258                        f"requirement: '{self}'"259                        f"capability: '{capability}', "260                    )261            else:262                inter_set: Set[Any] = self.intersection(capability)263                if len(inter_set) > 0:264                    names: List[str] = []265                    for item in inter_set:266                        if isinstance(item, type):267                            names.append(item.__name__)268                        elif isinstance(item, object):269                            names.append(item.__class__.__name__)270                        else:271                            names.append(item)272                    result.add_reason(f"requirements excludes {names}")273        return result274    def add(self, element: T) -> None:275        super().add(element)276        self.items.append(element)277    def remove(self, element: T) -> None:278        super().remove(element)279        self.items.remove(element)280    def update(self, *s: Iterable[T]) -> None:281        super().update(*s)282        self.items.extend(*s)283    def _generate_min_capability(self, capability: Any) -> Optional[Set[T]]:284        result: Optional[SetSpace[T]] = None285        if self.is_allow_set and len(self) > 0:286            assert isinstance(capability, SetSpace), f"actual: {type(capability)}"287            result = SetSpace(is_allow_set=self.is_allow_set)288            if len(capability) > 0:289                for item in self:290                    if item in capability:291                        result.add(item)292        return result293    def _intersect(self, capability: Any) -> Any:294        return self._generate_min_capability(capability)295def decode_set_space(data: Any) -> Any:296    """297    not sure what's reason, __post_init__ won't be called automatically.298    So write this decoder to force it's called on deserializing299    """300    result = None301    if data:302        result = SetSpace.schema().load(data)  # type: ignore303    return result304def decode_set_space_by_type(305    data: Any, base_type: Type[T]306) -> Optional[Union[SetSpace[T], T]]:307    if isinstance(data, dict):308        new_data = SetSpace[T](is_allow_set=True)309        types = data.get("items", [])310        for item in types:311            new_data.add(base_type(item))  # type: ignore312        decoded_data: Optional[Union[SetSpace[T], T]] = new_data313    elif isinstance(data, list):314        new_data = SetSpace[T](is_allow_set=True)315        for item in data:316            new_data.add(base_type(item))  # type: ignore317        decoded_data = new_data318    elif isinstance(data, str):319        decoded_data = base_type(data)  # type: ignore320    elif isinstance(data, SetSpace):321        decoded_data = data322    else:323        raise LisaException(f"unknown data type: {type(data)}")324    return decoded_data325def check_countspace(requirement: CountSpace, capability: CountSpace) -> ResultReason:326    result = ResultReason()327    if requirement is not None:328        if capability is None:329            result.add_reason(330                "if requirements isn't None, capability shouldn't be None"331            )332        else:333            if isinstance(requirement, int):334                if isinstance(capability, int):335                    if requirement != capability:336                        result.add_reason(337                            "requirement is a number, capability should be exact "338                            f"much, but requirement: {requirement}, "339                            f"capability: {capability}"340                        )341                elif isinstance(capability, IntRange):342                    temp_result = capability.check(requirement)343                    if not temp_result.result:344                        result.add_reason(345                            "requirement is a number, capability should include it, "346                            f"but requirement: {requirement}, capability: {capability}"347                        )348                else:349                    assert isinstance(capability, list), f"actual: {type(capability)}"350                    temp_requirement = IntRange(min=requirement, max=requirement)351                    temp_result = _one_of_matched(temp_requirement, capability)352                    if not temp_result.result:353                        result.add_reason(354                            f"requirement is a number, no capability matched, "355                            f"requirement: {requirement}, capability: {capability}"356                        )357            elif isinstance(requirement, IntRange):358                result.merge(requirement.check(capability))359            else:360                assert isinstance(requirement, list), f"actual: {type(requirement)}"361                supported = False362                for req_item in requirement:363                    temp_result = req_item.check(capability)364                    if temp_result.result:365                        supported = True366                if not supported:367                    result.add_reason(368                        "no capability matches requirement, "369                        f"requirement: {requirement}, capability: {capability}"370                    )371    return result372def generate_min_capability_countspace(373    requirement: CountSpace, capability: CountSpace374) -> int:375    check_result = check_countspace(requirement, capability)376    if not check_result.result:377        raise NotMeetRequirementException(378            "cannot get min value, capability doesn't support requirement: "379            f"{check_result.reasons}"380        )381    if requirement is None:382        if capability:383            requirement = capability384            result: int = sys.maxsize385        else:386            result = 0387    if isinstance(requirement, int):388        result = requirement389    elif isinstance(requirement, IntRange):390        result = requirement.generate_min_capability(capability)391    else:392        assert isinstance(requirement, list), f"actual: {type(requirement)}"393        result = sys.maxsize394        for req_item in requirement:395            temp_result = req_item.check(capability)396            if temp_result.result:397                temp_min = req_item.generate_min_capability(capability)398                result = min(result, temp_min)399    return result400def intersect_countspace(requirement: CountSpace, capability: CountSpace) -> Any:401    check_result = check_countspace(requirement, capability)402    if not check_result.result:403        raise NotMeetRequirementException(404            "cannot get intersect, capability doesn't support requirement: "405            f"{check_result.reasons}"406        )407    if requirement is None and capability:408        return copy.copy(capability)409    if isinstance(requirement, int):410        result = requirement411    elif isinstance(requirement, IntRange):412        result = requirement.intersect(capability)413    else:414        raise LisaException(415            f"not support to get intersect on countspace type: {type(requirement)}"416        )417    return result418def check_setspace(419    requirement: Optional[Union[SetSpace[T], T]],420    capability: Optional[Union[SetSpace[T], T]],421) -> ResultReason:422    result = ResultReason()423    if capability is None:424        result.add_reason("capability shouldn't be None")425    else:426        if requirement is not None:427            has_met_check = False428            if not isinstance(capability, SetSpace):429                capability = SetSpace[T](items=[capability])430            if not isinstance(requirement, SetSpace):431                requirement = SetSpace[T](items=[requirement])432            for item in requirement:433                if item in capability:434                    has_met_check = True435                    break436            if not has_met_check:437                result.add_reason(438                    f"requirement not supported in capability. "439                    f"requirement: {requirement}, "440                    f"capability: {capability}"441                )442    return result443def generate_min_capability_setspace_by_priority(444    requirement: Optional[Union[SetSpace[T], T]],445    capability: Optional[Union[SetSpace[T], T]],446    priority_list: List[T],447) -> T:448    check_result = check_setspace(requirement, capability)449    if not check_result.result:450        raise NotMeetRequirementException(451            "cannot get min value, capability doesn't support requirement"452            f"{check_result.reasons}"453        )454    assert capability is not None, "Capability shouldn't be None"455    # Ensure that both cap and req are instance of SetSpace456    if not isinstance(capability, SetSpace):457        capability = SetSpace[T](items=[capability])458    if requirement is None:459        requirement = capability460    if not isinstance(requirement, SetSpace):461        requirement = SetSpace[T](items=[requirement])462    # Find min capability463    min_cap: Optional[T] = None464    for item in priority_list:465        if item in requirement and item in capability:466            min_cap = item467            break468    assert min_cap, (469        "Cannot find min capability on data path, "470        f"requirement: {requirement}"471        f"capability: {capability}"472    )473    return min_cap474def intersect_setspace_by_priority(475    requirement: Optional[Union[SetSpace[T], T]],476    capability: Optional[Union[SetSpace[T], T]],477    priority_list: List[T],478) -> Any:479    # intersect doesn't need to take care about priority.480    check_result = check_setspace(requirement, capability)481    if not check_result.result:482        raise NotMeetRequirementException(483            f"capability doesn't support requirement: {check_result.reasons}"484        )485    assert capability is not None, "Capability shouldn't be None"486    value = SetSpace[T]()487    # Ensure that both cap and req are instance of SetSpace488    if not isinstance(capability, SetSpace):489        capability = SetSpace[T](items=[capability])490    if requirement is None:491        requirement = capability492    if not isinstance(requirement, SetSpace):493        requirement = SetSpace[T](items=[requirement])494    # Find min capability495    for item in requirement:496        if item in capability:497            value.add(item)498    return value499def count_space_to_int_range(count_space: CountSpace) -> IntRange:500    if count_space is None:501        result = IntRange(min=sys.maxsize * -1, max=sys.maxsize)502    elif isinstance(count_space, int):503        result = IntRange(min=count_space, max=count_space)504    elif isinstance(count_space, IntRange):505        result = count_space506    else:507        raise LisaException(508            f"unsupported type: {type(count_space)}, value: '{count_space}'"509        )510    return result511def check(512    requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],513    capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],514) -> ResultReason:515    result = ResultReason()516    if requirement is not None:517        if capability is None:518            result.add_reason(519                f"capability shouldn't be None, requirement: [{requirement}]"520            )521        elif isinstance(requirement, (list)):522            supported = False523            for req_item in requirement:524                temp_result = req_item.check(capability)525                if temp_result.result:526                    supported = True527            if not supported:528                result.add_reason(529                    "no capability meet any of requirement, "530                    f"requirement: {requirement}, capability: {capability}"531                )532        else:533            result.merge(requirement.check(capability))534    return result535def _call_requirement_method(536    method: str,537    requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],538    capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],539) -> Any:540    check_result = check(requirement, capability)541    if not check_result.result:542        raise NotMeetRequirementException(543            f"cannot call {method}, capability doesn't support requirement"544        )545    result: Optional[T_SEARCH_SPACE] = None546    if requirement is None:547        if capability is not None:548            requirement = capability549    if (550        isinstance(requirement, list)551        and method == RequirementMethod.generate_min_capability552    ):553        result = None554        for req_item in requirement:555            temp_result = req_item.check(capability)556            if temp_result.result:557                temp_min = getattr(req_item, method)(capability)558                if result is None:559                    result = temp_min560                else:561                    # TODO: multiple matches found, not supported well yet562                    # It can be improved by implement __eq__, __lt__ functions.563                    result = min(result, temp_min)564    elif requirement is not None:565        result = getattr(requirement, method)(capability)566    return result567def generate_min_capability(568    requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],569    capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],570) -> Any:571    return _call_requirement_method(572        RequirementMethod.generate_min_capability,573        requirement=requirement,574        capability=capability,575    )576def intersect(577    requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],578    capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],579) -> Any:580    return _call_requirement_method(581        RequirementMethod.intersect, requirement=requirement, capability=capability582    )583def equal_list(first: Optional[List[Any]], second: Optional[List[Any]]) -> bool:584    if first is None or second is None:585        result = first is second586    else:587        result = len(first) == len(second)588        result = result and all(589            f_item == second[index] for index, f_item in enumerate(first)590        )591    return result592def create_set_space(593    included_set: Optional[Iterable[T]],594    excluded_set: Optional[Iterable[T]],...security_profile.py
Source:security_profile.py  
...53    def __hash__(self) -> int:54        return hash(self._get_key())55    def _get_key(self) -> str:56        return f"{self.type}/{self.security_profile}"57    def _call_requirement_method(self, method_name: str, capability: Any) -> Any:58        value = SecurityProfileSettings()59        value.security_profile = getattr(60            search_space, f"{method_name}_setspace_by_priority"61        )(62            self.security_profile,63            capability.security_profile,64            security_profile_priority,65        )66        value.encrypt_disk = self.encrypt_disk or capability.encrypt_disk67        return value68    def check(self, capability: Any) -> search_space.ResultReason:69        assert isinstance(70            capability, SecurityProfileSettings71        ), f"actual: {type(capability)}"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
