How to use describe_outbound_cross_cluster_search_connections method in localstack

Best Python code snippet using localstack_python

test_elasticsearch.py

Source:test_elasticsearch.py Github

copy

Full Screen

...308 es = session_factory().client('es')309 search_inbound_connections = es.describe_inbound_cross_cluster_search_connections()310 self.assertEqual(search_inbound_connections['CrossClusterSearchConnections'][0]311 ['SourceDomainInfo']['OwnerId'], '644160558196')312 search_outbound_connections = es.describe_outbound_cross_cluster_search_connections()313 self.assertEqual(search_outbound_connections['CrossClusterSearchConnections'][0]314 ['SourceDomainInfo']['OwnerId'], '644160558196')315 def test_elasticsearch_cross_cluster_search_connections_inbound(self):316 session_factory = self.replay_flight_data(317 'test_elasticsearch_cross_cluster_search_connections')318 p = self.load_policy(319 {320 'name': 'test-elasticsearch-cross-cluster-search-connections',321 'resource': 'aws.elasticsearch',322 'filters': [323 {324 'type': 'cross-cluster',325 'inbound':326 {327 'key': 'SourceDomainInfo.OwnerId',328 'value': '644160558196',329 'op': 'eq'330 },331 }332 ]333 },334 session_factory=session_factory335 )336 resources = p.run()337 self.assertEqual(len(resources), 1)338 self.assertEqual(339 resources[0]['DomainName'],340 self.terraform_cross_cluster['aws_elasticsearch_domain.inbound_connection.domain_name']341 )342 es = session_factory().client('es')343 search_inbound_connections = es.describe_inbound_cross_cluster_search_connections()344 self.assertEqual(search_inbound_connections['CrossClusterSearchConnections'][0]345 ['SourceDomainInfo']['OwnerId'], '644160558196')346 def test_elasticsearch_cross_cluster_search_connections_outbound(self):347 session_factory = self.replay_flight_data(348 'test_elasticsearch_cross_cluster_search_connections')349 p = self.load_policy(350 {351 'name': 'test-elasticsearch-cross-cluster-search-connections',352 'resource': 'aws.elasticsearch',353 'filters': [354 {355 'type': 'cross-cluster',356 'outbound':357 {358 'key': 'SourceDomainInfo.OwnerId',359 'value': '644160558196',360 'op': 'eq'361 }362 }363 ]364 },365 session_factory=session_factory366 )367 resources = p.run()368 self.assertEqual(len(resources), 1)369 self.assertEqual(370 resources[0]['DomainName'],371 self.terraform_cross_cluster['aws_elasticsearch_domain.outbound_connection.domain_name']372 )373 es = session_factory().client('es')374 search_outbound_connections = es.describe_outbound_cross_cluster_search_connections()375 self.assertEqual(search_outbound_connections['CrossClusterSearchConnections'][0]376 ['SourceDomainInfo']['OwnerId'], '644160558196')377 def test_elasticsearch_cross_cluster_search_connections_not_found(self):378 session_factory = self.replay_flight_data(379 'test_elasticsearch_cross_cluster_search_connections_not_found')380 p = self.load_policy(381 {382 'name': 'test-elasticsearch-cross-cluster-search-connections',383 'resource': 'aws.elasticsearch',384 'filters': [385 {386 'type': 'cross-cluster',387 'outbound':388 {389 'key': 'DestinationDomainInfo.DomainName',390 'value': 'test',391 'op': 'eq'392 }393 }394 ]395 },396 session_factory=session_factory397 )398 resources = p.run()399 self.assertEqual(len(resources), 0)400 es = session_factory().client('es')401 search_outbound_connections = es.describe_outbound_cross_cluster_search_connections(402 Filters=[403 {404 'Name': 'destination-domain-info.domain-name',405 'Values': [406 'test',407 ]408 },409 ],)410 self.assertEqual(len(search_outbound_connections["CrossClusterSearchConnections"]), 0)411 def test_elasticsearch_cross_account(self):412 session_factory = self.replay_flight_data("test_elasticsearch_cross_account")413 p = self.load_policy(414 {415 "name": "elasticsearch-cross-account",...

Full Screen

Full Screen

elasticsearch.py

Source:elasticsearch.py Github

copy

Full Screen

1# Copyright The Cloud Custodian Authors.2# SPDX-License-Identifier: Apache-2.03import jmespath4import json5from c7n.actions import Action, ModifyVpcSecurityGroupsAction, RemovePolicyBase6from c7n.filters import MetricsFilter, CrossAccountAccessFilter, ValueFilter7from c7n.exceptions import PolicyValidationError8from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter, VpcFilter, Filter9from c7n.manager import resources10from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo11from c7n.utils import chunks, local_session, type_schema12from c7n.tags import Tag, RemoveTag, TagActionFilter, TagDelayedAction13from c7n.filters.kms import KmsRelatedFilter14from .securityhub import PostFinding15class DescribeDomain(DescribeSource):16 def get_resources(self, resource_ids):17 # augment will turn these into resource dictionaries18 return resource_ids19 def augment(self, domains):20 client = local_session(self.manager.session_factory).client('es')21 model = self.manager.get_model()22 results = []23 def _augment(resource_set):24 resources = self.manager.retry(25 client.describe_elasticsearch_domains,26 DomainNames=resource_set)['DomainStatusList']27 for r in resources:28 rarn = self.manager.generate_arn(r[model.id])29 r['Tags'] = self.manager.retry(30 client.list_tags, ARN=rarn).get('TagList', [])31 return resources32 for resource_set in chunks(domains, 5):33 results.extend(_augment(resource_set))34 return results35@resources.register('elasticsearch')36class ElasticSearchDomain(QueryResourceManager):37 class resource_type(TypeInfo):38 service = 'es'39 arn = 'ARN'40 arn_type = 'domain'41 enum_spec = (42 'list_domain_names', 'DomainNames[].DomainName', None)43 id = 'DomainName'44 name = 'Name'45 dimension = "DomainName"46 cfn_type = config_type = 'AWS::Elasticsearch::Domain'47 source_mapping = {48 'describe': DescribeDomain,49 'config': ConfigSource50 }51ElasticSearchDomain.filter_registry.register('marked-for-op', TagActionFilter)52@ElasticSearchDomain.filter_registry.register('subnet')53class Subnet(SubnetFilter):54 RelatedIdsExpression = "VPCOptions.SubnetIds[]"55@ElasticSearchDomain.filter_registry.register('security-group')56class SecurityGroup(SecurityGroupFilter):57 RelatedIdsExpression = "VPCOptions.SecurityGroupIds[]"58@ElasticSearchDomain.filter_registry.register('vpc')59class Vpc(VpcFilter):60 RelatedIdsExpression = "VPCOptions.VPCId"61@ElasticSearchDomain.filter_registry.register('metrics')62class Metrics(MetricsFilter):63 def get_dimensions(self, resource):64 return [{'Name': 'ClientId',65 'Value': self.manager.account_id},66 {'Name': 'DomainName',67 'Value': resource['DomainName']}]68@ElasticSearchDomain.filter_registry.register('kms-key')69class KmsFilter(KmsRelatedFilter):70 RelatedIdsExpression = 'EncryptionAtRestOptions.KmsKeyId'71@ElasticSearchDomain.filter_registry.register('cross-account')72class ElasticSearchCrossAccountAccessFilter(CrossAccountAccessFilter):73 """74 Filter to return all elasticsearch domains with cross account access permissions75 :example:76 .. code-block:: yaml77 policies:78 - name: check-elasticsearch-cross-account79 resource: aws.elasticsearch80 filters:81 - type: cross-account82 """83 policy_attribute = 'c7n:Policy'84 permissions = ('es:DescribeElasticsearchDomainConfig',)85 def process(self, resources, event=None):86 client = local_session(self.manager.session_factory).client('es')87 for r in resources:88 if self.policy_attribute not in r:89 result = self.manager.retry(90 client.describe_elasticsearch_domain_config,91 DomainName=r['DomainName'],92 ignore_err_codes=('ResourceNotFoundException',))93 if result:94 r[self.policy_attribute] = json.loads(95 result.get('DomainConfig').get('AccessPolicies').get('Options')96 )97 return super().process(resources)98@ElasticSearchDomain.filter_registry.register('cross-cluster')99class ElasticSearchCrossClusterFilter(Filter):100 """101 Filter to return all elasticsearch domains with inbound cross-cluster with the given info102 :example:103 .. code-block:: yaml104 policies:105 - name: check-elasticsearch-cross-cluster106 resource: aws.elasticsearch107 filters:108 - type: cross-cluster109 inbound:110 key: SourceDomainInfo.OwnerId111 op: eq112 value: '123456789'113 outbound:114 key: SourceDomainInfo.OwnerId115 op: eq116 value: '123456789'117 """118 schema = type_schema(type_name="cross-cluster",119 inbound=type_schema(type_name='inbound',120 required=('key', 'value'),121 rinherit=ValueFilter.schema),122 outbound=type_schema(type_name='outbound',123 required=('key', 'value'),124 rinherit=ValueFilter.schema),)125 schema_alias = False126 annotation_key = "c7n:SearchConnections"127 matched_key = "c7n:MatchedConnections"128 annotate = False129 permissions = ('es:ESCrossClusterGet',)130 def process(self, resources, event=None):131 client = local_session(self.manager.session_factory).client('es')132 results = []133 for r in resources:134 if self.annotation_key not in r:135 r[self.annotation_key] = {}136 try:137 if "inbound" in self.data:138 inbound = self.manager.retry(139 client.describe_inbound_cross_cluster_search_connections,140 Filters=[{'Name': 'destination-domain-info.domain-name',141 'Values': [r['DomainName']]}])142 inbound.pop('ResponseMetadata')143 r[self.annotation_key]["inbound"] = inbound144 if "outbound" in self.data:145 outbound = self.manager.retry(146 client.describe_outbound_cross_cluster_search_connections,147 Filters=[{'Name': 'source-domain-info.domain-name',148 'Values': [r['DomainName']]}])149 outbound.pop('ResponseMetadata')150 r[self.annotation_key]["outbound"] = outbound151 except client.exceptions.ResourceNotFoundExecption:152 continue153 matchFound = False154 r[self.matched_key] = {}155 for direction in r[self.annotation_key]:156 matcher = self.data.get(direction)157 valueFilter = ValueFilter(matcher)158 valueFilter.annotate = False159 matched = []160 for conn in r[self.annotation_key][direction]['CrossClusterSearchConnections']:161 if valueFilter(conn):162 matched.append(conn)163 matchFound = True164 r[self.matched_key][direction] = matched165 if matchFound:166 results.append(r)167 return results168@ElasticSearchDomain.action_registry.register('remove-statements')169class RemovePolicyStatement(RemovePolicyBase):170 """171 Action to remove policy statements from elasticsearch172 :example:173 .. code-block:: yaml174 policies:175 - name: elasticsearch-cross-account176 resource: aws.elasticsearch177 filters:178 - type: cross-account179 actions:180 - type: remove-statements181 statement_ids: matched182 """183 permissions = ('es:DescribeElasticsearchDomainConfig', 'es:UpdateElasticsearchDomainConfig',)184 def validate(self):185 for f in self.manager.iter_filters():186 if isinstance(f, ElasticSearchCrossAccountAccessFilter):187 return self188 raise PolicyValidationError(189 '`remove-statements` may only be used in '190 'conjunction with `cross-account` filter on %s' % (self.manager.data,))191 def process(self, resources):192 client = local_session(self.manager.session_factory).client('es')193 for r in resources:194 try:195 self.process_resource(client, r)196 except Exception:197 self.log.exception("Error processing es:%s", r['ARN'])198 def process_resource(self, client, resource):199 p = resource.get('c7n:Policy')200 if p is None:201 return202 statements, found = self.process_policy(203 p, resource, CrossAccountAccessFilter.annotation_key)204 if found:205 client.update_elasticsearch_domain_config(206 DomainName=resource['DomainName'],207 AccessPolicies=json.dumps(p)208 )209 return210@ElasticSearchDomain.action_registry.register('post-finding')211class ElasticSearchPostFinding(PostFinding):212 resource_type = 'AwsElasticsearchDomain'213 def format_resource(self, r):214 envelope, payload = self.format_envelope(r)215 payload.update(self.filter_empty({216 'AccessPolicies': r.get('AccessPolicies'),217 'DomainId': r['DomainId'],218 'DomainName': r['DomainName'],219 'Endpoint': r.get('Endpoint'),220 'Endpoints': r.get('Endpoints'),221 'DomainEndpointOptions': self.filter_empty({222 'EnforceHTTPS': jmespath.search(223 'DomainEndpointOptions.EnforceHTTPS', r),224 'TLSSecurityPolicy': jmespath.search(225 'DomainEndpointOptions.TLSSecurityPolicy', r)226 }),227 'ElasticsearchVersion': r['ElasticsearchVersion'],228 'EncryptionAtRestOptions': self.filter_empty({229 'Enabled': jmespath.search(230 'EncryptionAtRestOptions.Enabled', r),231 'KmsKeyId': jmespath.search(232 'EncryptionAtRestOptions.KmsKeyId', r)233 }),234 'NodeToNodeEncryptionOptions': self.filter_empty({235 'Enabled': jmespath.search(236 'NodeToNodeEncryptionOptions.Enabled', r)237 }),238 'VPCOptions': self.filter_empty({239 'AvailabilityZones': jmespath.search(240 'VPCOptions.AvailabilityZones', r),241 'SecurityGroupIds': jmespath.search(242 'VPCOptions.SecurityGroupIds', r),243 'SubnetIds': jmespath.search('VPCOptions.SubnetIds', r),244 'VPCId': jmespath.search('VPCOptions.VPCId', r)245 })246 }))247 return envelope248@ElasticSearchDomain.action_registry.register('modify-security-groups')249class ElasticSearchModifySG(ModifyVpcSecurityGroupsAction):250 """Modify security groups on an Elasticsearch domain"""251 permissions = ('es:UpdateElasticsearchDomainConfig',)252 def process(self, domains):253 groups = super(ElasticSearchModifySG, self).get_groups(domains)254 client = local_session(self.manager.session_factory).client('es')255 for dx, d in enumerate(domains):256 client.update_elasticsearch_domain_config(257 DomainName=d['DomainName'],258 VPCOptions={259 'SecurityGroupIds': groups[dx]})260@ElasticSearchDomain.action_registry.register('delete')261class Delete(Action):262 schema = type_schema('delete')263 permissions = ('es:DeleteElasticsearchDomain',)264 def process(self, resources):265 client = local_session(self.manager.session_factory).client('es')266 for r in resources:267 client.delete_elasticsearch_domain(DomainName=r['DomainName'])268@ElasticSearchDomain.action_registry.register('tag')269class ElasticSearchAddTag(Tag):270 """Action to create tag(s) on an existing elasticsearch domain271 :example:272 .. code-block:: yaml273 policies:274 - name: es-add-tag275 resource: elasticsearch276 filters:277 - "tag:DesiredTag": absent278 actions:279 - type: tag280 key: DesiredTag281 value: DesiredValue282 """283 permissions = ('es:AddTags',)284 def process_resource_set(self, client, domains, tags):285 for d in domains:286 try:287 client.add_tags(ARN=d['ARN'], TagList=tags)288 except client.exceptions.ResourceNotFoundExecption:289 continue290@ElasticSearchDomain.action_registry.register('remove-tag')291class ElasticSearchRemoveTag(RemoveTag):292 """Removes tag(s) on an existing elasticsearch domain293 :example:294 .. code-block:: yaml295 policies:296 - name: es-remove-tag297 resource: elasticsearch298 filters:299 - "tag:ExpiredTag": present300 actions:301 - type: remove-tag302 tags: ['ExpiredTag']303 """304 permissions = ('es:RemoveTags',)305 def process_resource_set(self, client, domains, tags):306 for d in domains:307 try:308 client.remove_tags(ARN=d['ARN'], TagKeys=tags)309 except client.exceptions.ResourceNotFoundExecption:310 continue311@ElasticSearchDomain.action_registry.register('mark-for-op')312class ElasticSearchMarkForOp(TagDelayedAction):313 """Tag an elasticsearch domain for action later314 :example:315 .. code-block:: yaml316 policies:317 - name: es-delete-missing318 resource: elasticsearch319 filters:320 - "tag:DesiredTag": absent321 actions:322 - type: mark-for-op323 days: 7324 op: delete325 tag: c7n_es_delete326 """327@resources.register('elasticsearch-reserved')328class ReservedInstances(QueryResourceManager):329 class resource_type(TypeInfo):330 service = 'es'331 name = id = 'ReservedElasticsearchInstanceId'332 date = 'StartTime'333 enum_spec = (334 'describe_reserved_elasticsearch_instances', 'ReservedElasticsearchInstances', None)335 filter_name = 'ReservedElasticsearchInstances'336 filter_type = 'list'337 arn_type = "reserved-instances"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful