Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py  
...97        sqs_client,98        sqs_queue_arn,99        sns_subscription,100    ):101        topic_arn = sns_create_topic()["TopicArn"]102        queue_url = sqs_create_queue()103        queue_arn = sqs_queue_arn(queue_url)104        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)105        # publish message to SNS, receive it from SQS, assert that messages are equal106        message = 'ö§a1"_!?,. £$-'107        sns_client.publish(TopicArn=topic_arn, Message=message)108        def check_message():109            msgs = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)110            msg_received = msgs["Messages"][0]111            msg_received = json.loads(to_str(msg_received["Body"]))112            msg_received = msg_received["Message"]113            assert message == msg_received114        retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)115    def test_subscribe_http_endpoint(self, sns_client, sns_create_topic, sns_subscription):116        topic_arn = sns_create_topic()["TopicArn"]117        # create HTTP endpoint and connect it to SNS topic118        class MyUpdateListener(ProxyListener):119            def forward_request(self, method, path, data, headers):120                records.append((json.loads(to_str(data)), headers))121                return 200122        records = []123        local_port = get_free_tcp_port()124        proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())125        wait_for_port_open(local_port)126        queue_arn = "%s://localhost:%s" % (get_service_protocol(), local_port)127        sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=queue_arn)128        def received():129            assert records[0][0]["Type"] == "SubscriptionConfirmation"130            assert records[0][1]["x-amz-sns-message-type"] == "SubscriptionConfirmation"131            token = records[0][0]["Token"]132            subscribe_url = records[0][0]["SubscribeURL"]133            assert subscribe_url == (134                f"{external_service_url('sns')}/?Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"135            )136            assert "Signature" in records[0][0]137            assert "SigningCertURL" in records[0][0]138        retry(received, retries=5, sleep=1)139        proxy.stop()140    def test_subscribe_with_invalid_protocol(self, sns_client, sns_create_topic, sns_subscription):141        topic_arn = sns_create_topic(Name=TEST_TOPIC_NAME_2)["TopicArn"]142        with pytest.raises(ClientError) as e:143            sns_subscription(144                TopicArn=topic_arn, Protocol="test-protocol", Endpoint="localstack@yopmail.com"145            )146        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400147        assert e.value.response["Error"]["Code"] == "InvalidParameter"148    def test_attribute_raw_subscribe(149        self, sqs_client, sns_client, sns_create_topic, sqs_queue, sqs_queue_arn, sns_subscription150    ):151        topic_arn = sns_create_topic()["TopicArn"]152        # create SNS topic and connect it to an SQS queue153        queue_url = sqs_queue154        queue_arn = sqs_queue_arn(queue_url)155        attributes = {"RawMessageDelivery": "True"}156        sns_subscription(157            TopicArn=topic_arn,158            Protocol="sqs",159            Endpoint=queue_arn,160            Attributes=attributes,161        )162        # fetch subscription information163        subscription_list = sns_client.list_subscriptions()164        subscription_arn = ""165        for subscription in subscription_list["Subscriptions"]:166            if subscription["TopicArn"] == topic_arn:167                subscription_arn = subscription["SubscriptionArn"]168        actual_attributes = sns_client.get_subscription_attributes(169            SubscriptionArn=subscription_arn170        )["Attributes"]171        # assert the attributes are well set172        assert actual_attributes["RawMessageDelivery"]173        # publish message to SNS, receive it from SQS, assert that messages are equal and that they are Raw174        message = "This is a test message"175        binary_attribute = b"\x02\x03\x04"176        # extending this test case to test support for binary message attribute data177        # https://github.com/localstack/localstack/issues/2432178        sns_client.publish(179            TopicArn=topic_arn,180            Message=message,181            MessageAttributes={"store": {"DataType": "Binary", "BinaryValue": binary_attribute}},182        )183        def check_message():184            msgs = sqs_client.receive_message(185                QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0186            )187            msg_received = msgs["Messages"][0]188            assert message == msg_received["Body"]189            assert binary_attribute == msg_received["MessageAttributes"]["store"]["BinaryValue"]190        retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)191        sns_client.unsubscribe(SubscriptionArn=subscription_arn)192    def test_filter_policy(193        self,194        sqs_create_queue,195        sqs_queue_arn,196        sns_client,197        sns_create_topic,198        sqs_client,199        sns_subscription,200    ):201        # connect SNS topic to an SQS queue202        queue_name = f"queue-{short_uid()}"203        queue_url = sqs_create_queue(QueueName=queue_name)204        queue_arn = sqs_queue_arn(queue_url)205        topic_arn = sns_create_topic()["TopicArn"]206        filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}207        sns_subscription(208            TopicArn=topic_arn,209            Protocol="sqs",210            Endpoint=queue_arn,211            Attributes={"FilterPolicy": json.dumps(filter_policy)},212        )213        # get number of messages214        num_msgs_0 = len(215            sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])216        )217        # publish message that satisfies the filter policy, assert that message is received218        message = "This is a test message"219        message_attributes = {"attr1": {"DataType": "Number", "StringValue": "99"}}220        sns_client.publish(221            TopicArn=topic_arn,222            Message=message,223            MessageAttributes=message_attributes,224        )225        def check_message():226            msgs_1 = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]227            num_msgs_1 = len(msgs_1)228            assert num_msgs_1 == (num_msgs_0 + 1)229            return num_msgs_1230        num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)231        # publish message that does not satisfy the filter policy, assert that message is not received232        message = "This is another test message"233        sns_client.publish(234            TopicArn=topic_arn,235            Message=message,236            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},237        )238        def check_message2():239            num_msgs_2 = len(240                sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]241            )242            assert num_msgs_2 == num_msgs_1243            return num_msgs_2244        retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)245    def test_exists_filter_policy(246        self,247        sqs_create_queue,248        sqs_queue_arn,249        sns_create_topic,250        sns_client,251        sqs_client,252        sns_subscription,253    ):254        # connect SNS topic to an SQS queue255        queue_name = f"queue-{short_uid()}"256        queue_url = sqs_create_queue(QueueName=queue_name)257        queue_arn = sqs_queue_arn(queue_url)258        topic_arn = sns_create_topic()["TopicArn"]259        filter_policy = {"store": [{"exists": True}]}260        def do_subscribe(filter_policy, queue_arn):261            sns_subscription(262                TopicArn=topic_arn,263                Protocol="sqs",264                Endpoint=queue_arn,265                Attributes={"FilterPolicy": json.dumps(filter_policy)},266            )267        do_subscribe(filter_policy, queue_arn)268        # get number of messages269        num_msgs_0 = len(270            sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])271        )272        # publish message that satisfies the filter policy, assert that message is received273        message = f"message-{short_uid()}"274        sns_client.publish(275            TopicArn=topic_arn,276            Message=message,277            MessageAttributes={278                "store": {"DataType": "Number", "StringValue": "99"},279                "def": {"DataType": "Number", "StringValue": "99"},280            },281        )282        def check_message1():283            num_msgs_1 = len(284                sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]285            )286            assert num_msgs_1 == (num_msgs_0 + 1)287            return num_msgs_1288        num_msgs_1 = retry(check_message1, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)289        # publish message that does not satisfy the filter policy, assert that message is not received290        message = f"message-{short_uid()}"291        sns_client.publish(292            TopicArn=topic_arn,293            Message=message,294            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},295        )296        def check_message2():297            num_msgs_2 = len(298                sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]299            )300            assert num_msgs_2 == num_msgs_1301            return num_msgs_2302        retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)303        # test with exist operator set to false.304        queue_arn = aws_stack.sqs_queue_arn(TEST_QUEUE_NAME)305        filter_policy = {"store": [{"exists": False}]}306        do_subscribe(filter_policy, queue_arn)307        # get number of messages308        num_msgs_0 = len(sqs_client.receive_message(QueueUrl=queue_url).get("Messages", []))309        # publish message with the attribute and see if its getting filtered.310        message = f"message-{short_uid()}"311        sns_client.publish(312            TopicArn=topic_arn,313            Message=message,314            MessageAttributes={315                "store": {"DataType": "Number", "StringValue": "99"},316                "def": {"DataType": "Number", "StringValue": "99"},317            },318        )319        def check_message():320            num_msgs_1 = len(321                sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get(322                    "Messages", []323                )324            )325            assert num_msgs_1 == num_msgs_0326            return num_msgs_1327        num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)328        # publish message that without the attribute and see if its getting filtered.329        message = f"message-{short_uid()}"330        sns_client.publish(331            TopicArn=topic_arn,332            Message=message,333            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},334        )335        def check_message3():336            num_msgs_2 = len(337                sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get(338                    "Messages", []339                )340            )341            assert num_msgs_2 == num_msgs_1342            return num_msgs_2343        retry(check_message3, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)344    def test_subscribe_sqs_queue(345        self,346        sqs_create_queue,347        sqs_queue_arn,348        sns_create_topic,349        sns_client,350        sqs_client,351        sns_subscription,352    ):353        # TODO: check with non default external port354        # connect SNS topic to an SQS queue355        queue_name = f"queue-{short_uid()}"356        queue_url = sqs_create_queue(QueueName=queue_name)357        queue_arn = sqs_queue_arn(queue_url)358        topic_arn = sns_create_topic()["TopicArn"]359        # create subscription with filter policy360        filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}361        subscription = sns_subscription(362            TopicArn=topic_arn,363            Protocol="sqs",364            Endpoint=queue_arn,365            Attributes={"FilterPolicy": json.dumps(filter_policy)},366        )367        # publish message that satisfies the filter policy368        message = "This is a test message"369        sns_client.publish(370            TopicArn=topic_arn,371            Message=message,372            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},373        )374        # assert that message is received375        def check_message():376            messages = sqs_client.receive_message(377                QueueUrl=queue_url, VisibilityTimeout=0, MessageAttributeNames=["All"]378            )["Messages"]379            message = messages[0]380            assert message["MessageAttributes"]["attr1"]["StringValue"] == "99.12"381        retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)382        # clean up383        sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])384    def test_subscribe_platform_endpoint(385        self, sns_client, sqs_create_queue, sns_create_topic, sns_subscription386    ):387        sns_backend = SNSBackend.get()388        topic_arn = sns_create_topic()["TopicArn"]389        app_arn = sns_client.create_platform_application(Name="app1", Platform="p1", Attributes={})[390            "PlatformApplicationArn"391        ]392        platform_arn = sns_client.create_platform_endpoint(393            PlatformApplicationArn=app_arn, Token="token_1"394        )["EndpointArn"]395        # create subscription with filter policy396        filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}397        subscription = sns_subscription(398            TopicArn=topic_arn,399            Protocol="application",400            Endpoint=platform_arn,401            Attributes={"FilterPolicy": json.dumps(filter_policy)},402        )403        # publish message that satisfies the filter policy404        message = "This is a test message"405        sns_client.publish(406            TopicArn=topic_arn,407            Message=message,408            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},409        )410        # assert that message has been received411        def check_message():412            assert len(sns_backend.platform_endpoint_messages[platform_arn]) > 0413        retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)414        # clean up415        sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])416        sns_client.delete_endpoint(EndpointArn=platform_arn)417        sns_client.delete_platform_application(PlatformApplicationArn=app_arn)418    def test_unknown_topic_publish(self, sns_client):419        fake_arn = "arn:aws:sns:us-east-1:123456789012:i_dont_exist"420        message = "This is a test message"421        with pytest.raises(ClientError) as e:422            sns_client.publish(TopicArn=fake_arn, Message=message)423        assert e.value.response["Error"]["Code"] == "NotFound"424        assert e.value.response["Error"]["Message"] == "Topic does not exist"425        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404426    def test_publish_sms(self, sns_client):427        response = sns_client.publish(PhoneNumber="+33000000000", Message="This is a SMS")428        assert "MessageId" in response429        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200430    def test_publish_target(self, sns_client):431        response = sns_client.publish(432            TargetArn="arn:aws:sns:us-east-1:000000000000:endpoint/APNS/abcdef/0f7d5971-aa8b-4bd5-b585-0826e9f93a66",433            Message="This is a push notification",434        )435        assert "MessageId" in response436        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200437    def test_tags(self, sns_client, sns_create_topic):438        topic_arn = sns_create_topic()["TopicArn"]439        sns_client.tag_resource(440            ResourceArn=topic_arn,441            Tags=[442                {"Key": "123", "Value": "abc"},443                {"Key": "456", "Value": "def"},444                {"Key": "456", "Value": "def"},445            ],446        )447        tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)448        distinct_tags = [449            tag for idx, tag in enumerate(tags["Tags"]) if tag not in tags["Tags"][:idx]450        ]451        # test for duplicate tags452        assert len(tags["Tags"]) == len(distinct_tags)453        assert len(tags["Tags"]) == 2454        assert tags["Tags"][0]["Key"] == "123"455        assert tags["Tags"][0]["Value"] == "abc"456        assert tags["Tags"][1]["Key"] == "456"457        assert tags["Tags"][1]["Value"] == "def"458        sns_client.untag_resource(ResourceArn=topic_arn, TagKeys=["123"])459        tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)460        assert len(tags["Tags"]) == 1461        assert tags["Tags"][0]["Key"] == "456"462        assert tags["Tags"][0]["Value"] == "def"463        sns_client.tag_resource(ResourceArn=topic_arn, Tags=[{"Key": "456", "Value": "pqr"}])464        tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)465        assert len(tags["Tags"]) == 1466        assert tags["Tags"][0]["Key"] == "456"467        assert tags["Tags"][0]["Value"] == "pqr"468    def test_topic_subscription(self, sns_client, sns_create_topic, sns_subscription):469        topic_arn = sns_create_topic()["TopicArn"]470        subscription = sns_subscription(471            TopicArn=topic_arn,472            Protocol="email",473            Endpoint="localstack@yopmail.com",474        )475        sns_backend = SNSBackend.get()476        def check_subscription():477            subscription_arn = subscription["SubscriptionArn"]478            subscription_obj = sns_backend.subscription_status[subscription_arn]479            assert subscription_obj["Status"] == "Not Subscribed"480            _token = subscription_obj["Token"]481            sns_client.confirm_subscription(TopicArn=topic_arn, Token=_token)482            assert subscription_obj["Status"] == "Subscribed"483        retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)484    def test_sqs_topic_subscription_confirmation(485        self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription486    ):487        topic_arn = sns_create_topic()["TopicArn"]488        queue_arn = sqs_queue_arn(sqs_create_queue())489        subscription = sns_subscription(490            TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, ReturnSubscriptionArn=True491        )492        def check_subscription():493            subscription_arn = subscription["SubscriptionArn"]494            subscription_attrs = sns_client.get_subscription_attributes(495                SubscriptionArn=subscription_arn496            )497            assert subscription_attrs["Attributes"]["PendingConfirmation"] == "false"498        retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)499    def test_dead_letter_queue(500        self,501        sns_client,502        sqs_client,503        sns_create_topic,504        sqs_create_queue,505        sqs_queue_arn,506        create_lambda_function,507        sns_subscription,508    ):509        lambda_name = f"test-{short_uid()}"510        lambda_arn = aws_stack.lambda_function_arn(lambda_name)511        topic_arn = sns_create_topic()["TopicArn"]512        queue_name = f"test-{short_uid()}"513        queue_url = sqs_create_queue(QueueName=queue_name)514        queue_arn = sqs_queue_arn(queue_url)515        create_lambda_function(516            func_name=lambda_name,517            handler_file=TEST_LAMBDA_PYTHON,518            libs=TEST_LAMBDA_LIBS,519            runtime=LAMBDA_RUNTIME_PYTHON36,520            DeadLetterConfig={"TargetArn": queue_arn},521        )522        sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)523        payload = {524            lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,525        }526        sns_client.publish(TopicArn=topic_arn, Message=json.dumps(payload))527        def receive_dlq():528            result = sqs_client.receive_message(529                QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0530            )531            msg_attrs = result["Messages"][0]["MessageAttributes"]532            assert len(result["Messages"]) > 0533            assert "RequestID" in msg_attrs534            assert "ErrorCode" in msg_attrs535            assert "ErrorMessage" in msg_attrs536        retry(receive_dlq, retries=8, sleep=2)537    def test_redrive_policy_http_subscription(538        self,539        sns_client,540        sns_create_topic,541        sqs_client,542        sqs_create_queue,543        sqs_queue_arn,544        sns_subscription,545    ):546        # self.unsubscribe_all_from_sns()547        dlq_name = f"dlq-{short_uid()}"548        dlq_url = sqs_create_queue(QueueName=dlq_name)549        dlq_arn = sqs_queue_arn(dlq_url)550        topic_arn = sns_create_topic()["TopicArn"]551        # create HTTP endpoint and connect it to SNS topic552        class MyUpdateListener(ProxyListener):553            def forward_request(self, method, path, data, headers):554                records.append((json.loads(to_str(data)), headers))555                return 200556        records = []557        local_port = get_free_tcp_port()558        proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())559        wait_for_port_open(local_port)560        http_endpoint = f"{get_service_protocol()}://localhost:{local_port}"561        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)562        sns_client.set_subscription_attributes(563            SubscriptionArn=subscription["SubscriptionArn"],564            AttributeName="RedrivePolicy",565            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),566        )567        proxy.stop()568        # for some reason, it takes a long time to stop the proxy thread -> TODO investigate569        time.sleep(5)570        sns_client.publish(571            TopicArn=topic_arn,572            Message=json.dumps({"message": "test_redrive_policy"}),573        )574        def receive_dlq():575            result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])576            assert len(result["Messages"]) > 0577            assert (578                json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]579                == "test_redrive_policy"580            )581        retry(receive_dlq, retries=7, sleep=2.5)582    def test_redrive_policy_lambda_subscription(583        self,584        sns_client,585        sns_create_topic,586        sqs_create_queue,587        sqs_queue_arn,588        create_lambda_function,589        sqs_client,590        sns_subscription,591    ):592        # self.unsubscribe_all_from_sns()593        dlq_name = f"dlq-{short_uid()}"594        dlq_url = sqs_create_queue(QueueName=dlq_name)595        dlq_arn = sqs_queue_arn(dlq_url)596        topic_arn = sns_create_topic()["TopicArn"]597        lambda_name = f"test-{short_uid()}"598        lambda_arn = create_lambda_function(599            func_name=lambda_name,600            libs=TEST_LAMBDA_LIBS,601            handler_file=TEST_LAMBDA_PYTHON,602            runtime=LAMBDA_RUNTIME_PYTHON36,603        )["CreateFunctionResponse"]["FunctionArn"]604        subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)605        sns_client.set_subscription_attributes(606            SubscriptionArn=subscription["SubscriptionArn"],607            AttributeName="RedrivePolicy",608            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),609        )610        testutil.delete_lambda_function(lambda_name)611        sns_client.publish(612            TopicArn=topic_arn,613            Message=json.dumps({"message": "test_redrive_policy"}),614        )615        def receive_dlq():616            result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])617            assert len(result["Messages"]) > 0618            assert (619                json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]620                == "test_redrive_policy"621            )622        retry(receive_dlq, retries=10, sleep=2)623    def test_redrive_policy_queue_subscription(624        self,625        sns_client,626        sns_create_topic,627        sqs_create_queue,628        sqs_queue_arn,629        sqs_client,630        sns_subscription,631    ):632        # self.unsubscribe_all_from_sns()633        dlq_name = f"dlq-{short_uid()}"634        dlq_url = sqs_create_queue(QueueName=dlq_name)635        dlq_arn = sqs_queue_arn(dlq_url)636        topic_arn = sns_create_topic()["TopicArn"]637        invalid_queue_arn = aws_stack.sqs_queue_arn("invalid_queue")638        # subscribe with an invalid queue ARN, to trigger event on DLQ below639        subscription = sns_subscription(640            TopicArn=topic_arn, Protocol="sqs", Endpoint=invalid_queue_arn641        )642        sns_client.set_subscription_attributes(643            SubscriptionArn=subscription["SubscriptionArn"],644            AttributeName="RedrivePolicy",645            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),646        )647        sns_client.publish(648            TopicArn=topic_arn, Message=json.dumps({"message": "test_redrive_policy"})649        )650        def receive_dlq():651            result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])652            assert len(result["Messages"]) > 0653            assert (654                json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]655                == "test_redrive_policy"656            )657        retry(receive_dlq, retries=10, sleep=2)658    def test_publish_with_empty_subject(self, sns_client, sns_create_topic):659        topic_arn = sns_create_topic()["TopicArn"]660        # Publish without subject661        rs = sns_client.publish(TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"}))662        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200663        with pytest.raises(ClientError) as e:664            sns_client.publish(665                TopicArn=topic_arn,666                Subject="",667                Message=json.dumps({"message": "test_publish"}),668            )669        assert e.value.response["Error"]["Code"] == "InvalidParameter"670    def test_create_topic_test_arn(self, sns_create_topic, sns_client):671        topic_name = f"topic-{short_uid()}"672        response = sns_create_topic(Name=topic_name)673        topic_arn_params = response["TopicArn"].split(":")674        testutil.response_arn_matches_partition(sns_client, response["TopicArn"])675        assert topic_arn_params[4] == TEST_AWS_ACCOUNT_ID676        assert topic_arn_params[5] == topic_name677    def test_publish_message_by_target_arn(678        self, sns_client, sns_create_topic, create_lambda_function, sns_subscription679    ):680        # self.unsubscribe_all_from_sns()681        func_name = f"lambda-{short_uid()}"682        topic_arn = sns_create_topic()["TopicArn"]683        lambda_arn = create_lambda_function(684            handler_file=TEST_LAMBDA_PYTHON_ECHO,685            func_name=func_name,686            runtime=LAMBDA_RUNTIME_PYTHON36,687        )["CreateFunctionResponse"]["FunctionArn"]688        subscription_arn = sns_subscription(689            TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn690        )["SubscriptionArn"]691        sns_client.publish(TopicArn=topic_arn, Message="test_message_1", Subject="test subject")692        # Lambda invoked 1 time693        events = retry(694            check_expected_lambda_log_events_length,695            retries=3,696            sleep=1,697            function_name=func_name,698            expected_length=1,699        )700        message = events[0]["Records"][0]701        assert message["EventSubscriptionArn"] == subscription_arn702        sns_client.publish(TargetArn=topic_arn, Message="test_message_2", Subject="test subject")703        events = retry(704            check_expected_lambda_log_events_length,705            retries=3,706            sleep=1,707            function_name=func_name,708            expected_length=2,709        )710        # Lambda invoked 1 more time711        assert len(events) == 2712        for event in events:713            message = event["Records"][0]714            assert message["EventSubscriptionArn"] == subscription_arn715    def test_publish_message_after_subscribe_topic(716        self,717        sns_client,718        sns_create_topic,719        sqs_client,720        sqs_create_queue,721        sqs_queue_arn,722        sns_subscription,723    ):724        # self.unsubscribe_all_from_sns()725        topic_arn = sns_create_topic()["TopicArn"]726        queue_url = sqs_create_queue()727        queue_arn = sqs_queue_arn(queue_url)728        rs = sns_client.publish(729            TopicArn=topic_arn, Subject="test subject", Message="test_message_1"730        )731        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200732        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)733        message_subject = "sqs subject"734        message_body = "test_message_2"735        rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body)736        # time.sleep(100)737        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200738        message_id = rs["MessageId"]739        def get_message(q_url):740            resp = sqs_client.receive_message(QueueUrl=q_url, VisibilityTimeout=0)741            return json.loads(resp["Messages"][0]["Body"])742        message = retry(get_message, retries=3, sleep=2, q_url=queue_url)743        assert message["MessageId"] == message_id744        assert message["Subject"] == message_subject745        assert message["Message"] == message_body746    def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic):747        topic_name = f"test-{short_uid()}"748        sns_create_topic(Name=topic_name)749        with pytest.raises(ClientError) as e:750            sns_client.create_topic(Name=topic_name, Tags=[{"Key": "456", "Value": "pqr"}])751        assert e.value.response["Error"]["Code"] == "InvalidParameter"752        assert e.value.response["Error"]["Message"] == "Topic already exists with different tags"753        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400754    def test_create_duplicate_topic_check_idempotentness(self, sns_create_topic):755        topic_name = f"test-{short_uid()}"756        tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]757        kwargs = [758            {"Tags": tags},  # to create topic with two tags759            {"Tags": tags},  # to create the same topic again with same tags760            {"Tags": [tags[0]]},  # to create the same topic again with one of the tags from above761            {"Tags": []},  # to create the same topic again with no tags762        ]763        responses = []764        for arg in kwargs:765            responses.append(sns_create_topic(Name=topic_name, **arg))766        # assert TopicArn is returned by all the above create_topic calls767        for i in range(len(responses)):768            assert "TopicArn" in responses[i]769    def test_create_platform_endpoint_check_idempotentness(self, sns_client):770        response = sns_client.create_platform_application(771            Name=f"test-{short_uid()}",772            Platform="GCM",773            Attributes={"PlatformCredential": "123"},774        )775        kwargs_list = [776            {"Token": "test1", "CustomUserData": "test-data"},777            {"Token": "test1", "CustomUserData": "test-data"},778            {"Token": "test1"},779            {"Token": "test1"},780        ]781        platform_arn = response["PlatformApplicationArn"]782        responses = []783        for kwargs in kwargs_list:784            responses.append(785                sns_client.create_platform_endpoint(PlatformApplicationArn=platform_arn, **kwargs)786            )787        # Assert endpointarn is returned in every call create platform call788        for i in range(len(responses)):789            assert "EndpointArn" in responses[i]790        endpoint_arn = responses[0]["EndpointArn"]791        # clean up792        sns_client.delete_endpoint(EndpointArn=endpoint_arn)793        sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)794    def test_publish_by_path_parameters(795        self,796        sns_create_topic,797        sns_client,798        sqs_client,799        sqs_create_queue,800        sqs_queue_arn,801        sns_subscription,802    ):803        topic_name = f"topic-{short_uid()}"804        queue_name = f"queue-{short_uid()}"805        message = f"test message {short_uid()}"806        topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]807        base_url = (808            f"{get_service_protocol()}://{config.LOCALSTACK_HOSTNAME}:{config.service_port('sns')}"809        )810        path = "Action=Publish&Version=2010-03-31&TopicArn={}&Message={}".format(topic_arn, message)811        queue_url = sqs_create_queue(QueueName=queue_name)812        queue_arn = sqs_queue_arn(queue_url)813        subscription_arn = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)[814            "SubscriptionArn"815        ]816        r = requests.post(817            url="{}/?{}".format(base_url, path),818            headers=aws_stack.mock_aws_request_headers("sns"),819        )820        assert r.status_code == 200821        def get_notification(q_url):822            resp = sqs_client.receive_message(QueueUrl=q_url)823            return json.loads(resp["Messages"][0]["Body"])824        notification = retry(get_notification, retries=3, sleep=2, q_url=queue_url)825        assert notification["TopicArn"] == topic_arn826        assert notification["Message"] == message827        sns_client.unsubscribe(SubscriptionArn=subscription_arn)828    def test_multiple_subscriptions_http_endpoint(829        self, sns_client, sns_create_topic, sns_subscription830    ):831        # create a topic832        topic_arn = sns_create_topic()["TopicArn"]833        # build fake http server endpoints834        _requests = queue.Queue()835        # create HTTP endpoint and connect it to SNS topic836        class MyUpdateListener(ProxyListener):837            def forward_request(self, method, path, data, headers):838                _requests.put(Request(method, path, headers=headers, body=data))839                return 429840        number_of_endpoints = 4841        proxies = []842        for _ in range(number_of_endpoints):843            local_port = get_free_tcp_port()844            proxies.append(845                start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())846            )847            wait_for_port_open(local_port)848            http_endpoint = f"{get_service_protocol()}://localhost:{local_port}"849            sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)850        # fetch subscription information851        subscription_list = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)852        assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200853        assert (854            len(subscription_list["Subscriptions"]) == number_of_endpoints855        ), f"unexpected number of subscriptions {subscription_list}"856        for _ in range(number_of_endpoints):857            request = _requests.get(timeout=2)858            assert request.get_json(True)["TopicArn"] == topic_arn859        with pytest.raises(queue.Empty):860            # make sure only four requests are received861            _requests.get(timeout=1)862        for proxy in proxies:863            proxy.stop()864    def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription):865        list_of_contacts = [866            f"+{random.randint(100000000, 9999999999)}",867            f"+{random.randint(100000000, 9999999999)}",868            f"+{random.randint(100000000, 9999999999)}",869        ]870        message = "Good news everyone!"871        topic_arn = sns_create_topic()["TopicArn"]872        for number in list_of_contacts:873            sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=number)874        sns_client.publish(Message=message, TopicArn=topic_arn)875        sns_backend = SNSBackend.get()876        def check_messages():877            sms_messages = sns_backend.sms_messages878            for contact in list_of_contacts:879                sms_was_found = False880                for message in sms_messages:881                    if message["endpoint"] == contact:882                        sms_was_found = True883                        break884                assert sms_was_found885        retry(check_messages, sleep=0.5)886    def test_publish_sqs_from_sns(887        self,888        sns_client,889        sns_create_topic,890        sqs_client,891        sqs_create_queue,892        sqs_queue_arn,893        sns_subscription,894    ):895        topic_arn = sns_create_topic()["TopicArn"]896        queue_url = sqs_create_queue()897        queue_arn = sqs_queue_arn(queue_url)898        subscription_arn = sns_subscription(899            TopicArn=topic_arn,900            Protocol="sqs",901            Endpoint=queue_arn,902            Attributes={"RawMessageDelivery": "true"},903        )["SubscriptionArn"]904        string_value = "99.12"905        sns_client.publish(906            TopicArn=topic_arn,907            Message="Test msg",908            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},909        )910        def get_message_with_attributes(queue_url):911            response = sqs_client.receive_message(912                QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0913            )914            assert response["Messages"][0]["MessageAttributes"] == {915                "attr1": {"DataType": "Number", "StringValue": string_value}916            }917            sqs_client.delete_message(918                QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]919            )920        retry(get_message_with_attributes, retries=3, sleep=3, queue_url=queue_url)921        sns_client.set_subscription_attributes(922            SubscriptionArn=subscription_arn,923            AttributeName="RawMessageDelivery",924            AttributeValue="false",925        )926        string_value = "100.12"927        sns_client.publish(928            TargetArn=topic_arn,929            Message="Test msg",930            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},931        )932        retry(get_message_with_attributes, retries=3, sleep=3, queue_url=queue_url)933    def test_publish_batch_messages_from_sns_to_sqs(934        self,935        sns_client,936        sns_create_topic,937        sqs_create_queue,938        sqs_queue_arn,939        sqs_client,940        sns_subscription,941    ):942        topic_arn = sns_create_topic()["TopicArn"]943        queue_url = sqs_create_queue()944        queue_arn = sqs_queue_arn(queue_url)945        sns_subscription(946            TopicArn=topic_arn,947            Protocol="sqs",948            Endpoint=queue_arn,949            Attributes={"RawMessageDelivery": "true"},950        )951        publish_batch_response = sns_client.publish_batch(952            TopicArn=topic_arn,953            PublishBatchRequestEntries=[954                {955                    "Id": "1",956                    "Message": "Test Message with two attributes",957                    "Subject": "Subject",958                    "MessageAttributes": {959                        "attr1": {"DataType": "Number", "StringValue": "99.12"},960                        "attr2": {"DataType": "Number", "StringValue": "109.12"},961                    },962                },963                {964                    "Id": "2",965                    "Message": "Test Message with one attribute",966                    "Subject": "Subject",967                    "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},968                },969                {970                    "Id": "3",971                    "Message": "Test Message without attribute",972                    "Subject": "Subject",973                },974                {975                    "Id": "4",976                    "Message": "Test Message without subject",977                },978            ],979        )980        assert "Successful" in publish_batch_response981        assert "Failed" in publish_batch_response982        for successful_resp in publish_batch_response["Successful"]:983            assert "Id" in successful_resp984            assert "MessageId" in successful_resp985        def get_messages(queue_url):986            response = sqs_client.receive_message(987                QueueUrl=queue_url,988                MessageAttributeNames=["All"],989                MaxNumberOfMessages=10,990                VisibilityTimeout=0,991            )992            assert len(response["Messages"]) == 4993            for message in response["Messages"]:994                assert "Body" in message995                if message["Body"] == "Test Message with two attributes":996                    assert len(message["MessageAttributes"]) == 2997                    assert message["MessageAttributes"]["attr1"] == {998                        "StringValue": "99.12",999                        "DataType": "Number",1000                    }1001                    assert message["MessageAttributes"]["attr2"] == {1002                        "StringValue": "109.12",1003                        "DataType": "Number",1004                    }1005                elif message["Body"] == "Test Message with one attribute":1006                    assert len(message["MessageAttributes"]) == 11007                    assert message["MessageAttributes"]["attr1"] == {1008                        "StringValue": "19.12",1009                        "DataType": "Number",1010                    }1011                elif message["Body"] == "Test Message without attribute":1012                    assert message.get("MessageAttributes") is None1013        retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1014    def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(1015        self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1016    ):1017        topic_name = f"topic-{short_uid()}.fifo"1018        queue_name = f"queue-{short_uid()}.fifo"1019        topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1020        queue_url = sqs_create_queue(1021            QueueName=queue_name,1022            Attributes={"FifoQueue": "true"},1023        )1024        sns_subscription(1025            TopicArn=topic_arn,1026            Protocol="sqs",1027            Endpoint=queue_url,1028            Attributes={"RawMessageDelivery": "true"},1029        )1030        message_group_id = "complexMessageGroupId"1031        publish_batch_response = sns_client.publish_batch(1032            TopicArn=topic_arn,1033            PublishBatchRequestEntries=[1034                {1035                    "Id": "1",1036                    "MessageGroupId": message_group_id,1037                    "Message": "Test Message with two attributes",1038                    "Subject": "Subject",1039                    "MessageAttributes": {1040                        "attr1": {"DataType": "Number", "StringValue": "99.12"},1041                        "attr2": {"DataType": "Number", "StringValue": "109.12"},1042                    },1043                },1044                {1045                    "Id": "2",1046                    "MessageGroupId": message_group_id,1047                    "Message": "Test Message with one attribute",1048                    "Subject": "Subject",1049                    "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},1050                },1051                {1052                    "Id": "3",1053                    "MessageGroupId": message_group_id,1054                    "Message": "Test Message without attribute",1055                    "Subject": "Subject",1056                },1057            ],1058        )1059        assert "Successful" in publish_batch_response1060        assert "Failed" in publish_batch_response1061        for successful_resp in publish_batch_response["Successful"]:1062            assert "Id" in successful_resp1063            assert "MessageId" in successful_resp1064        def get_messages(queue_url):1065            response = sqs_client.receive_message(1066                QueueUrl=queue_url,1067                MessageAttributeNames=["All"],1068                AttributeNames=["All"],1069                MaxNumberOfMessages=10,1070                VisibilityTimeout=0,1071            )1072            assert len(response["Messages"]) == 31073            for message in response["Messages"]:1074                assert "Body" in message1075                assert message["Attributes"]["MessageGroupId"] == message_group_id1076                if message["Body"] == "Test Message with two attributes":1077                    assert len(message["MessageAttributes"]) == 21078                    assert message["MessageAttributes"]["attr1"] == {1079                        "StringValue": "99.12",1080                        "DataType": "Number",1081                    }1082                    assert message["MessageAttributes"]["attr2"] == {1083                        "StringValue": "109.12",1084                        "DataType": "Number",1085                    }1086                elif message["Body"] == "Test Message with one attribute":1087                    assert len(message["MessageAttributes"]) == 11088                    assert message["MessageAttributes"]["attr1"] == {1089                        "StringValue": "19.12",1090                        "DataType": "Number",1091                    }1092                elif message["Body"] == "Test Message without attribute":1093                    assert message.get("MessageAttributes") is None1094        retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1095    def test_publish_batch_exceptions(1096        self, sns_client, sqs_client, sns_create_topic, sqs_create_queue, sns_subscription1097    ):1098        topic_name = f"topic-{short_uid()}.fifo"1099        queue_name = f"queue-{short_uid()}.fifo"1100        topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1101        queue_url = sqs_create_queue(1102            QueueName=queue_name,1103            Attributes={"FifoQueue": "true"},1104        )1105        queue_arn = aws_stack.sqs_queue_arn(queue_url)1106        sns_subscription(1107            TopicArn=topic_arn,1108            Protocol="sqs",1109            Endpoint=queue_arn,1110            Attributes={"RawMessageDelivery": "true"},1111        )1112        with pytest.raises(ClientError) as e:1113            sns_client.publish_batch(1114                TopicArn=topic_arn,1115                PublishBatchRequestEntries=[1116                    {1117                        "Id": "1",1118                        "Message": "Test Message with two attributes",1119                    }1120                ],1121            )1122        assert e.value.response["Error"]["Code"] == "InvalidParameter"1123        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001124        with pytest.raises(ClientError) as e:1125            sns_client.publish_batch(1126                TopicArn=topic_arn,1127                PublishBatchRequestEntries=[1128                    {"Id": f"Id_{i}", "Message": f"message_{i}"} for i in range(11)1129                ],1130            )1131        assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"1132        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001133        with pytest.raises(ClientError) as e:1134            sns_client.publish_batch(1135                TopicArn=topic_arn,1136                PublishBatchRequestEntries=[1137                    {"Id": "1", "Message": f"message_{i}"} for i in range(2)1138                ],1139            )1140        assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1141        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001142    def add_xray_header(self, request, **kwargs):1143        request.headers[1144            "X-Amzn-Trace-Id"1145        ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1146    def test_publish_sqs_from_sns_with_xray_propagation(1147        self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1148    ):1149        # TODO: remove or adapt for asf1150        if SQS_BACKEND_IMPL != "elasticmq":1151            pytest.skip("not using elasticmq as SQS backend")1152        sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1153        topic = sns_create_topic()1154        topic_arn = topic["TopicArn"]1155        queue_url = sqs_create_queue()1156        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1157        sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1158        response = sqs_client.receive_message(1159            QueueUrl=queue_url,1160            AttributeNames=["SentTimestamp", "AWSTraceHeader"],1161            MaxNumberOfMessages=1,1162            MessageAttributeNames=["All"],1163            VisibilityTimeout=2,1164            WaitTimeSeconds=2,1165        )1166        assert len(response["Messages"]) == 11167        message = response["Messages"][0]1168        assert "Attributes" in message1169        assert "AWSTraceHeader" in message["Attributes"]1170        assert (1171            message["Attributes"]["AWSTraceHeader"]1172            == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1173        )1174    def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client):1175        topic_name = f"test-{short_uid()}"1176        topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}])1177        sns_client.delete_topic(TopicArn=topic["TopicArn"])1178        topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])1179        assert topic["TopicArn"] == topic1["TopicArn"]1180    def test_not_found_error_on_get_subscription_attributes(1181        self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription1182    ):1183        topic_arn = sns_create_topic()["TopicArn"]1184        queue_url = sqs_create_queue()1185        queue_arn = sqs_queue_arn(queue_url)1186        subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1187        subscription_attributes = sns_client.get_subscription_attributes(1188            SubscriptionArn=subscription["SubscriptionArn"]1189        )1190        assert (1191            subscription_attributes.get("Attributes").get("SubscriptionArn")1192            == subscription["SubscriptionArn"]1193        )1194        sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])1195        with pytest.raises(ClientError) as e:1196            sns_client.get_subscription_attributes(SubscriptionArn=subscription["SubscriptionArn"])1197        assert e.value.response["Error"]["Code"] == "NotFound"1198        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4041199    def test_message_to_fifo_sqs(1200        self,1201        sns_client,1202        sqs_client,1203        sns_create_topic,1204        sqs_create_queue,1205        sqs_queue_arn,1206        sns_subscription,1207    ):1208        topic_name = f"topic-{short_uid()}.fifo"1209        queue_name = f"queue-{short_uid()}.fifo"1210        topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1211        queue_url = sqs_create_queue(1212            QueueName=queue_name,1213            Attributes={"FifoQueue": "true"},1214        )1215        queue_arn = sqs_queue_arn(queue_url)1216        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1217        message = "Test"1218        sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1219        def get_message():1220            received = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)[1221                "Messages"1222            ][0]["Body"]1223            assert json.loads(received)["Message"] == message1224        retry(get_message, retries=10, sleep_before=0.15, sleep=1)1225    def test_validations_for_fifo(1226        self,1227        sns_client,1228        sqs_client,1229        sns_create_topic,1230        sqs_create_queue,1231        sqs_queue_arn,1232        sns_subscription,1233    ):1234        topic_name = f"topic-{short_uid()}"1235        fifo_topic_name = f"topic-{short_uid()}.fifo"1236        fifo_queue_name = f"queue-{short_uid()}.fifo"1237        topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]1238        fifo_topic_arn = sns_create_topic(Name=fifo_topic_name, Attributes={"FifoTopic": "true"})[1239            "TopicArn"1240        ]1241        fifo_queue_url = sqs_create_queue(1242            QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1243        )1244        fifo_queue_arn = sqs_queue_arn(fifo_queue_url)1245        with pytest.raises(ClientError) as e:1246            sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=fifo_queue_arn)1247        assert e.match("standard SNS topic")1248        with pytest.raises(ClientError) as e:1249            sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1250        assert e.match("MessageGroupId")1251    def test_empty_sns_message(1252        self, sns_client, sqs_client, sns_topic, sqs_queue, sqs_queue_arn, sns_subscription...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
