How to use disable_insight_rules method in localstack

Best Python code snippet using localstack_python

client.pyi

Source:client.pyi Github

copy

Full Screen

...234 Disables the actions for the specified alarms.235 [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.disable_alarm_actions)236 [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#disable_alarm_actions)237 """238 def disable_insight_rules(self, *, RuleNames: List[str]) -> DisableInsightRulesOutputTypeDef:239 """240 Disables the specified Contributor Insights rules.241 [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.disable_insight_rules)242 [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#disable_insight_rules)243 """244 def enable_alarm_actions(self, *, AlarmNames: List[str]) -> None:245 """246 Enables the actions for the specified alarms.247 [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/cloudwatch.html#CloudWatch.Client.enable_alarm_actions)248 [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_cloudwatch/client.html#enable_alarm_actions)249 """250 def enable_insight_rules(self, *, RuleNames: List[str]) -> EnableInsightRulesOutputTypeDef:251 """252 Enables the specified Contributor Insights rules....

Full Screen

Full Screen

cw.py

Source:cw.py Github

copy

Full Screen

1# Copyright The Cloud Custodian Authors.2# SPDX-License-Identifier: Apache-2.03from concurrent.futures import as_completed4from datetime import datetime, timedelta5from c7n.actions import BaseAction6from c7n.exceptions import PolicyValidationError7from c7n.filters import Filter, MetricsFilter8from c7n.filters.core import parse_date9from c7n.filters.iamaccess import CrossAccountAccessFilter10from c7n.query import QueryResourceManager, ChildResourceManager, TypeInfo11from c7n.manager import resources12from c7n.resolver import ValuesFrom13from c7n.tags import universal_augment14from c7n.utils import type_schema, local_session, chunks, get_retry15@resources.register('alarm')16class Alarm(QueryResourceManager):17 class resource_type(TypeInfo):18 service = 'cloudwatch'19 arn_type = 'alarm'20 enum_spec = ('describe_alarms', 'MetricAlarms', None)21 id = 'AlarmArn'22 filter_name = 'AlarmNames'23 filter_type = 'list'24 name = 'AlarmName'25 date = 'AlarmConfigurationUpdatedTimestamp'26 cfn_type = config_type = 'AWS::CloudWatch::Alarm'27 retry = staticmethod(get_retry(('Throttled',)))28@Alarm.action_registry.register('delete')29class AlarmDelete(BaseAction):30 """Delete a cloudwatch alarm.31 :example:32 .. code-block:: yaml33 policies:34 - name: cloudwatch-delete-stale-alarms35 resource: alarm36 filters:37 - type: value38 value_type: age39 key: StateUpdatedTimestamp40 value: 3041 op: ge42 - StateValue: INSUFFICIENT_DATA43 actions:44 - delete45 """46 schema = type_schema('delete')47 permissions = ('cloudwatch:DeleteAlarms',)48 def process(self, resources):49 client = local_session(50 self.manager.session_factory).client('cloudwatch')51 for resource_set in chunks(resources, size=100):52 self.manager.retry(53 client.delete_alarms,54 AlarmNames=[r['AlarmName'] for r in resource_set])55@resources.register('event-rule')56class EventRule(QueryResourceManager):57 class resource_type(TypeInfo):58 service = 'events'59 arn_type = 'rule'60 enum_spec = ('list_rules', 'Rules', None)61 name = "Name"62 id = "Name"63 filter_name = "NamePrefix"64 filter_type = "scalar"65 cfn_type = 'AWS::Events::Rule'66 universal_taggable = object()67 augment = universal_augment68@EventRule.filter_registry.register('metrics')69class EventRuleMetrics(MetricsFilter):70 def get_dimensions(self, resource):71 return [{'Name': 'RuleName', 'Value': resource['Name']}]72@resources.register('event-rule-target')73class EventRuleTarget(ChildResourceManager):74 class resource_type(TypeInfo):75 service = 'events'76 arn = False77 arn_type = 'event-rule-target'78 enum_spec = ('list_targets_by_rule', 'Targets', None)79 parent_spec = ('event-rule', 'Rule', True)80 name = id = 'Id'81@EventRuleTarget.filter_registry.register('cross-account')82class CrossAccountFilter(CrossAccountAccessFilter):83 schema = type_schema(84 'cross-account',85 # white list accounts86 whitelist_from=ValuesFrom.schema,87 whitelist={'type': 'array', 'items': {'type': 'string'}})88 # dummy permission89 permissions = ('events:ListTargetsByRule',)90 def __call__(self, r):91 account_id = r['Arn'].split(':', 5)[4]92 return account_id not in self.accounts93@EventRuleTarget.action_registry.register('delete')94class DeleteTarget(BaseAction):95 schema = type_schema('delete')96 permissions = ('events:RemoveTargets',)97 def process(self, resources):98 client = local_session(self.manager.session_factory).client('events')99 rule_targets = {}100 for r in resources:101 rule_targets.setdefault(r['c7n:parent-id'], []).append(r['Id'])102 for rule_id, target_ids in rule_targets.items():103 client.remove_targets(104 Ids=target_ids,105 Rule=rule_id)106@resources.register('log-group')107class LogGroup(QueryResourceManager):108 class resource_type(TypeInfo):109 service = 'logs'110 arn_type = 'log-group'111 enum_spec = ('describe_log_groups', 'logGroups', None)112 name = 'logGroupName'113 id = 'arn'114 filter_name = 'logGroupNamePrefix'115 filter_type = 'scalar'116 dimension = 'LogGroupName'117 date = 'creationTime'118 universal_taggable = True119 cfn_type = 'AWS::Logs::LogGroup'120 def get_arns(self, resources):121 # log group arn in resource describe has ':*' suffix, not all122 # apis can use that form, so normalize to standard arn.123 return [r['arn'][:-2] for r in resources]124@resources.register('insight-rule')125class InsightRule(QueryResourceManager):126 class resource_type(TypeInfo):127 service = 'cloudwatch'128 arn_type = 'insight-rule'129 enum_spec = ('describe_insight_rules', 'InsightRules', None)130 name = id = 'Name'131 universal_taggable = object()132 permission_augment = ('cloudWatch::ListTagsForResource',)133 cfn_type = 'AWS::CloudWatch::InsightRule'134 def augment(self, rules):135 client = local_session(self.session_factory).client('cloudwatch')136 def _add_tags(r):137 arn = self.generate_arn(r['Name'])138 r['Tags'] = client.list_tags_for_resource(139 ResourceARN=arn).get('Tags', [])140 return r141 return list(map(_add_tags, rules))142@InsightRule.action_registry.register('disable')143class InsightRuleDisable(BaseAction):144 """Disable a cloudwatch contributor insight rule.145 :example:146 .. code-block:: yaml147 policies:148 - name: cloudwatch-disable-insight-rule149 resource: insight-rule150 filters:151 - type: value152 key: State153 value: ENABLED154 op: eq155 actions:156 - disable157 """158 schema = type_schema('disable')159 permissions = ('cloudwatch:DisableInsightRules',)160 def process(self, resources):161 client = local_session(162 self.manager.session_factory).client('cloudwatch')163 for resource_set in chunks(resources, size=100):164 self.manager.retry(165 client.disable_insight_rules,166 RuleNames=[r['Name'] for r in resource_set])167@InsightRule.action_registry.register('delete')168class InsightRuleDelete(BaseAction):169 """Delete a cloudwatch contributor insight rule170 :example:171 .. code-block:: yaml172 policies:173 - name: cloudwatch-delete-insight-rule174 resource: insight-rule175 filters:176 - type: value177 key: State178 value: ENABLED179 op: eq180 actions:181 - delete182 """183 schema = type_schema('delete')184 permissions = ('cloudwatch:DeleteInsightRules',)185 def process(self, resources):186 client = local_session(187 self.manager.session_factory).client('cloudwatch')188 for resource_set in chunks(resources, size=100):189 self.manager.retry(190 client.delete_insight_rules,191 RuleNames=[r['Name'] for r in resource_set])192@LogGroup.filter_registry.register('metrics')193class LogGroupMetrics(MetricsFilter):194 def get_dimensions(self, resource):195 return [{'Name': 'LogGroupName', 'Value': resource['logGroupName']}]196@LogGroup.action_registry.register('retention')197class Retention(BaseAction):198 """Action to set the retention period (in days) for CloudWatch log groups199 :example:200 .. code-block:: yaml201 policies:202 - name: cloudwatch-set-log-group-retention203 resource: log-group204 actions:205 - type: retention206 days: 200207 """208 schema = type_schema('retention', days={'type': 'integer'})209 permissions = ('logs:PutRetentionPolicy',)210 def process(self, resources):211 client = local_session(self.manager.session_factory).client('logs')212 days = self.data['days']213 for r in resources:214 self.manager.retry(215 client.put_retention_policy,216 logGroupName=r['logGroupName'],217 retentionInDays=days)218@LogGroup.action_registry.register('delete')219class Delete(BaseAction):220 """221 :example:222 .. code-block:: yaml223 policies:224 - name: cloudwatch-delete-stale-log-group225 resource: log-group226 filters:227 - type: last-write228 days: 182.5229 actions:230 - delete231 """232 schema = type_schema('delete')233 permissions = ('logs:DeleteLogGroup',)234 def process(self, resources):235 client = local_session(self.manager.session_factory).client('logs')236 for r in resources:237 try:238 self.manager.retry(239 client.delete_log_group, logGroupName=r['logGroupName'])240 except client.exceptions.ResourceNotFoundException:241 continue242@LogGroup.filter_registry.register('last-write')243class LastWriteDays(Filter):244 """Filters CloudWatch log groups by last write245 :example:246 .. code-block:: yaml247 policies:248 - name: cloudwatch-stale-groups249 resource: log-group250 filters:251 - type: last-write252 days: 60253 """254 schema = type_schema(255 'last-write', days={'type': 'number'})256 permissions = ('logs:DescribeLogStreams',)257 def process(self, resources, event=None):258 client = local_session(self.manager.session_factory).client('logs')259 self.date_threshold = parse_date(datetime.utcnow()) - timedelta(260 days=self.data['days'])261 return [r for r in resources if self.check_group(client, r)]262 def check_group(self, client, group):263 streams = self.manager.retry(264 client.describe_log_streams,265 logGroupName=group['logGroupName'],266 orderBy='LastEventTime',267 descending=True,268 limit=3).get('logStreams')269 group['streams'] = streams270 if not streams:271 last_timestamp = group['creationTime']272 elif 'lastIngestionTime' in streams[0]:273 last_timestamp = streams[0]['lastIngestionTime']274 else:275 last_timestamp = streams[0]['creationTime']276 last_write = parse_date(last_timestamp)277 group['lastWrite'] = last_write278 return self.date_threshold > last_write279@LogGroup.filter_registry.register('cross-account')280class LogCrossAccountFilter(CrossAccountAccessFilter):281 schema = type_schema(282 'cross-account',283 # white list accounts284 whitelist_from=ValuesFrom.schema,285 whitelist={'type': 'array', 'items': {'type': 'string'}})286 permissions = ('logs:DescribeSubscriptionFilters',)287 def process(self, resources, event=None):288 client = local_session(self.manager.session_factory).client('logs')289 accounts = self.get_accounts()290 results = []291 with self.executor_factory(max_workers=1) as w:292 futures = []293 for rset in chunks(resources, 50):294 futures.append(295 w.submit(296 self.process_resource_set, client, accounts, rset))297 for f in as_completed(futures):298 if f.exception():299 self.log.error(300 "Error checking log groups cross-account %s",301 f.exception())302 continue303 results.extend(f.result())304 return results305 def process_resource_set(self, client, accounts, resources):306 results = []307 for r in resources:308 found = False309 filters = self.manager.retry(310 client.describe_subscription_filters,311 logGroupName=r['logGroupName']).get('subscriptionFilters', ())312 for f in filters:313 if 'destinationArn' not in f:314 continue315 account_id = f['destinationArn'].split(':', 5)[4]316 if account_id not in accounts:317 r.setdefault('c7n:CrossAccountViolations', []).append(318 account_id)319 found = True320 if found:321 results.append(r)322 return results323@LogGroup.action_registry.register('set-encryption')324class EncryptLogGroup(BaseAction):325 """Encrypt/Decrypt a log group326 :example:327 .. code-block:: yaml328 policies:329 - name: encrypt-log-group330 resource: log-group331 filters:332 - kmsKeyId: absent333 actions:334 - type: set-encryption335 kms-key: alias/mylogkey336 state: True337 - name: decrypt-log-group338 resource: log-group339 filters:340 - kmsKeyId: kms:key:arn341 actions:342 - type: set-encryption343 state: False344 """345 schema = type_schema(346 'set-encryption',347 **{'kms-key': {'type': 'string'},348 'state': {'type': 'boolean'}})349 permissions = (350 'logs:AssociateKmsKey', 'logs:DisassociateKmsKey', 'kms:DescribeKey')351 def validate(self):352 if not self.data.get('state', True):353 return self354 key = self.data.get('kms-key', '')355 if not key:356 raise ValueError('Must specify either a KMS key ARN or Alias')357 if 'alias/' not in key and ':key/' not in key:358 raise PolicyValidationError(359 "Invalid kms key format %s" % key)360 return self361 def resolve_key(self, key):362 if not key:363 return364 # Qualified arn for key365 if key.startswith('arn:') and ':key/' in key:366 return key367 # Alias368 key = local_session(369 self.manager.session_factory).client(370 'kms').describe_key(371 KeyId=key)['KeyMetadata']['Arn']372 return key373 def process(self, resources):374 session = local_session(self.manager.session_factory)375 client = session.client('logs')376 state = self.data.get('state', True)377 key = self.resolve_key(self.data.get('kms-key'))378 for r in resources:379 try:380 if state:381 client.associate_kms_key(382 logGroupName=r['logGroupName'], kmsKeyId=key)383 else:384 client.disassociate_kms_key(logGroupName=r['logGroupName'])385 except client.exceptions.ResourceNotFoundException:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful