Best Python code snippet using localstack_python
test_opensearch.py
Source:test_opensearch.py  
...323        cluster_url = f"http://localhost:{config.EDGE_PORT}/{cluster_id}"324        cluster = EdgeProxiedOpensearchCluster(cluster_url)325        try:326            cluster.start()327            assert cluster.wait_is_up(240), "gave up waiting for server"328            response = requests.get(cluster_url)329            assert response.ok, f"cluster endpoint returned an error: {response.text}"330            assert response.json()["version"]["number"] == "1.1.0"331            response = requests.get(f"{cluster_url}/_cluster/health")332            assert response.ok, f"cluster health endpoint returned an error: {response.text}"333            assert response.json()["status"] in [334                "red",335                "orange",336                "yellow",337                "green",338            ], "expected cluster state to be in a valid state"339        finally:340            cluster.shutdown()341        assert poll_condition(342            lambda: not cluster.is_up(), timeout=240343        ), "gave up waiting for cluster to shut down"344class TestMultiClusterManager:345    @pytest.mark.skip_offline346    def test_multi_cluster(self, monkeypatch):347        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")348        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", True)349        manager = MultiClusterManager()350        # create two opensearch domains351        domain_key_0 = DomainKey(352            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID353        )354        domain_key_1 = DomainKey(355            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID356        )357        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)358        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)359        try:360            # spawn the two clusters361            assert cluster_0.wait_is_up(240)362            assert cluster_1.wait_is_up(240)363            retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)364            retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)365            # create an index in cluster_0, wait for it to appear, make sure it's not in cluster_1366            index_url_0 = cluster_0.url + "/my-index?pretty"367            index_url_1 = cluster_1.url + "/my-index?pretty"368            response = requests.put(index_url_0)369            assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"370            assert poll_condition(371                lambda: requests.head(index_url_0).ok, timeout=10372            ), "gave up waiting for index"373            assert not requests.head(index_url_1).ok, "index should not appear in second cluster"374        finally:375            call_safe(cluster_0.shutdown)376            call_safe(cluster_1.shutdown)377class TestMultiplexingClusterManager:378    @pytest.mark.skip_offline379    def test_multiplexing_cluster(self, monkeypatch):380        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")381        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)382        manager = MultiplexingClusterManager()383        # create two opensearch domains384        domain_key_0 = DomainKey(385            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID386        )387        domain_key_1 = DomainKey(388            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID389        )390        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)391        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)392        try:393            # spawn the two clusters394            assert cluster_0.wait_is_up(240)395            assert cluster_1.wait_is_up(240)396            retry(lambda: try_cluster_health(cluster_0.url), retries=12, sleep=10)397            retry(lambda: try_cluster_health(cluster_1.url), retries=12, sleep=10)398            # create an index in cluster_0, wait for it to appear, make sure it's in cluster_1, too399            index_url_0 = cluster_0.url + "/my-index?pretty"400            index_url_1 = cluster_1.url + "/my-index?pretty"401            response = requests.put(index_url_0)402            assert response.ok, f"failed to put index into cluster {cluster_0.url}: {response.text}"403            assert poll_condition(404                lambda: requests.head(index_url_0).ok, timeout=10405            ), "gave up waiting for index"406            assert requests.head(index_url_1).ok, "index should appear in second cluster"407        finally:408            call_safe(cluster_0.shutdown)409            call_safe(cluster_1.shutdown)410class TestSingletonClusterManager:411    def test_endpoint_strategy_port_singleton_cluster(self, monkeypatch):412        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "port")413        monkeypatch.setattr(config, "OPENSEARCH_MULTI_CLUSTER", False)414        manager = SingletonClusterManager()415        # create two opensearch domains416        domain_key_0 = DomainKey(417            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID418        )419        domain_key_1 = DomainKey(420            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID421        )422        cluster_0 = manager.create(domain_key_0.arn, OPENSEARCH_DEFAULT_VERSION)423        cluster_1 = manager.create(domain_key_1.arn, OPENSEARCH_DEFAULT_VERSION)424        # check if the first port url matches the port range425        parts = cluster_0.url.split(":")426        assert parts[0] == "http"427        assert parts[1] == "//localhost"428        assert int(parts[2]) in range(429            config.EXTERNAL_SERVICE_PORTS_START, config.EXTERNAL_SERVICE_PORTS_END430        )431        # check if the second url matches the first one432        assert cluster_0.url == cluster_1.url433        try:434            # wait for the two clusters435            assert cluster_0.wait_is_up(240)436            # make sure cluster_0 (which is equal to cluster_1) is reachable437            retry(lambda: try_cluster_health(cluster_0.url), retries=3, sleep=5)438        finally:439            call_safe(cluster_0.shutdown)440            call_safe(cluster_1.shutdown)441class TestCustomBackendManager:442    def test_custom_backend(self, httpserver, monkeypatch):443        monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", "domain")444        monkeypatch.setattr(config, "OPENSEARCH_CUSTOM_BACKEND", httpserver.url_for("/"))445        # create fake elasticsearch cluster446        httpserver.expect_request("/").respond_with_json(447            {448                "name": "om",449                "cluster_name": "opensearch",450                "cluster_uuid": "gREewvVZR0mIswR-8-6VRQ",451                "version": {452                    "number": "7.10.0",453                    "build_flavor": "default",454                    "build_type": "tar",455                    "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",456                    "build_date": "2020-11-09T21:30:33.964949Z",457                    "build_snapshot": False,458                    "lucene_version": "8.7.0",459                    "minimum_wire_compatibility_version": "6.8.0",460                    "minimum_index_compatibility_version": "6.0.0-beta1",461                },462                "tagline": "You Know, for Search",463            }464        )465        httpserver.expect_request("/_cluster/health").respond_with_json(466            {467                "cluster_name": "opensearch",468                "status": "green",469                "timed_out": False,470                "number_of_nodes": 1,471                "number_of_data_nodes": 1,472                "active_primary_shards": 0,473                "active_shards": 0,474                "relocating_shards": 0,475                "initializing_shards": 0,476                "unassigned_shards": 0,477                "delayed_unassigned_shards": 0,478                "number_of_pending_tasks": 0,479                "number_of_in_flight_fetch": 0,480                "task_max_waiting_in_queue_millis": 0,481                "active_shards_percent_as_number": 100,482            }483        )484        manager = create_cluster_manager()485        assert isinstance(manager, CustomBackendManager)486        domain_key = DomainKey(487            domain_name=f"domain-{short_uid()}", region="us-east-1", account=TEST_AWS_ACCOUNT_ID488        )489        cluster = manager.create(domain_key.arn, OPENSEARCH_DEFAULT_VERSION)490        # check that we're using the domain endpoint strategy491        assert f"{domain_key.domain_name}." in cluster.url492        try:493            assert cluster.wait_is_up(10)494            retry(lambda: try_cluster_health(cluster.url), retries=3, sleep=5)495        finally:496            call_safe(cluster.shutdown)...test_localstack_container.py
Source:test_localstack_container.py  
...10        server.container.ports.add(config.EDGE_PORT)11        assert not server.is_up()12        try:13            server.start()14            assert server.wait_is_up(60)15            response = requests.get("http://localhost:4566/health")16            assert response.ok, "expected health check to return OK: %s" % response.text17        finally:18            server.shutdown()19        server.join(30)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
