Best Python code snippet using lisa_python
environment.py
Source:environment.py  
...51    ],52)53_global_environment_id = 054_global_environment_id_lock: Lock = Lock()55def _get_environment_id() -> int:56    """57    Return an unique id crossing threads, runners.58    """59    global _global_environment_id_lock60    global _global_environment_id61    with _global_environment_id_lock:62        id = _global_environment_id63        _global_environment_id += 164    return id65@dataclass66class EnvironmentMessage(MessageBase):67    type: str = "Environment"68    name: str = ""69    runbook: schema.Environment = schema.Environment()70    status: EnvironmentStatus = EnvironmentStatus.New71@dataclass_json()72@dataclass73class EnvironmentSpace(search_space.RequirementMixin):74    """75    Search space of an environment. It uses to76    1. Specify test suite requirement, see TestCaseRequirement77    2. Describe capability of an environment, see Environment.capability78    """79    topology: str = field(80        default=constants.ENVIRONMENTS_SUBNET,81        metadata=field_metadata(82            validate=validate.OneOf([constants.ENVIRONMENTS_SUBNET])83        ),84    )85    nodes: List[schema.NodeSpace] = field(default_factory=list)86    def __post_init__(self, *args: Any, **kwargs: Any) -> None:87        self._expand_node_space()88    def __eq__(self, o: object) -> bool:89        assert isinstance(o, EnvironmentSpace), f"actual: {type(o)}"90        return self.topology == o.topology and search_space.equal_list(91            self.nodes, o.nodes92        )93    def check(self, capability: Any) -> search_space.ResultReason:94        assert isinstance(capability, EnvironmentSpace), f"actual: {type(capability)}"95        result = search_space.ResultReason()96        if not capability.nodes:97            result.add_reason("no node instance found")98        elif len(self.nodes) > len(capability.nodes):99            result.add_reason(100                f"no enough nodes, "101                f"requirement: {len(self.nodes)}, "102                f"capability: {len(capability.nodes)}."103            )104        else:105            if self.nodes:106                for index, current_req in enumerate(self.nodes):107                    current_cap = capability.nodes[index]108                    result.merge(109                        search_space.check(current_req, current_cap),110                        str(index),111                    )112                    if not result.result:113                        break114        return result115    def _generate_min_capability(self, capability: Any) -> Any:116        env = EnvironmentSpace(topology=self.topology)117        assert isinstance(capability, EnvironmentSpace), f"actual: {type(capability)}"118        assert capability.nodes119        for index, current_req in enumerate(self.nodes):120            if len(capability.nodes) == 1:121                current_cap = capability.nodes[0]122            else:123                current_cap = capability.nodes[index]124            env.nodes.append(current_req.generate_min_capability(current_cap))125        return env126    def _expand_node_space(self) -> None:127        if self.nodes:128            expanded_requirements: List[schema.NodeSpace] = []129            for node in self.nodes:130                expanded_requirements.extend(node.expand_by_node_count())131            self.nodes = expanded_requirements132class Environment(ContextMixin, InitializableMixin):133    def __init__(134        self,135        is_predefined: bool,136        warn_as_error: bool,137        id_: int,138        runbook: schema.Environment,139    ) -> None:140        super().__init__()141        self.nodes: Nodes142        self.runbook = runbook143        self.name = runbook.name144        self.is_predefined: bool = is_predefined145        self.is_new: bool = True146        self.id: str = str(id_)147        self.warn_as_error = warn_as_error148        self.platform: Optional[Platform] = None149        self.log = get_logger("env", self.name)150        self.source_test_result: Optional[TestResult] = None151        self._default_node: Optional[Node] = None152        self._is_dirty: bool = False153        # track retried times to provide an unique name.154        self._raw_id = id_155        self._retries: int = 0156        # cost uses to plan order of environments.157        # cheaper env can fit cases earlier to run more cases on it.158        # 1. smaller is higher priority, it can be index of candidate environment159        # 2. 0 means no cost.160        self.cost: float = 0161        # indicate is this environment is deploying, preparing, testing or not.162        self.is_in_use: bool = False163        # Not to set the log path until its first used. Because the path164        # contains environment name, which is not set in __init__.165        self._log_path: Optional[Path] = None166        self._working_path: Optional[Path] = None167        # it's used to compose a consistent path for both log and working path.168        self.environment_part_path = Path(169            f"environments/{get_datetime_path()}-{self.name}"170        )171        self._status: Optional[EnvironmentStatus] = None172        self.status = EnvironmentStatus.New173    def __repr__(self) -> str:174        return self.name175    @property176    def status(self) -> EnvironmentStatus:177        assert self._status178        return self._status179    @status.setter180    def status(self, value: EnvironmentStatus) -> None:181        # sometimes there are duplicated messages, ignore if no change.182        if self._status != value:183            if value == EnvironmentStatus.New:184                self._reset()185            self._status = value186            environment_message = EnvironmentMessage(187                name=self.name, status=self._status, runbook=self.runbook188            )189            notifier.notify(environment_message)190    @property191    def is_alive(self) -> bool:192        return self._status in [193            EnvironmentStatus.New,194            EnvironmentStatus.Prepared,195            EnvironmentStatus.Deployed,196            EnvironmentStatus.Connected,197        ]198    @property199    def default_node(self) -> Node:200        return self.nodes.default201    @property202    def log_path(self) -> Path:203        # avoid to create path for UT. There may be path conflict in UT.204        if is_unittest():205            return Path()206        if not self._log_path:207            self._log_path = constants.RUN_LOCAL_LOG_PATH / self.environment_part_path208            self._log_path.mkdir(parents=True, exist_ok=True)209        return self._log_path210    @property211    def working_path(self) -> Path:212        if is_unittest():213            return Path()214        if not self._working_path:215            self._working_path = (216                constants.RUN_LOCAL_WORKING_PATH / self.environment_part_path217            )218            self._working_path.mkdir(parents=True, exist_ok=True)219        return self._working_path220    @property221    def capability(self) -> EnvironmentSpace:222        result = EnvironmentSpace(topology=self.runbook.topology)223        for node in self.nodes.list():224            result.nodes.append(node.capability)225        if (226            self.status in [EnvironmentStatus.Prepared, EnvironmentStatus.New]227            and self.runbook.nodes_requirement228        ):229            result.nodes.extend(self.runbook.nodes_requirement)230        return result231    @property232    def is_dirty(self) -> bool:233        return self._is_dirty or any(x.is_dirty for x in self.nodes.list())234    def cleanup(self) -> None:235        self.nodes.cleanup()236        if hasattr(self, "_log_handler") and self._log_handler:237            remove_handler(self._log_handler, self.log)238            self._log_handler.close()239    def close(self) -> None:240        self.nodes.close()241    def create_node_from_exists(242        self,243        node_runbook: schema.Node,244    ) -> Node:245        node = Node.create(246            index=len(self.nodes),247            runbook=node_runbook,248            base_part_path=self.environment_part_path,249            parent_logger=self.log,250        )251        self.nodes.append(node)252        return node253    def create_node_from_requirement(254        self,255        node_requirement: schema.NodeSpace,256    ) -> Node:257        min_requirement = cast(258            schema.Capability,259            node_requirement.generate_min_capability(node_requirement),260        )261        assert isinstance(min_requirement.node_count, int), (262            f"must be int after generate_min_capability, "263            f"actual: {min_requirement.node_count}"264        )265        # node count should be expanded in platform already266        assert min_requirement.node_count == 1, f"actual: {min_requirement.node_count}"267        mock_runbook = schema.RemoteNode(268            type=constants.ENVIRONMENTS_NODES_REMOTE,269            capability=min_requirement,270            is_default=node_requirement.is_default,271        )272        node = Node.create(273            index=len(self.nodes),274            runbook=mock_runbook,275            base_part_path=self.environment_part_path,276            parent_logger=self.log,277        )278        self.nodes.append(node)279        return node280    def get_information(self) -> Dict[str, str]:281        final_information: Dict[str, str] = {}282        informations: List[283            Dict[str, str]284        ] = plugin_manager.hook.get_environment_information(environment=self)285        # reverse it, since it's FILO order,286        # try basic earlier, and they are allowed to be overwritten287        informations.reverse()288        for current_information in informations:289            final_information.update(current_information)290        return final_information291    def mark_dirty(self) -> None:292        self.log.debug("mark environment to dirty")293        self._is_dirty = True294    def _initialize(self, *args: Any, **kwargs: Any) -> None:295        if self.status != EnvironmentStatus.Deployed:296            raise LisaException("environment is not deployed, cannot be initialized")297        if not hasattr(self, "_log_handler"):298            self._log_handler = create_file_handler(299                self.log_path / "environment.log", self.log300            )301        self.nodes.initialize()302        self.status = EnvironmentStatus.Connected303    def _reset(self) -> None:304        self.nodes = Nodes()305        self.runbook.reload_requirements()306        self.is_new = True307        if not self.runbook.nodes_requirement and not self.runbook.nodes:308            raise LisaException("not found any node or requirement in environment")309        has_default_node = False310        for node_runbook in self.runbook.nodes:311            self.create_node_from_exists(312                node_runbook=node_runbook,313            )314            has_default_node = self._validate_single_default(315                has_default_node, node_runbook.is_default316            )317        if self._retries > 0:318            # provide an unique id.319            self.id = f"{self._raw_id}-{self._retries}"320        self.remove_context()321        self._retries += 1322    def _validate_single_default(323        self, has_default: bool, is_default: Optional[bool]324    ) -> bool:325        if is_default:326            if has_default:327                raise LisaException("only one node can set isDefault to True")328            has_default = True329        return has_default330if TYPE_CHECKING:331    EnvironmentsDict = UserDict[str, Environment]332else:333    EnvironmentsDict = UserDict334class Environments(EnvironmentsDict):335    def __init__(336        self,337        warn_as_error: bool = False,338    ) -> None:339        super().__init__()340        self.warn_as_error = warn_as_error341    def get_or_create(self, requirement: EnvironmentSpace) -> Optional[Environment]:342        result: Optional[Environment] = None343        for environment in self.values():344            # find exact match, or create a new one.345            if requirement == environment.capability:346                result = environment347                break348        else:349            result = self.from_requirement(requirement)350        return result351    def from_requirement(self, requirement: EnvironmentSpace) -> Optional[Environment]:352        runbook = schema.Environment(353            topology=requirement.topology,354            nodes_requirement=requirement.nodes,355        )356        id_ = _get_environment_id()357        return self.from_runbook(358            runbook=runbook,359            name=f"generated_{id_}",360            is_predefined_runbook=False,361            id_=id_,362        )363    def from_runbook(364        self,365        runbook: schema.Environment,366        name: str,367        is_predefined_runbook: bool,368        id_: int,369    ) -> Optional[Environment]:370        assert runbook371        assert name372        env: Optional[Environment] = None373        # make a copy, so that modification on env won't impact test case374        copied_runbook = copy.copy(runbook)375        copied_runbook.name = name376        env = Environment(377            is_predefined=is_predefined_runbook,378            warn_as_error=self.warn_as_error,379            id_=id_,380            runbook=copied_runbook,381        )382        self[name] = env383        log = _get_init_logger()384        log.debug(f"created {env.name}: {env.runbook}")385        return env386def load_environments(387    root_runbook: Optional[schema.EnvironmentRoot],388) -> Environments:389    if root_runbook:390        environments = Environments(391            warn_as_error=root_runbook.warn_as_error,392        )393        environments_runbook = root_runbook.environments394        for environment_runbook in environments_runbook:395            id_ = _get_environment_id()396            env = environments.from_runbook(397                runbook=environment_runbook,398                name=environment_runbook.name or f"customized_{id_}",399                is_predefined_runbook=True,400                id_=id_,401            )402            assert env, "created from runbook shouldn't be None"403    else:404        environments = Environments()405    return environments406class EnvironmentHookSpec:407    @hookspec408    def get_environment_information(self, environment: Environment) -> Dict[str, str]:409        ......test_registration.py
Source:test_registration.py  
...115            self.stub_cp_provider.basic_auth_cp = mock_uep116            rc = RegisterCommand()117            rc.options = Mock()118            rc.options.activation_keys = None119            env_id = rc._get_environment_id(mock_uep, 'owner', None)120            expected = None121            self.assertEqual(expected, env_id)122    def test_get_environment_id_one_available(self):123        def env_list(*args, **kwargs):124            return [{"id": "1234", "name": "somename"}]125        with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:126            mock_uep.getEnvironmentList = env_list127            mock_uep.supports_resource = Mock(return_value=True)128            self.stub_cp_provider.basic_auth_cp = mock_uep129            rc = RegisterCommand()130            rc.options = Mock()131            rc.options.activation_keys = None132            env_id = rc._get_environment_id(mock_uep, 'owner', None)133            expected = "1234"134            self.assertEqual(expected, env_id)135    def test_get_environment_id_multi_available(self):136        def env_list(*args, **kwargs):137            return [{"id": "1234", "name": "somename"},138                    {"id": "5678", "name": "othername"}]139        with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:140            mock_uep.getEnvironmentList = env_list141            mock_uep.supports_resource = Mock(return_value=True)142            self.stub_cp_provider.basic_auth_cp = mock_uep143            rc = RegisterCommand()144            rc.options = Mock()145            rc.options.activation_keys = None146            rc._prompt_for_environment = Mock(return_value="othername")147            env_id = rc._get_environment_id(mock_uep, 'owner', None)148            expected = "5678"149            self.assertEqual(expected, env_id)150    def test_get_environment_id_multi_available_bad_name(self):151        def env_list(*args, **kwargs):152            return [{"id": "1234", "name": "somename"},153                    {"id": "5678", "name": "othername"}]154        with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:155            mock_uep.getEnvironmentList = env_list156            mock_uep.supports_resource = Mock(return_value=True)157            self.stub_cp_provider.basic_auth_cp = mock_uep158            rc = RegisterCommand()159            rc.options = Mock()160            rc.options.activation_keys = None161            rc._prompt_for_environment = Mock(return_value="not_an_env")162            with Capture(silent=True):163                with self.assertRaises(SystemExit):164                    rc._get_environment_id(mock_uep, 'owner', None)165    def test_registration_with_failed_profile_upload(self):166        with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:167            profile_mgr = inj.require(inj.PROFILE_MANAGER)168            def raise_remote_server_exception(*args, **kwargs):169                """Raise remote server exception (uploading of profile failed)"""170                from rhsm.connection import RemoteServerException171                raise RemoteServerException(172                    502,173                    request_type="PUT",174                    handler="/subscription/consumers/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/profiles"175                )176            profile_mgr.update_check = Mock(side_effect=raise_remote_server_exception)177            self.stub_cp_provider.basic_auth_cp = mock_uep178            cmd = RegisterCommand()...base.py
Source:base.py  
...53        self._api = self._th.teachable(self.teachable)54        self._deployment_id = None55        self._paths_to_be_deleted = []56        self._archived_model_path = None57        self._environment_id = self._get_environment_id(environment)58        self._deployment_data = {59            "framework": self.framework,60            "environment": self._environment_id61        }62        version = kwargs.get('version', None)63        if version:64            self._version = int(version)65            self._retrieve_existing_deployment()66        # this is used where we can get the schema from the model67        # like in the ludwig for example68        self._auto_schema = True69    def _get_environment_id(self, environment_name):70        all_envs = self._api.get_all_environments()71        for env in all_envs:72            if env['name'] == environment_name:73                return env['id']74    def _retrieve_existing_deployment(self):75        r = self._api.get_deployment_by_env_id_and_version(self._environment_id, self._version)76        data = r.json()77        if len(data) == 1:78            self._deployment_data = data[0]79            self._deployment_id = data[0]['uuid']80            self._version = data[0]['version']81            return self._deployment_data82        else:83            raise AssertionError("The deployment with this version and environment should be only one.")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
