Best Python code snippet using localstack_python
test_aws_s3_cloudtrail_public_access.py
Source:test_aws_s3_cloudtrail_public_access.py  
1# Copyright (c) 2020 VMware Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import pytest15from mock import Mock16from botocore.exceptions import ClientError17from remediation_worker.jobs.aws_s3_cloudtrail_public_access.aws_s3_cloudtrail_public_access import (18    CloudtrailS3RemovePublicAccess,19)20@pytest.fixture21def valid_payload():22    return """23{24    "notificationInfo": {25        "RuleId": "5c6cc5e103dcc90f363146cd",26        "Service": "CloudTrail",27        "FindingInfo": {28            "FindingId": "d0431afd-b82e-4021-8aa6-ba3cf5c60ef7",29            "ObjectId": "CloudTrail_name",30            "ObjectChain": "{\\"cloudAccountId\\":\\"cloud_account_id\\",\\"entityId\\":\\"AWS.CloudTrail.159636093902.us-west-2.Trail.test-remediation\\",\\"entityName\\":\\"remediation-cloudtrail\\",\\"entityType\\":\\"AWS.CloudTrail.Trail\\",\\"lastUpdateTime\\":\\"2020-09-09T00:36:35.000Z\\",\\"partitionKey\\":\\"153894897389\\",\\"provider\\":\\"AWS\\",\\"region\\":\\"us-west-2\\",\\"service\\":\\"CloudTrail\\", \\"properties\\":[{\\"name\\":\\"S3BucketName\\",\\"stringV\\":\\"remediation-cloudtrail\\",\\"type\\":\\"string\\"}]}",31            "Region": "region"32            }33        }34}35"""36class TestCloudtrailS3PublicAccess(object):37    def test_parse_payload(self, valid_payload):38        params = CloudtrailS3RemovePublicAccess().parse(valid_payload)39        assert params["region"] == "region"40        assert params["cloudtrail_name"] == "CloudTrail_name"41        assert params["cloud_account_id"] == "cloud_account_id"42    def test_remediate_success_with_bucket_policy_public(self):43        client = Mock()44        cloudtrail_client = Mock()45        action = CloudtrailS3RemovePublicAccess()46        trail = {47            "Trail": {48                "Name": "CloudTrail_name",49                "S3BucketName": "remediation-cloudtrail",50            }51        }52        cloudtrail_client.get_trail.return_value = trail53        bucket_status = {54            "ResponseMetadata": {55                "RequestId": "9B28R8BGSR67A459",56                "HostId": "aS/3JTmp+hjghfjxfhc4VznkMTTkjhbjkKMCs93cfTCcC6R2rE3SIVziHRDFg=",57                "HTTPStatusCode": 200,58                "HTTPHeaders": {59                    "x-amz-id-2": "aS/3JTmp+hjghfjxfhc4VznkMTTkjhbjkKMCs93cfTCcC6R2rE3SIVziHRDFg=",60                    "x-amz-request-id": "9B28R8BGSR67A459",61                    "date": "Wed, 27 Jan 2021 14:51:32 GMT",62                    "transfer-encoding": "chunked",63                    "server": "AmazonS3",64                },65                "RetryAttempts": 0,66            },67            "PolicyStatus": {"IsPublic": True},68        }69        client.get_bucket_policy_status.return_value = bucket_status70        client.get_bucket_policy.return_value = {71            "ResponseMetadata": {72                "RequestId": "EPFRBXATAM2JCGDP",73                "HostId": "M4bxrGZTQykEqOjq0WZ9cQKDhdatiPqCHV8GsZCdRSFn8bOXF4441q9vPzR/33ca9xePha+zhCw=",74                "HTTPStatusCode": 200,75                "HTTPHeaders": {76                    "x-amz-id-2": "M4bxrGZTQykEqOjq0WZ9cQKDhdatiPqCHV8GsZCdRSFn8bOXF4441q9vPzR/33ca9xePha+zhCw=",77                    "x-amz-request-id": "EPFRBXATAM2JCGDP",78                    "date": "Wed, 27 Jan 2021 14:51:32 GMT",79                    "content-type": "application/json",80                    "content-length": "637",81                    "server": "AmazonS3",82                },83                "RetryAttempts": 0,84            },85            "Policy": '{"Version":"2012-10-17","Statement":[{"Sid":"AWSCloudTrailAclCheck20150319","Effect":"Allow","Principal":{"Service":"cloudtrail.amazonaws.com"},"Action":"s3:GetBucketAcl","Resource":"arn:aws:s3:::remediation-cloudtrail"},{"Sid":"AllowPublicReadAccess","Effect":"Allow","Principal":"*","Action":"s3:GetObject","Resource":"arn:aws:s3:::remediation-cloudtrail/*"},{"Sid":"AWSCloudTrailWrite20150319","Effect":"Allow","Principal":{"Service":"cloudtrail.amazonaws.com"},"Action":"s3:PutObject","Resource":"arn:aws:s3:::remediation-cloudtrail/AWSLogs/159636093902/*","Condition":{"StringEquals":{"s3:x-amz-acl":"bucket-owner-full-control"}}},{"Sid":"PublicRead","Effect":"Allow","Principal":{"AWS":"*"},"Action":["s3:GetObject","s3:GetObjectVersion"],"Resource":"arn:aws:s3:::remediation-cloudtrail/*"}]}',86        }87        assert (88            action.remediate(89                cloudtrail_client,90                client,91                "cloudtrail_name",92                "region",93                "cloud_account_id",94            )95            == 096        )97        assert client.put_public_access_block.call_count == 198        assert client.get_bucket_policy_status.call_count == 199        assert client.get_bucket_policy.call_count == 1100        assert client.put_bucket_policy.call_count == 1101        call_args = client.put_public_access_block.call_args102        updated_public_access_config = call_args[1]["PublicAccessBlockConfiguration"]103        assert updated_public_access_config == {104            "BlockPublicAcls": True,105            "IgnorePublicAcls": True,106            "BlockPublicPolicy": True,107            "RestrictPublicBuckets": True,108        }109        call_args_bucket_policy = client.put_bucket_policy.call_args110        updated_bucket_policy = call_args_bucket_policy[1]["Policy"]111        print(updated_bucket_policy)112        assert (113            updated_bucket_policy114            == '{"Version": "2012-10-17", "Statement": [{"Sid": "AWSCloudTrailAclCheck20150319", "Effect": "Allow", "Principal": {"Service": "cloudtrail.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": "arn:aws:s3:::remediation-cloudtrail"}, {"Sid": "AWSCloudTrailWrite20150319", "Effect": "Allow", "Principal": {"Service": "cloudtrail.amazonaws.com"}, "Action": "s3:PutObject", "Resource": "arn:aws:s3:::remediation-cloudtrail/AWSLogs/159636093902/*", "Condition": {"StringEquals": {"s3:x-amz-acl": "bucket-owner-full-control"}}}]}'115        )116    def test_remediate_success_without_bucket_policy_public(self):117        client = Mock()118        cloudtrail_client = Mock()119        action = CloudtrailS3RemovePublicAccess()120        trail = {121            "Trail": {122                "Name": "CloudTrail_name",123                "S3BucketName": "remediation-cloudtrail",124            }125        }126        cloudtrail_client.get_trail.return_value = trail127        bucket_status = {128            "ResponseMetadata": {129                "RequestId": "9B28R8BGSR67A459",130                "HostId": "aS/3JTmp+hjghfjxfhc4VznkMTTkjhbjkKMCs93cfTCcC6R2rE3SIVziHRDFg=",131                "HTTPStatusCode": 200,132                "HTTPHeaders": {133                    "x-amz-id-2": "aS/3JTmp+hjghfjxfhc4VznkMTTkjhbjkKMCs93cfTCcC6R2rE3SIVziHRDFg=",134                    "x-amz-request-id": "9B28R8BGSR67A459",135                    "date": "Wed, 27 Jan 2021 14:51:32 GMT",136                    "transfer-encoding": "chunked",137                    "server": "AmazonS3",138                },139                "RetryAttempts": 0,140            },141            "PolicyStatus": {"IsPublic": False},142        }143        client.get_bucket_policy_status.return_value = bucket_status144        assert (145            action.remediate(146                cloudtrail_client,147                client,148                "cloudtrail_name",149                "region",150                "cloud_account_id",151            )152            == 0153        )154        assert client.put_public_access_block.call_count == 1155        assert client.get_bucket_policy_status.call_count == 1156        call_args = client.put_public_access_block.call_args157        updated_public_access_config = call_args[1]["PublicAccessBlockConfiguration"]158        assert updated_public_access_config == {159            "BlockPublicAcls": True,160            "IgnorePublicAcls": True,161            "BlockPublicPolicy": True,162            "RestrictPublicBuckets": True,163        }164    def test_remediate_with_exception(self):165        class TestClient(object):166            def put_public_access_block(self, **kwargs):167                raise ClientError(168                    {169                        "Error": {170                            "Code": "NotFound",171                            "Message": "InvalidPermission.NotFound",172                        }173                    },174                    "TestCloudtrailS3PublicAccess",175                )176        client = TestClient()177        action = CloudtrailS3RemovePublicAccess()178        with pytest.raises(Exception):...list_s3_buckets.py
Source:list_s3_buckets.py  
...27                    "location": client.get_bucket_location(Bucket=bucket)["LocationConstraint"],28                    "logging": rm_meta(client.get_bucket_logging(Bucket=bucket)),29                    "notification_configuration": rm_meta(client.get_bucket_notification_configuration(Bucket=bucket)),30                    "policy": self.get_bucket_policy(client, bucket),31                    "policy_status": self.get_bucket_policy_status(client, bucket),32                    "replication": self.get_bucket_replication(client, bucket),33                    "request_payment": client.get_bucket_request_payment(Bucket=bucket)["Payer"],34                    "tagging": self.get_bucket_tagging(client, bucket),35                    "versioning": rm_meta(client.get_bucket_versioning(Bucket=bucket)),36                    "website": self.get_bucket_website(client, bucket),37                }38            }39            print(json.dumps(data, indent=2, sort_keys=True))40            if kwargs.get("Bucket"):41                return True42    def get_bucket_cors(self, client, bucket):43        try:44            return client.get_bucket_cors(Bucket=bucket)["CORSRules"]45        except ClientError as e:46            if e.response["Error"]["Code"] == "NoSuchCORSConfiguration":47                pass48    def get_bucket_encryption(self, client, bucket):49        try:50            return client.get_bucket_encryption(Bucket=bucket)["ServerSideEncryptionConfiguration"]51        except ClientError as e:52            if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":53                pass54    def get_bucket_lifecycle_configuration(self, client, bucket):55        try:56            return client.get_bucket_lifecycle_configuration(Bucket=bucket)["Rules"]57        except ClientError as e:58            if e.response["Error"]["Code"] == "NoSuchLifecycleConfiguration":59                pass60    def get_bucket_replication(self, client, bucket):61        try:62            return client.get_bucket_replication(Bucket=bucket)["ReplicationConfiguration"],63        except ClientError as e:64            if e.response["Error"]["Code"] == "ReplicationConfigurationNotFoundError":65                pass66    def get_bucket_policy(self, client, bucket):67        try:68            return client.get_bucket_policy(Bucket=bucket)["Policy"],69        except ClientError as e:70            if e.response["Error"]["Code"] == "NoSuchBucketPolicy":71                pass72    def get_bucket_policy_status(self, client, bucket):73        try:74            return client.get_bucket_policy_status(Bucket=bucket)["PolicyStatus"],75        except ClientError as e:76            if e.response["Error"]["Code"] == "NoSuchBucketPolicy":77                pass78    def get_bucket_tagging(self, client, bucket):79        try:80            return client.get_bucket_tagging(Bucket=bucket)["TagSet"],81        except ClientError as e:82            if e.response["Error"]["Code"] == "NoSuchTagSet":83                pass84    def get_bucket_website(self, client, bucket):85        try:86            return client.get_bucket_website(Bucket=bucket),87        except ClientError as e:88            if e.response["Error"]["Code"] == "NoSuchWebsiteConfiguration":...s3.py
Source:s3.py  
...29                get_bucket_policy = self.client.get_bucket_policy(30                    Bucket=bucket['Name'],31                )32                get_bucket_policy = get_bucket_policy['Policy']33                get_bucket_policy_status = self.client.get_bucket_policy_status(34                                    Bucket=bucket['Name'],35                                ) 36                get_bucket_policy_status = get_bucket_policy_status['PolicyStatus']37            except ClientError as e:38                get_bucket_policy = []39                get_bucket_policy_status = []40                if e.response['Error']['Code'] == 'NoSuchBucketPolicy':41                    pass42                    #print('\t NoSuchBucketPolicy')43                else:44                    pass45                    #print("unexpected error: %s" % (e.response))46            bucket['get_bucket_policy'] = get_bucket_policy47            bucket['get_bucket_policy_status'] = get_bucket_policy_status...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
