Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...130            result.get("DedicatedMasterType")131        )132        result["WarmType"] = _instancetype_to_opensearch(result.get("WarmType"))133        return result134def _domainconfig_from_opensearch(135    domain_config: Optional[DomainConfig],136) -> Optional[ElasticsearchDomainConfig]:137    if domain_config is not None:138        result = cast(ElasticsearchDomainConfig, domain_config)139        engine_version = domain_config.get("EngineVersion", {})140        result["ElasticsearchVersion"] = ElasticsearchVersionStatus(141            Options=_version_from_opensearch(engine_version.get("Options")),142            Status=cast(OptionStatus, engine_version.get("Status")),143        )144        cluster_config = domain_config.get("ClusterConfig", {})145        result["ElasticsearchClusterConfig"] = ElasticsearchClusterConfigStatus(146            Options=_clusterconfig_from_opensearch(cluster_config.get("Options")),147            Status=cluster_config.get("Status"),148        )149        result.pop("EngineVersion", None)150        result.pop("ClusterConfig", None)151        return result152def _compatible_version_list_from_opensearch(153    compatible_version_list: Optional[CompatibleVersionsList],154) -> Optional[CompatibleElasticsearchVersionsList]:155    if compatible_version_list is not None:156        return [157            CompatibleVersionsMap(158                SourceVersion=_version_from_opensearch(version_map["SourceVersion"]),159                TargetVersions=[160                    _version_from_opensearch(target_version)161                    for target_version in version_map["TargetVersions"]162                ],163            )164            for version_map in compatible_version_list165        ]166@contextmanager167def exception_mapper():168    """Maps an exception thrown by the OpenSearch client to an exception thrown by the ElasticSearch API."""169    try:170        yield171    except ClientError as err:172        exception_types = {173            "AccessDeniedException": AccessDeniedException,174            "BaseException": EsBaseException,175            "ConflictException": ConflictException,176            "DisabledOperationException": DisabledOperationException,177            "InternalException": InternalException,178            "InvalidPaginationTokenException": InvalidPaginationTokenException,179            "InvalidTypeException": InvalidTypeException,180            "LimitExceededException": LimitExceededException,181            "ResourceAlreadyExistsException": ResourceAlreadyExistsException,182            "ResourceNotFoundException": ResourceNotFoundException,183            "ValidationException": ValidationException,184        }185        mapped_exception_type = exception_types.get(err.response["Error"]["Code"], EsBaseException)186        raise mapped_exception_type(err.response["Error"]["Message"])187class EsProvider(EsApi):188    def create_elasticsearch_domain(189        self,190        context: RequestContext,191        domain_name: DomainName,192        elasticsearch_version: ElasticsearchVersionString = None,193        elasticsearch_cluster_config: ElasticsearchClusterConfig = None,194        ebs_options: EBSOptions = None,195        access_policies: PolicyDocument = None,196        snapshot_options: SnapshotOptions = None,197        vpc_options: VPCOptions = None,198        cognito_options: CognitoOptions = None,199        encryption_at_rest_options: EncryptionAtRestOptions = None,200        node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,201        advanced_options: AdvancedOptions = None,202        log_publishing_options: LogPublishingOptions = None,203        domain_endpoint_options: DomainEndpointOptions = None,204        advanced_security_options: AdvancedSecurityOptionsInput = None,205        auto_tune_options: AutoTuneOptionsInput = None,206        tag_list: TagList = None,207    ) -> CreateElasticsearchDomainResponse:208        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)209        # If no version is given, we set our default elasticsearch version210        engine_version = (211            _version_to_opensearch(elasticsearch_version)212            if elasticsearch_version213            else constants.ELASTICSEARCH_DEFAULT_VERSION214        )215        kwargs = {216            "DomainName": domain_name,217            "EngineVersion": engine_version,218            "ClusterConfig": _clusterconfig_to_opensearch(elasticsearch_cluster_config),219            "EBSOptions": ebs_options,220            "AccessPolicies": access_policies,221            "SnapshotOptions": snapshot_options,222            "VPCOptions": vpc_options,223            "CognitoOptions": cognito_options,224            "EncryptionAtRestOptions": encryption_at_rest_options,225            "NodeToNodeEncryptionOptions": node_to_node_encryption_options,226            "AdvancedOptions": advanced_options,227            "LogPublishingOptions": log_publishing_options,228            "DomainEndpointOptions": domain_endpoint_options,229            "AdvancedSecurityOptions": advanced_security_options,230            "AutoTuneOptions": auto_tune_options,231            "TagList": tag_list,232        }233        # Filter the kwargs to not set None values at all (boto doesn't like that)234        kwargs = {key: value for key, value in kwargs.items() if value is not None}235        with exception_mapper():236            domain_status = opensearch_client.create_domain(**kwargs)["DomainStatus"]237        # record event238        event_publisher.fire_event(239            event_publisher.EVENT_ES_CREATE_DOMAIN,240            payload={"n": event_publisher.get_hash(domain_name)},241        )242        status = _domainstatus_from_opensearch(domain_status)243        return CreateElasticsearchDomainResponse(DomainStatus=status)244    def delete_elasticsearch_domain(245        self, context: RequestContext, domain_name: DomainName246    ) -> DeleteElasticsearchDomainResponse:247        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)248        with exception_mapper():249            domain_status = opensearch_client.delete_domain(250                DomainName=domain_name,251            )["DomainStatus"]252        # record event253        event_publisher.fire_event(254            event_publisher.EVENT_ES_DELETE_DOMAIN,255            payload={"n": event_publisher.get_hash(domain_name)},256        )257        status = _domainstatus_from_opensearch(domain_status)258        return DeleteElasticsearchDomainResponse(DomainStatus=status)259    def describe_elasticsearch_domain(260        self, context: RequestContext, domain_name: DomainName261    ) -> DescribeElasticsearchDomainResponse:262        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)263        with exception_mapper():264            opensearch_status = opensearch_client.describe_domain(265                DomainName=domain_name,266            )["DomainStatus"]267        status = _domainstatus_from_opensearch(opensearch_status)268        return DescribeElasticsearchDomainResponse(DomainStatus=status)269    @handler("UpdateElasticsearchDomainConfig", expand=False)270    def update_elasticsearch_domain_config(271        self, context: RequestContext, payload: UpdateElasticsearchDomainConfigRequest272    ) -> UpdateElasticsearchDomainConfigResponse:273        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)274        payload: Dict275        if "ElasticsearchClusterConfig" in payload:276            payload["ClusterConfig"] = payload["ElasticsearchClusterConfig"]277            payload["ClusterConfig"]["InstanceType"] = _instancetype_to_opensearch(278                payload["ClusterConfig"]["InstanceType"]279            )280            payload.pop("ElasticsearchClusterConfig")281        with exception_mapper():282            opensearch_config = opensearch_client.update_domain_config(**payload)["DomainConfig"]283        config = _domainconfig_from_opensearch(opensearch_config)284        return UpdateElasticsearchDomainConfigResponse(DomainConfig=config)285    def describe_elasticsearch_domains(286        self, context: RequestContext, domain_names: DomainNameList287    ) -> DescribeElasticsearchDomainsResponse:288        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)289        with exception_mapper():290            opensearch_status_list = opensearch_client.describe_domains(291                DomainNames=domain_names,292            )["DomainStatusList"]293        status_list = [_domainstatus_from_opensearch(s) for s in opensearch_status_list]294        return DescribeElasticsearchDomainsResponse(DomainStatusList=status_list)295    def list_domain_names(296        self, context: RequestContext, engine_type: EngineType = None297    ) -> ListDomainNamesResponse:298        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)299        # Only hand the EngineType param to boto if it's set300        kwargs = {}301        if engine_type:302            kwargs["EngineType"] = engine_type303        with exception_mapper():304            domain_names = opensearch_client.list_domain_names(**kwargs)["DomainNames"]305        return ListDomainNamesResponse(DomainNames=cast(Optional[DomainInfoList], domain_names))306    def list_elasticsearch_versions(307        self,308        context: RequestContext,309        max_results: MaxResults = None,310        next_token: NextToken = None,311    ) -> ListElasticsearchVersionsResponse:312        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)313        # Construct the arguments as kwargs to not set None values at all (boto doesn't like that)314        kwargs = {315            key: value316            for key, value in {"MaxResults": max_results, "NextToken": next_token}.items()317            if value is not None318        }319        with exception_mapper():320            versions = opensearch_client.list_versions(**kwargs)321        return ListElasticsearchVersionsResponse(322            ElasticsearchVersions=[323                _version_from_opensearch(version) for version in versions["Versions"]324            ],325            NextToken=versions.get(next_token),326        )327    def get_compatible_elasticsearch_versions(328        self, context: RequestContext, domain_name: DomainName = None329    ) -> GetCompatibleElasticsearchVersionsResponse:330        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)331        # Only hand the DomainName param to boto if it's set332        kwargs = {}333        if domain_name:334            kwargs["DomainName"] = domain_name335        with exception_mapper():336            compatible_versions_response = opensearch_client.get_compatible_versions(**kwargs)337        compatible_versions = compatible_versions_response.get("CompatibleVersions")338        return GetCompatibleElasticsearchVersionsResponse(339            CompatibleElasticsearchVersions=_compatible_version_list_from_opensearch(340                compatible_versions341            )342        )343    def describe_elasticsearch_domain_config(344        self, context: RequestContext, domain_name: DomainName345    ) -> DescribeElasticsearchDomainConfigResponse:346        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)347        with exception_mapper():348            domain_config = opensearch_client.describe_domain_config(DomainName=domain_name).get(349                "DomainConfig"350            )351        return DescribeElasticsearchDomainConfigResponse(352            DomainConfig=_domainconfig_from_opensearch(domain_config)353        )354    def add_tags(self, context: RequestContext, arn: ARN, tag_list: TagList) -> None:355        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)356        with exception_mapper():357            opensearch_client.add_tags(ARN=arn, TagList=tag_list)358    def list_tags(self, context: RequestContext, arn: ARN) -> ListTagsResponse:359        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)360        with exception_mapper():361            response = opensearch_client.list_tags(ARN=arn)362        return ListTagsResponse(TagList=response.get("TagList"))363    def remove_tags(self, context: RequestContext, arn: ARN, tag_keys: StringList) -> None:364        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)365        with exception_mapper():366            opensearch_client.remove_tags(ARN=arn, TagKeys=tag_keys)Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
