How to use _get_port_from_url method in localstack

Best Python code snippet using localstack_python

cluster_manager.py

Source:cluster_manager.py Github

copy

Full Screen

...176 return super().health() and self.cluster.health()177 def do_shutdown(self):178 super(FakeEndpointProxyServer, self).do_shutdown()179 self.cluster.shutdown()180def _get_port_from_url(url: str) -> int:181 return int(url.split(":")[2])182class MultiplexingClusterManager(ClusterManager):183 """184 Multiplexes multiple endpoints to a single backend cluster (not managed by LocalStack).185 Using this, we lie to the client about the opensearch domain version.186 It only works with a single endpoint.187 Assumes the config:188 - OPENSEARCH_MULTI_CLUSTER = False189 - OPENSEARCH_ENDPOINT_STRATEGY = domain / path190 """191 cluster: Optional[Server]192 endpoints: Dict[str, ClusterEndpoint]193 def __init__(self) -> None:194 super().__init__()195 self.cluster = None196 self.endpoints = {}197 self.mutex = threading.RLock()198 def _create_cluster(self, arn, url, version) -> Server:199 with self.mutex:200 if not self.cluster:201 engine_type = versions.get_engine_type(version)202 # startup routine for the singleton cluster instance203 if engine_type == EngineType.OpenSearch:204 self.cluster = OpensearchCluster(205 port=get_free_tcp_port(), directories=resolve_directories(version, arn)206 )207 else:208 self.cluster = ElasticsearchCluster(209 port=get_free_tcp_port(), directories=resolve_directories(version, arn)210 )211 def _start_async(*_):212 LOG.info("starting %s on %s", type(self.cluster), self.cluster.url)213 self.cluster.start() # start may block during install214 start_thread(_start_async)215 cluster_endpoint = ClusterEndpoint(self.cluster, EndpointProxy(url, self.cluster.url))216 self.clusters[arn] = cluster_endpoint217 return cluster_endpoint218 def remove(self, arn: str):219 super().remove(arn) # removes the fake server220 if not self.endpoints:221 # if there are no endpoints left, remove the cluster222 with self.mutex:223 if not self.cluster:224 return225 LOG.debug("shutting down multiplexed cluster for %s: %s", arn, self.cluster.url)226 self.cluster.shutdown()227 self.cluster = None228class MultiClusterManager(ClusterManager):229 """230 Manages one cluster and endpoint per domain.231 """232 def _create_cluster(self, arn, url, version) -> Server:233 engine_type = versions.get_engine_type(version)234 if config.OPENSEARCH_ENDPOINT_STRATEGY != "port":235 if engine_type == EngineType.OpenSearch:236 return EdgeProxiedOpensearchCluster(237 url, version, directories=resolve_directories(version, arn)238 )239 else:240 return EdgeProxiedElasticsearchCluster(241 url, version, directories=resolve_directories(version, arn)242 )243 else:244 port = _get_port_from_url(url)245 if engine_type == EngineType.OpenSearch:246 return OpensearchCluster(247 port=port,248 host=LOCALHOST,249 version=version,250 directories=resolve_directories(version, arn),251 )252 else:253 return ElasticsearchCluster(254 port=port,255 host=LOCALHOST,256 version=version,257 directories=resolve_directories(version, arn),258 )259class SingletonClusterManager(ClusterManager):260 """261 Manages a single cluster and always returns that cluster. Using this, we lie to the client about the262 elasticsearch domain version. The first call to create_domain with a specific version will create the cluster263 with that version. Subsequent calls will believe they created a cluster with the version they specified. It keeps264 the cluster running until the last domain was removed. It only works with a single endpoint.265 Assumes the config:266 - ES_ENDPOINT_STRATEGY == "port"267 - ES_MULTI_CLUSTER == False268 """269 cluster: Optional[Server]270 def __init__(self) -> None:271 super().__init__()272 self.server = None273 self.mutex = threading.RLock()274 self.cluster = None275 def create(276 self,277 arn: str,278 version: str,279 endpoint_options: Optional[DomainEndpointOptions] = None,280 preferred_port: int = None,281 ) -> Server:282 with self.mutex:283 return super().create(arn, version, endpoint_options, preferred_port)284 def _create_cluster(self, arn, url, version) -> Server:285 if not self.cluster:286 port = _get_port_from_url(url)287 engine_type = versions.get_engine_type(version)288 if engine_type == EngineType.OpenSearch:289 self.cluster = OpensearchCluster(290 port=port,291 host=LOCALHOST,292 version=version,293 directories=resolve_directories(version, arn),294 )295 else:296 self.cluster = ElasticsearchCluster(297 port=port,298 host=LOCALHOST,299 version=version,300 directories=resolve_directories(version, arn),...

Full Screen

Full Screen

runner.py

Source:runner.py Github

copy

Full Screen

...66 if url is not None:67 if port is not None:68 raise RmockParamsError("url must be only param")69 70 port = self._get_port_from_url(url)71 72 # random port support73 port = find_port(port)74 75 self.port = port76 self.run_params.update(port=port)77 78 def _get_port_from_url(self, url):79 hosts = url.split(',')80 if len(hosts) > 1:81 raise RmockParamsError("only one host mode supported for memcache")82 83 host, port = hosts[0].split(':')84 85 if host not in ("localhost", "127.0.0.1"):86 raise RmockParamsError("invalid url: %s; host must be localhost" % url)87 88 try:89 return int(port)90 except ValueError:91 raise RmockParamsError("port must be integer")92 ...

Full Screen

Full Screen

ngrok.py

Source:ngrok.py Github

copy

Full Screen

...11}12HOST = 'localhost'13def _get_addr_from_url(url):14 return url.split("//")[-1].split(":")[0]15def _get_port_from_url(url):16 return url.split("//")[-1].split(":")[1]17class TunnelAlredyOpenError(Exception):18 pass19class Tunnel:20 @staticmethod21 def _is_already_open(tunnel_type):22 PORT = str(_type_description[tunnel_type]['port'])23 if _type_description[tunnel_type]['proto'] == 'tcp':24 this_tunnel_config = f'{HOST}:{PORT}'25 elif _type_description[tunnel_type]['proto'] == 'http':26 this_tunnel_config = f'http://{HOST}:{PORT}'27 else:28 raise ValueError('Unexpected tunnel type')29 tunnels = ngrok.get_tunnels()30 if tunnels is None:31 return False32 for tunnel in tunnels:33 if tunnel.config['addr'] == this_tunnel_config:34 return True35 return False36 def __init__(self, tunnel_type):37 ngrok.get_tunnels()38 if Tunnel._is_already_open(tunnel_type):39 raise TunnelAlredyOpenError(40 "Such tunnel is already open"41 )42 self.ngrok_tunnel = ngrok.connect(43 _type_description[tunnel_type]['port'],44 _type_description[tunnel_type]['proto'],45 )46 self.address = _get_addr_from_url(self.ngrok_tunnel.public_url)47 if tunnel_type == 'ssh':48 self.port = _get_port_from_url(self.ngrok_tunnel.public_url)49 def is_still_open(self):50 tunnels = ngrok.get_tunnels()51 for tunnel in tunnels:52 if tunnel.public_url == self.ngrok_tunnel.public_url:53 return True54 return False55 def close(self):56 ngrok.disconnect(self.ngrok_tunnel.public_url)57 if self.ngrok_tunnel.proto == 'http':58 url = _get_addr_from_url(self.ngrok_tunnel.public_url)59 https_pub_url = 'https://' + url60 ngrok.disconnect(https_pub_url)61def authenticate(token):62 ngrok.set_auth_token(token)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful