Best Python code snippet using lisa_python
testsuite.py
Source:testsuite.py  
...167        if hasattr(self, "_timer"):168            self.elapsed = self._timer.elapsed(False)169        fields = ["status", "elapsed", "id_", "log_file"]170        result_message = TestResultMessage()171        set_filtered_fields(self, result_message, fields=fields)172        metadata_fields = [173            "area",174            "category",175            "tags",176            "description",177            "priority",178            "owner",179        ]180        metadata_information = fields_to_dict(181            src=self.runtime_data.metadata, fields=metadata_fields182        )183        self.information.update(metadata_information)184        # get information of default node, and send to notifier.185        if self.environment:186            self.information.update(self.environment.get_information())187            self.information["environment"] = self.environment.name188        result_message.information.update(self.information)189        result_message.message = self.message[0:2048] if self.message else ""190        result_message.name = self.runtime_data.metadata.name191        result_message.full_name = self.runtime_data.metadata.full_name192        result_message.suite_name = self.runtime_data.metadata.suite.name193        result_message.suite_full_name = self.runtime_data.metadata.suite.full_name194        result_message.stacktrace = stacktrace195        # some extensions may need to update or fill information.196        plugin_manager.hook.update_test_result_message(message=result_message)197        notifier.notify(result_message)198@dataclass199class TestCaseRequirement:200    environment: Optional[EnvironmentSpace] = None201    environment_status: EnvironmentStatus = EnvironmentStatus.Connected202    platform_type: Optional[search_space.SetSpace[str]] = None203    os_type: Optional[search_space.SetSpace[Type[OperatingSystem]]] = None204def _create_test_case_requirement(205    node: schema.NodeSpace,206    supported_platform_type: Optional[List[str]] = None,207    unsupported_platform_type: Optional[List[str]] = None,208    supported_os: Optional[List[Type[OperatingSystem]]] = None,209    unsupported_os: Optional[List[Type[OperatingSystem]]] = None,210    supported_features: Optional[211        List[Union[Type[Feature], schema.FeatureSettings, str]]212    ] = None,213    unsupported_features: Optional[214        List[Union[Type[Feature], schema.FeatureSettings, str]]215    ] = None,216    environment_status: EnvironmentStatus = EnvironmentStatus.Connected,217) -> TestCaseRequirement:218    if supported_features:219        node.features = search_space.SetSpace[schema.FeatureSettings](220            is_allow_set=True,221            items=[Feature.get_feature_settings(x) for x in supported_features],222        )223    if unsupported_features:224        node.excluded_features = search_space.SetSpace[schema.FeatureSettings](225            is_allow_set=False,226            items=[Feature.get_feature_settings(x) for x in unsupported_features],227        )228    nodes: List[schema.NodeSpace] = [node]229    platform_types = search_space.create_set_space(230        supported_platform_type, unsupported_platform_type, "platform type"231    )232    # Most test cases are applied to Linux, exclude Windows by default.233    if unsupported_os is None and supported_os is None:234        unsupported_os = [Windows]235    os = search_space.create_set_space(supported_os, unsupported_os, "operating system")236    return TestCaseRequirement(237        environment=EnvironmentSpace(nodes=nodes),238        platform_type=platform_types,239        os_type=os,240        environment_status=environment_status,241    )242def node_requirement(243    node: schema.NodeSpace,244    supported_platform_type: Optional[List[str]] = None,245    unsupported_platform_type: Optional[List[str]] = None,246    supported_os: Optional[List[Type[OperatingSystem]]] = None,247    unsupported_os: Optional[List[Type[OperatingSystem]]] = None,248    environment_status: EnvironmentStatus = EnvironmentStatus.Connected,249) -> TestCaseRequirement:250    return _create_test_case_requirement(251        node,252        supported_platform_type,253        unsupported_platform_type,254        supported_os,255        unsupported_os,256        None,257        None,258        environment_status,259    )260def simple_requirement(261    min_count: int = 1,262    min_core_count: int = 1,263    min_nic_count: Optional[int] = None,264    min_data_disk_count: Optional[int] = None,265    disk: Optional[schema.DiskOptionSettings] = None,266    network_interface: Optional[schema.NetworkInterfaceOptionSettings] = None,267    supported_platform_type: Optional[List[str]] = None,268    unsupported_platform_type: Optional[List[str]] = None,269    supported_os: Optional[List[Type[OperatingSystem]]] = None,270    unsupported_os: Optional[List[Type[OperatingSystem]]] = None,271    supported_features: Optional[272        List[Union[Type[Feature], schema.FeatureSettings, str]]273    ] = None,274    unsupported_features: Optional[275        List[Union[Type[Feature], schema.FeatureSettings, str]]276    ] = None,277    environment_status: EnvironmentStatus = EnvironmentStatus.Connected,278) -> TestCaseRequirement:279    """280    define a simple requirement to support most test cases.281    """282    node = schema.NodeSpace()283    node.node_count = search_space.IntRange(min=min_count)284    node.core_count = search_space.IntRange(min=min_core_count)285    if min_data_disk_count or disk:286        if not disk:287            disk = schema.DiskOptionSettings()288        if min_data_disk_count:289            disk.data_disk_count = search_space.IntRange(min=min_data_disk_count)290        node.disk = disk291    if min_nic_count or network_interface:292        if not network_interface:293            network_interface = schema.NetworkInterfaceOptionSettings()294        if min_nic_count:295            network_interface.nic_count = search_space.IntRange(min=min_nic_count)296        node.network_interface = network_interface297    return _create_test_case_requirement(298        node,299        supported_platform_type,300        unsupported_platform_type,301        supported_os,302        unsupported_os,303        supported_features,304        unsupported_features,305        environment_status,306    )307DEFAULT_REQUIREMENT = simple_requirement()308class TestSuiteMetadata:309    def __init__(310        self,311        area: str,312        category: str,313        description: str,314        tags: Optional[List[str]] = None,315        name: str = "",316        requirement: TestCaseRequirement = DEFAULT_REQUIREMENT,317        owner: str = "Microsoft",318        full_name: str = "",319    ) -> None:320        self.name = name321        self.full_name = full_name322        self.cases: List[TestCaseMetadata] = []323        self.tags: List[str] = tags if tags else []324        self.area = area325        self.category = category326        if tags:327            self.tags = tags328        else:329            self.tags = []330        self.description = description331        self.requirement = requirement332        self.owner = owner333    def __call__(self, test_class: Type[TestSuite]) -> Callable[..., object]:334        self.test_class = test_class335        if not self.name:336            self.name = test_class.__name__337        self.full_name = test_class.__qualname__338        _add_suite_metadata(self)339        @wraps(self.test_class)340        def wrapper(341            test_class: Type[TestSuite],342            metadata: TestSuiteMetadata,343        ) -> TestSuite:344            return test_class(metadata)345        return wrapper346class TestCaseMetadata:347    def __init__(348        self,349        description: str,350        priority: int = 2,351        timeout: int = 3600,352        use_new_environment: bool = False,353        owner: str = "",354        requirement: Optional[TestCaseRequirement] = None,355    ) -> None:356        self.suite: TestSuiteMetadata357        self.priority = priority358        self.description = description359        self.timeout = timeout360        self.use_new_environment = use_new_environment361        if requirement:362            self.requirement = requirement363        self._owner = owner364    def __getattr__(self, key: str) -> Any:365        # return attributes of test suite, if it's not redefined in case level366        assert self.suite, "suite is not set before use metadata"367        return getattr(self.suite, key)368    def __call__(self, func: Callable[..., None]) -> Callable[..., None]:369        self.name = func.__name__370        self.full_name = func.__qualname__371        self.qualname = func.__qualname__372        self._func = func373        _add_case_metadata(self)374        @wraps(self._func)375        def wrapper(*args: Any, **kwargs: Any) -> None:376            parameters: Dict[str, Any] = {}377            for name in kwargs.keys():378                if name in func.__annotations__:379                    parameters[name] = kwargs[name]380            func(*args, **parameters)381        return wrapper382    @property383    def owner(self) -> str:384        if self._owner:385            return self._owner386        return self.suite.owner387class TestCaseRuntimeData:388    def __init__(self, metadata: TestCaseMetadata):389        self.metadata = metadata390        # all runtime setting fields391        self.select_action: str = ""392        self.times: int = 1393        self.retry: int = 0394        self.use_new_environment: bool = metadata.use_new_environment395        self.ignore_failure: bool = False396        self.environment_name: str = ""397    def __getattr__(self, key: str) -> Any:398        # return attributes of metadata for convenient399        assert self.metadata400        return getattr(self.metadata, key)401    def __repr__(self) -> str:402        return (403            f"name: {self.metadata.name}, "404            f"action: {self.select_action}, "405            f"times: {self.times}, retry: {self.retry}, "406            f"new_env: {self.use_new_environment}, "407            f"ignore_failure: {self.ignore_failure}, "408            f"env_name: {self.environment_name}"409        )410    def clone(self) -> TestCaseRuntimeData:411        cloned = TestCaseRuntimeData(self.metadata)412        fields = [413            constants.TESTCASE_SELECT_ACTION,414            constants.TESTCASE_TIMES,415            constants.TESTCASE_RETRY,416            constants.TESTCASE_USE_NEW_ENVIRONMENT,417            constants.TESTCASE_IGNORE_FAILURE,418            constants.ENVIRONMENT,419        ]420        set_filtered_fields(self, cloned, fields)421        return cloned422class TestSuite:423    def __init__(424        self,425        metadata: TestSuiteMetadata,426    ) -> None:427        super().__init__()428        self._metadata = metadata429        self._should_stop = False430        self.__log = get_logger("suite", metadata.name)431    def before_suite(self, log: Logger, **kwargs: Any) -> None:432        ...433    def after_suite(self, log: Logger, **kwargs: Any) -> None:434        ......features.py
Source:features.py  
...342        value = AwsDiskOptionSettings()343        super_value = schema.DiskOptionSettings._call_requirement_method(344            self, method_name, capability345        )346        set_filtered_fields(super_value, value, ["data_disk_count"])347        cap_disk_type = capability.disk_type348        if isinstance(cap_disk_type, search_space.SetSpace):349            assert (350                len(cap_disk_type) > 0351            ), "capability should have at least one disk type, but it's empty"352        elif isinstance(cap_disk_type, schema.DiskType):353            cap_disk_type = search_space.SetSpace[schema.DiskType](354                is_allow_set=True, items=[cap_disk_type]355            )356        else:357            raise LisaException(358                f"unknown disk type on capability, type: {cap_disk_type}"359            )360        value.disk_type = getattr(search_space, f"{method_name}_setspace_by_priority")(...testselector.py
Source:testselector.py  
...103        constants.TESTCASE_RETRY,104        constants.TESTCASE_IGNORE_FAILURE,105        constants.ENVIRONMENT,106    ]107    set_filtered_fields(case_runbook, applied_case_data, fields)108    applied_case_data.use_new_environment = (109        applied_case_data.use_new_environment or case_runbook.use_new_environment110    )111    # use default value from selector112    applied_case_data.select_action = action113def _force_check(114    name: str,115    is_force: bool,116    force_expected_set: Set[str],117    force_exclusive_set: Set[str],118    temp_force_exclusive_set: Set[str],119    case_runbook: schema.TestCase,120) -> bool:121    is_skip = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
