Best Python code snippet using localstack_python
test_opensearch.py
Source:test_opensearch.py  
1import json2import logging3import threading4import botocore.exceptions5import pytest6from localstack import config7from localstack.constants import OPENSEARCH_DEFAULT_VERSION, TEST_AWS_ACCOUNT_ID8from localstack.services.install import install_opensearch9from localstack.services.opensearch.cluster import EdgeProxiedOpensearchCluster10from localstack.services.opensearch.cluster_manager import (11    CustomBackendManager,12    DomainKey,13    MultiClusterManager,14    MultiplexingClusterManager,15    SingletonClusterManager,16    create_cluster_manager,17)18from localstack.utils.common import call_safe, poll_condition, retry19from localstack.utils.common import safe_requests as requests20from localstack.utils.common import short_uid, start_worker_thread21LOG = logging.getLogger(__name__)22# Common headers used when sending requests to OpenSearch23COMMON_HEADERS = {"content-type": "application/json", "Accept-encoding": "identity"}24# Lock and event to ensure that the installation is executed before the tests25INIT_LOCK = threading.Lock()26installed = threading.Event()27def install_async():28    """29    Installs the default opensearch version in a worker thread. Used by conftest.py to make30    sure opensearch is downloaded once the tests arrive here.31    """32    if installed.is_set():33        return34    def run_install(*args):35        with INIT_LOCK:36            if installed.is_set():37                return38            LOG.info("installing opensearch default version")39            install_opensearch()40            LOG.info("done installing opensearch default version")41            LOG.info("installing opensearch 1.0")42            install_opensearch("OpenSearch_1.0")43            LOG.info("done installing opensearch 1.0")44            installed.set()45    start_worker_thread(run_install)46@pytest.fixture(autouse=True)47def opensearch():48    if not installed.is_set():49        install_async()50    assert installed.wait(timeout=5 * 60), "gave up waiting for opensearch to install"51    yield52def try_cluster_health(cluster_url: str):53    response = requests.get(cluster_url)54    assert response.ok, f"cluster endpoint returned an error: {response.text}"55    response = requests.get(f"{cluster_url}/_cluster/health")56    assert response.ok, f"cluster health endpoint returned an error: {response.text}"57    assert response.json()["status"] in [58        "orange",59        "yellow",60        "green",61    ], "expected cluster state to be in a valid state"62class TestOpensearchProvider:63    """64    Because this test reuses the localstack instance for each test, all tests are performed with65    OPENSEARCH_MULTI_CLUSTER=True, regardless of changes in the config value.66    """67    def test_list_versions(self, opensearch_client):68        response = opensearch_client.list_versions()69        assert "Versions" in response70        versions = response["Versions"]71        expected_versions = [72            "OpenSearch_1.1",73            "OpenSearch_1.0",74            "Elasticsearch_7.10",75            "Elasticsearch_7.9",76            "Elasticsearch_7.8",77            "Elasticsearch_7.7",78            "Elasticsearch_7.4",79            "Elasticsearch_7.1",80            "Elasticsearch_6.8",81            "Elasticsearch_6.7",82            "Elasticsearch_6.5",83            "Elasticsearch_6.4",84            "Elasticsearch_6.3",85            "Elasticsearch_6.2",86            "Elasticsearch_6.0",87            "Elasticsearch_5.6",88            "Elasticsearch_5.5",89            "Elasticsearch_5.3",90            "Elasticsearch_5.1",91            "Elasticsearch_5.0",92        ]93        # We iterate over the expected versions to avoid breaking the test if new versions are supported94        for expected_version in expected_versions:95            assert expected_version in versions96    def test_get_compatible_versions(self, opensearch_client):97        response = opensearch_client.get_compatible_versions()98        assert "CompatibleVersions" in response99        compatible_versions = response["CompatibleVersions"]100        assert len(compatible_versions) >= 18101        expected_compatible_versions = [102            {"SourceVersion": "OpenSearch_1.0", "TargetVersions": ["OpenSearch_1.1"]},103            {104                "SourceVersion": "Elasticsearch_7.10",105                "TargetVersions": ["OpenSearch_1.0", "OpenSearch_1.1"],106            },107            {108                "SourceVersion": "Elasticsearch_7.9",109                "TargetVersions": ["Elasticsearch_7.10", "OpenSearch_1.0", "OpenSearch_1.1"],110            },111            {112                "SourceVersion": "Elasticsearch_7.8",113                "TargetVersions": [114                    "Elasticsearch_7.9",115                    "Elasticsearch_7.10",116                    "OpenSearch_1.1",117                    "OpenSearch_1.0",118                ],119            },120            {121                "SourceVersion": "Elasticsearch_7.7",122                "TargetVersions": [123                    "Elasticsearch_7.8",124                    "Elasticsearch_7.9",125                    "Elasticsearch_7.10",126                    "OpenSearch_1.0",127                    "OpenSearch_1.1",128                ],129            },130            {131                "SourceVersion": "Elasticsearch_7.4",132                "TargetVersions": [133                    "Elasticsearch_7.7",134                    "Elasticsearch_7.8",135                    "Elasticsearch_7.9",136                    "Elasticsearch_7.10",137                    "OpenSearch_1.0",138                    "OpenSearch_1.1",139                ],140            },141            {142                "SourceVersion": "Elasticsearch_7.1",143                "TargetVersions": [144                    "Elasticsearch_7.4",145                    "Elasticsearch_7.7",146                    "Elasticsearch_7.8",147                    "Elasticsearch_7.9",148                    "Elasticsearch_7.10",149                    "OpenSearch_1.0",150                    "OpenSearch_1.1",151                ],152            },153            {154                "SourceVersion": "Elasticsearch_6.8",155                "TargetVersions": [156                    "Elasticsearch_7.1",157                    "Elasticsearch_7.4",158                    "Elasticsearch_7.7",159                    "Elasticsearch_7.8",160                    "Elasticsearch_7.9",161                    "Elasticsearch_7.10",162                    "OpenSearch_1.0",163                    "OpenSearch_1.1",164                ],165            },166            {"SourceVersion": "Elasticsearch_6.7", "TargetVersions": ["Elasticsearch_6.8"]},167            {168                "SourceVersion": "Elasticsearch_6.5",169                "TargetVersions": ["Elasticsearch_6.7", "Elasticsearch_6.8"],170            },171            {172                "SourceVersion": "Elasticsearch_6.4",173                "TargetVersions": [174                    "Elasticsearch_6.5",175                    "Elasticsearch_6.7",176                    "Elasticsearch_6.8",177                ],178            },179            {180                "SourceVersion": "Elasticsearch_6.3",181                "TargetVersions": [182                    "Elasticsearch_6.4",183                    "Elasticsearch_6.5",184                    "Elasticsearch_6.7",185                    "Elasticsearch_6.8",186                ],187            },188            {189                "SourceVersion": "Elasticsearch_6.2",190                "TargetVersions": [191                    "Elasticsearch_6.3",192                    "Elasticsearch_6.4",193                    "Elasticsearch_6.5",194                    "Elasticsearch_6.7",195                    "Elasticsearch_6.8",196                ],197            },198            {199                "SourceVersion": "Elasticsearch_6.0",200                "TargetVersions": [201                    "Elasticsearch_6.3",202                    "Elasticsearch_6.4",203                    "Elasticsearch_6.5",204                    "Elasticsearch_6.7",205                    "Elasticsearch_6.8",206                ],207            },208            {209                "SourceVersion": "Elasticsearch_5.6",210                "TargetVersions": [211                    "Elasticsearch_6.3",212                    "Elasticsearch_6.4",213                    "Elasticsearch_6.5",214                    "Elasticsearch_6.7",215                    "Elasticsearch_6.8",216                ],217            },218            {"SourceVersion": "Elasticsearch_5.5", "TargetVersions": ["Elasticsearch_5.6"]},219            {"SourceVersion": "Elasticsearch_5.3", "TargetVersions": ["Elasticsearch_5.6"]},220            {"SourceVersion": "Elasticsearch_5.1", "TargetVersions": ["Elasticsearch_5.6"]},221        ]222        # Iterate over the expected compatible versions to avoid breaking the test if new versions are supported223        for expected_compatible_version in expected_compatible_versions:224            assert expected_compatible_version in compatible_versions225    def test_get_compatible_version_for_domain(self, opensearch_client, opensearch_create_domain):226        opensearch_domain = opensearch_create_domain(EngineVersion="OpenSearch_1.0")227        response = opensearch_client.get_compatible_versions(DomainName=opensearch_domain)228        assert "CompatibleVersions" in response229        compatible_versions = response["CompatibleVersions"]230        assert len(compatible_versions) == 1231        compatibility = compatible_versions[0]232        assert compatibility["SourceVersion"] == "OpenSearch_1.0"233        # Just check if 1.1 is contained (not equality) to avoid breaking the test if new versions are supported234        assert "OpenSearch_1.1" in compatibility["TargetVersions"]235    def test_create_domain(self, opensearch_client, opensearch_wait_for_cluster):236        domain_name = f"opensearch-domain-{short_uid()}"237        try:238            opensearch_client.create_domain(DomainName=domain_name)239            response = opensearch_client.list_domain_names(EngineType="OpenSearch")240            domain_names = [domain["DomainName"] for domain in response["DomainNames"]]241            assert domain_name in domain_names242            # wait for the cluster243            opensearch_wait_for_cluster(domain_name=domain_name)244        finally:245            opensearch_client.delete_domain(DomainName=domain_name)246    def test_create_existing_domain_causes_exception(247        self, opensearch_client, opensearch_wait_for_cluster248    ):249        domain_name = f"opensearch-domain-{short_uid()}"250        try:251            opensearch_client.create_domain(DomainName=domain_name)252            with pytest.raises(botocore.exceptions.ClientError) as exc_info:253                opensearch_client.create_domain(DomainName=domain_name)254            assert exc_info.type.__name__ == "ResourceAlreadyExistsException"255            opensearch_wait_for_cluster(domain_name=domain_name)256        finally:257            opensearch_client.delete_domain(DomainName=domain_name)258    def test_describe_domains(self, opensearch_client, opensearch_domain):259        response = opensearch_client.describe_domains(DomainNames=[opensearch_domain])260        assert len(response["DomainStatusList"]) == 1261        assert response["DomainStatusList"][0]["DomainName"] == opensearch_domain262    def test_domain_version(self, opensearch_client, opensearch_domain, opensearch_create_domain):263        response = opensearch_client.describe_domain(DomainName=opensearch_domain)264        assert "DomainStatus" in response265        status = response["DomainStatus"]266        assert "EngineVersion" in status267        assert status["EngineVersion"] == OPENSEARCH_DEFAULT_VERSION268        domain_name = opensearch_create_domain(EngineVersion="OpenSearch_1.0")269        response = opensearch_client.describe_domain(DomainName=domain_name)270        assert "DomainStatus" in response271        status = response["DomainStatus"]272        assert "EngineVersion" in status273        assert status["EngineVersion"] == "OpenSearch_1.0"274    def test_create_indices(self, opensearch_endpoint):275        indices = ["index1", "index2"]276        for index_name in indices:277            index_path = f"{opensearch_endpoint}/{index_name}"278            requests.put(index_path, headers=COMMON_HEADERS)279            endpoint = f"{opensearch_endpoint}/_cat/indices/{index_name}?format=json&pretty"280            req = requests.get(endpoint)281            assert req.status_code == 200282            req_result = json.loads(req.text)283            assert req_result[0]["health"] in ["green", "yellow"]284            assert req_result[0]["index"] in indices285    def test_get_document(self, opensearch_document_path):286        response = requests.get(opensearch_document_path)287        assert (288            "I'm just a simple man" in response.text289        ), f"document not found({response.status_code}): {response.text}"290    def test_search(self, opensearch_endpoint, opensearch_document_path):291        index = "/".join(opensearch_document_path.split("/")[:-2])292        # force the refresh of the index after the document was added, so it can appear in search293        response = requests.post(f"{opensearch_endpoint}/_refresh", headers=COMMON_HEADERS)294        assert response.ok295        search = {"query": {"match": {"last_name": "Fett"}}}296        response = requests.get(f"{index}/_search", data=json.dumps(search), headers=COMMON_HEADERS)297        assert (298            "I'm just a simple man" in response.text299        ), f"search unsuccessful({response.status_code}): {response.text}"300    def test_endpoint_strategy_path(self, monkeypatch, opensearch_create_domain, opensearch_client):301        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "path")302        domain_name = f"opensearch-domain-{short_uid()}"303        opensearch_create_domain(DomainName=domain_name)304        status = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"]305        assert "Endpoint" in status306        endpoint = status["Endpoint"]307        assert endpoint.endswith(f"/{domain_name}")308    def test_endpoint_strategy_port(self, monkeypatch, opensearch_create_domain, opensearch_client):309        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port")310        domain_name = f"opensearch-domain-{short_uid()}"311        opensearch_create_domain(DomainName=domain_name)312        status = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"]313        assert "Endpoint" in status314        endpoint = status["Endpoint"]315        parts = endpoint.split(":")316        assert parts[0] == "localhost"317        assert int(parts[1]) in range(318            config.EXTERNAL_SERVICE_PORTS_START, config.EXTERNAL_SERVICE_PORTS_END319        )320class TestEdgeProxiedOpensearchCluster:321    def test_route_through_edge(self):322        cluster_id = f"domain-{short_uid()}"323        cluster_url = f"http://localhost:{config.EDGE_PORT}/{cluster_id}"324        cluster = EdgeProxiedOpensearchCluster(cluster_url)325        try:326            cluster.start()327            assert cluster.wait_is_up(240), "gave up waiting for server"328            response = requests.get(cluster_url)329            assert response.ok, f"cluster endpoint returned an error: {response.text}"330            assert response.json()["version"]["number"] == "1.1.0"331            response = requests.get(f"{cluster_url}/_cluster/health")332            assert response.ok, f"cluster health endpoint returned an error: {response.text}"333            assert response.json()["status"] in [334                "red",335                "orange",336                "yellow",337                "green",338            ], "expected cluster state to be in a valid state"339        finally:340            cluster.shutdown()341        assert poll_condition(342            lambda: not cluster.is_up(), timeout=240343        ), "gave up waiting for cluster to shut down"344class TestMultiClusterManager:345    @pytest.mark.skip_offline346    def test_multi_cluster(self, monkeypatch):347        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")348        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", True)349        manager = MultiClusterManager()350        # create two opensearch domains351        domain_key_0 = DomainKey(352            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID353        )354        domain_key_1 = DomainKey(355            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID356        )357        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)358        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)359        try:360            # spawn the two clusters361            assert cluster_0.wait_is_up(240)362            assert cluster_1.wait_is_up(240)363            retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)364            retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)365            # create an index in cluster_0, wait for it to appear, make sure it's not in cluster_1366            index_url_0 = cluster_0.url + "/my-index?pretty"367            index_url_1 = cluster_1.url + "/my-index?pretty"368            response = requests.put(index_url_0)369            assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"370            assert poll_condition(371                lambda: requests.head(index_url_0).ok, timeout=10372            ), "gave up waiting for index"373            assert not requests.head(index_url_1).ok, "index should not appear in second cluster"374        finally:375            call_safe(cluster_0.shutdown)376            call_safe(cluster_1.shutdown)377class TestMultiplexingClusterManager:378    @pytest.mark.skip_offline379    def test_multiplexing_cluster(self, monkeypatch):380        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")381        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)382        manager = MultiplexingClusterManager()383        # create two opensearch domains384        domain_key_0 = DomainKey(385            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID386        )387        domain_key_1 = DomainKey(388            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID389        )390        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)391        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)392        try:393            # spawn the two clusters394            assert cluster_0.wait_is_up(240)395            assert cluster_1.wait_is_up(240)396            retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)397            retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)398            # create an index in cluster_0, wait for it to appear, make sure it's in cluster_1, too399            index_url_0 = cluster_0.url + "/my-index?pretty"400            index_url_1 = cluster_1.url + "/my-index?pretty"401            response = requests.put(index_url_0)402            assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"403            assert poll_condition(404                lambda: requests.head(index_url_0).ok, timeout=10405            ), "gave up waiting for index"406            assert requests.head(index_url_1).ok, "index should appear in second cluster"407        finally:408            call_safe(cluster_0.shutdown)409            call_safe(cluster_1.shutdown)410class TestSingletonClusterManager:411    def test_endpoint_strategy_port_singleton_cluster(self, monkeypatch):412        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port")413        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)414        manager = SingletonClusterManager()415        # create two opensearch domains416        domain_key_0 = DomainKey(417            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID418        )419        domain_key_1 = DomainKey(420            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID421        )422        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)423        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)424        # check if the first port url matches the port range425        parts = cluster_0.url.split(":")426        assert parts[0] == "http"427        assert parts[1] == "//localhost"428        assert int(parts[2]) in range(429            config.EXTERNAL_SERVICE_PORTS_START, config.EXTERNAL_SERVICE_PORTS_END430        )431        # check if the second url matches the first one432        assert cluster_0.url == cluster_1.url433        try:434            # wait for the two clusters435            assert cluster_0.wait_is_up(240)436            # make sure cluster_0 (which is equal to cluster_1) is reachable437            retry(lambda: try_cluster_health(cluster_0.url), retries=3, sleep=5)438        finally:439            call_safe(cluster_0.shutdown)440            call_safe(cluster_1.shutdown)441class TestCustomBackendManager:442    def test_custom_backend(self, httpserver, monkeypatch):443        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")444        monkeypatch.setattr(config, "OPENSEARCH_CUSTOM_BACKEND", httpserver.url_for("/"))445        # create fake elasticsearch cluster446        httpserver.expect_request("/").respond_with_json(447            {448                "name": "om",449                "cluster_name": "opensearch",450                "cluster_uuid": "gREewvVZR0mIswR-8-6VRQ",451                "version": {452                    "number": "7.10.0",453                    "build_flavor": "default",454                    "build_type": "tar",455                    "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",456                    "build_date": "2020-11-09T21:30:33.964949Z",457                    "build_snapshot": False,458                    "lucene_version": "8.7.0",459                    "minimum_wire_compatibility_version": "6.8.0",460                    "minimum_index_compatibility_version": "6.0.0-beta1",461                },462                "tagline": "You Know, for Search",463            }464        )465        httpserver.expect_request("/_cluster/health").respond_with_json(466            {467                "cluster_name": "opensearch",468                "status": "green",469                "timed_out": False,470                "number_of_nodes": 1,471                "number_of_data_nodes": 1,472                "active_primary_shards": 0,473                "active_shards": 0,474                "relocating_shards": 0,475                "initializing_shards": 0,476                "unassigned_shards": 0,477                "delayed_unassigned_shards": 0,478                "number_of_pending_tasks": 0,479                "number_of_in_flight_fetch": 0,480                "task_max_waiting_in_queue_millis": 0,481                "active_shards_percent_as_number": 100,482            }483        )484        manager = create_cluster_manager()485        assert isinstance(manager, CustomBackendManager)486        domain_key = DomainKey(487            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID488        )489        cluster = manager.create(domain_key.arn, OPENSEARCH_DEFAULT_VERSION)490        # check that we're using the domain endpoint strategy491        assert f"{domain_key.domain_name}." in cluster.url492        try:493            assert cluster.wait_is_up(10)494            retry(lambda: try_cluster_health(cluster.url), retries=3, sleep=5)495        finally:496            call_safe(cluster.shutdown)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
