How to use sqs_collect_s3_events method in localstack

Best Python code snippet using localstack_python

test_s3_notifications_sqs.py

Source:test_s3_notifications_sqs.py Github

copy

Full Screen

...79 """80 def factory(bucket_name: str, queue_url: str, events: List["EventType"]):81 return create_sqs_bucket_notification(s3_client, sqs_client, bucket_name, queue_url, events)82 return factory83def sqs_collect_s3_events(84 sqs_client: "SQSClient", queue_url: str, min_events: int, timeout: int = 1085) -> List[Dict]:86 """87 Polls the given queue for the given amount of time and extracts and flattens from the received messages all88 events (messages that have a "Records" field in their body, and where the records can be json-deserialized).89 :param sqs_client: the boto3 client to use90 :param queue_url: the queue URL to listen from91 :param min_events: the minimum number of events to receive to wait for92 :param timeout: the number of seconds to wait before raising an assert error93 :return: a list with the deserialized records from the SQS messages94 """95 events = []96 def collect_events() -> int:97 _response = sqs_client.receive_message(98 QueueUrl=queue_url, WaitTimeSeconds=timeout, MaxNumberOfMessages=199 )100 messages = _response.get("Messages", [])101 if not messages:102 LOG.info("no messages received from %s after %d seconds", queue_url, timeout)103 for m in messages:104 body = m["Body"]105 # see https://www.mikulskibartosz.name/what-is-s3-test-event/106 if "s3:TestEvent" in body:107 continue108 assert "Records" in body, "Unexpected event received"109 doc = json.loads(body)110 events.extend(doc["Records"])111 return len(events)112 assert poll_condition(lambda: collect_events() >= min_events, timeout=timeout)113 return events114class TestS3NotificationsToSQS:115 @pytest.mark.aws_validated116 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag"])117 def test_object_created_put(118 self,119 s3_client,120 sqs_client,121 s3_create_bucket,122 sqs_create_queue,123 s3_create_sqs_bucket_notification,124 snapshot,125 ):126 snapshot.add_transformer(snapshot.transform.sqs_api())127 snapshot.add_transformer(snapshot.transform.s3_api())128 # setup fixture129 bucket_name = s3_create_bucket()130 queue_url = sqs_create_queue()131 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Put"])132 s3_client.put_bucket_versioning(133 Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}134 )135 obj0 = s3_client.put_object(Bucket=bucket_name, Key="my_key_0", Body="something")136 obj1 = s3_client.put_object(Bucket=bucket_name, Key="my_key_1", Body="something else")137 # collect s3 events from SQS queue138 events = sqs_collect_s3_events(sqs_client, queue_url, min_events=2)139 assert len(events) == 2, f"unexpected number of events in {events}"140 # order seems not be guaranteed - sort so we can rely on the order141 events.sort(key=lambda x: x["s3"]["object"]["size"])142 snapshot.match("receive_messages", {"messages": events})143 # assert144 assert events[0]["eventSource"] == "aws:s3"145 assert events[0]["eventName"] == "ObjectCreated:Put"146 assert events[0]["s3"]["bucket"]["name"] == bucket_name147 assert events[0]["s3"]["object"]["key"] == "my_key_0"148 assert events[0]["s3"]["object"]["size"] == 9149 assert events[0]["s3"]["object"]["versionId"]150 assert obj0["VersionId"] == events[0]["s3"]["object"]["versionId"]151 assert events[1]["eventSource"] == "aws:s3"152 assert events[0]["eventName"] == "ObjectCreated:Put"153 assert events[1]["s3"]["bucket"]["name"] == bucket_name154 assert events[1]["s3"]["object"]["key"] == "my_key_1"155 assert events[1]["s3"]["object"]["size"] == 14156 assert events[1]["s3"]["object"]["versionId"]157 assert obj1["VersionId"] == events[1]["s3"]["object"]["versionId"]158 @pytest.mark.aws_validated159 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])160 def test_object_created_copy(161 self,162 s3_client,163 sqs_client,164 s3_create_bucket,165 sqs_create_queue,166 s3_create_sqs_bucket_notification,167 snapshot,168 ):169 snapshot.add_transformer(snapshot.transform.sqs_api())170 snapshot.add_transformer(snapshot.transform.s3_api())171 snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))172 # setup fixture173 bucket_name = s3_create_bucket()174 queue_url = sqs_create_queue()175 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:Copy"])176 src_key = "src-dest-%s" % short_uid()177 dest_key = "key-dest-%s" % short_uid()178 s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")179 assert not sqs_collect_s3_events(180 sqs_client, queue_url, 0, timeout=1181 ), "unexpected event triggered for put_object"182 s3_client.copy_object(183 Bucket=bucket_name,184 CopySource={"Bucket": bucket_name, "Key": src_key},185 Key=dest_key,186 )187 events = sqs_collect_s3_events(sqs_client, queue_url, 1)188 assert len(events) == 1, f"unexpected number of events in {events}"189 snapshot.match("receive_messages", {"messages": events})190 assert events[0]["eventSource"] == "aws:s3"191 assert events[0]["eventName"] == "ObjectCreated:Copy"192 assert events[0]["s3"]["bucket"]["name"] == bucket_name193 assert events[0]["s3"]["object"]["key"] == dest_key194 @pytest.mark.aws_validated195 @pytest.mark.skip_snapshot_verify(196 paths=["$..s3.object.eTag", "$..s3.object.versionId", "$..s3.object.size"]197 )198 def test_object_created_and_object_removed(199 self,200 s3_client,201 sqs_client,202 s3_create_bucket,203 sqs_create_queue,204 s3_create_sqs_bucket_notification,205 snapshot,206 ):207 snapshot.add_transformer(snapshot.transform.sqs_api())208 snapshot.add_transformer(snapshot.transform.s3_api())209 snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))210 # setup fixture211 bucket_name = s3_create_bucket()212 queue_url = sqs_create_queue()213 s3_create_sqs_bucket_notification(214 bucket_name, queue_url, ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]215 )216 src_key = "src-dest-%s" % short_uid()217 dest_key = "key-dest-%s" % short_uid()218 # event0 = PutObject219 s3_client.put_object(Bucket=bucket_name, Key=src_key, Body="something")220 # event1 = CopyObject221 s3_client.copy_object(222 Bucket=bucket_name,223 CopySource={"Bucket": bucket_name, "Key": src_key},224 Key=dest_key,225 )226 # event3 = DeleteObject227 s3_client.delete_object(Bucket=bucket_name, Key=src_key)228 # collect events229 events = sqs_collect_s3_events(sqs_client, queue_url, 3)230 assert len(events) == 3, f"unexpected number of events in {events}"231 # order seems not be guaranteed - sort so we can rely on the order232 events.sort(key=lambda x: x["eventName"])233 snapshot.match("receive_messages", {"messages": events})234 assert events[1]["eventName"] == "ObjectCreated:Put"235 assert events[1]["s3"]["bucket"]["name"] == bucket_name236 assert events[1]["s3"]["object"]["key"] == src_key237 assert events[0]["eventName"] == "ObjectCreated:Copy"238 assert events[0]["s3"]["bucket"]["name"] == bucket_name239 assert events[0]["s3"]["object"]["key"] == dest_key240 assert events[2]["eventName"] == "ObjectRemoved:Delete"241 assert events[2]["s3"]["bucket"]["name"] == bucket_name242 assert events[2]["s3"]["object"]["key"] == src_key243 @pytest.mark.aws_validated244 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])245 def test_object_created_complete_multipart_upload(246 self,247 s3_client,248 sqs_client,249 s3_create_bucket,250 sqs_create_queue,251 s3_create_sqs_bucket_notification,252 tmpdir,253 snapshot,254 ):255 snapshot.add_transformer(snapshot.transform.sqs_api())256 snapshot.add_transformer(snapshot.transform.s3_api())257 # setup fixture258 bucket_name = s3_create_bucket()259 queue_url = sqs_create_queue()260 key = "test-key"261 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])262 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#multipart-transfers263 config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)264 file = tmpdir / "test-file.bin"265 data = b"1" * (6 * KB) # create 6 kilobytes of ones266 file.write(data=data, mode="w")267 s3_client.upload_file(268 Bucket=bucket_name, Key=key, Filename=str(file.realpath()), Config=config269 )270 events = sqs_collect_s3_events(sqs_client, queue_url, 1)271 snapshot.match("receive_messages", {"messages": events})272 assert events[0]["eventName"] == "ObjectCreated:CompleteMultipartUpload"273 assert events[0]["s3"]["bucket"]["name"] == bucket_name274 assert events[0]["s3"]["object"]["key"] == key275 assert events[0]["s3"]["object"]["size"] == file.size()276 @pytest.mark.aws_validated277 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])278 def test_key_encoding(279 self,280 s3_client,281 sqs_client,282 s3_create_bucket,283 sqs_create_queue,284 s3_create_sqs_bucket_notification,285 snapshot,286 ):287 snapshot.add_transformer(snapshot.transform.sqs_api())288 snapshot.add_transformer(snapshot.transform.s3_api())289 # test for https://github.com/localstack/localstack/issues/2741290 bucket_name = s3_create_bucket()291 queue_url = sqs_create_queue()292 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])293 key = "a@b"294 key_encoded = "a%40b"295 s3_client.put_object(Bucket=bucket_name, Key=key, Body="something")296 events = sqs_collect_s3_events(sqs_client, queue_url, min_events=1)297 snapshot.match("receive_messages", {"messages": events})298 assert events[0]["eventName"] == "ObjectCreated:Put"299 assert events[0]["s3"]["object"]["key"] == key_encoded300 @pytest.mark.aws_validated301 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])302 def test_object_created_put_with_presigned_url_upload(303 self,304 s3_client,305 sqs_client,306 s3_create_bucket,307 sqs_create_queue,308 s3_create_sqs_bucket_notification,309 snapshot,310 ):311 snapshot.add_transformer(snapshot.transform.sqs_api())312 snapshot.add_transformer(snapshot.transform.s3_api())313 bucket_name = s3_create_bucket()314 queue_url = sqs_create_queue()315 key = "key-by-hostname"316 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectCreated:*"])317 url = s3_client.generate_presigned_url(318 "put_object", Params={"Bucket": bucket_name, "Key": key}319 )320 requests.put(url, data="something", verify=False)321 events = sqs_collect_s3_events(sqs_client, queue_url, 1)322 snapshot.match("receive_messages", {"messages": events})323 assert events[0]["eventName"] == "ObjectCreated:Put"324 assert events[0]["s3"]["object"]["key"] == key325 @pytest.mark.aws_validated326 @pytest.mark.skip_snapshot_verify(327 paths=[328 "$..s3.object.eTag",329 "$..s3.object.versionId",330 "$..s3.object.size",331 "$..s3.object.sequencer",332 "$..eventVersion",333 ]334 )335 def test_object_tagging_put_event(336 self,337 s3_client,338 sqs_client,339 s3_create_bucket,340 sqs_create_queue,341 s3_create_sqs_bucket_notification,342 snapshot,343 ):344 snapshot.add_transformer(snapshot.transform.sqs_api())345 snapshot.add_transformer(snapshot.transform.s3_api())346 snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))347 # setup fixture348 bucket_name = s3_create_bucket()349 queue_url = sqs_create_queue()350 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectTagging:Put"])351 dest_key = "key-dest-%s" % short_uid()352 s3_client.put_object(Bucket=bucket_name, Key=dest_key, Body="FooBarBlitz")353 assert not sqs_collect_s3_events(354 sqs_client, queue_url, 0, timeout=1355 ), "unexpected event triggered for put_object"356 s3_client.put_object_tagging(357 Bucket=bucket_name,358 Key=dest_key,359 Tagging={360 "TagSet": [361 {"Key": "swallow_type", "Value": "african"},362 ]363 },364 )365 events = sqs_collect_s3_events(sqs_client, queue_url, 1)366 assert len(events) == 1, f"unexpected number of events in {events}"367 snapshot.match("receive_messages", {"messages": events})368 assert events[0]["eventSource"] == "aws:s3"369 assert events[0]["eventName"] == "ObjectTagging:Put"370 assert events[0]["s3"]["bucket"]["name"] == bucket_name371 assert events[0]["s3"]["object"]["key"] == dest_key372 @pytest.mark.aws_validated373 @pytest.mark.skip_snapshot_verify(374 paths=[375 "$..s3.object.eTag",376 "$..s3.object.versionId",377 "$..s3.object.size",378 "$..s3.object.sequencer",379 "$..eventVersion",380 ]381 )382 def test_object_tagging_delete_event(383 self,384 s3_client,385 sqs_client,386 s3_create_bucket,387 sqs_create_queue,388 s3_create_sqs_bucket_notification,389 snapshot,390 ):391 snapshot.add_transformer(snapshot.transform.sqs_api())392 snapshot.add_transformer(snapshot.transform.s3_api())393 snapshot.add_transformer(snapshot.transform.jsonpath("$..s3.object.key", "object-key"))394 # setup fixture395 bucket_name = s3_create_bucket()396 queue_url = sqs_create_queue()397 s3_create_sqs_bucket_notification(bucket_name, queue_url, ["s3:ObjectTagging:Delete"])398 dest_key = "key-dest-%s" % short_uid()399 s3_client.put_object(Bucket=bucket_name, Key=dest_key, Body="FooBarBlitz")400 assert not sqs_collect_s3_events(401 sqs_client, queue_url, 0, timeout=1402 ), "unexpected event triggered for put_object"403 s3_client.put_object_tagging(404 Bucket=bucket_name,405 Key=dest_key,406 Tagging={407 "TagSet": [408 {"Key": "swallow_type", "Value": "african"},409 ]410 },411 )412 s3_client.delete_object_tagging(413 Bucket=bucket_name,414 Key=dest_key,415 )416 events = sqs_collect_s3_events(sqs_client, queue_url, 1)417 assert len(events) == 1, f"unexpected number of events in {events}"418 snapshot.match("receive_messages", {"messages": events})419 assert events[0]["eventSource"] == "aws:s3"420 assert events[0]["eventName"] == "ObjectTagging:Delete"421 assert events[0]["s3"]["bucket"]["name"] == bucket_name422 assert events[0]["s3"]["object"]["key"] == dest_key423 @pytest.mark.aws_validated424 @pytest.mark.skip_snapshot_verify(paths=["$..s3.object.eTag", "$..s3.object.versionId"])425 def test_xray_header(426 self,427 s3_client,428 sqs_client,429 s3_create_bucket,430 sqs_create_queue,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful