How to use describe_aggregate_compliance_by_config_rules method in localstack

Best Python code snippet using localstack_python

load_compliance_data.py

Source:load_compliance_data.py Github

copy

Full Screen

1#!/usr/bin/env python2import io3import sys4from datetime import datetime5import yaml6import json7import boto38import botocore9from botocore.exceptions import ClientError10import click11import orgcrawler12from orgcrawler.utils import jsonfmt, yamlfmt13from orgcrawler.cli.utils import (14 setup_crawler,15 format_responses,16)17#DEFAULT_REGION = 'us-east-1'18DEFAULT_REGION = 'us-west-2'19def paginate(client, method, **kwargs):20 paginator = client.get_paginator(method.__name__)21 for page in paginator.paginate(**kwargs).result_key_iters():22 for result in page:23 yield result24def truncate_sechub_rule_name(rule_name):25 if rule_name.startswith('securityhub'):26 return rule_name.rpartition('-')[0]27 return rule_name28def get_resource_count(item):29 if 'ComplianceContributorCount' in item['Compliance']:30 return int(item['Compliance']['ComplianceContributorCount'].get('CappedCount', 0))31 return None32def is_in_scope(spec, rule_name):33 return rule_name in spec.get('config_rules')34def timestamp():35 return datetime.utcnow().isoformat()36@click.command(context_settings=dict(help_option_names=['-h', '--help']))37@click.option('--master-role', '-r',38 required=True,39 help='IAM role to assume for accessing AWS Organization Master account.'40)41@click.option('--aggregation-account', '-a',42 required=True,43 help='Name or Id of config rule aggregation account.',44)45@click.option('--reporting-account',46 default='',47 help='Name or Id of account where s3 bucket lives. defaults to "aggregation-account"',48)49@click.option('--bucket-name', '-b',50 default='compliance_data',51 help='Name of the s3 bucket where to upload config rule compliance data.'52)53@click.option('--spec-file', '-f',54 default='./spec.yaml',55 show_default=True,56 type=click.File('r'),57 help='Path to file containing config rule names.'58)59def main(master_role, aggregation_account, reporting_account, bucket_name, spec_file):60 if not reporting_account:61 reporting_account = aggregation_account62 print(master_role, aggregation_account, reporting_account, bucket_name, spec_file)63 # parse spec file64 spec = yaml.safe_load(spec_file.read())65 #print(yamlfmt(spec['config_rules']))66 #print()67 #print(yamlfmt([truncate_sechub_rule_name(rule_name) for rule_name in spec['config_rules']]))68 # get account names and alias using orgcrawler69 crawler = setup_crawler(70 master_role,71 regions=DEFAULT_REGION,72 )73 #print(yamlfmt([a.dump() for a in crawler.accounts]))74 # get aggregation name75 account = crawler.org.get_account(aggregation_account)76 #print(account.dump())77 botoConfig = botocore.client.Config(connect_timeout=2, read_timeout=10, retries={"max_attempts": 2})78 client = boto3.client('config', config=botoConfig, region_name=DEFAULT_REGION, **account.credentials)79 response = client.describe_configuration_aggregators(80 #ConfigurationAggregatorNames=[81 # 'string',82 #],83 )84 #print(response)85 aggrigator_name = next(86 (agg['ConfigurationAggregatorName'] for agg in response['ConfigurationAggregators']),87 None,88 )89 #print(aggrigator_name)90 # get compliance data91 if aggrigator_name is not None:92 compliance_generator = paginate(93 client,94 client.describe_aggregate_compliance_by_config_rules,95 ConfigurationAggregatorName=aggrigator_name,96 )97 #print(next(compliance_generator))98 else:99 sys.exit('could not determine ConfigurationAggregatorName')100 # assemble config rule compliance data101 text_stream = io.StringIO()102 for item in compliance_generator:103 rule_name = truncate_sechub_rule_name(item['ConfigRuleName'])104 if is_in_scope(spec, rule_name):105 compliance_data = dict(106 config_rule_name=rule_name,107 compliance_type=item['Compliance']['ComplianceType'],108 non_compliant_resource_count=get_resource_count(item),109 account_id=item['AccountId'],110 account_name=crawler.org.get_account_name_by_id(item['AccountId']),111 region=item['AwsRegion'],112 timestamp=timestamp(),113 )114 text_stream.write(json.dumps(compliance_data) + '\n')115 #else:116 # print('out of scope: ', compliance_data['AccountName'], rule_name)117 #print(text_stream.getvalue())118 # upload to s3119 day = datetime.now().day120 month = datetime.now().month121 year = datetime.now().year122 obj_path = 'aggregate_compliance_by_config_rules/{}/{}/{}/compliance_data.json'.format(year, month, day) 123 print(obj_path)124 account = crawler.org.get_account(reporting_account)125 bucket_name = bucket_name + '-' + account.id126 print(bucket_name)127 s3_client = boto3.client('s3', region_name=DEFAULT_REGION, **account.credentials)128 try:129 s3_client.create_bucket(130 ACL = 'private',131 Bucket = bucket_name,132 CreateBucketConfiguration = {'LocationConstraint':DEFAULT_REGION}133 )134 except s3_client.exceptions.BucketAlreadyOwnedByYou as e:135 pass136 s3_client.put_object(137 Bucket = bucket_name,138 Key = obj_path,139 Body = text_stream.getvalue(),140 )141if __name__ == '__main__':...

Full Screen

Full Screen

config-aggregators-repot-email.py

Source:config-aggregators-repot-email.py Github

copy

Full Screen

...52 aggregator_rule_info = {}53 aggregator_rule_info['AggregatorName'] = aggregator_name54 next_token = ''55 while True:56 config_rules_resp = config.describe_aggregate_compliance_by_config_rules(57 ConfigurationAggregatorName = aggregator_name,58 Filters={59 'ComplianceType': 'NON_COMPLIANT'60 },61 NextToken = next_token62 )63 config_rules += config_rules_resp['AggregateComplianceByConfigRules']64 if 'NextToken' in config_rules_resp:65 next_token = config_rules_resp['NextToken']66 else:67 break 68 aggregator_rule_info['AggregatorRules'] = config_rules69 aggregator_rule_list.append(aggregator_rule_info)70 ...

Full Screen

Full Screen

config-compliance-stats.py

Source:config-compliance-stats.py Github

copy

Full Screen

...4# resource_counts = client.get_discovered_resource_counts()5# comp = client.describe_compliance_by_config_rule()6paginator = client.get_paginator('describe_aggregate_compliance_by_config_rules')7response_iterator = paginator.paginate(ConfigurationAggregatorName=organization_aggregator_name)8# comps = client.describe_aggregate_compliance_by_config_rules(ConfigurationAggregatorName=organization_aggregator_name)9for iterator in response_iterator:10 for comp in iterator['AggregateComplianceByConfigRules']:11 config_rule_name = comp['ConfigRuleName']12 compliance = comp['Compliance']['ComplianceType']13 account_id = comp['AccountId']14 region = comp['AwsRegion']15 print(f'{account_id} ({region}): {compliance} - {config_rule_name}')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful