How to use update_elasticsearch_domain_config method in localstack

Best Python code snippet using localstack_python

test_es_utils.py

Source:test_es_utils.py Github

copy

Full Screen

1import pytest2from dcicutils.es_utils import (3 create_es_client, execute_lucene_query_on_es, get_bulk_uuids_embedded, ElasticSearchServiceClient,4)5from dcicutils.ff_utils import get_es_metadata6from dcicutils.misc_utils import ignored7from dcicutils.qa_utils import timed8from unittest import mock9class TestElasticSearchServiceClient:10 @staticmethod11 def mock_update_es_success(DomainName, ElasticsearchClusterConfig): # noQA - mixed-case params chosen by AWS12 ignored(DomainName, ElasticsearchClusterConfig)13 return {14 'ResponseMetadata': {15 'HTTPStatusCode': 20016 }17 }18 @staticmethod19 def mock_update_es_fail(DomainName, ElasticsearchClusterConfig): # noQA - mixed-case params chosen by AWS20 return {21 'ResponseMetadata': {22 'HTTPStatusCode': 40323 }24 }25 @staticmethod26 def mock_update_es_bad_response(DomainName, ElasticsearchClusterConfig): # noQA - mixed-case params chosen by AWS27 return {28 'something_else': {29 'blah': 40330 }31 }32 @staticmethod33 def mock_update_es_unknown(DomainName, ElasticsearchClusterConfig): # noQA - mixed-case params chosen by AWS34 ignored(DomainName, ElasticsearchClusterConfig)35 raise Exception('Literally anything')36 def test_elasticsearch_service_client_resize_accepted(self):37 """ Tests handling of a success response. """38 client = ElasticSearchServiceClient()39 with mock.patch.object(client.client, 'update_elasticsearch_domain_config',40 self.mock_update_es_success):41 success = client.resize_elasticsearch_cluster(42 domain_name='fourfront-newtest',43 master_node_type='t2.medium.elasticsearch',44 master_node_count=3,45 data_node_type='c5.large.elasticsearch',46 data_node_count=247 )48 assert success49 def test_elasticsearch_service_client_resize_fail(self):50 """ Tests handling of a 403 response. """51 client = ElasticSearchServiceClient()52 with mock.patch.object(client.client, 'update_elasticsearch_domain_config',53 self.mock_update_es_fail):54 success = client.resize_elasticsearch_cluster(55 domain_name='fourfront-newtest',56 master_node_type='t2.medium.elasticsearch',57 master_node_count=3,58 data_node_type='c5.large.elasticsearch',59 data_node_count=260 )61 assert not success62 def test_elasticsearch_service_client_resize_bad_response(self):63 """ Tests handling of a badly formatted response. """64 client = ElasticSearchServiceClient()65 with mock.patch.object(client.client, 'update_elasticsearch_domain_config',66 self.mock_update_es_bad_response):67 success = client.resize_elasticsearch_cluster(68 domain_name='fourfront-newtest',69 master_node_type='t2.medium.elasticsearch',70 master_node_count=3,71 data_node_type='c5.large.elasticsearch',72 data_node_count=273 )74 assert not success75 def test_elasticsearch_service_client_resize_unknown(self):76 """ Tests handling of a unknown error. """77 client = ElasticSearchServiceClient()78 with mock.patch.object(client.client, 'update_elasticsearch_domain_config',79 self.mock_update_es_unknown):80 success = client.resize_elasticsearch_cluster(81 domain_name='fourfront-newtest',82 master_node_type='t2.medium.elasticsearch',83 master_node_count=3,84 data_node_type='c5.large.elasticsearch',85 data_node_count=286 )87 assert not success88@pytest.fixture89def es_client_fixture(integrated_ff):90 """ Fixture that creates an es client to mastertest """91 return create_es_client(integrated_ff['es_url'])92@pytest.mark.integrated93def test_lucene_query_basic(es_client_fixture):94 """ Tests basic lucene queries via the underlying endpoint on mastertest """95 results = execute_lucene_query_on_es(client=es_client_fixture, index='fourfront-mastertestuser', query={})96 assert len(results) == 1097 test_query = {98 'query': {99 'bool': {100 'must': [ # search for will's user insert101 {'terms': {'_id': ['1a12362f-4eb6-4a9c-8173-776667226988']}}102 ],103 'must_not': []104 }105 },106 'sort': [{'_uid': {'order': 'desc'}}]107 }108 results = execute_lucene_query_on_es(client=es_client_fixture, index='fourfront-mastertestuser', query=test_query)109 assert len(results) == 1110@pytest.mark.integrated111def test_get_bulk_uuids_embedded(es_client_fixture):112 """ Tests getting some bulk uuids acquired from search. """113 uuids = ['1a12362f-4eb6-4a9c-8173-776667226988'] # only one uuid first114 result1 = get_bulk_uuids_embedded(es_client_fixture, 'fourfront-mastertestuser', uuids, is_generator=False)115 assert len(result1) == 1116 assert result1[0]['uuid'] == uuids[0] # check uuid117 assert result1[0]['lab']['awards'] is not None # check embedding118 assert result1[0]['lab']['awards'][0]['project'] is not None119 # one page of results, should give us 10120 users = execute_lucene_query_on_es(client=es_client_fixture, index='fourfront-mastertestuser', query={})121 uuids = [doc['_id'] for doc in users]122 result2 = get_bulk_uuids_embedded(es_client_fixture, 'fourfront-mastertestuser', uuids)123 assert len(result2) == 10124 for doc in result2:125 assert doc['uuid'] in uuids # check uuids126 result3 = get_bulk_uuids_embedded(es_client_fixture, 'fourfront-mastertestuser', uuids, is_generator=True)127 for doc in result3:128 assert doc['uuid'] in uuids # check uuids from gen129@pytest.mark.integrated130def test_get_bulk_uuids_outperforms_get_es_metadata(integrated_ff, es_client_fixture):131 """ Tests that the new method out performs the new one. """132 users = execute_lucene_query_on_es(client=es_client_fixture, index='fourfront-mastertestuser', query={})133 uuids = [doc['_id'] for doc in users]134 times = []135 def set_current_time(start, end): # noqa I want this default argument to be mutable136 times.append(end - start)137 with timed(reporter=set_current_time):138 get_bulk_uuids_embedded(es_client_fixture, 'fourfront-mastertestuser', uuids, is_generator=False)139 assert len(times) == 1140 with timed(reporter=set_current_time):141 get_es_metadata(uuids, key=integrated_ff['ff_key'], ff_env=integrated_ff['ff_env'])142 assert len(times) == 2...

Full Screen

Full Screen

elasticsearch.py

Source:elasticsearch.py Github

copy

Full Screen

...54 new_policy = json.dumps(evil_policy)55 logger.debug("Setting resource policy for %s" % self.arn)56 try:57 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/es.html#ElasticsearchService.Client.update_elasticsearch_domain_config58 self.client.update_elasticsearch_domain_config(DomainName=self.name, AccessPolicies=new_policy)59 message = "success"60 success = True61 except botocore.exceptions.ClientError as error:62 message = str(error)63 success = False64 response_message = ResponseMessage(message=message, operation="set_rbp", success=success, evil_principal="",65 victim_resource_arn=self.arn, original_policy=self.original_policy,66 updated_policy=evil_policy, resource_type=self.resource_type,67 resource_name=self.name, service=self.service)68 return response_message69class ElasticSearchDomains(ResourceTypes):70 def __init__(self, client: boto3.Session.client, current_account_id: str, region: str):71 super().__init__(client, current_account_id, region)72 self.service = "elasticsearch"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful