Best Python code snippet using localstack_python
cluster_manager.py
Source:cluster_manager.py  
...201                engine_type = versions.get_engine_type(version)202                # startup routine for the singleton cluster instance203                if engine_type == EngineType.OpenSearch:204                    self.cluster = OpensearchCluster(205                        port=get_free_tcp_port(), directories=resolve_directories(version, arn)206                    )207                else:208                    self.cluster = ElasticsearchCluster(209                        port=get_free_tcp_port(), directories=resolve_directories(version, arn)210                    )211                def _start_async(*_):212                    LOG.info("starting %s on %s", type(self.cluster), self.cluster.url)213                    self.cluster.start()  # start may block during install214                start_thread(_start_async)215            cluster_endpoint = ClusterEndpoint(self.cluster, EndpointProxy(url, self.cluster.url))216            self.clusters[arn] = cluster_endpoint217            return cluster_endpoint218    def remove(self, arn: str):219        super().remove(arn)  # removes the fake server220        if not self.endpoints:221            # if there are no endpoints left, remove the cluster222            with self.mutex:223                if not self.cluster:224                    return225                LOG.debug("shutting down multiplexed cluster for %s: %s", arn, self.cluster.url)226                self.cluster.shutdown()227                self.cluster = None228class MultiClusterManager(ClusterManager):229    """230    Manages one cluster and endpoint per domain.231    """232    def _create_cluster(self, arn, url, version) -> Server:233        engine_type = versions.get_engine_type(version)234        if config.OPENSEARCH_ENDPOINT_STRATEGY != "port":235            if engine_type == EngineType.OpenSearch:236                return EdgeProxiedOpensearchCluster(237                    url, version, directories=resolve_directories(version, arn)238                )239            else:240                return EdgeProxiedElasticsearchCluster(241                    url, version, directories=resolve_directories(version, arn)242                )243        else:244            port = _get_port_from_url(url)245            if engine_type == EngineType.OpenSearch:246                return OpensearchCluster(247                    port=port,248                    host=LOCALHOST,249                    version=version,250                    directories=resolve_directories(version, arn),251                )252            else:253                return ElasticsearchCluster(254                    port=port,255                    host=LOCALHOST,256                    version=version,257                    directories=resolve_directories(version, arn),258                )259class SingletonClusterManager(ClusterManager):260    """261    Manages a single cluster and always returns that cluster. Using this, we lie to the client about the262    elasticsearch domain version. The first call to create_domain with a specific version will create the cluster263    with that version. Subsequent calls will believe they created a cluster with the version they specified. It keeps264    the cluster running until the last domain was removed. It only works with a single endpoint.265    Assumes the config:266    - ES_ENDPOINT_STRATEGY == "port"267    - ES_MULTI_CLUSTER == False268    """269    cluster: Optional[Server]270    def __init__(self) -> None:271        super().__init__()272        self.server = None273        self.mutex = threading.RLock()274        self.cluster = None275    def create(276        self,277        arn: str,278        version: str,279        endpoint_options: Optional[DomainEndpointOptions] = None,280        preferred_port: int = None,281    ) -> Server:282        with self.mutex:283            return super().create(arn, version, endpoint_options, preferred_port)284    def _create_cluster(self, arn, url, version) -> Server:285        if not self.cluster:286            port = _get_port_from_url(url)287            engine_type = versions.get_engine_type(version)288            if engine_type == EngineType.OpenSearch:289                self.cluster = OpensearchCluster(290                    port=port,291                    host=LOCALHOST,292                    version=version,293                    directories=resolve_directories(version, arn),294                )295            else:296                self.cluster = ElasticsearchCluster(297                    port=port,298                    host=LOCALHOST,299                    version=version,300                    directories=resolve_directories(version, arn),301                )302        return self.cluster303    def remove(self, arn: str):304        with self.mutex:305            try:306                cluster = self.clusters.pop(arn)307            except KeyError:308                return309            LOG.debug("removing cluster for %s, %s remaining", arn, len(self.clusters))310            if not self.clusters:311                # shutdown the cluster if it is312                cluster.shutdown()313                self.cluster = None314class CustomBackendManager(ClusterManager):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
