Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...161    def __init__(self):162        self.opensearch_domains = {}163def get_domain_config(domain_key) -> DomainConfig:164    status = get_domain_status(domain_key)165    return _status_to_config(status)166def _status_to_config(status: DomainStatus) -> DomainConfig:167    cluster_cfg = status.get("ClusterConfig") or {}168    default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG169    config_status = get_domain_config_status()170    return DomainConfig(171        AccessPolicies=AccessPoliciesStatus(172            Options=PolicyDocument(""),173            Status=config_status,174        ),175        AdvancedOptions=AdvancedOptionsStatus(176            Options={177                "override_main_response_version": "false",178                "rest.action.multi.allow_explicit_index": "true",179            },180            Status=config_status,181        ),182        EBSOptions=EBSOptionsStatus(183            Options=EBSOptions(184                EBSEnabled=True,185                VolumeSize=100,186                VolumeType=VolumeType.gp2,187            ),188            Status=config_status,189        ),190        ClusterConfig=ClusterConfigStatus(191            Options=ClusterConfig(192                DedicatedMasterCount=cluster_cfg.get(193                    "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]194                ),195                DedicatedMasterEnabled=cluster_cfg.get(196                    "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]197                ),198                DedicatedMasterType=cluster_cfg.get(199                    "DedicatedMasterType", default_cfg["DedicatedMasterType"]200                ),201                InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),202                InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),203                ZoneAwarenessEnabled=cluster_cfg.get(204                    "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]205                ),206            ),207            Status=config_status,208        ),209        CognitoOptions=CognitoOptionsStatus(210            Options=CognitoOptions(Enabled=False), Status=config_status211        ),212        EngineVersion=VersionStatus(Options=status.get("EngineVersion"), Status=config_status),213        EncryptionAtRestOptions=EncryptionAtRestOptionsStatus(214            Options=EncryptionAtRestOptions(Enabled=False),215            Status=config_status,216        ),217        LogPublishingOptions=LogPublishingOptionsStatus(218            Options={},219            Status=config_status,220        ),221        SnapshotOptions=SnapshotOptionsStatus(222            Options=SnapshotOptions(AutomatedSnapshotStartHour=randint(0, 23)),223            Status=config_status,224        ),225        VPCOptions=VPCDerivedInfoStatus(226            Options={},227            Status=config_status,228        ),229        DomainEndpointOptions=DomainEndpointOptionsStatus(230            Options=status.get("DomainEndpointOptions", {}),231            Status=config_status,232        ),233        NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptionsStatus(234            Options=NodeToNodeEncryptionOptions(Enabled=False),235            Status=config_status,236        ),237        AdvancedSecurityOptions=AdvancedSecurityOptionsStatus(238            Options=status.get("AdvancedSecurityOptions", {}), Status=config_status239        ),240        AutoTuneOptions=AutoTuneOptionsStatus(241            Options=AutoTuneOptions(242                DesiredState=AutoTuneDesiredState.ENABLED,243                RollbackOnDisable=RollbackOnDisable.NO_ROLLBACK,244                MaintenanceSchedules=[],245            ),246            Status=AutoTuneStatus(247                CreationDate=config_status.get("CreationDate"),248                UpdateDate=config_status.get("UpdateDate"),249                UpdateVersion=config_status.get("UpdateVersion"),250                State=AutoTuneState.ENABLED,251                PendingDeletion=config_status.get("PendingDeletion"),252            ),253        ),254    )255def get_domain_config_status() -> OptionStatus:256    return OptionStatus(257        CreationDate=datetime.now(),258        PendingDeletion=False,259        State=OptionState.Active,260        UpdateDate=datetime.now(),261        UpdateVersion=randint(1, 100),262    )263def get_domain_status(domain_key: DomainKey, deleted=False) -> DomainStatus:264    region = OpenSearchServiceBackend.get(domain_key.region)265    stored_status: DomainStatus = (266        region.opensearch_domains.get(domain_key.domain_name) or DomainStatus()267    )268    cluster_cfg = stored_status.get("ClusterConfig") or {}269    default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG270    new_status = DomainStatus(271        ARN=domain_key.arn,272        Created=True,273        Deleted=deleted,274        Processing=stored_status.get("Processing", True),275        DomainId=f"{domain_key.account}/{domain_key.domain_name}",276        DomainName=domain_key.domain_name,277        ClusterConfig=ClusterConfig(278            DedicatedMasterCount=cluster_cfg.get(279                "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]280            ),281            DedicatedMasterEnabled=cluster_cfg.get(282                "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]283            ),284            DedicatedMasterType=cluster_cfg.get(285                "DedicatedMasterType", default_cfg["DedicatedMasterType"]286            ),287            InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),288            InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),289            ZoneAwarenessEnabled=cluster_cfg.get(290                "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]291            ),292            WarmEnabled=False,293            ColdStorageOptions=ColdStorageOptions(Enabled=False),294        ),295        EngineVersion=stored_status.get("EngineVersion") or OPENSEARCH_DEFAULT_VERSION,296        Endpoint=stored_status.get("Endpoint", None),297        EBSOptions=EBSOptions(EBSEnabled=True, VolumeType=VolumeType.gp2, VolumeSize=10, Iops=0),298        CognitoOptions=CognitoOptions(Enabled=False),299        UpgradeProcessing=False,300        AccessPolicies="",301        SnapshotOptions=SnapshotOptions(AutomatedSnapshotStartHour=0),302        EncryptionAtRestOptions=EncryptionAtRestOptions(Enabled=False),303        NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptions(Enabled=False),304        AdvancedOptions={305            "override_main_response_version": "false",306            "rest.action.multi.allow_explicit_index": "true",307        },308        ServiceSoftwareOptions=ServiceSoftwareOptions(309            CurrentVersion="",310            NewVersion="",311            UpdateAvailable=False,312            Cancellable=False,313            UpdateStatus=DeploymentStatus.COMPLETED,314            Description="There is no software update available for this domain.",315            AutomatedUpdateDate=datetime.fromtimestamp(0, tz=timezone.utc),316            OptionalDeployment=True,317        ),318        DomainEndpointOptions=DomainEndpointOptions(319            EnforceHTTPS=False,320            TLSSecurityPolicy=TLSSecurityPolicy.Policy_Min_TLS_1_0_2019_07,321            CustomEndpointEnabled=False,322        ),323        AdvancedSecurityOptions=AdvancedSecurityOptions(324            Enabled=False, InternalUserDatabaseEnabled=False325        ),326        AutoTuneOptions=AutoTuneOptionsOutput(State=AutoTuneState.ENABLE_IN_PROGRESS),327    )328    if stored_status.get("Endpoint"):329        new_status["Endpoint"] = new_status.get("Endpoint")330    return new_status331def _ensure_domain_exists(arn: ARN) -> None:332    """333    Checks if the domain for the given ARN exists. Otherwise, a ValidationException is raised.334    :param arn: ARN string to lookup the domain for335    :return: None if the domain exists, otherwise raises an exception336    :raises: ValidationException if the domain for the given ARN cannot be found337    """338    domain_key = DomainKey.from_arn(arn)339    region = OpenSearchServiceBackend.get(domain_key.region)340    domain_status = region.opensearch_domains.get(domain_key.domain_name)341    if domain_status is None:342        raise ValidationException("Invalid ARN. Domain not found.")343def _update_domain_config_request_to_status(request: UpdateDomainConfigRequest) -> DomainStatus:344    request: Dict345    request.pop("DryRun", None)346    request.pop("DomainName", None)347    return request348class OpensearchProvider(OpensearchApi):349    def create_domain(350        self,351        context: RequestContext,352        domain_name: DomainName,353        engine_version: VersionString = None,354        cluster_config: ClusterConfig = None,355        ebs_options: EBSOptions = None,356        access_policies: PolicyDocument = None,357        snapshot_options: SnapshotOptions = None,358        vpc_options: VPCOptions = None,359        cognito_options: CognitoOptions = None,360        encryption_at_rest_options: EncryptionAtRestOptions = None,361        node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,362        advanced_options: AdvancedOptions = None,363        log_publishing_options: LogPublishingOptions = None,364        domain_endpoint_options: DomainEndpointOptions = None,365        advanced_security_options: AdvancedSecurityOptionsInput = None,366        tag_list: TagList = None,367        auto_tune_options: AutoTuneOptionsInput = None,368    ) -> CreateDomainResponse:369        region = OpenSearchServiceBackend.get()370        with _domain_mutex:371            if domain_name in region.opensearch_domains:372                raise ResourceAlreadyExistsException(373                    f"domain {domain_name} already exists in region {region.name}"374                )375            domain_key = DomainKey(376                domain_name=domain_name,377                region=context.region,378                account=context.account_id,379            )380            # "create" domain data381            region.opensearch_domains[domain_name] = get_domain_status(domain_key)382            # lazy-init the cluster (sets the Endpoint and Processing flag of the domain status)383            # TODO handle additional parameters (cluster config,...)384            create_cluster(domain_key, engine_version, domain_endpoint_options)385            # set the tags386            self.add_tags(context, domain_key.arn, tag_list)387            # get the (updated) status388            status = get_domain_status(domain_key)389        # record event390        event_publisher.fire_event(391            event_publisher.EVENT_OPENSEARCH_CREATE_DOMAIN,392            payload={"n": event_publisher.get_hash(domain_name)},393        )394        return CreateDomainResponse(DomainStatus=status)395    def delete_domain(396        self, context: RequestContext, domain_name: DomainName397    ) -> DeleteDomainResponse:398        domain_key = DomainKey(399            domain_name=domain_name,400            region=context.region,401            account=context.account_id,402        )403        region = OpenSearchServiceBackend.get(domain_key.region)404        with _domain_mutex:405            if domain_name not in region.opensearch_domains:406                raise ResourceNotFoundException(f"Domain not found: {domain_name}")407            status = get_domain_status(domain_key, deleted=True)408            _remove_cluster(domain_key)409        # record event410        event_publisher.fire_event(411            event_publisher.EVENT_OPENSEARCH_DELETE_DOMAIN,412            payload={"n": event_publisher.get_hash(domain_name)},413        )414        return DeleteDomainResponse(DomainStatus=status)415    def describe_domain(416        self, context: RequestContext, domain_name: DomainName417    ) -> DescribeDomainResponse:418        domain_key = DomainKey(419            domain_name=domain_name,420            region=context.region,421            account=context.account_id,422        )423        region = OpenSearchServiceBackend.get(domain_key.region)424        with _domain_mutex:425            if domain_name not in region.opensearch_domains:426                raise ResourceNotFoundException(f"Domain not found: {domain_name}")427            status = get_domain_status(domain_key)428        return DescribeDomainResponse(DomainStatus=status)429    @handler("UpdateDomainConfig", expand=False)430    def update_domain_config(431        self, context: RequestContext, payload: UpdateDomainConfigRequest432    ) -> UpdateDomainConfigResponse:433        domain_key = DomainKey(434            domain_name=payload["DomainName"],435            region=context.region,436            account=context.account_id,437        )438        region = OpenSearchServiceBackend.get(domain_key.region)439        with _domain_mutex:440            domain_status = region.opensearch_domains.get(domain_key.domain_name, None)441            if domain_status is None:442                raise ResourceNotFoundException(f"Domain not found: {domain_key.domain_name}")443            status_update: Dict = _update_domain_config_request_to_status(payload)444            domain_status.update(status_update)445        return UpdateDomainConfigResponse(DomainConfig=_status_to_config(domain_status))446    def describe_domains(447        self, context: RequestContext, domain_names: DomainNameList448    ) -> DescribeDomainsResponse:449        status_list = []450        with _domain_mutex:451            for domain_name in domain_names:452                try:453                    domain_status = self.describe_domain(context, domain_name)["DomainStatus"]454                    status_list.append(domain_status)455                except ResourceNotFoundException:456                    # ResourceNotFoundExceptions are ignored, we just look for the next domain.457                    # If no domain can be found, the result will just be empty.458                    pass459        return DescribeDomainsResponse(DomainStatusList=status_list)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
