Best Python code snippet using localstack_python
test_elasticsearch.py
Source:test_elasticsearch.py  
...305        )306        resources = p.run()307        self.assertEqual(len(resources), 2)308        es = session_factory().client('es')309        search_inbound_connections = es.describe_inbound_cross_cluster_search_connections()310        self.assertEqual(search_inbound_connections['CrossClusterSearchConnections'][0]311        ['SourceDomainInfo']['OwnerId'], '644160558196')312        search_outbound_connections = es.describe_outbound_cross_cluster_search_connections()313        self.assertEqual(search_outbound_connections['CrossClusterSearchConnections'][0]314        ['SourceDomainInfo']['OwnerId'], '644160558196')315    def test_elasticsearch_cross_cluster_search_connections_inbound(self):316        session_factory = self.replay_flight_data(317            'test_elasticsearch_cross_cluster_search_connections')318        p = self.load_policy(319            {320                'name': 'test-elasticsearch-cross-cluster-search-connections',321                'resource': 'aws.elasticsearch',322                'filters': [323                    {324                        'type': 'cross-cluster',325                        'inbound':326                        {327                            'key': 'SourceDomainInfo.OwnerId',328                            'value': '644160558196',329                            'op': 'eq'330                        },331                    }332                ]333            },334            session_factory=session_factory335        )336        resources = p.run()337        self.assertEqual(len(resources), 1)338        self.assertEqual(339            resources[0]['DomainName'],340            self.terraform_cross_cluster['aws_elasticsearch_domain.inbound_connection.domain_name']341        )342        es = session_factory().client('es')343        search_inbound_connections = es.describe_inbound_cross_cluster_search_connections()344        self.assertEqual(search_inbound_connections['CrossClusterSearchConnections'][0]345        ['SourceDomainInfo']['OwnerId'], '644160558196')346    def test_elasticsearch_cross_cluster_search_connections_outbound(self):347        session_factory = self.replay_flight_data(348            'test_elasticsearch_cross_cluster_search_connections')349        p = self.load_policy(350            {351                'name': 'test-elasticsearch-cross-cluster-search-connections',352                'resource': 'aws.elasticsearch',353                'filters': [354                    {355                        'type': 'cross-cluster',356                        'outbound':357                        {...elasticsearch.py
Source:elasticsearch.py  
1# Copyright The Cloud Custodian Authors.2# SPDX-License-Identifier: Apache-2.03import jmespath4import json5from c7n.actions import Action, ModifyVpcSecurityGroupsAction, RemovePolicyBase6from c7n.filters import MetricsFilter, CrossAccountAccessFilter, ValueFilter7from c7n.exceptions import PolicyValidationError8from c7n.filters.vpc import SecurityGroupFilter, SubnetFilter, VpcFilter, Filter9from c7n.manager import resources10from c7n.query import ConfigSource, DescribeSource, QueryResourceManager, TypeInfo11from c7n.utils import chunks, local_session, type_schema12from c7n.tags import Tag, RemoveTag, TagActionFilter, TagDelayedAction13from c7n.filters.kms import KmsRelatedFilter14from .securityhub import PostFinding15class DescribeDomain(DescribeSource):16    def get_resources(self, resource_ids):17        # augment will turn these into resource dictionaries18        return resource_ids19    def augment(self, domains):20        client = local_session(self.manager.session_factory).client('es')21        model = self.manager.get_model()22        results = []23        def _augment(resource_set):24            resources = self.manager.retry(25                client.describe_elasticsearch_domains,26                DomainNames=resource_set)['DomainStatusList']27            for r in resources:28                rarn = self.manager.generate_arn(r[model.id])29                r['Tags'] = self.manager.retry(30                    client.list_tags, ARN=rarn).get('TagList', [])31            return resources32        for resource_set in chunks(domains, 5):33            results.extend(_augment(resource_set))34        return results35@resources.register('elasticsearch')36class ElasticSearchDomain(QueryResourceManager):37    class resource_type(TypeInfo):38        service = 'es'39        arn = 'ARN'40        arn_type = 'domain'41        enum_spec = (42            'list_domain_names', 'DomainNames[].DomainName', None)43        id = 'DomainName'44        name = 'Name'45        dimension = "DomainName"46        cfn_type = config_type = 'AWS::Elasticsearch::Domain'47    source_mapping = {48        'describe': DescribeDomain,49        'config': ConfigSource50    }51ElasticSearchDomain.filter_registry.register('marked-for-op', TagActionFilter)52@ElasticSearchDomain.filter_registry.register('subnet')53class Subnet(SubnetFilter):54    RelatedIdsExpression = "VPCOptions.SubnetIds[]"55@ElasticSearchDomain.filter_registry.register('security-group')56class SecurityGroup(SecurityGroupFilter):57    RelatedIdsExpression = "VPCOptions.SecurityGroupIds[]"58@ElasticSearchDomain.filter_registry.register('vpc')59class Vpc(VpcFilter):60    RelatedIdsExpression = "VPCOptions.VPCId"61@ElasticSearchDomain.filter_registry.register('metrics')62class Metrics(MetricsFilter):63    def get_dimensions(self, resource):64        return [{'Name': 'ClientId',65                 'Value': self.manager.account_id},66                {'Name': 'DomainName',67                 'Value': resource['DomainName']}]68@ElasticSearchDomain.filter_registry.register('kms-key')69class KmsFilter(KmsRelatedFilter):70    RelatedIdsExpression = 'EncryptionAtRestOptions.KmsKeyId'71@ElasticSearchDomain.filter_registry.register('cross-account')72class ElasticSearchCrossAccountAccessFilter(CrossAccountAccessFilter):73    """74    Filter to return all elasticsearch domains with cross account access permissions75    :example:76    .. code-block:: yaml77        policies:78          - name: check-elasticsearch-cross-account79            resource: aws.elasticsearch80            filters:81              - type: cross-account82    """83    policy_attribute = 'c7n:Policy'84    permissions = ('es:DescribeElasticsearchDomainConfig',)85    def process(self, resources, event=None):86        client = local_session(self.manager.session_factory).client('es')87        for r in resources:88            if self.policy_attribute not in r:89                result = self.manager.retry(90                    client.describe_elasticsearch_domain_config,91                    DomainName=r['DomainName'],92                    ignore_err_codes=('ResourceNotFoundException',))93                if result:94                    r[self.policy_attribute] = json.loads(95                        result.get('DomainConfig').get('AccessPolicies').get('Options')96                    )97        return super().process(resources)98@ElasticSearchDomain.filter_registry.register('cross-cluster')99class ElasticSearchCrossClusterFilter(Filter):100    """101    Filter to return all elasticsearch domains with inbound cross-cluster with the given info102    :example:103    .. code-block:: yaml104        policies:105          - name: check-elasticsearch-cross-cluster106            resource: aws.elasticsearch107            filters:108              - type: cross-cluster109                inbound:110                    key: SourceDomainInfo.OwnerId111                    op: eq112                    value: '123456789'113                outbound:114                    key: SourceDomainInfo.OwnerId115                    op: eq116                    value: '123456789'117    """118    schema = type_schema(type_name="cross-cluster",119                         inbound=type_schema(type_name='inbound',120                                             required=('key', 'value'),121                                             rinherit=ValueFilter.schema),122                         outbound=type_schema(type_name='outbound',123                                              required=('key', 'value'),124                                              rinherit=ValueFilter.schema),)125    schema_alias = False126    annotation_key = "c7n:SearchConnections"127    matched_key = "c7n:MatchedConnections"128    annotate = False129    permissions = ('es:ESCrossClusterGet',)130    def process(self, resources, event=None):131        client = local_session(self.manager.session_factory).client('es')132        results = []133        for r in resources:134            if self.annotation_key not in r:135                r[self.annotation_key] = {}136                try:137                    if "inbound" in self.data:138                        inbound = self.manager.retry(139                            client.describe_inbound_cross_cluster_search_connections,140                            Filters=[{'Name': 'destination-domain-info.domain-name',141                                    'Values': [r['DomainName']]}])142                        inbound.pop('ResponseMetadata')143                        r[self.annotation_key]["inbound"] = inbound144                    if "outbound" in self.data:145                        outbound = self.manager.retry(146                            client.describe_outbound_cross_cluster_search_connections,147                            Filters=[{'Name': 'source-domain-info.domain-name',148                                    'Values': [r['DomainName']]}])149                        outbound.pop('ResponseMetadata')150                        r[self.annotation_key]["outbound"] = outbound151                except client.exceptions.ResourceNotFoundExecption:152                    continue153            matchFound = False154            r[self.matched_key] = {}155            for direction in r[self.annotation_key]:156                matcher = self.data.get(direction)157                valueFilter = ValueFilter(matcher)158                valueFilter.annotate = False159                matched = []160                for conn in r[self.annotation_key][direction]['CrossClusterSearchConnections']:161                    if valueFilter(conn):162                        matched.append(conn)163                        matchFound = True164                r[self.matched_key][direction] = matched165            if matchFound:166                results.append(r)167        return results168@ElasticSearchDomain.action_registry.register('remove-statements')169class RemovePolicyStatement(RemovePolicyBase):170    """171    Action to remove policy statements from elasticsearch172    :example:173    .. code-block:: yaml174        policies:175          - name: elasticsearch-cross-account176            resource: aws.elasticsearch177            filters:178              - type: cross-account179            actions:180              - type: remove-statements181                statement_ids: matched182    """183    permissions = ('es:DescribeElasticsearchDomainConfig', 'es:UpdateElasticsearchDomainConfig',)184    def validate(self):185        for f in self.manager.iter_filters():186            if isinstance(f, ElasticSearchCrossAccountAccessFilter):187                return self188        raise PolicyValidationError(189            '`remove-statements` may only be used in '190            'conjunction with `cross-account` filter on %s' % (self.manager.data,))191    def process(self, resources):192        client = local_session(self.manager.session_factory).client('es')193        for r in resources:194            try:195                self.process_resource(client, r)196            except Exception:197                self.log.exception("Error processing es:%s", r['ARN'])198    def process_resource(self, client, resource):199        p = resource.get('c7n:Policy')200        if p is None:201            return202        statements, found = self.process_policy(203            p, resource, CrossAccountAccessFilter.annotation_key)204        if found:205            client.update_elasticsearch_domain_config(206                DomainName=resource['DomainName'],207                AccessPolicies=json.dumps(p)208            )209        return210@ElasticSearchDomain.action_registry.register('post-finding')211class ElasticSearchPostFinding(PostFinding):212    resource_type = 'AwsElasticsearchDomain'213    def format_resource(self, r):214        envelope, payload = self.format_envelope(r)215        payload.update(self.filter_empty({216            'AccessPolicies': r.get('AccessPolicies'),217            'DomainId': r['DomainId'],218            'DomainName': r['DomainName'],219            'Endpoint': r.get('Endpoint'),220            'Endpoints': r.get('Endpoints'),221            'DomainEndpointOptions': self.filter_empty({222                'EnforceHTTPS': jmespath.search(223                    'DomainEndpointOptions.EnforceHTTPS', r),224                'TLSSecurityPolicy': jmespath.search(225                    'DomainEndpointOptions.TLSSecurityPolicy', r)226            }),227            'ElasticsearchVersion': r['ElasticsearchVersion'],228            'EncryptionAtRestOptions': self.filter_empty({229                'Enabled': jmespath.search(230                    'EncryptionAtRestOptions.Enabled', r),231                'KmsKeyId': jmespath.search(232                    'EncryptionAtRestOptions.KmsKeyId', r)233            }),234            'NodeToNodeEncryptionOptions': self.filter_empty({235                'Enabled': jmespath.search(236                    'NodeToNodeEncryptionOptions.Enabled', r)237            }),238            'VPCOptions': self.filter_empty({239                'AvailabilityZones': jmespath.search(240                    'VPCOptions.AvailabilityZones', r),241                'SecurityGroupIds': jmespath.search(242                    'VPCOptions.SecurityGroupIds', r),243                'SubnetIds': jmespath.search('VPCOptions.SubnetIds', r),244                'VPCId': jmespath.search('VPCOptions.VPCId', r)245            })246        }))247        return envelope248@ElasticSearchDomain.action_registry.register('modify-security-groups')249class ElasticSearchModifySG(ModifyVpcSecurityGroupsAction):250    """Modify security groups on an Elasticsearch domain"""251    permissions = ('es:UpdateElasticsearchDomainConfig',)252    def process(self, domains):253        groups = super(ElasticSearchModifySG, self).get_groups(domains)254        client = local_session(self.manager.session_factory).client('es')255        for dx, d in enumerate(domains):256            client.update_elasticsearch_domain_config(257                DomainName=d['DomainName'],258                VPCOptions={259                    'SecurityGroupIds': groups[dx]})260@ElasticSearchDomain.action_registry.register('delete')261class Delete(Action):262    schema = type_schema('delete')263    permissions = ('es:DeleteElasticsearchDomain',)264    def process(self, resources):265        client = local_session(self.manager.session_factory).client('es')266        for r in resources:267            client.delete_elasticsearch_domain(DomainName=r['DomainName'])268@ElasticSearchDomain.action_registry.register('tag')269class ElasticSearchAddTag(Tag):270    """Action to create tag(s) on an existing elasticsearch domain271    :example:272    .. code-block:: yaml273                policies:274                  - name: es-add-tag275                    resource: elasticsearch276                    filters:277                      - "tag:DesiredTag": absent278                    actions:279                      - type: tag280                        key: DesiredTag281                        value: DesiredValue282    """283    permissions = ('es:AddTags',)284    def process_resource_set(self, client, domains, tags):285        for d in domains:286            try:287                client.add_tags(ARN=d['ARN'], TagList=tags)288            except client.exceptions.ResourceNotFoundExecption:289                continue290@ElasticSearchDomain.action_registry.register('remove-tag')291class ElasticSearchRemoveTag(RemoveTag):292    """Removes tag(s) on an existing elasticsearch domain293    :example:294    .. code-block:: yaml295        policies:296          - name: es-remove-tag297            resource: elasticsearch298            filters:299              - "tag:ExpiredTag": present300            actions:301              - type: remove-tag302                tags: ['ExpiredTag']303        """304    permissions = ('es:RemoveTags',)305    def process_resource_set(self, client, domains, tags):306        for d in domains:307            try:308                client.remove_tags(ARN=d['ARN'], TagKeys=tags)309            except client.exceptions.ResourceNotFoundExecption:310                continue311@ElasticSearchDomain.action_registry.register('mark-for-op')312class ElasticSearchMarkForOp(TagDelayedAction):313    """Tag an elasticsearch domain for action later314    :example:315    .. code-block:: yaml316                policies:317                  - name: es-delete-missing318                    resource: elasticsearch319                    filters:320                      - "tag:DesiredTag": absent321                    actions:322                      - type: mark-for-op323                        days: 7324                        op: delete325                        tag: c7n_es_delete326    """327@resources.register('elasticsearch-reserved')328class ReservedInstances(QueryResourceManager):329    class resource_type(TypeInfo):330        service = 'es'331        name = id = 'ReservedElasticsearchInstanceId'332        date = 'StartTime'333        enum_spec = (334            'describe_reserved_elasticsearch_instances', 'ReservedElasticsearchInstances', None)335        filter_name = 'ReservedElasticsearchInstances'336        filter_type = 'list'337        arn_type = "reserved-instances"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
