Best Python code snippet using localstack_python
lambda_function.py
Source:lambda_function.py  
...66                    else:67                        for serversideenc in encryption['ServerSideEncryptionConfiguration']['Rules']:68                            if ((serversideenc['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'] != 'AES256') or (serversideenc['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'] != 'aws:kms')):69                                print("Here comes aes")70                                client.put_bucket_encryption(71    Bucket=entry['requestParameters']['bucketName'],72    ServerSideEncryptionConfiguration={73        'Rules': [74            {75                'ApplyServerSideEncryptionByDefault': {76                    'SSEAlgorithm': 'AES256'77                }78            },79        ]80    }81)82                    if ((encryption=="none")):83                                print("Here comes aes")84                                client.put_bucket_encryption(85    Bucket=entry['requestParameters']['bucketName'],86    ServerSideEncryptionConfiguration={87        'Rules': [88            {89                'ApplyServerSideEncryptionByDefault': {90                    'SSEAlgorithm': 'AES256'91                }92            },93        ]94    }95)96                    result = client.get_bucket_acl(Bucket=entry['requestParameters']['bucketName'])97                    for grants in result['Grants']:98                        if((grants['Grantee']['Type']=='Group') and (grants['Grantee']['URI']=='http://acs.amazonaws.com/groups/global/AllUsers')):99                            client.put_bucket_acl(ACL='private',Bucket=entry['requestParameters']['bucketName'])100                #tag is confidential101                elif ((dpc.lower() == 'confidential')):102                    try:103                        encryption = client.get_bucket_encryption(104    Bucket=event['detail']['requestParameters']['bucketName']105)106                    except ClientError as e:107                        if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':108                            encryption = "none"109                    else:110                        print(encryption)111                        for serversideenc in encryption['ServerSideEncryptionConfiguration']['Rules']:112                            if serversideenc['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'] != 'aws:kms':113                                print("Here comes aes")114                                client.put_bucket_encryption(115    Bucket=entry['requestParameters']['bucketName'],116    ServerSideEncryptionConfiguration={117        'Rules': [118            {119                'ApplyServerSideEncryptionByDefault': {120                    'SSEAlgorithm': 'aws:kms',121                    'KMSMasterKeyID': confidential_kms_key_arn122                }123            },124        ]125    }126)127                    if ((encryption=="none")):128                                print("Here comes aes")129                                client.put_bucket_encryption(130    Bucket=entry['requestParameters']['bucketName'],131    ServerSideEncryptionConfiguration={132        'Rules': [133            {134                'ApplyServerSideEncryptionByDefault': {135                    'SSEAlgorithm': 'aws:kms',136                    'KMSMasterKeyID': confidential_kms_key_arn137                }138            },139        ]140    }141)142                    result = client.get_bucket_acl(Bucket=entry['requestParameters']['bucketName'])143                    for grants in result['Grants']:144                        if((grants['Grantee']['Type']=='Group') and (grants['Grantee']['URI']=='http://acs.amazonaws.com/groups/global/AllUsers')):145                            client.put_bucket_acl(ACL='private',Bucket=event['detail']['requestParameters']['bucketName'])146                #tag is open so can be public147                elif (dpc.lower() == 'public'):148                    result = client.get_bucket_acl(Bucket=entry['requestParameters']['bucketName'])149                    for grants in result['Grants']:150                        if((grants['Grantee']['Type']=='Group') and (grants['Grantee']['URI']=='http://acs.amazonaws.com/groups/global/AllUsers')):151                            print("THIS IS FULLY OPEN!")152                elif (dpc == 'open'):153                    result = client.get_bucket_acl(Bucket=entry['requestParameters']['bucketName'])154                    try:155                            encryption = client.get_bucket_encryption(156    Bucket=entry['requestParameters']['bucketName']157)158                    except ClientError as e:159                            #print(e.response)160                            print("No encryption found and it's okay")161                    else:162                            print("It's encrypted, leaving as is it")163                    for grants in result['Grants']:164                        if((grants['Grantee']['Type']=='Group') and (grants['Grantee']['URI']=='http://acs.amazonaws.com/groups/global/AllUsers')):165                            client.put_bucket_acl(ACL='private',Bucket=entry['requestParameters']['bucketName'])166                #dpc is not internal confidential or open167                else:168                    print("neither")169                    encryption = client.get_bucket_encryption(170    Bucket=entry['requestParameters']['bucketName']171)172                    for rules in encryption['ServerSideEncryptionConfiguration']['Rules']:173                        if rules['ApplyServerSideEncryptionByDefault']['SSEAlgorithm'] != "AES256":174                            client.put_bucket_encryption(175    Bucket=entry['requestParameters']['bucketName'],176    ServerSideEncryptionConfiguration={177        'Rules': [178            {179                'ApplyServerSideEncryptionByDefault': {180                    'SSEAlgorithm': 'AES256'181                }182            },183        ]184    }185)186                    client.put_bucket_acl(ACL='private',Bucket=entry['requestParameters']['bucketName'])187                    buckettags = client.get_bucket_tagging(Bucket=entry['requestParameters']['bucketName'])188                    for idx, tagitem in enumerate(buckettags['TagSet']):189                        if tagitem['Key'] == 'DPC':190                            buckettags['TagSet'][idx]['Value'] = 'Internal'191                    client.put_bucket_tagging(Bucket=entry['requestParameters']['bucketName'],192    Tagging={ 'TagSet' : buckettags['TagSet'] }193)194        if (not dpcexists):195            print("No DPC tagset adding private acl, and dpc tagset to the bucket")196            client.put_bucket_encryption(197    Bucket=entry['requestParameters']['bucketName'],198    ServerSideEncryptionConfiguration={199        'Rules': [200            {201                'ApplyServerSideEncryptionByDefault': {202                    'SSEAlgorithm': 'AES256'203                }204            },205        ]206    }207)208            client.put_bucket_acl(ACL='private',Bucket=entry['requestParameters']['bucketName'])209            buckettags = client.get_bucket_tagging(Bucket=entry['requestParameters']['bucketName'])210            #print(buckettags)211            buckettags['TagSet'].append({212                'Key': 'DPC',213                'Value': 'Internal'214            })215            client.put_bucket_tagging(Bucket=entry['requestParameters']['bucketName'],216    Tagging= { 'TagSet' : buckettags['TagSet'] }217)218    else:219        print("No tagset adding one to the bucket")220        client.put_bucket_encryption(221    Bucket=entry['requestParameters']['bucketName'],222    ServerSideEncryptionConfiguration={223        'Rules': [224            {225                'ApplyServerSideEncryptionByDefault': {226                    'SSEAlgorithm': 'AES256'227                }228            },229        ]230    }231)232        client.put_bucket_acl(ACL='private',Bucket=entry['requestParameters']['bucketName'])233        client.put_bucket_tagging(Bucket=entry['requestParameters']['bucketName'],234    Tagging={...enables3encryptionfunction.py
Source:enables3encryptionfunction.py  
...39        if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':40            if os.environ['KMS_ENCRYPTION_KEY'] != '':41                print(f"Enabling Encryption for {bucket_name} with CMK")42                try:43                    response = s3client.put_bucket_encryption(44                        Bucket=bucket_name,45                        ServerSideEncryptionConfiguration={46                            'Rules': [{47                                'ApplyServerSideEncryptionByDefault': {48                                    'SSEAlgorithm': 'aws:kms',49                                    'KMSMasterKeyID': os.environ['KMS_ENCRYPTION_KEY']50                                },51                                'BucketKeyEnabled': True52                            }]53                        }54                    )55                except Exception as e:56                    logger.exception(f"failed to to update {bucket_name} - " + str(e)) 57                    58    59            else:60                print(f"Enabling Default Encryption for {bucket_name} with AWS SSE-S3")61                try:62                    response = s3client.put_bucket_encryption(63                        Bucket=bucket_name,64                        ServerSideEncryptionConfiguration={65                            'Rules': [{66                                'ApplyServerSideEncryptionByDefault': {67                                    'SSEAlgorithm': 'AES256'68                                }69                            }]70                        }71                    )72                except Exception as e:73                    logger.exception(f"failed to to update {bucket_name} - " + str(e)) 74    75        76        else:77          print("Bucket: %s, unexpected error: %s" % (bucket['Name'], e))78          pass79    80def on_create(event):81    logger.info("Initial Run")82    s3client = boto3.client('s3')83    s3 = boto3.resource('s3')84    logger.info("Getting Checking each bucket's Encryption")85    for bucket in s3.buckets.all():86        try:87            enc = s3client.get_bucket_encryption(Bucket=bucket.name)88            rules = enc['ServerSideEncryptionConfiguration']['Rules']89            print('Bucket: %s, Encryption: %s' % (bucket.name, rules))90        except botocore.exceptions.ClientError as e:91            if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':92                if os.environ['KMS_ENCRYPTION_KEY'] != '':93                    print(f"Enabling Encryption for {bucket.name} with CMK")94                    try:95                        response = s3client.put_bucket_encryption(96                            Bucket=bucket.name,97                            ServerSideEncryptionConfiguration={98                                'Rules': [{99                                    'ApplyServerSideEncryptionByDefault': {100                                        'SSEAlgorithm': 'aws:kms',101                                        'KMSMasterKeyID': os.environ['KMS_ENCRYPTION_KEY']102                                    },103                                    'BucketKeyEnabled': True104                                }]105                            }106                        )107                    except Exception as e:108                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 109                        110        111                else:112                    print(f"Enabling Default Encryption for {bucket.name} with AWS SSE-S3")113                    try:114                        response = s3client.put_bucket_encryption(115                            Bucket=bucket.name,116                            ServerSideEncryptionConfiguration={117                                'Rules': [{118                                    'ApplyServerSideEncryptionByDefault': {119                                        'SSEAlgorithm': 'AES256'120                                    }121                                }]122                            }123                        )124                    except Exception as e:125                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 126        127            128            else:129                print("Bucket: %s, unexpected error: %s" % (bucket.name, e))130                pass131        132        #logger.info(f"Checking AWS Region {region}")133        #status = remote_client.get_ebs_encryption_by_default()134        #print ("===="*10)135        #result = status["EbsEncryptionByDefault"]136        #if result == True:137        #    logger.info(f"Activated, nothing to do")138        #else:139        #    logger.info(f"Not activated, activation in progress")140        #    remote_client.enable_ebs_encryption_by_default()141        #logger.info(f"Default EBS Encryption Enabled for {account_id}")142        143def on_update(event):144    logger.info("Initial Run")145    s3client = boto3.client('s3')146    s3 = boto3.resource('s3')147    logger.info("Getting Checking each bucket's logging")148    for bucket in s3.buckets.all():149        try:150            enc = s3client.get_bucket_encryption(Bucket=bucket.name)151            rules = enc['ServerSideEncryptionConfiguration']['Rules']152            print('Bucket: %s, Encryption: %s' % (bucket.name, rules))153        except botocore.exceptions.ClientError as e:154            if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':155                if os.environ['KMS_ENCRYPTION_KEY'] != '':156                    print(f"Enabling Encryption for {bucket.name} with CMK")157                    try:158                        response = s3client.put_bucket_encryption(159                            Bucket=bucket.name,160                            ServerSideEncryptionConfiguration={161                                'Rules': [{162                                    'ApplyServerSideEncryptionByDefault': {163                                        'SSEAlgorithm': 'aws:kms',164                                        'KMSMasterKeyID': os.environ['KMS_ENCRYPTION_KEY']165                                    },166                                    'BucketKeyEnabled': True167                                }]168                            }169                        )170                    except Exception as e:171                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 172                        173        174                else:175                    print(f"Enabling Default Encryption for {bucket.name} with AWS SSE-S3")176                    try:177                        response = s3client.put_bucket_encryption(178                            Bucket=bucket.name,179                            ServerSideEncryptionConfiguration={180                                'Rules': [{181                                    'ApplyServerSideEncryptionByDefault': {182                                        'SSEAlgorithm': 'AES256'183                                    }184                                }]185                            }186                        )187                    except Exception as e:188                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 189        190            191            else:192                print("Bucket: %s, unexpected error: %s" % (bucket.name, e))193                pass194    195def on_delete(event):196    logger.info("Deletion Run - Not implemented")197    s3client = boto3.client('s3')198    s3 = boto3.resource('s3')199    logger.info("Getting Checking each bucket's logging")200    for bucket in s3.buckets.all():201    202        try:203            enc = s3client.get_bucket_encryption(Bucket=bucket.name)204            rules = enc['ServerSideEncryptionConfiguration']['Rules']205            print('Bucket: %s, Encryption: %s' % (bucket.name, rules))206        except botocore.exceptions.ClientError as e:207            if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':208                if os.environ['KMS_ENCRYPTION_KEY'] != '':209                    print(f"Enabling Encryption for {bucket.name} with CMK")210                    try:211                        response = s3client.put_bucket_encryption(212                            Bucket=bucket.name,213                            ServerSideEncryptionConfiguration={214                                'Rules': [{215                                    'ApplyServerSideEncryptionByDefault': {216                                        'SSEAlgorithm': 'aws:kms',217                                        'KMSMasterKeyID': os.environ['KMS_ENCRYPTION_KEY']218                                    },219                                    'BucketKeyEnabled': True220                                }]221                            }222                        )223                    except Exception as e:224                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 225                        226        227                else:228                    print(f"Enabling Default Encryption for {bucket.name} with AWS SSE-S3")229                    try:230                        response = s3client.put_bucket_encryption(231                            Bucket=bucket.name,232                            ServerSideEncryptionConfiguration={233                                'Rules': [{234                                    'ApplyServerSideEncryptionByDefault': {235                                        'SSEAlgorithm': 'AES256'236                                    }237                                }]238                            }239                        )240                    except Exception as e:241                        logger.exception(f"failed to to update {bucket.name} - " + str(e)) 242        243            244            else:...aws-compliance-s3-encryption.py
Source:aws-compliance-s3-encryption.py  
...5logger = logging.getLogger()6logger.setLevel(logging.INFO)7client = boto3.client('s3')8bucket_list = []9def put_bucket_encryption(bucket,client):10  encrypt = client.put_bucket_encryption(11    Bucket=bucket,12    ServerSideEncryptionConfiguration={13        'Rules': [14            {15                'ApplyServerSideEncryptionByDefault': {16                    'SSEAlgorithm': 'AES256'17                }18            },19        ]20    }21  )22  return encrypt23def get_bucket_list(client):24  response = client.list_buckets()25  for i in response['Buckets']:26    bucket_list.append(i['Name'])27  return bucket_list28def lambda_handler(event, context):29    for bucket in get_bucket_list(client):30      try:31        bucket_encryption = client.get_bucket_encryption( Bucket=bucket )32      except ClientError as error:33        put_bucket_encryption(bucket, client)34        logging.info("{} does not have encryption. Adding".format(bucket))35      except TypeError as type:36        pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
