How to use _await_queue_size method in localstack

Best Python code snippet using localstack_python

test_lambda_integration_sqs.py

Source:test_lambda_integration_sqs.py Github

copy

Full Screen

...23LAMBDA_SQS_INTEGRATION_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_sqs_integration.py")24LAMBDA_SQS_BATCH_ITEM_FAILURE_FILE = os.path.join(25 THIS_FOLDER, "functions", "lambda_sqs_batch_item_failure.py"26)27def _await_queue_size(sqs_client, queue_url: str, qsize: int, retries=10, sleep=1):28 # wait for all items to appear in the queue29 def _verify_event_queue_size():30 attr = "ApproximateNumberOfMessages"31 _approx = int(32 sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=[attr])[33 "Attributes"34 ][attr]35 )36 assert _approx >= qsize37 retry(_verify_event_queue_size, retries=retries, sleep=sleep)38@pytest.fixture(autouse=True)39def _snapshot_transformers(snapshot):40 # manual transformers since we are passing SQS attributes through lambdas and back again41 snapshot.add_transformer(snapshot.transform.key_value("QueueUrl"))42 snapshot.add_transformer(snapshot.transform.key_value("ReceiptHandle"))43 snapshot.add_transformer(snapshot.transform.key_value("SenderId", reference_replacement=False))44 snapshot.add_transformer(snapshot.transform.key_value("SequenceNumber"))45 snapshot.add_transformer(snapshot.transform.resource_name())46 # body contains dynamic attributes so md5 hash changes47 snapshot.add_transformer(snapshot.transform.key_value("MD5OfBody"))48 # lower-case for when messages are rendered in lambdas49 snapshot.add_transformer(snapshot.transform.key_value("receiptHandle"))50 snapshot.add_transformer(snapshot.transform.key_value("md5OfBody"))51@pytest.mark.skip_snapshot_verify(52 paths=[53 # FIXME: this is most of the event source mapping unfortunately54 "$..ParallelizationFactor",55 "$..LastProcessingResult",56 "$..Topics",57 "$..MaximumRetryAttempts",58 "$..MaximumBatchingWindowInSeconds",59 "$..FunctionResponseTypes",60 "$..StartingPosition",61 "$..StateTransitionReason",62 ]63)64@pytest.mark.aws_validated65def test_failing_lambda_retries_after_visibility_timeout(66 create_lambda_function,67 lambda_client,68 sqs_client,69 sqs_create_queue,70 sqs_queue_arn,71 lambda_su_role,72 snapshot,73 cleanups,74):75 """This test verifies a basic SQS retry scenario. The lambda uses an SQS queue as event source, and we are76 testing whether the lambda automatically retries after the visibility timeout expires, and, after the retry,77 properly deletes the message from the queue."""78 # create queue used in the lambda to send events to (to verify lambda was invoked)79 destination_queue_name = f"destination-queue-{short_uid()}"80 destination_url = sqs_create_queue(QueueName=destination_queue_name)81 snapshot.match(82 "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)83 )84 # timeout in seconds, used for both the lambda and the queue visibility timeout85 retry_timeout = 586 # set up lambda function87 function_name = f"failing-lambda-{short_uid()}"88 create_lambda_function(89 func_name=function_name,90 handler_file=LAMBDA_SQS_INTEGRATION_FILE,91 runtime=LAMBDA_RUNTIME_PYTHON38,92 role=lambda_su_role,93 timeout=retry_timeout, # timeout needs to be <= than visibility timeout94 )95 # create event source queue96 event_source_url = sqs_create_queue(97 QueueName=f"source-queue-{short_uid()}",98 Attributes={99 # the visibility timeout is implicitly also the time between retries100 "VisibilityTimeout": str(retry_timeout),101 },102 )103 event_source_arn = sqs_queue_arn(event_source_url)104 # wire everything with the event source mapping105 response = lambda_client.create_event_source_mapping(106 EventSourceArn=event_source_arn,107 FunctionName=function_name,108 BatchSize=1,109 )110 mapping_uuid = response["UUID"]111 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))112 _await_event_source_mapping_enabled(lambda_client, mapping_uuid)113 response = lambda_client.get_event_source_mapping(UUID=mapping_uuid)114 snapshot.match("event_source_mapping", response)115 # trigger lambda with a message and pass the result destination url. the event format is expected by the116 # lambda_sqs_integration.py lambda.117 event = {"destination": destination_url, "fail_attempts": 1}118 sqs_client.send_message(119 QueueUrl=event_source_url,120 MessageBody=json.dumps(event),121 )122 # now wait for the first invocation result which is expected to fail123 then = time.time()124 first_response = sqs_client.receive_message(125 QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1126 )127 assert "Messages" in first_response128 snapshot.match("first_attempt", first_response)129 # and then after a few seconds (at least the visibility timeout), we expect the130 second_response = sqs_client.receive_message(131 QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1132 )133 assert "Messages" in second_response134 snapshot.match("second_attempt", second_response)135 # check that it took at least the retry timeout between the first and second attempt136 assert time.time() >= then + retry_timeout137 # assert message is removed from the queue138 assert "Messages" not in sqs_client.receive_message(139 QueueUrl=destination_url, WaitTimeSeconds=retry_timeout + 1, MaxNumberOfMessages=1140 )141@pytest.mark.skip_snapshot_verify(142 paths=[143 "$..ParallelizationFactor",144 "$..LastProcessingResult",145 "$..Topics",146 "$..MaximumRetryAttempts",147 "$..MaximumBatchingWindowInSeconds",148 "$..FunctionResponseTypes",149 "$..StartingPosition",150 "$..StateTransitionReason",151 ]152)153@pytest.mark.aws_validated154def test_redrive_policy_with_failing_lambda(155 create_lambda_function,156 lambda_client,157 sqs_client,158 sqs_create_queue,159 sqs_queue_arn,160 lambda_su_role,161 snapshot,162 cleanups,163):164 """This test verifies that SQS moves a message that is passed to a failing lambda to a DLQ according to the165 redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event166 source mapping should then automatically move the message to the DLQ, but not earlier (see167 https://github.com/localstack/localstack/issues/5283)"""168 # create queue used in the lambda to send events to (to verify lambda was invoked)169 destination_queue_name = f"destination-queue-{short_uid()}"170 destination_url = sqs_create_queue(QueueName=destination_queue_name)171 snapshot.match(172 "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)173 )174 # timeout in seconds, used for both the lambda and the queue visibility timeout175 retry_timeout = 5176 retries = 2177 # set up lambda function178 function_name = f"failing-lambda-{short_uid()}"179 create_lambda_function(180 func_name=function_name,181 handler_file=LAMBDA_SQS_INTEGRATION_FILE,182 runtime=LAMBDA_RUNTIME_PYTHON38,183 role=lambda_su_role,184 timeout=retry_timeout, # timeout needs to be <= than visibility timeout185 )186 # create dlq for event source queue187 event_dlq_url = sqs_create_queue(QueueName=f"event-dlq-{short_uid()}")188 event_dlq_arn = sqs_queue_arn(event_dlq_url)189 # create event source queue190 event_source_url = sqs_create_queue(191 QueueName=f"source-queue-{short_uid()}",192 Attributes={193 # the visibility timeout is implicitly also the time between retries194 "VisibilityTimeout": str(retry_timeout),195 "RedrivePolicy": json.dumps(196 {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}197 ),198 },199 )200 event_source_arn = sqs_queue_arn(event_source_url)201 # wire everything with the event source mapping202 mapping_uuid = lambda_client.create_event_source_mapping(203 EventSourceArn=event_source_arn,204 FunctionName=function_name,205 BatchSize=1,206 )["UUID"]207 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))208 _await_event_source_mapping_enabled(lambda_client, mapping_uuid)209 # trigger lambda with a message and pass the result destination url. the event format is expected by the210 # lambda_sqs_integration.py lambda.211 event = {"destination": destination_url, "fail_attempts": retries}212 sqs_client.send_message(213 QueueUrl=event_source_url,214 MessageBody=json.dumps(event),215 )216 # now wait for the first invocation result which is expected to fail217 first_response = sqs_client.receive_message(218 QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1219 )220 assert "Messages" in first_response221 snapshot.match("first_attempt", first_response)222 # check that the DLQ is empty223 assert "Messages" not in sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=1)224 # the second is also expected to fail, and then the message moves into the DLQ225 second_response = sqs_client.receive_message(226 QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1227 )228 assert "Messages" in second_response229 snapshot.match("second_attempt", second_response)230 # now check that the event messages was placed in the DLQ231 dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=15)232 assert "Messages" in dlq_response233 snapshot.match("dlq_response", dlq_response)234@pytest.mark.aws_validated235def test_sqs_queue_as_lambda_dead_letter_queue(236 sqs_client,237 lambda_client,238 lambda_su_role,239 create_lambda_function,240 sqs_create_queue,241 sqs_queue_arn,242 snapshot,243):244 snapshot.add_transformer(245 [246 # MessageAttributes contain the request id, messes the hash247 snapshot.transform.key_value(248 "MD5OfMessageAttributes",249 value_replacement="<md5-hash>",250 reference_replacement=False,251 ),252 snapshot.transform.jsonpath(253 "$..Messages..MessageAttributes.RequestID.StringValue", "request-id"254 ),255 ]256 )257 dlq_queue_url = sqs_create_queue()258 dlq_queue_arn = sqs_queue_arn(dlq_queue_url)259 function_name = f"lambda-fn-{short_uid()}"260 lambda_creation_response = create_lambda_function(261 func_name=function_name,262 handler_file=TEST_LAMBDA_PYTHON,263 runtime=LAMBDA_RUNTIME_PYTHON37,264 role=lambda_su_role,265 DeadLetterConfig={"TargetArn": dlq_queue_arn},266 )267 snapshot.match(268 "lambda-response-dlq-config",269 lambda_creation_response["CreateFunctionResponse"]["DeadLetterConfig"],270 )271 # invoke Lambda, triggering an error272 payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}273 lambda_client.invoke(274 FunctionName=function_name,275 Payload=json.dumps(payload),276 InvocationType="Event",277 )278 def receive_dlq():279 result = sqs_client.receive_message(280 QueueUrl=dlq_queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0281 )282 assert len(result["Messages"]) > 0283 return result284 # check that the SQS queue used as DLQ received the error from the lambda285 # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here286 # reduced retries when using localstack to avoid tests flaking287 retries = 120 if is_aws_cloud() else 3288 messages = retry(receive_dlq, retries=retries, sleep=3)289 snapshot.match("messages", messages)290# TODO: flaky against AWS291@pytest.mark.skip_snapshot_verify(292 paths=[293 # FIXME: we don't seem to be returning SQS FIFO sequence numbers correctly294 "$..SequenceNumber",295 # no idea why this one fails296 "$..receiptHandle",297 # matching these attributes doesn't work well because of the dynamic nature of messages298 "$..md5OfBody",299 "$..MD5OfMessageBody",300 # FIXME: this is most of the event source mapping unfortunately301 "$..create_event_source_mapping.ParallelizationFactor",302 "$..create_event_source_mapping.LastProcessingResult",303 "$..create_event_source_mapping.Topics",304 "$..create_event_source_mapping.MaximumRetryAttempts",305 "$..create_event_source_mapping.MaximumBatchingWindowInSeconds",306 "$..create_event_source_mapping.FunctionResponseTypes",307 "$..create_event_source_mapping.StartingPosition",308 "$..create_event_source_mapping.StateTransitionReason",309 "$..create_event_source_mapping.State",310 "$..create_event_source_mapping.ResponseMetadata",311 ]312)313@pytest.mark.aws_validated314def test_report_batch_item_failures(315 create_lambda_function,316 lambda_client,317 sqs_client,318 sqs_create_queue,319 sqs_queue_arn,320 lambda_su_role,321 snapshot,322 cleanups,323):324 """This test verifies the SQS Lambda integration feature Reporting batch item failures325 redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event326 source mapping should then automatically move the message to the DQL, but not earlier (see327 https://github.com/localstack/localstack/issues/5283)"""328 # create queue used in the lambda to send invocation results to (to verify lambda was invoked)329 destination_queue_name = f"destination-queue-{short_uid()}"330 destination_url = sqs_create_queue(QueueName=destination_queue_name)331 snapshot.match(332 "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)333 )334 # timeout in seconds, used for both the lambda and the queue visibility timeout.335 # increase to 10 if testing against AWS fails.336 retry_timeout = 8337 retries = 2338 # set up lambda function339 function_name = f"failing-lambda-{short_uid()}"340 create_lambda_function(341 func_name=function_name,342 handler_file=LAMBDA_SQS_BATCH_ITEM_FAILURE_FILE,343 runtime=LAMBDA_RUNTIME_PYTHON38,344 role=lambda_su_role,345 timeout=retry_timeout, # timeout needs to be <= than visibility timeout346 envvars={"DESTINATION_QUEUE_URL": destination_url},347 )348 # create dlq for event source queue349 event_dlq_url = sqs_create_queue(350 QueueName=f"event-dlq-{short_uid()}.fifo", Attributes={"FifoQueue": "true"}351 )352 event_dlq_arn = sqs_queue_arn(event_dlq_url)353 # create event source queue354 # we use a FIFO queue to be sure the lambda is invoked in a deterministic way355 event_source_url = sqs_create_queue(356 QueueName=f"source-queue-{short_uid()}.fifo",357 Attributes={358 "FifoQueue": "true",359 # the visibility timeout is implicitly also the time between retries360 "VisibilityTimeout": str(retry_timeout),361 "RedrivePolicy": json.dumps(362 {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}363 ),364 },365 )366 event_source_arn = sqs_queue_arn(event_source_url)367 # put a batch in the queue. the event format is expected by the lambda_sqs_batch_item_failure.py lambda.368 # we add the batch before the event_source_mapping to be sure that the entire batch is sent to the first invocation.369 # message 1 succeeds immediately370 # message 2 and 3 succeeds after one retry371 # message 4 fails after 2 retries and lands in the DLQ372 response = sqs_client.send_message_batch(373 QueueUrl=event_source_url,374 Entries=[375 {376 "Id": "message-1",377 "MessageBody": json.dumps({"message": 1, "fail_attempts": 0}),378 "MessageGroupId": "1",379 "MessageDeduplicationId": "dedup-1",380 },381 {382 "Id": "message-2",383 "MessageBody": json.dumps({"message": 2, "fail_attempts": 1}),384 "MessageGroupId": "1",385 "MessageDeduplicationId": "dedup-2",386 },387 {388 "Id": "message-3",389 "MessageBody": json.dumps({"message": 3, "fail_attempts": 1}),390 "MessageGroupId": "1",391 "MessageDeduplicationId": "dedup-3",392 },393 {394 "Id": "message-4",395 "MessageBody": json.dumps({"message": 4, "fail_attempts": retries}),396 "MessageGroupId": "1",397 "MessageDeduplicationId": "dedup-4",398 },399 ],400 )401 # sort so snapshotting works402 response["Successful"].sort(key=lambda r: r["Id"])403 snapshot.match("send_message_batch", response)404 # wait for all items to appear in the queue405 _await_queue_size(sqs_client, event_source_url, qsize=4, retries=30)406 # wire everything with the event source mapping407 response = lambda_client.create_event_source_mapping(408 EventSourceArn=event_source_arn,409 FunctionName=function_name,410 BatchSize=10,411 MaximumBatchingWindowInSeconds=0,412 FunctionResponseTypes=["ReportBatchItemFailures"],413 )414 snapshot.match("create_event_source_mapping", response)415 mapping_uuid = response["UUID"]416 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))417 _await_event_source_mapping_enabled(lambda_client, mapping_uuid)418 # now wait for the first invocation result which is expected to have processed message 1 we wait half the retry419 # interval to wait long enough for the message to appear, but short enough to check that the DLQ is empty after420 # the first attempt.421 first_invocation = sqs_client.receive_message(422 QueueUrl=destination_url, WaitTimeSeconds=int(retry_timeout / 2), MaxNumberOfMessages=1423 )424 assert "Messages" in first_invocation425 # hack to make snapshot work426 first_invocation["Messages"][0]["Body"] = json.loads(first_invocation["Messages"][0]["Body"])427 first_invocation["Messages"][0]["Body"]["event"]["Records"].sort(428 key=lambda record: json.loads(record["body"])["message"]429 )430 snapshot.match("first_invocation", first_invocation)431 # check that the DQL is empty432 assert "Messages" not in sqs_client.receive_message(QueueUrl=event_dlq_url)433 # now wait for the second invocation result which is expected to have processed message 2 and 3434 second_invocation = sqs_client.receive_message(435 QueueUrl=destination_url, WaitTimeSeconds=retry_timeout + 2, MaxNumberOfMessages=1436 )437 assert "Messages" in second_invocation438 # hack to make snapshot work439 second_invocation["Messages"][0]["Body"] = json.loads(second_invocation["Messages"][0]["Body"])440 second_invocation["Messages"][0]["Body"]["event"]["Records"].sort(441 key=lambda record: json.loads(record["body"])["message"]442 )443 snapshot.match("second_invocation", second_invocation)444 # here we make sure there's actually not a third attempt, since our retries = 2445 third_attempt = sqs_client.receive_message(446 QueueUrl=destination_url, WaitTimeSeconds=1, MaxNumberOfMessages=1447 )448 assert "Messages" not in third_attempt449 # now check that message 4 was placed in the DLQ450 dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=15)451 assert "Messages" in dlq_response452 snapshot.match("dlq_response", dlq_response)453@pytest.mark.aws_validated454def test_report_batch_item_failures_on_lambda_error(455 create_lambda_function,456 lambda_client,457 sqs_client,458 sqs_create_queue,459 sqs_queue_arn,460 lambda_su_role,461 snapshot,462 cleanups,463):464 # timeout in seconds, used for both the lambda and the queue visibility timeout465 retry_timeout = 2466 retries = 2467 # set up lambda function468 function_name = f"failing-lambda-{short_uid()}"469 create_lambda_function(470 func_name=function_name,471 handler_file=LAMBDA_SQS_INTEGRATION_FILE,472 runtime=LAMBDA_RUNTIME_PYTHON38,473 role=lambda_su_role,474 timeout=retry_timeout, # timeout needs to be <= than visibility timeout475 )476 # create dlq for event source queue477 event_dlq_url = sqs_create_queue(QueueName=f"event-dlq-{short_uid()}")478 event_dlq_arn = sqs_queue_arn(event_dlq_url)479 # create event source queue480 event_source_url = sqs_create_queue(481 QueueName=f"source-queue-{short_uid()}",482 Attributes={483 # the visibility timeout is implicitly also the time between retries484 "VisibilityTimeout": str(retry_timeout),485 "RedrivePolicy": json.dumps(486 {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}487 ),488 },489 )490 event_source_arn = sqs_queue_arn(event_source_url)491 # send a batch with a message to the queue that provokes a lambda failure (the lambda tries to parse the body as492 # JSON, but if it's not a json document, it fails). consequently, the entire batch should be discarded493 sqs_client.send_message_batch(494 QueueUrl=event_source_url,495 Entries=[496 {497 "Id": "message-1",498 "MessageBody": "{not a json body",499 },500 {501 # this one's ok, but will be sent to the DLQ nonetheless because it's part of this bad batch.502 "Id": "message-2",503 "MessageBody": json.dumps({"message": 2, "fail_attempts": 0}),504 },505 ],506 )507 _await_queue_size(sqs_client, event_source_url, qsize=2)508 # wire everything with the event source mapping509 mapping_uuid = lambda_client.create_event_source_mapping(510 EventSourceArn=event_source_arn,511 FunctionName=function_name,512 FunctionResponseTypes=["ReportBatchItemFailures"],513 )["UUID"]514 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))515 _await_event_source_mapping_enabled(lambda_client, mapping_uuid)516 # the message should arrive in the DLQ after 2 retries + some time for processing517 messages = []518 def _collect_message():519 dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url)520 messages.extend(dlq_response.get("Messages", []))521 assert len(messages) >= 2...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful