Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...339    region = OpenSearchServiceBackend.get(domain_key.region)340    domain_status = region.opensearch_domains.get(domain_key.domain_name)341    if domain_status is None:342        raise ValidationException("Invalid ARN. Domain not found.")343def _update_domain_config_request_to_status(request: UpdateDomainConfigRequest) -> DomainStatus:344    request: Dict345    request.pop("DryRun", None)346    request.pop("DomainName", None)347    return request348class OpensearchProvider(OpensearchApi):349    def create_domain(350        self,351        context: RequestContext,352        domain_name: DomainName,353        engine_version: VersionString = None,354        cluster_config: ClusterConfig = None,355        ebs_options: EBSOptions = None,356        access_policies: PolicyDocument = None,357        snapshot_options: SnapshotOptions = None,358        vpc_options: VPCOptions = None,359        cognito_options: CognitoOptions = None,360        encryption_at_rest_options: EncryptionAtRestOptions = None,361        node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,362        advanced_options: AdvancedOptions = None,363        log_publishing_options: LogPublishingOptions = None,364        domain_endpoint_options: DomainEndpointOptions = None,365        advanced_security_options: AdvancedSecurityOptionsInput = None,366        tag_list: TagList = None,367        auto_tune_options: AutoTuneOptionsInput = None,368    ) -> CreateDomainResponse:369        region = OpenSearchServiceBackend.get()370        with _domain_mutex:371            if domain_name in region.opensearch_domains:372                raise ResourceAlreadyExistsException(373                    f"domain {domain_name} already exists in region {region.name}"374                )375            domain_key = DomainKey(376                domain_name=domain_name,377                region=context.region,378                account=context.account_id,379            )380            # "create" domain data381            region.opensearch_domains[domain_name] = get_domain_status(domain_key)382            # lazy-init the cluster (sets the Endpoint and Processing flag of the domain status)383            # TODO handle additional parameters (cluster config,...)384            create_cluster(domain_key, engine_version, domain_endpoint_options)385            # set the tags386            self.add_tags(context, domain_key.arn, tag_list)387            # get the (updated) status388            status = get_domain_status(domain_key)389        # record event390        event_publisher.fire_event(391            event_publisher.EVENT_OPENSEARCH_CREATE_DOMAIN,392            payload={"n": event_publisher.get_hash(domain_name)},393        )394        return CreateDomainResponse(DomainStatus=status)395    def delete_domain(396        self, context: RequestContext, domain_name: DomainName397    ) -> DeleteDomainResponse:398        domain_key = DomainKey(399            domain_name=domain_name,400            region=context.region,401            account=context.account_id,402        )403        region = OpenSearchServiceBackend.get(domain_key.region)404        with _domain_mutex:405            if domain_name not in region.opensearch_domains:406                raise ResourceNotFoundException(f"Domain not found: {domain_name}")407            status = get_domain_status(domain_key, deleted=True)408            _remove_cluster(domain_key)409        # record event410        event_publisher.fire_event(411            event_publisher.EVENT_OPENSEARCH_DELETE_DOMAIN,412            payload={"n": event_publisher.get_hash(domain_name)},413        )414        return DeleteDomainResponse(DomainStatus=status)415    def describe_domain(416        self, context: RequestContext, domain_name: DomainName417    ) -> DescribeDomainResponse:418        domain_key = DomainKey(419            domain_name=domain_name,420            region=context.region,421            account=context.account_id,422        )423        region = OpenSearchServiceBackend.get(domain_key.region)424        with _domain_mutex:425            if domain_name not in region.opensearch_domains:426                raise ResourceNotFoundException(f"Domain not found: {domain_name}")427            status = get_domain_status(domain_key)428        return DescribeDomainResponse(DomainStatus=status)429    @handler("UpdateDomainConfig", expand=False)430    def update_domain_config(431        self, context: RequestContext, payload: UpdateDomainConfigRequest432    ) -> UpdateDomainConfigResponse:433        domain_key = DomainKey(434            domain_name=payload["DomainName"],435            region=context.region,436            account=context.account_id,437        )438        region = OpenSearchServiceBackend.get(domain_key.region)439        with _domain_mutex:440            domain_status = region.opensearch_domains.get(domain_key.domain_name, None)441            if domain_status is None:442                raise ResourceNotFoundException(f"Domain not found: {domain_key.domain_name}")443            status_update: Dict = _update_domain_config_request_to_status(payload)444            domain_status.update(status_update)445        return UpdateDomainConfigResponse(DomainConfig=_status_to_config(domain_status))446    def describe_domains(447        self, context: RequestContext, domain_names: DomainNameList448    ) -> DescribeDomainsResponse:449        status_list = []450        with _domain_mutex:451            for domain_name in domain_names:452                try:453                    domain_status = self.describe_domain(context, domain_name)["DomainStatus"]454                    status_list.append(domain_status)455                except ResourceNotFoundException:456                    # ResourceNotFoundExceptions are ignored, we just look for the next domain.457                    # If no domain can be found, the result will just be empty....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
