How to use get_service_container method in localstack

Best Python code snippet using localstack_python

plugins.py

Source:plugins.py Github

copy

Full Screen

...194 def __init__(self) -> None:195 super().__init__()196 self._services = {}197 self._mutex = threading.RLock()198 def get_service_container(self, name: str) -> Optional[ServiceContainer]:199 return self._services.get(name)200 def get_service(self, name: str) -> Optional[Service]:201 container = self.get_service_container(name)202 return container.service if container else None203 def add_service(self, service: Service) -> bool:204 state = ServiceState.AVAILABLE if service.is_enabled() else ServiceState.DISABLED205 self._services[service.name()] = ServiceContainer(service, state)206 return True207 def list_available(self) -> List[str]:208 return list(self._services.keys())209 def exists(self, name: str) -> bool:210 return name in self._services211 def is_running(self, name: str) -> bool:212 return self.get_state(name) == ServiceState.RUNNING213 def check(self, name: str) -> bool:214 if self.get_state(name) in [ServiceState.RUNNING, ServiceState.ERROR]:215 return self.get_service_container(name).check()216 def check_all(self):217 return any(self.check(service_name) for service_name in self.list_available())218 def get_state(self, name: str) -> Optional[ServiceState]:219 container = self.get_service_container(name)220 return container.state if container else None221 def get_states(self) -> Dict[str, ServiceState]:222 return {name: self.get_state(name) for name in self.list_available()}223 @log_duration()224 def require(self, name: str) -> Service:225 """226 High level function that always returns a running service, or raises an error. If the service is in a state227 that it could be transitioned into a running state, then invoking this function will attempt that transition,228 e.g., by starting the service if it is available.229 """230 container = self.get_service_container(name)231 if not container:232 raise ValueError("no such service %s" % name)233 if container.state == ServiceState.STARTING:234 if not poll_condition(lambda: container.state != ServiceState.STARTING, timeout=30):235 raise TimeoutError("gave up waiting for service %s to start" % name)236 if container.state == ServiceState.STOPPING:237 if not poll_condition(lambda: container.state == ServiceState.STOPPED, timeout=30):238 raise TimeoutError("gave up waiting for service %s to stop" % name)239 with container.lock:240 if container.state == ServiceState.DISABLED:241 raise ServiceDisabled("service %s is disabled" % name)242 if container.state == ServiceState.RUNNING:243 return container.service244 if container.state == ServiceState.ERROR:245 # raise any capture error246 raise container.errors[-1]247 if container.state == ServiceState.AVAILABLE or container.state == ServiceState.STOPPED:248 if container.start():249 return container.service250 else:251 raise container.errors[-1]252 raise ServiceStateException(253 "service %s is not ready (%s) and could not be started" % (name, container.state)254 )255 # legacy map compatibility256 def items(self):257 return {258 container.service.name(): container.service for container in self._services.values()259 }.items()260 def keys(self):261 return self._services.keys()262 def values(self):263 return [container.service for container in self._services.values()]264 def get(self, key):265 return self.get_service(key)266 def __iter__(self):267 return self._services268class ServicePlugin(Plugin):269 service: Service270 api: str271 @abc.abstractmethod272 def create_service(self) -> Service:273 raise NotImplementedError274 def load(self):275 self.service = self.create_service()276 return self.service277class ServicePluginAdapter(ServicePlugin):278 def __init__(279 self,280 api: str,281 create_service: Callable[[], Service],282 should_load: Callable[[], bool] = None,283 ) -> None:284 super().__init__()285 self.api = api286 self._create_service = create_service287 self._should_load = should_load288 def should_load(self) -> bool:289 if self._should_load:290 return self._should_load()291 return True292 def create_service(self) -> Service:293 return self._create_service()294def aws_provider(api: str = None, name="default", should_load: Callable[[], bool] = None):295 """296 Decorator for marking methods that create a Service instance as a ServicePlugin. Methods marked with this297 decorator are discoverable as a PluginSpec within the namespace "localstack.aws.provider", with the name298 "<api>:<name>". If api is not explicitly specified, then the method name is used as api name.299 """300 def wrapper(fn):301 # sugar for being able to name the function like the api302 _api = api or fn.__name__303 # this causes the plugin framework into pointing the entrypoint to the original function rather than the304 # nested factory function305 @functools.wraps(fn)306 def factory() -> ServicePluginAdapter:307 return ServicePluginAdapter(api=_api, should_load=should_load, create_service=fn)308 return PluginSpec(PLUGIN_NAMESPACE, f"{_api}:{name}", factory=factory)309 return wrapper310class ServicePluginErrorCollector(PluginLifecycleListener):311 """312 A PluginLifecycleListener that collects errors related to service plugins.313 """314 errors: Dict[Tuple[str, str], Exception] # keys are: (api, provider)315 def __init__(self, errors: Dict[str, Exception] = None) -> None:316 super().__init__()317 self.errors = errors or {}318 def get_key(self, plugin_name) -> Tuple[str, str]:319 # the convention is <api>:<provider>, currently we don't really expose the provider320 # TODO: faulty plugin names would break this321 return tuple(plugin_name.split(":", maxsplit=1))322 def on_resolve_exception(self, namespace: str, entrypoint, exception: Exception):323 self.errors[self.get_key(entrypoint.name)] = exception324 def on_init_exception(self, plugin_spec: PluginSpec, exception: Exception):325 self.errors[self.get_key(plugin_spec.name)] = exception326 def on_load_exception(self, plugin_spec: PluginSpec, plugin: Plugin, exception: Exception):327 self.errors[self.get_key(plugin_spec.name)] = exception328 def has_errors(self, api: str, provider: str = None) -> bool:329 for e_api, e_provider in self.errors.keys():330 if api == e_api:331 if not provider:332 return True333 else:334 return e_provider == provider335 return False336class ServicePluginManager(ServiceManager):337 plugin_manager: PluginManager[ServicePlugin]338 plugin_errors: ServicePluginErrorCollector339 def __init__(340 self,341 plugin_manager: PluginManager[ServicePlugin] = None,342 provider_config: ServiceProviderConfig = None,343 ) -> None:344 super().__init__()345 self.plugin_errors = ServicePluginErrorCollector()346 self.plugin_manager = plugin_manager or PluginManager(347 PLUGIN_NAMESPACE, listener=self.plugin_errors348 )349 self._api_provider_specs = None350 self.provider_config = provider_config or config.SERVICE_PROVIDER_CONFIG351 def get_active_provider(self, service: str) -> str:352 return self.provider_config.get_provider(service)353 # TODO make the abstraction clearer, to provide better information if service is available versus discoverable354 # especially important when considering pro services355 def list_available(self) -> List[str]:356 """357 List all available services, which have an available, configured provider358 :return: List of service names359 """360 return [361 service362 for service, providers in self.api_provider_specs.items()363 if self.get_active_provider(service) in providers364 ]365 def exists(self, name: str) -> bool:366 return name in self.list_available()367 def get_state(self, name: str) -> Optional[ServiceState]:368 if name in self._services:369 # ServiceContainer exists, which means the plugin has been loaded370 return super().get_state(name)371 if not self.exists(name):372 # there's definitely no service with this name373 return None374 # if a PluginSpec exists, then we can get the container and check whether there was an error loading the plugin375 provider = self.get_active_provider(name)376 if self.plugin_errors.has_errors(name, provider):377 return ServiceState.ERROR378 return ServiceState.AVAILABLE379 def get_service_container(self, name: str) -> Optional[ServiceContainer]:380 container = super().get_service_container(name)381 if container:382 return container383 if not self.exists(name):384 return None385 # this is where we start lazy loading. we now know the PluginSpec for the API exists,386 # but the ServiceContainer has not been created387 plugin = self._load_service_plugin(name)388 if not plugin or not plugin.service:389 return None390 with self._mutex:391 if plugin.service.name() not in self._services:392 super().add_service(plugin.service)393 return super().get_service_container(name)394 @property395 def api_provider_specs(self) -> Dict[str, List[str]]:396 """397 Returns all provider names within the service plugin namespace and parses their name according to the convention,398 that is "<api>:<provider>". The result is a dictionary that maps api => List[str (name of a provider)].399 """400 if self._api_provider_specs is not None:401 return self._api_provider_specs402 with self._mutex:403 if self._api_provider_specs is None:404 self._api_provider_specs = self._resolve_api_provider_specs()405 return self._api_provider_specs406 @log_duration()407 def _load_service_plugin(self, name: str) -> Optional[ServicePlugin]:408 providers = self.api_provider_specs.get(name)409 if not providers:410 # no providers for this api411 return None412 preferred_provider = self.get_active_provider(name)413 if preferred_provider in providers:414 provider = preferred_provider415 else:416 LOG.warning(417 "Configured provider (%s) does not exist for service (%s). Available options are: %s",418 preferred_provider,419 name,420 providers,421 )422 return None423 plugin_name = f"{name}:{provider}"424 plugin = self.plugin_manager.load(plugin_name)425 plugin.name = plugin_name426 return plugin427 @log_duration()428 def _resolve_api_provider_specs(self) -> Dict[str, List[str]]:429 result = defaultdict(list)430 for spec in self.plugin_manager.list_plugin_specs():431 api, provider = spec.name.split(432 ":"433 ) # TODO: error handling, faulty plugins could break the runtime434 result[api].append(provider)435 return result436 def apis_with_provider(self, provider: str) -> List[str]:437 """438 Lists all apis where a given provider exists for.439 :param provider: Name of the provider440 :return: List of apis the given provider provides441 """442 apis = []443 for api, providers in self.api_provider_specs.items():444 if provider in providers:445 apis.append(api)446 return apis447 def stop_services(self, services: List[str] = None):448 """449 Stops services for this service manager, if they are currently active.450 Will not stop services not already started or in and error state.451 :param services: Service names to stop. If not provided, all services for this manager will be stopped.452 """453 for service_name in services:454 if self.get_state(service_name) in [ServiceState.STARTING, ServiceState.RUNNING]:455 service_container = self.get_service_container(service_name)456 service_container.stop()457 def stop_all_services(self) -> None:458 """459 Stops all services for this service manager, if they are currently active.460 Will not stop services not already started or in and error state.461 """462 services = self.list_available()463 self.stop_services(services)464# map of service plugins, mapping from service name to plugin details465SERVICE_PLUGINS: ServicePluginManager = ServicePluginManager()466# -----------------------------467# INFRASTRUCTURE HEALTH CHECKS468# -----------------------------469def wait_for_infra_shutdown():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful