Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...74        if version.startswith("OpenSearch_"):75            return version76        else:77            return f"Elasticsearch_{version}"78def _version_from_opensearch(79    version: Optional[VersionString],80) -> Optional[ElasticsearchVersionString]:81    if version is not None:82        if version.startswith("Elasticsearch_"):83            return version.split("_")[1]84        else:85            return version86def _instancetype_to_opensearch(instance_type: Optional[str]) -> Optional[str]:87    if instance_type is not None:88        return instance_type.replace("elasticsearch", "search")89def _instancetype_from_opensearch(instance_type: Optional[str]) -> Optional[str]:90    if instance_type is not None:91        return instance_type.replace("search", "elasticsearch")92def _clusterconfig_from_opensearch(93    cluster_config: Optional[ClusterConfig],94) -> Optional[ElasticsearchClusterConfig]:95    if cluster_config is not None:96        # Just take the whole typed dict and typecast it to our target type97        result = cast(ElasticsearchClusterConfig, cluster_config)98        # Adjust the instance type names99        result["InstanceType"] = _instancetype_from_opensearch(cluster_config.get("InstanceType"))100        result["DedicatedMasterType"] = _instancetype_from_opensearch(101            cluster_config.get("DedicatedMasterType")102        )103        result["WarmType"] = _instancetype_from_opensearch(cluster_config.get("WarmType"))104        return result105def _domainstatus_from_opensearch(106    domain_status: Optional[DomainStatus],107) -> Optional[ElasticsearchDomainStatus]:108    if domain_status is not None:109        # Just take the whole typed dict and typecast it to our target type110        result = cast(ElasticsearchDomainStatus, domain_status)111        # Only specifically handle keys which are named differently or their values differ (version and clusterconfig)112        result["ElasticsearchVersion"] = _version_from_opensearch(113            domain_status.get("EngineVersion")114        )115        result["ElasticsearchClusterConfig"] = _clusterconfig_from_opensearch(116            domain_status.get("ClusterConfig")117        )118        result.pop("EngineVersion", None)119        result.pop("ClusterConfig", None)120        return result121def _clusterconfig_to_opensearch(122    elasticsearch_cluster_config: Optional[ElasticsearchClusterConfig],123) -> Optional[ClusterConfig]:124    if elasticsearch_cluster_config is not None:125        result = cast(ClusterConfig, elasticsearch_cluster_config)126        result["InstanceType"] = _instancetype_to_opensearch(result.get("InstanceType"))127        result["DedicatedMasterType"] = _instancetype_to_opensearch(128            result.get("DedicatedMasterType")129        )130        result["WarmType"] = _instancetype_to_opensearch(result.get("WarmType"))131        return result132def _domainconfig_from_opensearch(133    domain_config: Optional[DomainConfig],134) -> Optional[ElasticsearchDomainConfig]:135    if domain_config is not None:136        result = cast(ElasticsearchDomainConfig, domain_config)137        engine_version = domain_config.get("EngineVersion", {})138        result["ElasticsearchVersion"] = ElasticsearchVersionStatus(139            Options=_version_from_opensearch(engine_version.get("Options")),140            Status=cast(OptionStatus, engine_version.get("Status")),141        )142        cluster_config = domain_config.get("ClusterConfig", {})143        result["ElasticsearchClusterConfig"] = ElasticsearchClusterConfigStatus(144            Options=_clusterconfig_from_opensearch(cluster_config.get("Options")),145            Status=cluster_config.get("Status"),146        )147        result.pop("EngineVersion", None)148        result.pop("ClusterConfig", None)149        return result150def _compatible_version_list_from_opensearch(151    compatible_version_list: Optional[CompatibleVersionsList],152) -> Optional[CompatibleElasticsearchVersionsList]:153    if compatible_version_list is not None:154        return [155            CompatibleVersionsMap(156                SourceVersion=_version_from_opensearch(version_map["SourceVersion"]),157                TargetVersions=[158                    _version_from_opensearch(target_version)159                    for target_version in version_map["TargetVersions"]160                ],161            )162            for version_map in compatible_version_list163        ]164@contextmanager165def exception_mapper():166    """Maps an exception thrown by the OpenSearch client to an exception thrown by the ElasticSearch API."""167    try:168        yield169    except ClientError as err:170        exception_types = {171            "AccessDeniedException": AccessDeniedException,172            "BaseException": EsBaseException,173            "ConflictException": ConflictException,174            "DisabledOperationException": DisabledOperationException,175            "InternalException": InternalException,176            "InvalidPaginationTokenException": InvalidPaginationTokenException,177            "InvalidTypeException": InvalidTypeException,178            "LimitExceededException": LimitExceededException,179            "ResourceAlreadyExistsException": ResourceAlreadyExistsException,180            "ResourceNotFoundException": ResourceNotFoundException,181            "ValidationException": ValidationException,182        }183        mapped_exception_type = exception_types.get(err.response["Error"]["Code"], EsBaseException)184        raise mapped_exception_type(err.response["Error"]["Message"])185class EsProvider(EsApi):186    def create_elasticsearch_domain(187        self,188        context: RequestContext,189        domain_name: DomainName,190        elasticsearch_version: ElasticsearchVersionString = None,191        elasticsearch_cluster_config: ElasticsearchClusterConfig = None,192        ebs_options: EBSOptions = None,193        access_policies: PolicyDocument = None,194        snapshot_options: SnapshotOptions = None,195        vpc_options: VPCOptions = None,196        cognito_options: CognitoOptions = None,197        encryption_at_rest_options: EncryptionAtRestOptions = None,198        node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,199        advanced_options: AdvancedOptions = None,200        log_publishing_options: LogPublishingOptions = None,201        domain_endpoint_options: DomainEndpointOptions = None,202        advanced_security_options: AdvancedSecurityOptionsInput = None,203        auto_tune_options: AutoTuneOptionsInput = None,204        tag_list: TagList = None,205    ) -> CreateElasticsearchDomainResponse:206        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)207        # If no version is given, we set our default elasticsearch version208        engine_version = (209            _version_to_opensearch(elasticsearch_version)210            if elasticsearch_version211            else constants.ELASTICSEARCH_DEFAULT_VERSION212        )213        kwargs = {214            "DomainName": domain_name,215            "EngineVersion": engine_version,216            "ClusterConfig": _clusterconfig_to_opensearch(elasticsearch_cluster_config),217            "EBSOptions": ebs_options,218            "AccessPolicies": access_policies,219            "SnapshotOptions": snapshot_options,220            "VPCOptions": vpc_options,221            "CognitoOptions": cognito_options,222            "EncryptionAtRestOptions": encryption_at_rest_options,223            "NodeToNodeEncryptionOptions": node_to_node_encryption_options,224            "AdvancedOptions": advanced_options,225            "LogPublishingOptions": log_publishing_options,226            "DomainEndpointOptions": domain_endpoint_options,227            "AdvancedSecurityOptions": advanced_security_options,228            "AutoTuneOptions": auto_tune_options,229            "TagList": tag_list,230        }231        # Filter the kwargs to not set None values at all (boto doesn't like that)232        kwargs = {key: value for key, value in kwargs.items() if value is not None}233        with exception_mapper():234            domain_status = opensearch_client.create_domain(**kwargs)["DomainStatus"]235        # record event236        event_publisher.fire_event(237            event_publisher.EVENT_ES_CREATE_DOMAIN,238            payload={"n": event_publisher.get_hash(domain_name)},239        )240        status = _domainstatus_from_opensearch(domain_status)241        return CreateElasticsearchDomainResponse(DomainStatus=status)242    def delete_elasticsearch_domain(243        self, context: RequestContext, domain_name: DomainName244    ) -> DeleteElasticsearchDomainResponse:245        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)246        with exception_mapper():247            domain_status = opensearch_client.delete_domain(248                DomainName=domain_name,249            )["DomainStatus"]250        # record event251        event_publisher.fire_event(252            event_publisher.EVENT_ES_DELETE_DOMAIN,253            payload={"n": event_publisher.get_hash(domain_name)},254        )255        status = _domainstatus_from_opensearch(domain_status)256        return DeleteElasticsearchDomainResponse(DomainStatus=status)257    def describe_elasticsearch_domain(258        self, context: RequestContext, domain_name: DomainName259    ) -> DescribeElasticsearchDomainResponse:260        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)261        with exception_mapper():262            opensearch_status = opensearch_client.describe_domain(263                DomainName=domain_name,264            )["DomainStatus"]265        status = _domainstatus_from_opensearch(opensearch_status)266        return DescribeElasticsearchDomainResponse(DomainStatus=status)267    def describe_elasticsearch_domains(268        self, context: RequestContext, domain_names: DomainNameList269    ) -> DescribeElasticsearchDomainsResponse:270        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)271        with exception_mapper():272            opensearch_status_list = opensearch_client.describe_domains(273                DomainNames=domain_names,274            )["DomainStatusList"]275        status_list = [_domainstatus_from_opensearch(s) for s in opensearch_status_list]276        return DescribeElasticsearchDomainsResponse(DomainStatusList=status_list)277    def list_domain_names(278        self, context: RequestContext, engine_type: EngineType = None279    ) -> ListDomainNamesResponse:280        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)281        # Only hand the EngineType param to boto if it's set282        kwargs = {}283        if engine_type:284            kwargs["EngineType"] = engine_type285        with exception_mapper():286            domain_names = opensearch_client.list_domain_names(**kwargs)["DomainNames"]287        return ListDomainNamesResponse(DomainNames=cast(Optional[DomainInfoList], domain_names))288    def list_elasticsearch_versions(289        self,290        context: RequestContext,291        max_results: MaxResults = None,292        next_token: NextToken = None,293    ) -> ListElasticsearchVersionsResponse:294        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)295        # Construct the arguments as kwargs to not set None values at all (boto doesn't like that)296        kwargs = {297            key: value298            for key, value in {"MaxResults": max_results, "NextToken": next_token}.items()299            if value is not None300        }301        with exception_mapper():302            versions = opensearch_client.list_versions(**kwargs)303        return ListElasticsearchVersionsResponse(304            ElasticsearchVersions=[305                _version_from_opensearch(version) for version in versions["Versions"]306            ],307            NextToken=versions.get(next_token),308        )309    def get_compatible_elasticsearch_versions(310        self, context: RequestContext, domain_name: DomainName = None311    ) -> GetCompatibleElasticsearchVersionsResponse:312        opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)313        # Only hand the DomainName param to boto if it's set314        kwargs = {}315        if domain_name:316            kwargs["DomainName"] = domain_name317        with exception_mapper():318            compatible_versions_response = opensearch_client.get_compatible_versions(**kwargs)319        compatible_versions = compatible_versions_response.get("CompatibleVersions")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
