How to use _update_domain_config_request_to_status method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...339 region = OpenSearchServiceBackend.get(domain_key.region)340 domain_status = region.opensearch_domains.get(domain_key.domain_name)341 if domain_status is None:342 raise ValidationException("Invalid ARN. Domain not found.")343def _update_domain_config_request_to_status(request: UpdateDomainConfigRequest) -> DomainStatus:344 request: Dict345 request.pop("DryRun", None)346 request.pop("DomainName", None)347 return request348class OpensearchProvider(OpensearchApi):349 def create_domain(350 self,351 context: RequestContext,352 domain_name: DomainName,353 engine_version: VersionString = None,354 cluster_config: ClusterConfig = None,355 ebs_options: EBSOptions = None,356 access_policies: PolicyDocument = None,357 snapshot_options: SnapshotOptions = None,358 vpc_options: VPCOptions = None,359 cognito_options: CognitoOptions = None,360 encryption_at_rest_options: EncryptionAtRestOptions = None,361 node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,362 advanced_options: AdvancedOptions = None,363 log_publishing_options: LogPublishingOptions = None,364 domain_endpoint_options: DomainEndpointOptions = None,365 advanced_security_options: AdvancedSecurityOptionsInput = None,366 tag_list: TagList = None,367 auto_tune_options: AutoTuneOptionsInput = None,368 ) -> CreateDomainResponse:369 region = OpenSearchServiceBackend.get()370 with _domain_mutex:371 if domain_name in region.opensearch_domains:372 raise ResourceAlreadyExistsException(373 f"domain {domain_name} already exists in region {region.name}"374 )375 domain_key = DomainKey(376 domain_name=domain_name,377 region=context.region,378 account=context.account_id,379 )380 # "create" domain data381 region.opensearch_domains[domain_name] = get_domain_status(domain_key)382 # lazy-init the cluster (sets the Endpoint and Processing flag of the domain status)383 # TODO handle additional parameters (cluster config,...)384 create_cluster(domain_key, engine_version, domain_endpoint_options)385 # set the tags386 self.add_tags(context, domain_key.arn, tag_list)387 # get the (updated) status388 status = get_domain_status(domain_key)389 # record event390 event_publisher.fire_event(391 event_publisher.EVENT_OPENSEARCH_CREATE_DOMAIN,392 payload={"n": event_publisher.get_hash(domain_name)},393 )394 return CreateDomainResponse(DomainStatus=status)395 def delete_domain(396 self, context: RequestContext, domain_name: DomainName397 ) -> DeleteDomainResponse:398 domain_key = DomainKey(399 domain_name=domain_name,400 region=context.region,401 account=context.account_id,402 )403 region = OpenSearchServiceBackend.get(domain_key.region)404 with _domain_mutex:405 if domain_name not in region.opensearch_domains:406 raise ResourceNotFoundException(f"Domain not found: {domain_name}")407 status = get_domain_status(domain_key, deleted=True)408 _remove_cluster(domain_key)409 # record event410 event_publisher.fire_event(411 event_publisher.EVENT_OPENSEARCH_DELETE_DOMAIN,412 payload={"n": event_publisher.get_hash(domain_name)},413 )414 return DeleteDomainResponse(DomainStatus=status)415 def describe_domain(416 self, context: RequestContext, domain_name: DomainName417 ) -> DescribeDomainResponse:418 domain_key = DomainKey(419 domain_name=domain_name,420 region=context.region,421 account=context.account_id,422 )423 region = OpenSearchServiceBackend.get(domain_key.region)424 with _domain_mutex:425 if domain_name not in region.opensearch_domains:426 raise ResourceNotFoundException(f"Domain not found: {domain_name}")427 status = get_domain_status(domain_key)428 return DescribeDomainResponse(DomainStatus=status)429 @handler("UpdateDomainConfig", expand=False)430 def update_domain_config(431 self, context: RequestContext, payload: UpdateDomainConfigRequest432 ) -> UpdateDomainConfigResponse:433 domain_key = DomainKey(434 domain_name=payload["DomainName"],435 region=context.region,436 account=context.account_id,437 )438 region = OpenSearchServiceBackend.get(domain_key.region)439 with _domain_mutex:440 domain_status = region.opensearch_domains.get(domain_key.domain_name, None)441 if domain_status is None:442 raise ResourceNotFoundException(f"Domain not found: {domain_key.domain_name}")443 status_update: Dict = _update_domain_config_request_to_status(payload)444 domain_status.update(status_update)445 return UpdateDomainConfigResponse(DomainConfig=_status_to_config(domain_status))446 def describe_domains(447 self, context: RequestContext, domain_names: DomainNameList448 ) -> DescribeDomainsResponse:449 status_list = []450 with _domain_mutex:451 for domain_name in domain_names:452 try:453 domain_status = self.describe_domain(context, domain_name)["DomainStatus"]454 status_list.append(domain_status)455 except ResourceNotFoundException:456 # ResourceNotFoundExceptions are ignored, we just look for the next domain.457 # If no domain can be found, the result will just be empty....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful