How to use put_bucket_notification_configuration method in localstack

Best Python code snippet using localstack_python

test_s3_bucket_notification.py

Source:test_s3_bucket_notification.py Github

copy

Full Screen

1# Make coding more python3-ish2from __future__ import (absolute_import, division, print_function)3__metaclass__ = type4import pytest5try:6 from botocore.exceptions import ClientError7except ImportError:8 pass9from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO310from ansible_collections.community.aws.tests.unit.compat.mock import MagicMock11from ansible_collections.community.aws.tests.unit.compat.mock import patch12from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleExitJson13from ansible_collections.community.aws.tests.unit.plugins.modules.utils import AnsibleFailJson14from ansible_collections.community.aws.tests.unit.plugins.modules.utils import ModuleTestCase15from ansible_collections.community.aws.tests.unit.plugins.modules.utils import set_module_args16from ansible_collections.community.aws.plugins.modules.s3_bucket_notification import AmazonBucket17from ansible_collections.community.aws.plugins.modules.s3_bucket_notification import Config18from ansible_collections.community.aws.plugins.modules import s3_bucket_notification19if not HAS_BOTO3:20 pytestmark = pytest.mark.skip("s3_bucket_notification.py requires the `boto3` and `botocore` modules")21class TestAmazonBucketOperations:22 def test_current_config(self):23 api_config = {24 'Id': 'test-id',25 'LambdaFunctionArn': 'test-arn',26 'Events': [],27 'Filter': {28 'Key': {29 'FilterRules': [{30 'Name': 'Prefix',31 'Value': ''32 }, {33 'Name': 'Suffix',34 'Value': ''35 }]36 }37 }38 }39 client = MagicMock()40 client.get_bucket_notification_configuration.return_value = {41 'LambdaFunctionConfigurations': [api_config]42 }43 bucket = AmazonBucket(client, 'test-bucket')44 current = bucket.current_config('test-id')45 assert current.raw == api_config46 assert client.get_bucket_notification_configuration.call_count == 147 def test_current_config_empty(self):48 client = MagicMock()49 client.get_bucket_notification_configuration.return_value = {50 'LambdaFunctionConfigurations': []51 }52 bucket = AmazonBucket(client, 'test-bucket')53 current = bucket.current_config('test-id')54 assert current is None55 assert client.get_bucket_notification_configuration.call_count == 156 def test_apply_invalid_config(self):57 client = MagicMock()58 client.get_bucket_notification_configuration.return_value = {59 'LambdaFunctionConfigurations': []60 }61 client.put_bucket_notification_configuration.side_effect = ClientError({}, '')62 bucket = AmazonBucket(client, 'test-bucket')63 config = Config.from_params(**{64 'event_name': 'test_event',65 'lambda_function_arn': 'lambda_arn',66 'lambda_version': 1,67 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],68 'prefix': '',69 'suffix': ''70 })71 with pytest.raises(ClientError):72 bucket.apply_config(config)73 def test_apply_config(self):74 client = MagicMock()75 client.get_bucket_notification_configuration.return_value = {76 'LambdaFunctionConfigurations': []77 }78 bucket = AmazonBucket(client, 'test-bucket')79 config = Config.from_params(**{80 'event_name': 'test_event',81 'lambda_function_arn': 'lambda_arn',82 'lambda_version': 1,83 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],84 'prefix': '',85 'suffix': ''86 })87 bucket.apply_config(config)88 assert client.get_bucket_notification_configuration.call_count == 189 assert client.put_bucket_notification_configuration.call_count == 190 def test_apply_config_add_event(self):91 api_config = {92 'Id': 'test-id',93 'LambdaFunctionArn': 'test-arn',94 'Events': ['s3:ObjectRemoved:*'],95 'Filter': {96 'Key': {97 'FilterRules': [{98 'Name': 'Prefix',99 'Value': ''100 }, {101 'Name': 'Suffix',102 'Value': ''103 }]104 }105 }106 }107 client = MagicMock()108 client.get_bucket_notification_configuration.return_value = {109 'LambdaFunctionConfigurations': [api_config]110 }111 bucket = AmazonBucket(client, 'test-bucket')112 config = Config.from_params(**{113 'event_name': 'test-id',114 'lambda_function_arn': 'test-arn',115 'lambda_version': 1,116 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],117 'prefix': '',118 'suffix': ''119 })120 bucket.apply_config(config)121 assert client.get_bucket_notification_configuration.call_count == 1122 assert client.put_bucket_notification_configuration.call_count == 1123 client.put_bucket_notification_configuration.assert_called_with(124 Bucket='test-bucket',125 NotificationConfiguration={126 'LambdaFunctionConfigurations': [{127 'Id': 'test-id',128 'LambdaFunctionArn': 'test-arn:1',129 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],130 'Filter': {131 'Key': {132 'FilterRules': [{133 'Name': 'Prefix',134 'Value': ''135 }, {136 'Name': 'Suffix',137 'Value': ''138 }]139 }140 }141 }]142 }143 )144 def test_delete_config(self):145 api_config = {146 'Id': 'test-id',147 'LambdaFunctionArn': 'test-arn',148 'Events': [],149 'Filter': {150 'Key': {151 'FilterRules': [{152 'Name': 'Prefix',153 'Value': ''154 }, {155 'Name': 'Suffix',156 'Value': ''157 }]158 }159 }160 }161 client = MagicMock()162 client.get_bucket_notification_configuration.return_value = {163 'LambdaFunctionConfigurations': [api_config]164 }165 bucket = AmazonBucket(client, 'test-bucket')166 config = Config.from_params(**{167 'event_name': 'test-id',168 'lambda_function_arn': 'lambda_arn',169 'lambda_version': 1,170 'events': [],171 'prefix': '',172 'suffix': ''173 })174 bucket.delete_config(config)175 assert client.get_bucket_notification_configuration.call_count == 1176 assert client.put_bucket_notification_configuration.call_count == 1177 client.put_bucket_notification_configuration.assert_called_with(178 Bucket='test-bucket',179 NotificationConfiguration={'LambdaFunctionConfigurations': []}180 )181class TestConfig:182 def test_config_from_params(self):183 config = Config({184 'Id': 'test-id',185 'LambdaFunctionArn': 'test-arn:10',186 'Events': [],187 'Filter': {188 'Key': {189 'FilterRules': [{190 'Name': 'Prefix',191 'Value': ''192 }, {193 'Name': 'Suffix',194 'Value': ''195 }]196 }197 }198 })199 config_from_params = Config.from_params(**{200 'event_name': 'test-id',201 'lambda_function_arn': 'test-arn',202 'lambda_version': 10,203 'events': [],204 'prefix': '',205 'suffix': ''206 })207 assert config.raw == config_from_params.raw208 assert config == config_from_params209class TestModule(ModuleTestCase):210 def test_module_fail_when_required_args_missing(self):211 with pytest.raises(AnsibleFailJson):212 set_module_args({})213 s3_bucket_notification.main()214 @patch('ansible_collections.community.aws.plugins.modules.s3_bucket_notification.AnsibleAWSModule.client')215 def test_add_s3_bucket_notification(self, aws_client):216 aws_client.return_value.get_bucket_notification_configuration.return_value = {217 'LambdaFunctionConfigurations': []218 }219 set_module_args({220 'region': 'us-east-2',221 'lambda_function_arn': 'test-lambda-arn',222 'bucket_name': 'test-lambda',223 'event_name': 'test-id',224 'events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],225 'state': 'present',226 'prefix': '/images',227 'suffix': '.jpg'228 })229 with pytest.raises(AnsibleExitJson) as context:230 s3_bucket_notification.main()231 result = context.value.args[0]232 assert result['changed'] is True233 assert aws_client.return_value.get_bucket_notification_configuration.call_count == 1234 aws_client.return_value.put_bucket_notification_configuration.assert_called_with(235 Bucket='test-lambda',236 NotificationConfiguration={237 'LambdaFunctionConfigurations': [{238 'Id': 'test-id',239 'LambdaFunctionArn': 'test-lambda-arn',240 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],241 'Filter': {242 'Key': {243 'FilterRules': [{244 'Name': 'Prefix',245 'Value': '/images'246 }, {247 'Name': 'Suffix',248 'Value': '.jpg'249 }]250 }251 }252 }]...

Full Screen

Full Screen

index.py

Source:index.py Github

copy

Full Screen

...14 def id(self):15 return "{}".format(self._bucket)16 def default_deletion_policies(self):17 return ['Delete']18 def put_bucket_notification_configuration(self):19 self._s3.put_bucket_notification_configuration(20 Bucket=self._bucket,21 NotificationConfiguration=self._notification_configuration,22 )23 def delete_bucket_notification_configuration(self):24 self._s3.put_bucket_notification_configuration(25 Bucket=self._bucket,26 NotificationConfiguration={},27 )28 def create(self, policies):29 self.put_bucket_notification_configuration()30 def update(self, policies):31 self.put_bucket_notification_configuration()32 def delete(self, policies):33 try:34 self.delete_bucket_notification_configuration()35 except Exception as e:36 if policies.has('IgnoreError'):37 return38 raise e39def handler(event, context):40 c = S3BucketNotificationConfiguration(event, context)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful