Best Python code snippet using localstack_python
configureCMK.py
Source:configureCMK.py  
...96                "Resource": "*"97            }98        )99    print(f"Updating policy for key  {args.cmk_arn}")100    kms_client.put_key_policy(101        KeyId = args.cmk_arn,102        PolicyName = "default",103        Policy = json.dumps(policy)104    )105    print(f"Policy for key {args.cmk_arn} updated.")106def process_stacks(stackname):107    paginator = cloudformation_client.get_paginator('list_stack_resources')108    response_iterator = paginator.paginate(109        StackName=stackname,110        PaginationConfig={111            'MaxItems': 10000#,112        }113    )114    115    for response in response_iterator:116        lambda_resources = filter(lambda x: x["ResourceType"] == "AWS::Lambda::Function",response["StackResourceSummaries"])117        118        for lambda_func in lambda_resources:119            lambda_client.update_function_configuration(FunctionName=lambda_func["PhysicalResourceId"],KMSKeyArn=args.cmk_arn)120            print(f"Updated function {lambda_func['PhysicalResourceId']} in stack {stackname}")121            122            lambda_configuration = lambda_client.get_function_configuration(FunctionName=lambda_func["PhysicalResourceId"])123            role_name = lambda_configuration["Role"].split("/")[-1]124            assign_role(role_name)125        ssm_parameters = filter(lambda x: x["ResourceType"] == "AWS::SSM::Parameter",response["StackResourceSummaries"])126        for parameter in ssm_parameters:127            parameter_name = parameter["PhysicalResourceId"]128            parameter_response = ssm_client.get_parameter(129                Name=parameter_name,130                WithDecryption=True131            )132            parameter_value = parameter_response['Parameter']['Value']133            description = parameter_response['Parameter']["Description"] if "Decription" in parameter_response['Parameter'] else ""134            ssm_client.put_parameter(135                    Name=parameter_name,136                    Description=description,137                    Value=parameter_value,138                    Type='SecureString',139                    KeyId=args.cmk_arn,140                    Overwrite=True,141                )142        s3_buckets = filter(lambda x: x["ResourceType"] == "AWS::S3::Bucket",response["StackResourceSummaries"])143        for bucket in s3_buckets:144            s3_client.put_bucket_encryption(145                        Bucket=bucket["PhysicalResourceId"],146                        ServerSideEncryptionConfiguration={147                            'Rules': [148                                        {149                                            'ApplyServerSideEncryptionByDefault': {150                                                'SSEAlgorithm': 'aws:kms',151                                                'KMSMasterKeyID': args.cmk_arn152                                            }153                                        },154                                    ]155                                }156                            )157            print(f"Encryption set for {bucket['PhysicalResourceId']}")158            s3_client.put_bucket_logging(159                Bucket=bucket["PhysicalResourceId"],160                BucketLoggingStatus={161                    'LoggingEnabled': {162                        'TargetBucket': args.target_s3_bucket,163                        'TargetPrefix': bucket["PhysicalResourceId"] + '/'164                    }165                }166            )167            print(f"Access Logs set for {bucket['PhysicalResourceId']}")168        ddb_tables = filter(lambda x: x["ResourceType"] == "AWS::DynamoDB::Table",response["StackResourceSummaries"])169        for table in ddb_tables:170            table_description = ddb_client.describe_table(TableName = table["PhysicalResourceId"])171            if('SSEDescription' not in table_description["Table"] or 'KMSMasterKeyArn' not in table_description["Table"]['SSEDescription'] or  table_description["Table"]['SSEDescription']['KMSMasterKeyArn']!= args.cmk_arn ):172                ddb_client.update_table(173                    TableName = table["PhysicalResourceId"],174                    SSESpecification ={175                        'Enabled': True,176                        'SSEType': 'KMS',177                        'KMSMasterKeyId': args.cmk_arn178                    }179                )180                181        kinesis_streams = filter(lambda x: x["ResourceType"] == "AWS::KinesisFirehose::DeliveryStream",response["StackResourceSummaries"])182        for stream in kinesis_streams:183            stream_response = kinesis_client.describe_delivery_stream(184                        DeliveryStreamName=stream["PhysicalResourceId"])185            if('KeyType' not in stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration'] 186                or ( stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration']['KeyType']   != "CUSTOMER_MANAGED_CMK"  187                and  stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration']['KeyARN'] != args.cmk_arn)):188                kinesis_client.start_delivery_stream_encryption(189                    DeliveryStreamName=stream["PhysicalResourceId"],190                    DeliveryStreamEncryptionConfigurationInput={191                        'KeyARN': args.cmk_arn,192                        'KeyType': 'CUSTOMER_MANAGED_CMK'})193        role_resources = filter(lambda x: 'LambdaRole' in x["LogicalResourceId"] or x["LogicalResourceId"] in cmk_roles_logical_ids , response["StackResourceSummaries"])194        for role_resource in role_resources:195            print(f"role_resource: {role_resource['PhysicalResourceId']}")196            cmk_roles_physical_ids.append(role_resource["PhysicalResourceId"])197            assign_role(role_resource["PhysicalResourceId"])198process_stacks(args.stack_arn)199paginator = cloudformation_client.get_paginator('list_stack_resources')200response_iterator = paginator.paginate(201    StackName=args.stack_arn,202    PaginationConfig={203        'MaxItems': 10000,204    }205)206for response in response_iterator:207    stacks = filter(lambda x: x["ResourceType"] == "AWS::CloudFormation::Stack",response["StackResourceSummaries"])208    for stack in stacks:209        print(f"Processing stack {stack['PhysicalResourceId']}")210        process_stacks(stack["PhysicalResourceId"])...util_kms.py
Source:util_kms.py  
...74        )75    except botocore.exceptions.ClientError as e:76        erm = _fail(e, 'create_alias', aliasName)77        raise Exception(erm)78def put_key_policy(ctx, cmkArn, policyJson):79    try:80        client = ctx.client('kms')81        client.put_key_policy(82            KeyId=cmkArn,83            PolicyName='default',84            Policy=policyJson85        )86    except botocore.exceptions.ClientError as e:87        erm = _fail(e, 'put_key_policy', cmkArn)88        raise Exception(erm)89def get_key_rotation_status(ctx, cmkArn):90    try:91        client = ctx.client('kms')92        response = client.get_key_rotation_status(93            KeyId=cmkArn94        )95        return response['KeyRotationEnabled']96    except botocore.exceptions.ClientError as e:97        erm = _fail(e, 'get_key_rotation_status', cmkArn)98        raise Exception(erm)99def get_key_policy(ctx, cmkArn):100    try:101        client = ctx.client('kms')102        response = client.get_key_policy(103            KeyId=cmkArn,104            PolicyName='default'105        )106        return response['Policy']107    except botocore.exceptions.ClientError as e:108        erm = _fail(e, 'get_key_policy', cmkArn)109        raise Exception(erm)110def update_key_description(ctx, cmkArn, description):111    try:112        client = ctx.client('kms')113        client.update_key_description(114            KeyId=cmkArn,115            Description=description116        )117    except botocore.exceptions.ClientError as e:118        erm = _fail(e, 'update_key_description', cmkArn)119        raise Exception(erm)120def enable_key_rotation(ctx, cmkArn):121    try:122        client = ctx.client('kms')123        client.enable_key_rotation(124            KeyId=cmkArn125        )126    except botocore.exceptions.ClientError as e:127        erm = _fail(e, 'enable_key_rotation', cmkArn)128        raise Exception(erm)129def schedule_key_deletion(ctx, cmkArn, pendingWindowInDays):130    try:131        client = ctx.client('kms')132        client.schedule_key_deletion(133            KeyId=cmkArn,134            PendingWindowInDays=pendingWindowInDays135        )136    except botocore.exceptions.ClientError as e:137        if _is_resource_not_found(e): return None138        erm = _fail(e, 'schedule_key_deletion', cmkArn)139        raise Exception(erm)140def delete_alias(ctx, canonAlias):141    try:142        client = ctx.client('kms')143        client.delete_alias(144            AliasName=canonAlias145        )146    except botocore.exceptions.ClientError as e:147        if _is_resource_not_found(e): return None148        erm = _fail(e, 'delete_alias', canonAlias)149        raise Exception(erm)150def getCMKMeta(ctx, keyId):151    try:152        client = ctx.client('kms')153        response = client.describe_key(154            KeyId=keyId155        )156        return response['KeyMetadata']157    except botocore.exceptions.ClientError as e:158        if _is_resource_not_found(e): return None159        erm = _fail(e, 'describe_key', keyId)160        raise Exception(erm)161def declareCMK(ctx, description, alias, policyStatements):162    statements = [policy_statement_default(ctx)]163    statements.extend(policyStatements)164    policyMap = policy_map(statements)165    reqdPolicyJson = json.dumps(policyMap)166    canonAlias = canon_alias(alias)167    exMeta = getCMKMeta(ctx, canonAlias)168    createReqd = False169    if exMeta:170        keyState = exMeta['KeyState']171        if keyState == 'PendingDeletion':172            createReqd = True173        elif keyState == 'Enabled':174            createReqd = False175        else:176            erm = 'KMS CMK {} in unexpected state {}'.format(alias, keyState)177            raise Exception(erm)178    else:179        createReqd = True180    if createReqd:181        newArn = create_key_arn(ctx, description, reqdPolicyJson)182        create_alias(ctx, canonAlias, newArn)183        enable_key_rotation(ctx, newArn)184        return newArn185    exArn = exMeta['Arn']186    exDescription = exMeta['Description']187    exPolicyJson = get_key_policy(ctx, exArn)188    exPolicyJsonCanon = json.dumps(json.loads(exPolicyJson))189    if exPolicyJsonCanon != reqdPolicyJson:190        put_key_policy(ctx, exArn, reqdPolicyJson)191    if exDescription != description:192        update_key_description(ctx, exArn, description)193    isRotationEnabled = get_key_rotation_status(ctx, exArn)194    if not isRotationEnabled:195        enable_key_rotation(ctx, exArn)196    return exArn197def deleteCMK(ctx, alias, pendingWindowInDays=7):198    canonAlias = canon_alias(alias)199    exMeta = getCMKMeta(ctx, canonAlias)200    if exMeta:201        exArn = exMeta['Arn']202        delete_alias(ctx, canonAlias)...kms.py
Source:kms.py  
...73            message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,74                       'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}75            self.logger.exception(message)76            raise77    def put_key_policy(self, key_id, policy):78        try:79            response = kms_client.put_key_policy(80                KeyId=key_id,81                Policy=policy,82                PolicyName = 'default', # Per API docs, the only valid value is default.83                BypassPolicyLockoutSafetyCheck=True84            )85            return response86        except Exception as e:87            message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,88                       'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}89            self.logger.exception(message)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
