Best Python code snippet using localstack_python
test_lambda_integration_sqs.py
Source:test_lambda_integration_sqs.py  
...23LAMBDA_SQS_INTEGRATION_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_sqs_integration.py")24LAMBDA_SQS_BATCH_ITEM_FAILURE_FILE = os.path.join(25    THIS_FOLDER, "functions", "lambda_sqs_batch_item_failure.py"26)27def _await_queue_size(sqs_client, queue_url: str, qsize: int, retries=10, sleep=1):28    # wait for all items to appear in the queue29    def _verify_event_queue_size():30        attr = "ApproximateNumberOfMessages"31        _approx = int(32            sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=[attr])[33                "Attributes"34            ][attr]35        )36        assert _approx >= qsize37    retry(_verify_event_queue_size, retries=retries, sleep=sleep)38@pytest.fixture(autouse=True)39def _snapshot_transformers(snapshot):40    # manual transformers since we are passing SQS attributes through lambdas and back again41    snapshot.add_transformer(snapshot.transform.key_value("QueueUrl"))42    snapshot.add_transformer(snapshot.transform.key_value("ReceiptHandle"))43    snapshot.add_transformer(snapshot.transform.key_value("SenderId", reference_replacement=False))44    snapshot.add_transformer(snapshot.transform.key_value("SequenceNumber"))45    snapshot.add_transformer(snapshot.transform.resource_name())46    # body contains dynamic attributes so md5 hash changes47    snapshot.add_transformer(snapshot.transform.key_value("MD5OfBody"))48    # lower-case for when messages are rendered in lambdas49    snapshot.add_transformer(snapshot.transform.key_value("receiptHandle"))50    snapshot.add_transformer(snapshot.transform.key_value("md5OfBody"))51@pytest.mark.skip_snapshot_verify(52    paths=[53        # FIXME: this is most of the event source mapping unfortunately54        "$..ParallelizationFactor",55        "$..LastProcessingResult",56        "$..Topics",57        "$..MaximumRetryAttempts",58        "$..MaximumBatchingWindowInSeconds",59        "$..FunctionResponseTypes",60        "$..StartingPosition",61        "$..StateTransitionReason",62    ]63)64@pytest.mark.aws_validated65def test_failing_lambda_retries_after_visibility_timeout(66    create_lambda_function,67    lambda_client,68    sqs_client,69    sqs_create_queue,70    sqs_queue_arn,71    lambda_su_role,72    snapshot,73    cleanups,74):75    """This test verifies a basic SQS retry scenario. The lambda uses an SQS queue as event source, and we are76    testing whether the lambda automatically retries after the visibility timeout expires, and, after the retry,77    properly deletes the message from the queue."""78    # create queue used in the lambda to send events to (to verify lambda was invoked)79    destination_queue_name = f"destination-queue-{short_uid()}"80    destination_url = sqs_create_queue(QueueName=destination_queue_name)81    snapshot.match(82        "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)83    )84    # timeout in seconds, used for both the lambda and the queue visibility timeout85    retry_timeout = 586    # set up lambda function87    function_name = f"failing-lambda-{short_uid()}"88    create_lambda_function(89        func_name=function_name,90        handler_file=LAMBDA_SQS_INTEGRATION_FILE,91        runtime=LAMBDA_RUNTIME_PYTHON38,92        role=lambda_su_role,93        timeout=retry_timeout,  # timeout needs to be <= than visibility timeout94    )95    # create event source queue96    event_source_url = sqs_create_queue(97        QueueName=f"source-queue-{short_uid()}",98        Attributes={99            # the visibility timeout is implicitly also the time between retries100            "VisibilityTimeout": str(retry_timeout),101        },102    )103    event_source_arn = sqs_queue_arn(event_source_url)104    # wire everything with the event source mapping105    response = lambda_client.create_event_source_mapping(106        EventSourceArn=event_source_arn,107        FunctionName=function_name,108        BatchSize=1,109    )110    mapping_uuid = response["UUID"]111    cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))112    _await_event_source_mapping_enabled(lambda_client, mapping_uuid)113    response = lambda_client.get_event_source_mapping(UUID=mapping_uuid)114    snapshot.match("event_source_mapping", response)115    # trigger lambda with a message and pass the result destination url. the event format is expected by the116    # lambda_sqs_integration.py lambda.117    event = {"destination": destination_url, "fail_attempts": 1}118    sqs_client.send_message(119        QueueUrl=event_source_url,120        MessageBody=json.dumps(event),121    )122    # now wait for the first invocation result which is expected to fail123    then = time.time()124    first_response = sqs_client.receive_message(125        QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1126    )127    assert "Messages" in first_response128    snapshot.match("first_attempt", first_response)129    # and then after a few seconds (at least the visibility timeout), we expect the130    second_response = sqs_client.receive_message(131        QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1132    )133    assert "Messages" in second_response134    snapshot.match("second_attempt", second_response)135    # check that it took at least the retry timeout between the first and second attempt136    assert time.time() >= then + retry_timeout137    # assert message is removed from the queue138    assert "Messages" not in sqs_client.receive_message(139        QueueUrl=destination_url, WaitTimeSeconds=retry_timeout + 1, MaxNumberOfMessages=1140    )141@pytest.mark.skip_snapshot_verify(142    paths=[143        "$..ParallelizationFactor",144        "$..LastProcessingResult",145        "$..Topics",146        "$..MaximumRetryAttempts",147        "$..MaximumBatchingWindowInSeconds",148        "$..FunctionResponseTypes",149        "$..StartingPosition",150        "$..StateTransitionReason",151    ]152)153@pytest.mark.aws_validated154def test_redrive_policy_with_failing_lambda(155    create_lambda_function,156    lambda_client,157    sqs_client,158    sqs_create_queue,159    sqs_queue_arn,160    lambda_su_role,161    snapshot,162    cleanups,163):164    """This test verifies that SQS moves a message that is passed to a failing lambda to a DLQ according to the165    redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event166    source mapping should then automatically move the message to the DLQ, but not earlier (see167    https://github.com/localstack/localstack/issues/5283)"""168    # create queue used in the lambda to send events to (to verify lambda was invoked)169    destination_queue_name = f"destination-queue-{short_uid()}"170    destination_url = sqs_create_queue(QueueName=destination_queue_name)171    snapshot.match(172        "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)173    )174    # timeout in seconds, used for both the lambda and the queue visibility timeout175    retry_timeout = 5176    retries = 2177    # set up lambda function178    function_name = f"failing-lambda-{short_uid()}"179    create_lambda_function(180        func_name=function_name,181        handler_file=LAMBDA_SQS_INTEGRATION_FILE,182        runtime=LAMBDA_RUNTIME_PYTHON38,183        role=lambda_su_role,184        timeout=retry_timeout,  # timeout needs to be <= than visibility timeout185    )186    # create dlq for event source queue187    event_dlq_url = sqs_create_queue(QueueName=f"event-dlq-{short_uid()}")188    event_dlq_arn = sqs_queue_arn(event_dlq_url)189    # create event source queue190    event_source_url = sqs_create_queue(191        QueueName=f"source-queue-{short_uid()}",192        Attributes={193            # the visibility timeout is implicitly also the time between retries194            "VisibilityTimeout": str(retry_timeout),195            "RedrivePolicy": json.dumps(196                {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}197            ),198        },199    )200    event_source_arn = sqs_queue_arn(event_source_url)201    # wire everything with the event source mapping202    mapping_uuid = lambda_client.create_event_source_mapping(203        EventSourceArn=event_source_arn,204        FunctionName=function_name,205        BatchSize=1,206    )["UUID"]207    cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))208    _await_event_source_mapping_enabled(lambda_client, mapping_uuid)209    # trigger lambda with a message and pass the result destination url. the event format is expected by the210    # lambda_sqs_integration.py lambda.211    event = {"destination": destination_url, "fail_attempts": retries}212    sqs_client.send_message(213        QueueUrl=event_source_url,214        MessageBody=json.dumps(event),215    )216    # now wait for the first invocation result which is expected to fail217    first_response = sqs_client.receive_message(218        QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1219    )220    assert "Messages" in first_response221    snapshot.match("first_attempt", first_response)222    # check that the DLQ is empty223    assert "Messages" not in sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=1)224    # the second is also expected to fail, and then the message moves into the DLQ225    second_response = sqs_client.receive_message(226        QueueUrl=destination_url, WaitTimeSeconds=15, MaxNumberOfMessages=1227    )228    assert "Messages" in second_response229    snapshot.match("second_attempt", second_response)230    # now check that the event messages was placed in the DLQ231    dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=15)232    assert "Messages" in dlq_response233    snapshot.match("dlq_response", dlq_response)234@pytest.mark.aws_validated235def test_sqs_queue_as_lambda_dead_letter_queue(236    sqs_client,237    lambda_client,238    lambda_su_role,239    create_lambda_function,240    sqs_create_queue,241    sqs_queue_arn,242    snapshot,243):244    snapshot.add_transformer(245        [246            # MessageAttributes contain the request id, messes the hash247            snapshot.transform.key_value(248                "MD5OfMessageAttributes",249                value_replacement="<md5-hash>",250                reference_replacement=False,251            ),252            snapshot.transform.jsonpath(253                "$..Messages..MessageAttributes.RequestID.StringValue", "request-id"254            ),255        ]256    )257    dlq_queue_url = sqs_create_queue()258    dlq_queue_arn = sqs_queue_arn(dlq_queue_url)259    function_name = f"lambda-fn-{short_uid()}"260    lambda_creation_response = create_lambda_function(261        func_name=function_name,262        handler_file=TEST_LAMBDA_PYTHON,263        runtime=LAMBDA_RUNTIME_PYTHON37,264        role=lambda_su_role,265        DeadLetterConfig={"TargetArn": dlq_queue_arn},266    )267    snapshot.match(268        "lambda-response-dlq-config",269        lambda_creation_response["CreateFunctionResponse"]["DeadLetterConfig"],270    )271    # invoke Lambda, triggering an error272    payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}273    lambda_client.invoke(274        FunctionName=function_name,275        Payload=json.dumps(payload),276        InvocationType="Event",277    )278    def receive_dlq():279        result = sqs_client.receive_message(280            QueueUrl=dlq_queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0281        )282        assert len(result["Messages"]) > 0283        return result284    # check that the SQS queue used as DLQ received the error from the lambda285    # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here286    # reduced retries when using localstack to avoid tests flaking287    retries = 120 if is_aws_cloud() else 3288    messages = retry(receive_dlq, retries=retries, sleep=3)289    snapshot.match("messages", messages)290# TODO: flaky against AWS291@pytest.mark.skip_snapshot_verify(292    paths=[293        # FIXME: we don't seem to be returning SQS FIFO sequence numbers correctly294        "$..SequenceNumber",295        # no idea why this one fails296        "$..receiptHandle",297        # matching these attributes doesn't work well because of the dynamic nature of messages298        "$..md5OfBody",299        "$..MD5OfMessageBody",300        # FIXME: this is most of the event source mapping unfortunately301        "$..create_event_source_mapping.ParallelizationFactor",302        "$..create_event_source_mapping.LastProcessingResult",303        "$..create_event_source_mapping.Topics",304        "$..create_event_source_mapping.MaximumRetryAttempts",305        "$..create_event_source_mapping.MaximumBatchingWindowInSeconds",306        "$..create_event_source_mapping.FunctionResponseTypes",307        "$..create_event_source_mapping.StartingPosition",308        "$..create_event_source_mapping.StateTransitionReason",309        "$..create_event_source_mapping.State",310        "$..create_event_source_mapping.ResponseMetadata",311    ]312)313@pytest.mark.aws_validated314def test_report_batch_item_failures(315    create_lambda_function,316    lambda_client,317    sqs_client,318    sqs_create_queue,319    sqs_queue_arn,320    lambda_su_role,321    snapshot,322    cleanups,323):324    """This test verifies the SQS Lambda integration feature Reporting batch item failures325    redrive policy, and the lambda is invoked the correct number of times. The test retries twice and the event326    source mapping should then automatically move the message to the DQL, but not earlier (see327    https://github.com/localstack/localstack/issues/5283)"""328    # create queue used in the lambda to send invocation results to (to verify lambda was invoked)329    destination_queue_name = f"destination-queue-{short_uid()}"330    destination_url = sqs_create_queue(QueueName=destination_queue_name)331    snapshot.match(332        "get_destination_queue_url", sqs_client.get_queue_url(QueueName=destination_queue_name)333    )334    # timeout in seconds, used for both the lambda and the queue visibility timeout.335    # increase to 10 if testing against AWS fails.336    retry_timeout = 8337    retries = 2338    # set up lambda function339    function_name = f"failing-lambda-{short_uid()}"340    create_lambda_function(341        func_name=function_name,342        handler_file=LAMBDA_SQS_BATCH_ITEM_FAILURE_FILE,343        runtime=LAMBDA_RUNTIME_PYTHON38,344        role=lambda_su_role,345        timeout=retry_timeout,  # timeout needs to be <= than visibility timeout346        envvars={"DESTINATION_QUEUE_URL": destination_url},347    )348    # create dlq for event source queue349    event_dlq_url = sqs_create_queue(350        QueueName=f"event-dlq-{short_uid()}.fifo", Attributes={"FifoQueue": "true"}351    )352    event_dlq_arn = sqs_queue_arn(event_dlq_url)353    # create event source queue354    # we use a FIFO queue to be sure the lambda is invoked in a deterministic way355    event_source_url = sqs_create_queue(356        QueueName=f"source-queue-{short_uid()}.fifo",357        Attributes={358            "FifoQueue": "true",359            # the visibility timeout is implicitly also the time between retries360            "VisibilityTimeout": str(retry_timeout),361            "RedrivePolicy": json.dumps(362                {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}363            ),364        },365    )366    event_source_arn = sqs_queue_arn(event_source_url)367    # put a batch in the queue. the event format is expected by the lambda_sqs_batch_item_failure.py lambda.368    # we add the batch before the event_source_mapping to be sure that the entire batch is sent to the first invocation.369    # message 1 succeeds immediately370    # message 2 and 3 succeeds after one retry371    # message 4 fails after 2 retries and lands in the DLQ372    response = sqs_client.send_message_batch(373        QueueUrl=event_source_url,374        Entries=[375            {376                "Id": "message-1",377                "MessageBody": json.dumps({"message": 1, "fail_attempts": 0}),378                "MessageGroupId": "1",379                "MessageDeduplicationId": "dedup-1",380            },381            {382                "Id": "message-2",383                "MessageBody": json.dumps({"message": 2, "fail_attempts": 1}),384                "MessageGroupId": "1",385                "MessageDeduplicationId": "dedup-2",386            },387            {388                "Id": "message-3",389                "MessageBody": json.dumps({"message": 3, "fail_attempts": 1}),390                "MessageGroupId": "1",391                "MessageDeduplicationId": "dedup-3",392            },393            {394                "Id": "message-4",395                "MessageBody": json.dumps({"message": 4, "fail_attempts": retries}),396                "MessageGroupId": "1",397                "MessageDeduplicationId": "dedup-4",398            },399        ],400    )401    # sort so snapshotting works402    response["Successful"].sort(key=lambda r: r["Id"])403    snapshot.match("send_message_batch", response)404    # wait for all items to appear in the queue405    _await_queue_size(sqs_client, event_source_url, qsize=4, retries=30)406    # wire everything with the event source mapping407    response = lambda_client.create_event_source_mapping(408        EventSourceArn=event_source_arn,409        FunctionName=function_name,410        BatchSize=10,411        MaximumBatchingWindowInSeconds=0,412        FunctionResponseTypes=["ReportBatchItemFailures"],413    )414    snapshot.match("create_event_source_mapping", response)415    mapping_uuid = response["UUID"]416    cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))417    _await_event_source_mapping_enabled(lambda_client, mapping_uuid)418    # now wait for the first invocation result which is expected to have processed message 1 we wait half the retry419    # interval to wait long enough for the message to appear, but short enough to check that the DLQ is empty after420    # the first attempt.421    first_invocation = sqs_client.receive_message(422        QueueUrl=destination_url, WaitTimeSeconds=int(retry_timeout / 2), MaxNumberOfMessages=1423    )424    assert "Messages" in first_invocation425    # hack to make snapshot work426    first_invocation["Messages"][0]["Body"] = json.loads(first_invocation["Messages"][0]["Body"])427    first_invocation["Messages"][0]["Body"]["event"]["Records"].sort(428        key=lambda record: json.loads(record["body"])["message"]429    )430    snapshot.match("first_invocation", first_invocation)431    # check that the DQL is empty432    assert "Messages" not in sqs_client.receive_message(QueueUrl=event_dlq_url)433    # now wait for the second invocation result which is expected to have processed message 2 and 3434    second_invocation = sqs_client.receive_message(435        QueueUrl=destination_url, WaitTimeSeconds=retry_timeout + 2, MaxNumberOfMessages=1436    )437    assert "Messages" in second_invocation438    # hack to make snapshot work439    second_invocation["Messages"][0]["Body"] = json.loads(second_invocation["Messages"][0]["Body"])440    second_invocation["Messages"][0]["Body"]["event"]["Records"].sort(441        key=lambda record: json.loads(record["body"])["message"]442    )443    snapshot.match("second_invocation", second_invocation)444    # here we make sure there's actually not a third attempt, since our retries = 2445    third_attempt = sqs_client.receive_message(446        QueueUrl=destination_url, WaitTimeSeconds=1, MaxNumberOfMessages=1447    )448    assert "Messages" not in third_attempt449    # now check that message 4 was placed in the DLQ450    dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url, WaitTimeSeconds=15)451    assert "Messages" in dlq_response452    snapshot.match("dlq_response", dlq_response)453@pytest.mark.aws_validated454def test_report_batch_item_failures_on_lambda_error(455    create_lambda_function,456    lambda_client,457    sqs_client,458    sqs_create_queue,459    sqs_queue_arn,460    lambda_su_role,461    snapshot,462    cleanups,463):464    # timeout in seconds, used for both the lambda and the queue visibility timeout465    retry_timeout = 2466    retries = 2467    # set up lambda function468    function_name = f"failing-lambda-{short_uid()}"469    create_lambda_function(470        func_name=function_name,471        handler_file=LAMBDA_SQS_INTEGRATION_FILE,472        runtime=LAMBDA_RUNTIME_PYTHON38,473        role=lambda_su_role,474        timeout=retry_timeout,  # timeout needs to be <= than visibility timeout475    )476    # create dlq for event source queue477    event_dlq_url = sqs_create_queue(QueueName=f"event-dlq-{short_uid()}")478    event_dlq_arn = sqs_queue_arn(event_dlq_url)479    # create event source queue480    event_source_url = sqs_create_queue(481        QueueName=f"source-queue-{short_uid()}",482        Attributes={483            # the visibility timeout is implicitly also the time between retries484            "VisibilityTimeout": str(retry_timeout),485            "RedrivePolicy": json.dumps(486                {"deadLetterTargetArn": event_dlq_arn, "maxReceiveCount": retries}487            ),488        },489    )490    event_source_arn = sqs_queue_arn(event_source_url)491    # send a batch with a message to the queue that provokes a lambda failure (the lambda tries to parse the body as492    # JSON, but if it's not a json document, it fails). consequently, the entire batch should be discarded493    sqs_client.send_message_batch(494        QueueUrl=event_source_url,495        Entries=[496            {497                "Id": "message-1",498                "MessageBody": "{not a json body",499            },500            {501                # this one's ok, but will be sent to the DLQ nonetheless because it's part of this bad batch.502                "Id": "message-2",503                "MessageBody": json.dumps({"message": 2, "fail_attempts": 0}),504            },505        ],506    )507    _await_queue_size(sqs_client, event_source_url, qsize=2)508    # wire everything with the event source mapping509    mapping_uuid = lambda_client.create_event_source_mapping(510        EventSourceArn=event_source_arn,511        FunctionName=function_name,512        FunctionResponseTypes=["ReportBatchItemFailures"],513    )["UUID"]514    cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=mapping_uuid))515    _await_event_source_mapping_enabled(lambda_client, mapping_uuid)516    # the message should arrive in the DLQ after 2 retries + some time for processing517    messages = []518    def _collect_message():519        dlq_response = sqs_client.receive_message(QueueUrl=event_dlq_url)520        messages.extend(dlq_response.get("Messages", []))521        assert len(messages) >= 2...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
