Best Python code snippet using localstack_python
test_s3_bucket_notification.py
Source:test_s3_bucket_notification.py  
1# Make coding more python3-ish2from __future__ import (absolute_import, division, print_function)3__metaclass__ = type4import pytest5try:6    from botocore.exceptions import ClientError7except ImportError:8    pass9from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO310from ansible_collections.community.aws.tests.unit.compat.mock import MagicMock11from ansible_collections.community.aws.tests.unit.compat.mock import patch12from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson13from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleFailJson14from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase15from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args16from ansible_collections.community.aws.plugins.modules.s3_bucket_notification import AmazonBucket17from ansible_collections.community.aws.plugins.modules.s3_bucket_notification import Config18from ansible_collections.community.aws.plugins.modules import s3_bucket_notification19if not HAS_BOTO3:20    pytestmark = pytest.mark.skip("s3_bucket_notification.py requires the `boto3` and `botocore` modules")21class TestAmazonBucketOperations:22    def test_current_config(self):23        api_config = {24            'Id': 'test-id',25            'LambdaFunctionArn': 'test-arn',26            'Events': [],27            'Filter': {28                'Key': {29                    'FilterRules': [{30                        'Name': 'Prefix',31                        'Value': ''32                    }, {33                        'Name': 'Suffix',34                        'Value': ''35                    }]36                }37            }38        }39        client = MagicMock()40        client.get_bucket_notification_configuration.return_value = {41            'LambdaFunctionConfigurations': [api_config]42        }43        bucket = AmazonBucket(client, 'test-bucket')44        current = bucket.current_config('test-id')45        assert current.raw == api_config46        assert client.get_bucket_notification_configuration.call_count == 147    def test_current_config_empty(self):48        client = MagicMock()49        client.get_bucket_notification_configuration.return_value = {50            'LambdaFunctionConfigurations': []51        }52        bucket = AmazonBucket(client, 'test-bucket')53        current = bucket.current_config('test-id')54        assert current is None55        assert client.get_bucket_notification_configuration.call_count == 156    def test_apply_invalid_config(self):57        client = MagicMock()58        client.get_bucket_notification_configuration.return_value = {59            'LambdaFunctionConfigurations': []60        }61        client.put_bucket_notification_configuration.side_effect = ClientError({}, '')62        bucket = AmazonBucket(client, 'test-bucket')63        config = Config.from_params(**{64            'event_name': 'test_event',65            'lambda_function_arn': 'lambda_arn',66            'lambda_version': 1,67            'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],68            'prefix': '',69            'suffix': ''70        })71        with pytest.raises(ClientError):72            bucket.apply_config(config)73    def test_apply_config(self):74        client = MagicMock()75        client.get_bucket_notification_configuration.return_value = {76            'LambdaFunctionConfigurations': []77        }78        bucket = AmazonBucket(client, 'test-bucket')79        config = Config.from_params(**{80            'event_name': 'test_event',81            'lambda_function_arn': 'lambda_arn',82            'lambda_version': 1,83            'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],84            'prefix': '',85            'suffix': ''86        })87        bucket.apply_config(config)88        assert client.get_bucket_notification_configuration.call_count == 189        assert client.put_bucket_notification_configuration.call_count == 190    def test_apply_config_add_event(self):91        api_config = {92            'Id': 'test-id',93            'LambdaFunctionArn': 'test-arn',94            'Events': ['s3:ObjectRemoved:*'],95            'Filter': {96                'Key': {97                    'FilterRules': [{98                        'Name': 'Prefix',99                        'Value': ''100                    }, {101                        'Name': 'Suffix',102                        'Value': ''103                    }]104                }105            }106        }107        client = MagicMock()108        client.get_bucket_notification_configuration.return_value = {109            'LambdaFunctionConfigurations': [api_config]110        }111        bucket = AmazonBucket(client, 'test-bucket')112        config = Config.from_params(**{113            'event_name': 'test-id',114            'lambda_function_arn': 'test-arn',115            'lambda_version': 1,116            'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],117            'prefix': '',118            'suffix': ''119        })120        bucket.apply_config(config)121        assert client.get_bucket_notification_configuration.call_count == 1122        assert client.put_bucket_notification_configuration.call_count == 1123        client.put_bucket_notification_configuration.assert_called_with(124            Bucket='test-bucket',125            NotificationConfiguration={126                'LambdaFunctionConfigurations': [{127                    'Id': 'test-id',128                    'LambdaFunctionArn': 'test-arn:1',129                    'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],130                    'Filter': {131                        'Key': {132                            'FilterRules': [{133                                'Name': 'Prefix',134                                'Value': ''135                            }, {136                                'Name': 'Suffix',137                                'Value': ''138                            }]139                        }140                    }141                }]142            }143        )144    def test_delete_config(self):145        api_config = {146            'Id': 'test-id',147            'LambdaFunctionArn': 'test-arn',148            'Events': [],149            'Filter': {150                'Key': {151                    'FilterRules': [{152                        'Name': 'Prefix',153                        'Value': ''154                    }, {155                        'Name': 'Suffix',156                        'Value': ''157                    }]158                }159            }160        }161        client = MagicMock()162        client.get_bucket_notification_configuration.return_value = {163            'LambdaFunctionConfigurations': [api_config]164        }165        bucket = AmazonBucket(client, 'test-bucket')166        config = Config.from_params(**{167            'event_name': 'test-id',168            'lambda_function_arn': 'lambda_arn',169            'lambda_version': 1,170            'events': [],171            'prefix': '',172            'suffix': ''173        })174        bucket.delete_config(config)175        assert client.get_bucket_notification_configuration.call_count == 1176        assert client.put_bucket_notification_configuration.call_count == 1177        client.put_bucket_notification_configuration.assert_called_with(178            Bucket='test-bucket',179            NotificationConfiguration={'LambdaFunctionConfigurations': []}180        )181class TestConfig:182    def test_config_from_params(self):183        config = Config({184            'Id': 'test-id',185            'LambdaFunctionArn': 'test-arn:10',186            'Events': [],187            'Filter': {188                'Key': {189                    'FilterRules': [{190                        'Name': 'Prefix',191                        'Value': ''192                    }, {193                        'Name': 'Suffix',194                        'Value': ''195                    }]196                }197            }198        })199        config_from_params = Config.from_params(**{200            'event_name': 'test-id',201            'lambda_function_arn': 'test-arn',202            'lambda_version': 10,203            'events': [],204            'prefix': '',205            'suffix': ''206        })207        assert config.raw == config_from_params.raw208        assert config == config_from_params209class TestModule(ModuleTestCase):210    def test_module_fail_when_required_args_missing(self):211        with pytest.raises(AnsibleFailJson):212            set_module_args({})213            s3_bucket_notification.main()214    @patch('ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client')215    def test_add_s3_bucket_notification(self, aws_client):216        aws_client.return_value.get_bucket_notification_configuration.return_value = {217            'LambdaFunctionConfigurations': []218        }219        set_module_args({220            'region': 'us-east-2',221            'lambda_function_arn': 'test-lambda-arn',222            'bucket_name': 'test-lambda',223            'event_name': 'test-id',224            'events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],225            'state': 'present',226            'prefix': '/images',227            'suffix': '.jpg'228        })229        with pytest.raises(AnsibleExitJson) as context:230            s3_bucket_notification.main()231        result = context.value.args[0]232        assert result['changed'] is True233        assert aws_client.return_value.get_bucket_notification_configuration.call_count == 1234        aws_client.return_value.put_bucket_notification_configuration.assert_called_with(235            Bucket='test-lambda',236            NotificationConfiguration={237                'LambdaFunctionConfigurations': [{238                    'Id': 'test-id',239                    'LambdaFunctionArn': 'test-lambda-arn',240                    'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],241                    'Filter': {242                        'Key': {243                            'FilterRules': [{244                                'Name': 'Prefix',245                                'Value': '/images'246                            }, {247                                'Name': 'Suffix',248                                'Value': '.jpg'249                            }]250                        }251                    }252                }]...index.py
Source:index.py  
...14    def id(self):15        return "{}".format(self._bucket)16    def default_deletion_policies(self):17        return ['Delete']18    def put_bucket_notification_configuration(self):19        self._s3.put_bucket_notification_configuration(20            Bucket=self._bucket,21            NotificationConfiguration=self._notification_configuration,22        )23    def delete_bucket_notification_configuration(self):24        self._s3.put_bucket_notification_configuration(25            Bucket=self._bucket,26            NotificationConfiguration={},27        )28    def create(self, policies):29        self.put_bucket_notification_configuration()30    def update(self, policies):31        self.put_bucket_notification_configuration()32    def delete(self, policies):33        try:34            self.delete_bucket_notification_configuration()35        except Exception as e:36            if policies.has('IgnoreError'):37                return38            raise e39def handler(event, context):40    c = S3BucketNotificationConfiguration(event, context)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
