Best Python code snippet using localstack_python
client.pyi
Source:client.pyi  
...142        Deletes all dashboards that you specify.143        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.delete_dashboards)144        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#delete_dashboards)145        """146    def delete_insight_rules(self, *, RuleNames: List[str]) -> DeleteInsightRulesOutputTypeDef:147        """148        Permanently deletes the specified Contributor Insights rules.149        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.delete_insight_rules)150        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#delete_insight_rules)151        """152    def delete_metric_stream(self, *, Name: str) -> Dict[str, Any]:153        """154        Permanently deletes the metric stream that you specify.155        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.delete_metric_stream)156        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#delete_metric_stream)157        """158    def describe_alarm_history(159        self,160        *,...cw.py
Source:cw.py  
1# Copyright The Cloud Custodian Authors.2# SPDX-License-Identifier: Apache-2.03from concurrent.futures import as_completed4from datetime import datetime, timedelta5from c7n.actions import BaseAction6from c7n.exceptions import PolicyValidationError7from c7n.filters import Filter, MetricsFilter8from c7n.filters.core import parse_date9from c7n.filters.iamaccess import CrossAccountAccessFilter10from c7n.query import QueryResourceManager, ChildResourceManager, TypeInfo11from c7n.manager import resources12from c7n.resolver import ValuesFrom13from c7n.tags import universal_augment14from c7n.utils import type_schema, local_session, chunks, get_retry15@resources.register('alarm')16class Alarm(QueryResourceManager):17    class resource_type(TypeInfo):18        service = 'cloudwatch'19        arn_type = 'alarm'20        enum_spec = ('describe_alarms', 'MetricAlarms', None)21        id = 'AlarmArn'22        filter_name = 'AlarmNames'23        filter_type = 'list'24        name = 'AlarmName'25        date = 'AlarmConfigurationUpdatedTimestamp'26        cfn_type = config_type = 'AWS::CloudWatch::Alarm'27    retry = staticmethod(get_retry(('Throttled',)))28@Alarm.action_registry.register('delete')29class AlarmDelete(BaseAction):30    """Delete a cloudwatch alarm.31    :example:32    .. code-block:: yaml33            policies:34              - name: cloudwatch-delete-stale-alarms35                resource: alarm36                filters:37                  - type: value38                    value_type: age39                    key: StateUpdatedTimestamp40                    value: 3041                    op: ge42                  - StateValue: INSUFFICIENT_DATA43                actions:44                  - delete45    """46    schema = type_schema('delete')47    permissions = ('cloudwatch:DeleteAlarms',)48    def process(self, resources):49        client = local_session(50            self.manager.session_factory).client('cloudwatch')51        for resource_set in chunks(resources, size=100):52            self.manager.retry(53                client.delete_alarms,54                AlarmNames=[r['AlarmName'] for r in resource_set])55@resources.register('event-rule')56class EventRule(QueryResourceManager):57    class resource_type(TypeInfo):58        service = 'events'59        arn_type = 'rule'60        enum_spec = ('list_rules', 'Rules', None)61        name = "Name"62        id = "Name"63        filter_name = "NamePrefix"64        filter_type = "scalar"65        cfn_type = 'AWS::Events::Rule'66        universal_taggable = object()67    augment = universal_augment68@EventRule.filter_registry.register('metrics')69class EventRuleMetrics(MetricsFilter):70    def get_dimensions(self, resource):71        return [{'Name': 'RuleName', 'Value': resource['Name']}]72@resources.register('event-rule-target')73class EventRuleTarget(ChildResourceManager):74    class resource_type(TypeInfo):75        service = 'events'76        arn = False77        arn_type = 'event-rule-target'78        enum_spec = ('list_targets_by_rule', 'Targets', None)79        parent_spec = ('event-rule', 'Rule', True)80        name = id = 'Id'81@EventRuleTarget.filter_registry.register('cross-account')82class CrossAccountFilter(CrossAccountAccessFilter):83    schema = type_schema(84        'cross-account',85        # white list accounts86        whitelist_from=ValuesFrom.schema,87        whitelist={'type': 'array', 'items': {'type': 'string'}})88    # dummy permission89    permissions = ('events:ListTargetsByRule',)90    def __call__(self, r):91        account_id = r['Arn'].split(':', 5)[4]92        return account_id not in self.accounts93@EventRuleTarget.action_registry.register('delete')94class DeleteTarget(BaseAction):95    schema = type_schema('delete')96    permissions = ('events:RemoveTargets',)97    def process(self, resources):98        client = local_session(self.manager.session_factory).client('events')99        rule_targets = {}100        for r in resources:101            rule_targets.setdefault(r['c7n:parent-id'], []).append(r['Id'])102        for rule_id, target_ids in rule_targets.items():103            client.remove_targets(104                Ids=target_ids,105                Rule=rule_id)106@resources.register('log-group')107class LogGroup(QueryResourceManager):108    class resource_type(TypeInfo):109        service = 'logs'110        arn_type = 'log-group'111        enum_spec = ('describe_log_groups', 'logGroups', None)112        name = 'logGroupName'113        id = 'arn'114        filter_name = 'logGroupNamePrefix'115        filter_type = 'scalar'116        dimension = 'LogGroupName'117        date = 'creationTime'118        universal_taggable = True119        cfn_type = 'AWS::Logs::LogGroup'120    def get_arns(self, resources):121        # log group arn in resource describe has ':*' suffix, not all122        # apis can use that form, so normalize to standard arn.123        return [r['arn'][:-2] for r in resources]124@resources.register('insight-rule')125class InsightRule(QueryResourceManager):126    class resource_type(TypeInfo):127        service = 'cloudwatch'128        arn_type = 'insight-rule'129        enum_spec = ('describe_insight_rules', 'InsightRules', None)130        name = id = 'Name'131        universal_taggable = object()132        permission_augment = ('cloudWatch::ListTagsForResource',)133        cfn_type = 'AWS::CloudWatch::InsightRule'134    def augment(self, rules):135        client = local_session(self.session_factory).client('cloudwatch')136        def _add_tags(r):137            arn = self.generate_arn(r['Name'])138            r['Tags'] = client.list_tags_for_resource(139                ResourceARN=arn).get('Tags', [])140            return r141        return list(map(_add_tags, rules))142@InsightRule.action_registry.register('disable')143class InsightRuleDisable(BaseAction):144    """Disable a cloudwatch contributor insight rule.145    :example:146    .. code-block:: yaml147            policies:148              - name: cloudwatch-disable-insight-rule149                resource: insight-rule150                filters:151                  - type: value152                    key: State153                    value: ENABLED154                    op: eq155                actions:156                  - disable157    """158    schema = type_schema('disable')159    permissions = ('cloudwatch:DisableInsightRules',)160    def process(self, resources):161        client = local_session(162            self.manager.session_factory).client('cloudwatch')163        for resource_set in chunks(resources, size=100):164            self.manager.retry(165                client.disable_insight_rules,166                RuleNames=[r['Name'] for r in resource_set])167@InsightRule.action_registry.register('delete')168class InsightRuleDelete(BaseAction):169    """Delete a cloudwatch contributor insight rule170    :example:171    .. code-block:: yaml172            policies:173              - name: cloudwatch-delete-insight-rule174                resource: insight-rule175                filters:176                  - type: value177                    key: State178                    value: ENABLED179                    op: eq180                actions:181                  - delete182    """183    schema = type_schema('delete')184    permissions = ('cloudwatch:DeleteInsightRules',)185    def process(self, resources):186        client = local_session(187            self.manager.session_factory).client('cloudwatch')188        for resource_set in chunks(resources, size=100):189            self.manager.retry(190                client.delete_insight_rules,191                RuleNames=[r['Name'] for r in resource_set])192@LogGroup.filter_registry.register('metrics')193class LogGroupMetrics(MetricsFilter):194    def get_dimensions(self, resource):195        return [{'Name': 'LogGroupName', 'Value': resource['logGroupName']}]196@LogGroup.action_registry.register('retention')197class Retention(BaseAction):198    """Action to set the retention period (in days) for CloudWatch log groups199    :example:200    .. code-block:: yaml201            policies:202              - name: cloudwatch-set-log-group-retention203                resource: log-group204                actions:205                  - type: retention206                    days: 200207    """208    schema = type_schema('retention', days={'type': 'integer'})209    permissions = ('logs:PutRetentionPolicy',)210    def process(self, resources):211        client = local_session(self.manager.session_factory).client('logs')212        days = self.data['days']213        for r in resources:214            self.manager.retry(215                client.put_retention_policy,216                logGroupName=r['logGroupName'],217                retentionInDays=days)218@LogGroup.action_registry.register('delete')219class Delete(BaseAction):220    """221    :example:222    .. code-block:: yaml223            policies:224              - name: cloudwatch-delete-stale-log-group225                resource: log-group226                filters:227                  - type: last-write228                    days: 182.5229                actions:230                  - delete231    """232    schema = type_schema('delete')233    permissions = ('logs:DeleteLogGroup',)234    def process(self, resources):235        client = local_session(self.manager.session_factory).client('logs')236        for r in resources:237            try:238                self.manager.retry(239                    client.delete_log_group, logGroupName=r['logGroupName'])240            except client.exceptions.ResourceNotFoundException:241                continue242@LogGroup.filter_registry.register('last-write')243class LastWriteDays(Filter):244    """Filters CloudWatch log groups by last write245    :example:246    .. code-block:: yaml247            policies:248              - name: cloudwatch-stale-groups249                resource: log-group250                filters:251                  - type: last-write252                    days: 60253    """254    schema = type_schema(255        'last-write', days={'type': 'number'})256    permissions = ('logs:DescribeLogStreams',)257    def process(self, resources, event=None):258        client = local_session(self.manager.session_factory).client('logs')259        self.date_threshold = parse_date(datetime.utcnow()) - timedelta(260            days=self.data['days'])261        return [r for r in resources if self.check_group(client, r)]262    def check_group(self, client, group):263        streams = self.manager.retry(264            client.describe_log_streams,265            logGroupName=group['logGroupName'],266            orderBy='LastEventTime',267            descending=True,268            limit=3).get('logStreams')269        group['streams'] = streams270        if not streams:271            last_timestamp = group['creationTime']272        elif 'lastIngestionTime' in streams[0]:273            last_timestamp = streams[0]['lastIngestionTime']274        else:275            last_timestamp = streams[0]['creationTime']276        last_write = parse_date(last_timestamp)277        group['lastWrite'] = last_write278        return self.date_threshold > last_write279@LogGroup.filter_registry.register('cross-account')280class LogCrossAccountFilter(CrossAccountAccessFilter):281    schema = type_schema(282        'cross-account',283        # white list accounts284        whitelist_from=ValuesFrom.schema,285        whitelist={'type': 'array', 'items': {'type': 'string'}})286    permissions = ('logs:DescribeSubscriptionFilters',)287    def process(self, resources, event=None):288        client = local_session(self.manager.session_factory).client('logs')289        accounts = self.get_accounts()290        results = []291        with self.executor_factory(max_workers=1) as w:292            futures = []293            for rset in chunks(resources, 50):294                futures.append(295                    w.submit(296                        self.process_resource_set, client, accounts, rset))297            for f in as_completed(futures):298                if f.exception():299                    self.log.error(300                        "Error checking log groups cross-account %s",301                        f.exception())302                    continue303                results.extend(f.result())304        return results305    def process_resource_set(self, client, accounts, resources):306        results = []307        for r in resources:308            found = False309            filters = self.manager.retry(310                client.describe_subscription_filters,311                logGroupName=r['logGroupName']).get('subscriptionFilters', ())312            for f in filters:313                if 'destinationArn' not in f:314                    continue315                account_id = f['destinationArn'].split(':', 5)[4]316                if account_id not in accounts:317                    r.setdefault('c7n:CrossAccountViolations', []).append(318                        account_id)319                    found = True320            if found:321                results.append(r)322        return results323@LogGroup.action_registry.register('set-encryption')324class EncryptLogGroup(BaseAction):325    """Encrypt/Decrypt a log group326    :example:327    .. code-block:: yaml328        policies:329          - name: encrypt-log-group330            resource: log-group331            filters:332              - kmsKeyId: absent333            actions:334              - type: set-encryption335                kms-key: alias/mylogkey336                state: True337          - name: decrypt-log-group338            resource: log-group339            filters:340              - kmsKeyId: kms:key:arn341            actions:342              - type: set-encryption343                state: False344    """345    schema = type_schema(346        'set-encryption',347        **{'kms-key': {'type': 'string'},348           'state': {'type': 'boolean'}})349    permissions = (350        'logs:AssociateKmsKey', 'logs:DisassociateKmsKey', 'kms:DescribeKey')351    def validate(self):352        if not self.data.get('state', True):353            return self354        key = self.data.get('kms-key', '')355        if not key:356            raise ValueError('Must specify either a KMS key ARN or Alias')357        if 'alias/' not in key and ':key/' not in key:358            raise PolicyValidationError(359                "Invalid kms key format %s" % key)360        return self361    def resolve_key(self, key):362        if not key:363            return364        # Qualified arn for key365        if key.startswith('arn:') and ':key/' in key:366            return key367        # Alias368        key = local_session(369            self.manager.session_factory).client(370                'kms').describe_key(371                    KeyId=key)['KeyMetadata']['Arn']372        return key373    def process(self, resources):374        session = local_session(self.manager.session_factory)375        client = session.client('logs')376        state = self.data.get('state', True)377        key = self.resolve_key(self.data.get('kms-key'))378        for r in resources:379            try:380                if state:381                    client.associate_kms_key(382                        logGroupName=r['logGroupName'], kmsKeyId=key)383                else:384                    client.disassociate_kms_key(logGroupName=r['logGroupName'])385            except client.exceptions.ResourceNotFoundException:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
