How to use opensearch_document_path method in localstack

Best Python code snippet using localstack_python

test_opensearch.py

Source:test_opensearch.py Github

copy

Full Screen

1import json2import logging3import threading4import botocore.exceptions5import pytest6from localstack import config7from localstack.constants import OPENSEARCH_DEFAULT_VERSION, TEST_AWS_ACCOUNT_ID8from localstack.services.install import install_opensearch9from localstack.services.opensearch.cluster import EdgeProxiedOpensearchCluster10from localstack.services.opensearch.cluster_manager import (11 CustomBackendManager,12 DomainKey,13 MultiClusterManager,14 MultiplexingClusterManager,15 SingletonClusterManager,16 create_cluster_manager,17)18from localstack.utils.common import call_safe, poll_condition, retry19from localstack.utils.common import safe_requests as requests20from localstack.utils.common import short_uid, start_worker_thread21LOG = logging.getLogger(__name__)22# Common headers used when sending requests to OpenSearch23COMMON_HEADERS = {"content-type": "application/json", "Accept-encoding": "identity"}24# Lock and event to ensure that the installation is executed before the tests25INIT_LOCK = threading.Lock()26installed = threading.Event()27def install_async():28 """29 Installs the default opensearch version in a worker thread. Used by conftest.py to make30 sure opensearch is downloaded once the tests arrive here.31 """32 if installed.is_set():33 return34 def run_install(*args):35 with INIT_LOCK:36 if installed.is_set():37 return38 LOG.info("installing opensearch default version")39 install_opensearch()40 LOG.info("done installing opensearch default version")41 LOG.info("installing opensearch 1.0")42 install_opensearch("OpenSearch_1.0")43 LOG.info("done installing opensearch 1.0")44 installed.set()45 start_worker_thread(run_install)46@pytest.fixture(autouse=True)47def opensearch():48 if not installed.is_set():49 install_async()50 assert installed.wait(timeout=5 * 60), "gave up waiting for opensearch to install"51 yield52def try_cluster_health(cluster_url: str):53 response = requests.get(cluster_url)54 assert response.ok, f"cluster endpoint returned an error: {response.text}"55 response = requests.get(f"{cluster_url}/_cluster/health")56 assert response.ok, f"cluster health endpoint returned an error: {response.text}"57 assert response.json()["status"] in [58 "orange",59 "yellow",60 "green",61 ], "expected cluster state to be in a valid state"62class TestOpensearchProvider:63 """64 Because this test reuses the localstack instance for each test, all tests are performed with65 OPENSEARCH_MULTI_CLUSTER=True, regardless of changes in the config value.66 """67 def test_list_versions(self, opensearch_client):68 response = opensearch_client.list_versions()69 assert "Versions" in response70 versions = response["Versions"]71 expected_versions = [72 "OpenSearch_1.1",73 "OpenSearch_1.0",74 "Elasticsearch_7.10",75 "Elasticsearch_7.9",76 "Elasticsearch_7.8",77 "Elasticsearch_7.7",78 "Elasticsearch_7.4",79 "Elasticsearch_7.1",80 "Elasticsearch_6.8",81 "Elasticsearch_6.7",82 "Elasticsearch_6.5",83 "Elasticsearch_6.4",84 "Elasticsearch_6.3",85 "Elasticsearch_6.2",86 "Elasticsearch_6.0",87 "Elasticsearch_5.6",88 "Elasticsearch_5.5",89 "Elasticsearch_5.3",90 "Elasticsearch_5.1",91 "Elasticsearch_5.0",92 ]93 # We iterate over the expected versions to avoid breaking the test if new versions are supported94 for expected_version in expected_versions:95 assert expected_version in versions96 def test_get_compatible_versions(self, opensearch_client):97 response = opensearch_client.get_compatible_versions()98 assert "CompatibleVersions" in response99 compatible_versions = response["CompatibleVersions"]100 assert len(compatible_versions) >= 18101 expected_compatible_versions = [102 {"SourceVersion": "OpenSearch_1.0", "TargetVersions": ["OpenSearch_1.1"]},103 {104 "SourceVersion": "Elasticsearch_7.10",105 "TargetVersions": ["OpenSearch_1.0", "OpenSearch_1.1"],106 },107 {108 "SourceVersion": "Elasticsearch_7.9",109 "TargetVersions": ["Elasticsearch_7.10", "OpenSearch_1.0", "OpenSearch_1.1"],110 },111 {112 "SourceVersion": "Elasticsearch_7.8",113 "TargetVersions": [114 "Elasticsearch_7.9",115 "Elasticsearch_7.10",116 "OpenSearch_1.1",117 "OpenSearch_1.0",118 ],119 },120 {121 "SourceVersion": "Elasticsearch_7.7",122 "TargetVersions": [123 "Elasticsearch_7.8",124 "Elasticsearch_7.9",125 "Elasticsearch_7.10",126 "OpenSearch_1.0",127 "OpenSearch_1.1",128 ],129 },130 {131 "SourceVersion": "Elasticsearch_7.4",132 "TargetVersions": [133 "Elasticsearch_7.7",134 "Elasticsearch_7.8",135 "Elasticsearch_7.9",136 "Elasticsearch_7.10",137 "OpenSearch_1.0",138 "OpenSearch_1.1",139 ],140 },141 {142 "SourceVersion": "Elasticsearch_7.1",143 "TargetVersions": [144 "Elasticsearch_7.4",145 "Elasticsearch_7.7",146 "Elasticsearch_7.8",147 "Elasticsearch_7.9",148 "Elasticsearch_7.10",149 "OpenSearch_1.0",150 "OpenSearch_1.1",151 ],152 },153 {154 "SourceVersion": "Elasticsearch_6.8",155 "TargetVersions": [156 "Elasticsearch_7.1",157 "Elasticsearch_7.4",158 "Elasticsearch_7.7",159 "Elasticsearch_7.8",160 "Elasticsearch_7.9",161 "Elasticsearch_7.10",162 "OpenSearch_1.0",163 "OpenSearch_1.1",164 ],165 },166 {"SourceVersion": "Elasticsearch_6.7", "TargetVersions": ["Elasticsearch_6.8"]},167 {168 "SourceVersion": "Elasticsearch_6.5",169 "TargetVersions": ["Elasticsearch_6.7", "Elasticsearch_6.8"],170 },171 {172 "SourceVersion": "Elasticsearch_6.4",173 "TargetVersions": [174 "Elasticsearch_6.5",175 "Elasticsearch_6.7",176 "Elasticsearch_6.8",177 ],178 },179 {180 "SourceVersion": "Elasticsearch_6.3",181 "TargetVersions": [182 "Elasticsearch_6.4",183 "Elasticsearch_6.5",184 "Elasticsearch_6.7",185 "Elasticsearch_6.8",186 ],187 },188 {189 "SourceVersion": "Elasticsearch_6.2",190 "TargetVersions": [191 "Elasticsearch_6.3",192 "Elasticsearch_6.4",193 "Elasticsearch_6.5",194 "Elasticsearch_6.7",195 "Elasticsearch_6.8",196 ],197 },198 {199 "SourceVersion": "Elasticsearch_6.0",200 "TargetVersions": [201 "Elasticsearch_6.3",202 "Elasticsearch_6.4",203 "Elasticsearch_6.5",204 "Elasticsearch_6.7",205 "Elasticsearch_6.8",206 ],207 },208 {209 "SourceVersion": "Elasticsearch_5.6",210 "TargetVersions": [211 "Elasticsearch_6.3",212 "Elasticsearch_6.4",213 "Elasticsearch_6.5",214 "Elasticsearch_6.7",215 "Elasticsearch_6.8",216 ],217 },218 {"SourceVersion": "Elasticsearch_5.5", "TargetVersions": ["Elasticsearch_5.6"]},219 {"SourceVersion": "Elasticsearch_5.3", "TargetVersions": ["Elasticsearch_5.6"]},220 {"SourceVersion": "Elasticsearch_5.1", "TargetVersions": ["Elasticsearch_5.6"]},221 ]222 # Iterate over the expected compatible versions to avoid breaking the test if new versions are supported223 for expected_compatible_version in expected_compatible_versions:224 assert expected_compatible_version in compatible_versions225 def test_get_compatible_version_for_domain(self, opensearch_client, opensearch_create_domain):226 opensearch_domain = opensearch_create_domain(EngineVersion="OpenSearch_1.0")227 response = opensearch_client.get_compatible_versions(DomainName=opensearch_domain)228 assert "CompatibleVersions" in response229 compatible_versions = response["CompatibleVersions"]230 assert len(compatible_versions) == 1231 compatibility = compatible_versions[0]232 assert compatibility["SourceVersion"] == "OpenSearch_1.0"233 # Just check if 1.1 is contained (not equality) to avoid breaking the test if new versions are supported234 assert "OpenSearch_1.1" in compatibility["TargetVersions"]235 def test_create_domain(self, opensearch_client, opensearch_wait_for_cluster):236 domain_name = f"opensearch-domain-{short_uid()}"237 try:238 opensearch_client.create_domain(DomainName=domain_name)239 response = opensearch_client.list_domain_names(EngineType="OpenSearch")240 domain_names = [domain["DomainName"] for domain in response["DomainNames"]]241 assert domain_name in domain_names242 # wait for the cluster243 opensearch_wait_for_cluster(domain_name=domain_name)244 finally:245 opensearch_client.delete_domain(DomainName=domain_name)246 def test_create_existing_domain_causes_exception(247 self, opensearch_client, opensearch_wait_for_cluster248 ):249 domain_name = f"opensearch-domain-{short_uid()}"250 try:251 opensearch_client.create_domain(DomainName=domain_name)252 with pytest.raises(botocore.exceptions.ClientError) as exc_info:253 opensearch_client.create_domain(DomainName=domain_name)254 assert exc_info.type.__name__ == "ResourceAlreadyExistsException"255 opensearch_wait_for_cluster(domain_name=domain_name)256 finally:257 opensearch_client.delete_domain(DomainName=domain_name)258 def test_describe_domains(self, opensearch_client, opensearch_domain):259 response = opensearch_client.describe_domains(DomainNames=[opensearch_domain])260 assert len(response["DomainStatusList"]) == 1261 assert response["DomainStatusList"][0]["DomainName"] == opensearch_domain262 def test_domain_version(self, opensearch_client, opensearch_domain, opensearch_create_domain):263 response = opensearch_client.describe_domain(DomainName=opensearch_domain)264 assert "DomainStatus" in response265 status = response["DomainStatus"]266 assert "EngineVersion" in status267 assert status["EngineVersion"] == OPENSEARCH_DEFAULT_VERSION268 domain_name = opensearch_create_domain(EngineVersion="OpenSearch_1.0")269 response = opensearch_client.describe_domain(DomainName=domain_name)270 assert "DomainStatus" in response271 status = response["DomainStatus"]272 assert "EngineVersion" in status273 assert status["EngineVersion"] == "OpenSearch_1.0"274 def test_create_indices(self, opensearch_endpoint):275 indices = ["index1", "index2"]276 for index_name in indices:277 index_path = f"{opensearch_endpoint}/{index_name}"278 requests.put(index_path, headers=COMMON_HEADERS)279 endpoint = f"{opensearch_endpoint}/_cat/indices/{index_name}?format=json&pretty"280 req = requests.get(endpoint)281 assert req.status_code == 200282 req_result = json.loads(req.text)283 assert req_result[0]["health"] in ["green", "yellow"]284 assert req_result[0]["index"] in indices285 def test_get_document(self, opensearch_document_path):286 response = requests.get(opensearch_document_path)287 assert (288 "I'm just a simple man" in response.text289 ), f"document not found({response.status_code}): {response.text}"290 def test_search(self, opensearch_endpoint, opensearch_document_path):291 index = "/".join(opensearch_document_path.split("/")[:-2])292 # force the refresh of the index after the document was added, so it can appear in search293 response = requests.post(f"{opensearch_endpoint}/_refresh", headers=COMMON_HEADERS)294 assert response.ok295 search = {"query": {"match": {"last_name": "Fett"}}}296 response = requests.get(f"{index}/_search", data=json.dumps(search), headers=COMMON_HEADERS)297 assert (298 "I'm just a simple man" in response.text299 ), f"search unsuccessful({response.status_code}): {response.text}"300 def test_endpoint_strategy_path(self, monkeypatch, opensearch_create_domain, opensearch_client):301 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "path")302 domain_name = f"opensearch-domain-{short_uid()}"303 opensearch_create_domain(DomainName=domain_name)304 status = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"]305 assert "Endpoint" in status306 endpoint = status["Endpoint"]307 assert endpoint.endswith(f"/{domain_name}")308 def test_endpoint_strategy_port(self, monkeypatch, opensearch_create_domain, opensearch_client):309 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port")310 domain_name = f"opensearch-domain-{short_uid()}"311 opensearch_create_domain(DomainName=domain_name)312 status = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"]313 assert "Endpoint" in status314 endpoint = status["Endpoint"]315 parts = endpoint.split(":")316 assert parts[0] == "localhost"317 assert int(parts[1]) in range(318 config.EXTERNAL_SERVICE_PORTS_START, config.EXTERNAL_SERVICE_PORTS_END319 )320class TestEdgeProxiedOpensearchCluster:321 def test_route_through_edge(self):322 cluster_id = f"domain-{short_uid()}"323 cluster_url = f"http://localhost:{config.EDGE_PORT}/{cluster_id}"324 cluster = EdgeProxiedOpensearchCluster(cluster_url)325 try:326 cluster.start()327 assert cluster.wait_is_up(240), "gave up waiting for server"328 response = requests.get(cluster_url)329 assert response.ok, f"cluster endpoint returned an error: {response.text}"330 assert response.json()["version"]["number"] == "1.1.0"331 response = requests.get(f"{cluster_url}/_cluster/health")332 assert response.ok, f"cluster health endpoint returned an error: {response.text}"333 assert response.json()["status"] in [334 "red",335 "orange",336 "yellow",337 "green",338 ], "expected cluster state to be in a valid state"339 finally:340 cluster.shutdown()341 assert poll_condition(342 lambda: not cluster.is_up(), timeout=240343 ), "gave up waiting for cluster to shut down"344class TestMultiClusterManager:345 @pytest.mark.skip_offline346 def test_multi_cluster(self, monkeypatch):347 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")348 monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", True)349 manager = MultiClusterManager()350 # create two opensearch domains351 domain_key_0 = DomainKey(352 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID353 )354 domain_key_1 = DomainKey(355 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID356 )357 cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)358 cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)359 try:360 # spawn the two clusters361 assert cluster_0.wait_is_up(240)362 assert cluster_1.wait_is_up(240)363 retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)364 retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)365 # create an index in cluster_0, wait for it to appear, make sure it's not in cluster_1366 index_url_0 = cluster_0.url + "/my-index?pretty"367 index_url_1 = cluster_1.url + "/my-index?pretty"368 response = requests.put(index_url_0)369 assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"370 assert poll_condition(371 lambda: requests.head(index_url_0).ok, timeout=10372 ), "gave up waiting for index"373 assert not requests.head(index_url_1).ok, "index should not appear in second cluster"374 finally:375 call_safe(cluster_0.shutdown)376 call_safe(cluster_1.shutdown)377class TestMultiplexingClusterManager:378 @pytest.mark.skip_offline379 def test_multiplexing_cluster(self, monkeypatch):380 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")381 monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)382 manager = MultiplexingClusterManager()383 # create two opensearch domains384 domain_key_0 = DomainKey(385 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID386 )387 domain_key_1 = DomainKey(388 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID389 )390 cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)391 cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)392 try:393 # spawn the two clusters394 assert cluster_0.wait_is_up(240)395 assert cluster_1.wait_is_up(240)396 retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)397 retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)398 # create an index in cluster_0, wait for it to appear, make sure it's in cluster_1, too399 index_url_0 = cluster_0.url + "/my-index?pretty"400 index_url_1 = cluster_1.url + "/my-index?pretty"401 response = requests.put(index_url_0)402 assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"403 assert poll_condition(404 lambda: requests.head(index_url_0).ok, timeout=10405 ), "gave up waiting for index"406 assert requests.head(index_url_1).ok, "index should appear in second cluster"407 finally:408 call_safe(cluster_0.shutdown)409 call_safe(cluster_1.shutdown)410class TestSingletonClusterManager:411 def test_endpoint_strategy_port_singleton_cluster(self, monkeypatch):412 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port")413 monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)414 manager = SingletonClusterManager()415 # create two opensearch domains416 domain_key_0 = DomainKey(417 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID418 )419 domain_key_1 = DomainKey(420 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID421 )422 cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)423 cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)424 # check if the first port url matches the port range425 parts = cluster_0.url.split(":")426 assert parts[0] == "http"427 assert parts[1] == "//localhost"428 assert int(parts[2]) in range(429 config.EXTERNAL_SERVICE_PORTS_START, config.EXTERNAL_SERVICE_PORTS_END430 )431 # check if the second url matches the first one432 assert cluster_0.url == cluster_1.url433 try:434 # wait for the two clusters435 assert cluster_0.wait_is_up(240)436 # make sure cluster_0 (which is equal to cluster_1) is reachable437 retry(lambda: try_cluster_health(cluster_0.url), retries=3, sleep=5)438 finally:439 call_safe(cluster_0.shutdown)440 call_safe(cluster_1.shutdown)441class TestCustomBackendManager:442 def test_custom_backend(self, httpserver, monkeypatch):443 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")444 monkeypatch.setattr(config, "OPENSEARCH_CUSTOM_BACKEND", httpserver.url_for("/"))445 # create fake elasticsearch cluster446 httpserver.expect_request("/").respond_with_json(447 {448 "name": "om",449 "cluster_name": "opensearch",450 "cluster_uuid": "gREewvVZR0mIswR-8-6VRQ",451 "version": {452 "number": "7.10.0",453 "build_flavor": "default",454 "build_type": "tar",455 "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",456 "build_date": "2020-11-09T21:30:33.964949Z",457 "build_snapshot": False,458 "lucene_version": "8.7.0",459 "minimum_wire_compatibility_version": "6.8.0",460 "minimum_index_compatibility_version": "6.0.0-beta1",461 },462 "tagline": "You Know, for Search",463 }464 )465 httpserver.expect_request("/_cluster/health").respond_with_json(466 {467 "cluster_name": "opensearch",468 "status": "green",469 "timed_out": False,470 "number_of_nodes": 1,471 "number_of_data_nodes": 1,472 "active_primary_shards": 0,473 "active_shards": 0,474 "relocating_shards": 0,475 "initializing_shards": 0,476 "unassigned_shards": 0,477 "delayed_unassigned_shards": 0,478 "number_of_pending_tasks": 0,479 "number_of_in_flight_fetch": 0,480 "task_max_waiting_in_queue_millis": 0,481 "active_shards_percent_as_number": 100,482 }483 )484 manager = create_cluster_manager()485 assert isinstance(manager, CustomBackendManager)486 domain_key = DomainKey(487 domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID488 )489 cluster = manager.create(domain_key.arn, OPENSEARCH_DEFAULT_VERSION)490 # check that we're using the domain endpoint strategy491 assert f"{domain_key.domain_name}." in cluster.url492 try:493 assert cluster.wait_is_up(10)494 retry(lambda: try_cluster_health(cluster.url), retries=3, sleep=5)495 finally:496 call_safe(cluster.shutdown)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful