Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py  
...598    ):599        dlq_url = sqs_create_queue()600        dlq_arn = sqs_queue_arn(dlq_url)601        topic_arn = sns_create_topic()["TopicArn"]602        sns_allow_topic_sqs_queue(603            sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn604        )605        lambda_name = f"test-{short_uid()}"606        lambda_arn = create_lambda_function(607            func_name=lambda_name,608            libs=TEST_LAMBDA_LIBS,609            handler_file=TEST_LAMBDA_PYTHON,610            runtime=LAMBDA_RUNTIME_PYTHON37,611            role=lambda_su_role,612        )["CreateFunctionResponse"]["FunctionArn"]613        subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)614        sns_client.set_subscription_attributes(615            SubscriptionArn=subscription["SubscriptionArn"],616            AttributeName="RedrivePolicy",617            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),618        )619        lambda_client.delete_function(FunctionName=lambda_name)620        sns_client.publish(621            TopicArn=topic_arn,622            Message=json.dumps({"message": "test_redrive_policy"}),623        )624        response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)625        assert (626            len(response["Messages"]) == 1627        ), f"invalid number of messages in DLQ response {response}"628        message = json.loads(response["Messages"][0]["Body"])629        assert message["Type"] == "Notification"630        assert json.loads(message["Message"])["message"] == "test_redrive_policy"631    @pytest.mark.aws_validated632    def test_publish_with_empty_subject(self, sns_client, sns_create_topic):633        topic_arn = sns_create_topic()["TopicArn"]634        # Publish without subject635        rs = sns_client.publish(TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"}))636        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200637        with pytest.raises(ClientError) as e:638            sns_client.publish(639                TopicArn=topic_arn,640                Subject="",641                Message=json.dumps({"message": "test_publish"}),642            )643        assert e.value.response["Error"]["Code"] == "InvalidParameter"644        assert e.value.response["Error"]["Message"] == "Invalid parameter: Subject"645    # todo test with snapshot to aws validate it646    def test_create_topic_test_arn(self, sns_create_topic, sns_client):647        topic_name = f"topic-{short_uid()}"648        response = sns_create_topic(Name=topic_name)649        topic_arn_params = response["TopicArn"].split(":")650        testutil.response_arn_matches_partition(sns_client, response["TopicArn"])651        assert topic_arn_params[4] == get_aws_account_id()652        assert topic_arn_params[5] == topic_name653    @pytest.mark.aws_validated654    def test_publish_message_by_target_arn(655        self,656        sns_client,657        sqs_client,658        sns_create_topic,659        sqs_create_queue,660        sns_create_sqs_subscription,661    ):662        # using an SQS subscription to test TopicArn/TargetArn as it is easier to check against AWS663        topic_arn = sns_create_topic()["TopicArn"]664        queue_url = sqs_create_queue()665        sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)666        sns_client.publish(TopicArn=topic_arn, Message="test-msg-1")667        response = sqs_client.receive_message(668            QueueUrl=queue_url,669            MessageAttributeNames=["All"],670            VisibilityTimeout=0,671            WaitTimeSeconds=4,672        )673        assert len(response["Messages"]) == 1674        message = response["Messages"][0]675        msg_body = json.loads(message["Body"])676        assert msg_body["Message"] == "test-msg-1"677        sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])678        # publish with TargetArn instead of TopicArn679        sns_client.publish(TargetArn=topic_arn, Message="test-msg-2")680        response = sqs_client.receive_message(681            QueueUrl=queue_url,682            MessageAttributeNames=["All"],683            VisibilityTimeout=0,684            WaitTimeSeconds=4,685        )686        assert len(response["Messages"]) == 1687        message = response["Messages"][0]688        msg_body = json.loads(message["Body"])689        assert msg_body["Message"] == "test-msg-2"690    @pytest.mark.aws_validated691    def test_publish_message_before_subscribe_topic(692        self,693        sns_client,694        sns_create_topic,695        sqs_client,696        sqs_create_queue,697        sns_subscription,698        sns_create_sqs_subscription,699    ):700        topic_arn = sns_create_topic()["TopicArn"]701        queue_url = sqs_create_queue()702        rs = sns_client.publish(703            TopicArn=topic_arn, Subject="test subject", Message="test_message_1"704        )705        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200706        sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)707        message_subject = "sqs subject"708        message_body = "test_message_2"709        rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body)710        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200711        message_id = rs["MessageId"]712        response = sqs_client.receive_message(713            QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5714        )715        # nothing was subscribing to the topic, so the first message is lost716        assert len(response["Messages"]) == 1717        message = json.loads(response["Messages"][0]["Body"])718        assert message["MessageId"] == message_id719        assert message["Subject"] == message_subject720        assert message["Message"] == message_body721    @pytest.mark.aws_validated722    def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic):723        topic_name = f"test-{short_uid()}"724        sns_create_topic(Name=topic_name)725        with pytest.raises(ClientError) as e:726            sns_client.create_topic(Name=topic_name, Tags=[{"Key": "key1", "Value": "value1"}])727        assert e.value.response["Error"]["Code"] == "InvalidParameter"728        assert (729            e.value.response["Error"]["Message"]730            == "Invalid parameter: Tags Reason: Topic already exists with different tags"731        )732        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400733    @pytest.mark.aws_validated734    def test_create_duplicate_topic_check_idempotency(self, sns_create_topic):735        topic_name = f"test-{short_uid()}"736        tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]737        kwargs = [738            {"Tags": tags},  # to create the same topic again with same tags739            {"Tags": [tags[0]]},  # to create the same topic again with one of the tags from above740            {"Tags": []},  # to create the same topic again with no tags741        ]742        # create topic with two tags743        response = sns_create_topic(Name=topic_name, Tags=tags)744        topic_arn = response["TopicArn"]745        for arg in kwargs:746            response = sns_create_topic(Name=topic_name, **arg)747            # assert TopicArn returned by all the above create_topic calls is the same as the original748            assert response["TopicArn"] == topic_arn749    @pytest.mark.only_localstack750    @pytest.mark.skip(751        reason="Idempotency not supported in Moto backend. See bug https://github.com/spulec/moto/issues/2333"752    )753    def test_create_platform_endpoint_check_idempotency(self, sns_client):754        response = sns_client.create_platform_application(755            Name=f"test-{short_uid()}",756            Platform="GCM",757            Attributes={"PlatformCredential": "123"},758        )759        kwargs_list = [760            {"Token": "test1", "CustomUserData": "test-data"},761            {"Token": "test1", "CustomUserData": "test-data"},762            {"Token": "test1"},763            {"Token": "test1"},764        ]765        platform_arn = response["PlatformApplicationArn"]766        responses = []767        for kwargs in kwargs_list:768            responses.append(769                sns_client.create_platform_endpoint(PlatformApplicationArn=platform_arn, **kwargs)770            )771        # Assert endpointarn is returned in every call create platform call772        for i in range(len(responses)):773            assert "EndpointArn" in responses[i]774        endpoint_arn = responses[0]["EndpointArn"]775        # clean up776        sns_client.delete_endpoint(EndpointArn=endpoint_arn)777        sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)778    @pytest.mark.aws_validated779    def test_publish_by_path_parameters(780        self,781        sns_create_topic,782        sns_client,783        sqs_client,784        sqs_create_queue,785        sns_create_sqs_subscription,786        aws_http_client_factory,787    ):788        message = f"test message {short_uid()}"789        topic_arn = sns_create_topic()["TopicArn"]790        queue_url = sqs_create_queue()791        sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)792        client = aws_http_client_factory("sns", region="us-east-1")793        if is_aws_cloud():794            endpoint_url = "https://sns.us-east-1.amazonaws.com"795        else:796            endpoint_url = config.get_edge_url()797        response = client.post(798            endpoint_url,799            params={800                "Action": "Publish",801                "Version": "2010-03-31",802                "TopicArn": topic_arn,803                "Message": message,804            },805        )806        assert response.status_code == 200807        assert b"<PublishResponse" in response.content808        messages = sqs_client.receive_message(809            QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5810        )["Messages"]811        msg_body = json.loads(messages[0]["Body"])812        assert msg_body["TopicArn"] == topic_arn813        assert msg_body["Message"] == message814    @pytest.mark.only_localstack815    def test_multiple_subscriptions_http_endpoint(816        self, sns_client, sns_create_topic, sns_subscription817    ):818        # create a topic819        topic_arn = sns_create_topic()["TopicArn"]820        # build fake http server endpoints821        _requests = queue.Queue()822        # create HTTP endpoint and connect it to SNS topic823        def handler(request):824            _requests.put(request)825            return Response(status=429)826        number_of_endpoints = 4827        servers = []828        for _ in range(number_of_endpoints):829            server = HTTPServer()830            server.start()831            servers.append(server)832            server.expect_request("/").respond_with_handler(handler)833            http_endpoint = server.url_for("/")834            wait_for_port_open(http_endpoint)835            sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)836        # fetch subscription information837        subscription_list = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)838        assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200839        assert (840            len(subscription_list["Subscriptions"]) == number_of_endpoints841        ), f"unexpected number of subscriptions {subscription_list}"842        for _ in range(number_of_endpoints):843            request = _requests.get(timeout=2)844            assert request.get_json(True)["TopicArn"] == topic_arn845        with pytest.raises(queue.Empty):846            # make sure only four requests are received847            _requests.get(timeout=1)848        for server in servers:849            server.stop()850    @pytest.mark.only_localstack851    def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription):852        list_of_contacts = [853            f"+{random.randint(100000000, 9999999999)}",854            f"+{random.randint(100000000, 9999999999)}",855            f"+{random.randint(100000000, 9999999999)}",856        ]857        message = "Good news everyone!"858        topic_arn = sns_create_topic()["TopicArn"]859        for number in list_of_contacts:860            sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=number)861        sns_client.publish(Message=message, TopicArn=topic_arn)862        sns_backend = SNSBackend.get()863        def check_messages():864            sms_messages = sns_backend.sms_messages865            for contact in list_of_contacts:866                sms_was_found = False867                for message in sms_messages:868                    if message["endpoint"] == contact:869                        sms_was_found = True870                        break871                assert sms_was_found872        retry(check_messages, sleep=0.5)873    @pytest.mark.aws_validated874    def test_publish_sqs_from_sns(875        self,876        sns_client,877        sqs_client,878        sns_create_topic,879        sqs_create_queue,880        sns_create_sqs_subscription,881    ):882        topic_arn = sns_create_topic()["TopicArn"]883        queue_url = sqs_create_queue()884        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)885        subscription_arn = subscription["SubscriptionArn"]886        sns_client.set_subscription_attributes(887            SubscriptionArn=subscription_arn,888            AttributeName="RawMessageDelivery",889            AttributeValue="true",890        )891        string_value = "99.12"892        sns_client.publish(893            TopicArn=topic_arn,894            Message="Test msg",895            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},896        )897        response = sqs_client.receive_message(898            QueueUrl=queue_url,899            MessageAttributeNames=["All"],900            VisibilityTimeout=0,901            WaitTimeSeconds=4,902        )903        # format is of SQS MessageAttributes when RawDelivery is set to "true"904        assert response["Messages"][0]["MessageAttributes"] == {905            "attr1": {"DataType": "Number", "StringValue": string_value}906        }907        sqs_client.delete_message(908            QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]909        )910        sns_client.set_subscription_attributes(911            SubscriptionArn=subscription_arn,912            AttributeName="RawMessageDelivery",913            AttributeValue="false",914        )915        string_value = "100.12"916        sns_client.publish(917            TargetArn=topic_arn,918            Message="Test msg",919            MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},920        )921        response = sqs_client.receive_message(922            QueueUrl=queue_url,923            MessageAttributeNames=["All"],924            VisibilityTimeout=0,925            WaitTimeSeconds=4,926        )927        message_body = json.loads(response["Messages"][0]["Body"])928        # format is SNS MessageAttributes when RawDelivery is "false"929        assert message_body["MessageAttributes"] == {930            "attr1": {"Type": "Number", "Value": string_value}931        }932    @pytest.mark.aws_validated933    def test_publish_batch_messages_from_sns_to_sqs(934        self,935        sns_client,936        sqs_client,937        sns_create_topic,938        sqs_create_queue,939        sns_create_sqs_subscription,940    ):941        topic_arn = sns_create_topic()["TopicArn"]942        queue_url = sqs_create_queue()943        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)944        subscription_arn = subscription["SubscriptionArn"]945        sns_client.set_subscription_attributes(946            SubscriptionArn=subscription_arn,947            AttributeName="RawMessageDelivery",948            AttributeValue="true",949        )950        publish_batch_response = sns_client.publish_batch(951            TopicArn=topic_arn,952            PublishBatchRequestEntries=[953                {954                    "Id": "1",955                    "Message": "Test Message with two attributes",956                    "Subject": "Subject",957                    "MessageAttributes": {958                        "attr1": {"DataType": "Number", "StringValue": "99.12"},959                        "attr2": {"DataType": "Number", "StringValue": "109.12"},960                    },961                },962                {963                    "Id": "2",964                    "Message": "Test Message with one attribute",965                    "Subject": "Subject",966                    "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},967                },968                {969                    "Id": "3",970                    "Message": "Test Message without attribute",971                    "Subject": "Subject",972                },973                {974                    "Id": "4",975                    "Message": "Test Message without subject",976                },977            ],978        )979        assert "Successful" in publish_batch_response980        assert len(publish_batch_response["Successful"]) == 4981        assert "Failed" in publish_batch_response982        for successful_resp in publish_batch_response["Successful"]:983            assert "Id" in successful_resp984            assert "MessageId" in successful_resp985        message_received = set()986        def get_messages():987            response = sqs_client.receive_message(988                QueueUrl=queue_url, MessageAttributeNames=["All"], WaitTimeSeconds=1989            )990            for message in response["Messages"]:991                message_received.add(message["MessageId"])992                assert "Body" in message993                if message["Body"] == "Test Message with two attributes":994                    assert len(message["MessageAttributes"]) == 2995                    assert message["MessageAttributes"]["attr1"] == {996                        "StringValue": "99.12",997                        "DataType": "Number",998                    }999                    assert message["MessageAttributes"]["attr2"] == {1000                        "StringValue": "109.12",1001                        "DataType": "Number",1002                    }1003                elif message["Body"] == "Test Message with one attribute":1004                    assert len(message["MessageAttributes"]) == 11005                    assert message["MessageAttributes"]["attr1"] == {1006                        "StringValue": "19.12",1007                        "DataType": "Number",1008                    }1009                elif message["Body"] == "Test Message without attribute":1010                    assert message.get("MessageAttributes") is None1011            assert len(message_received) == 41012        retry(get_messages, retries=3, sleep=1)1013    @pytest.mark.aws_validated1014    def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(1015        self,1016        sns_client,1017        sns_create_topic,1018        sqs_client,1019        sqs_create_queue,1020        sns_create_sqs_subscription,1021    ):1022        topic_name = f"topic-{short_uid()}.fifo"1023        queue_name = f"queue-{short_uid()}.fifo"1024        topic_arn = sns_create_topic(1025            Name=topic_name,1026            Attributes={1027                "FifoTopic": "true",1028                "ContentBasedDeduplication": "true",1029            },1030        )["TopicArn"]1031        queue_url = sqs_create_queue(1032            QueueName=queue_name,1033            Attributes={1034                "FifoQueue": "true",1035                "ContentBasedDeduplication": "true",1036            },1037        )1038        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1039        subscription_arn = subscription["SubscriptionArn"]1040        sns_client.set_subscription_attributes(1041            SubscriptionArn=subscription_arn,1042            AttributeName="RawMessageDelivery",1043            AttributeValue="true",1044        )1045        message_group_id = "complexMessageGroupId"1046        publish_batch_response = sns_client.publish_batch(1047            TopicArn=topic_arn,1048            PublishBatchRequestEntries=[1049                {1050                    "Id": "1",1051                    "MessageGroupId": message_group_id,1052                    "Message": "Test Message with two attributes",1053                    "Subject": "Subject",1054                    "MessageAttributes": {1055                        "attr1": {"DataType": "Number", "StringValue": "99.12"},1056                        "attr2": {"DataType": "Number", "StringValue": "109.12"},1057                    },1058                },1059                {1060                    "Id": "2",1061                    "MessageGroupId": message_group_id,1062                    "Message": "Test Message with one attribute",1063                    "Subject": "Subject",1064                    "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},1065                },1066                {1067                    "Id": "3",1068                    "MessageGroupId": message_group_id,1069                    "Message": "Test Message without attribute",1070                    "Subject": "Subject",1071                },1072            ],1073        )1074        assert "Successful" in publish_batch_response1075        assert "Failed" in publish_batch_response1076        for successful_resp in publish_batch_response["Successful"]:1077            assert "Id" in successful_resp1078            assert "MessageId" in successful_resp1079        def get_messages(queue_url):1080            response = sqs_client.receive_message(1081                QueueUrl=queue_url,1082                MessageAttributeNames=["All"],1083                AttributeNames=["All"],1084                MaxNumberOfMessages=10,1085            )1086            assert len(response["Messages"]) == 31087            for msg_index, message in enumerate(response["Messages"]):1088                assert "Body" in message1089                assert message["Attributes"]["MessageGroupId"] == message_group_id1090                if message["Body"] == "Test Message with two attributes":1091                    assert msg_index == 01092                    assert len(message["MessageAttributes"]) == 21093                    assert message["MessageAttributes"]["attr1"] == {1094                        "StringValue": "99.12",1095                        "DataType": "Number",1096                    }1097                    assert message["MessageAttributes"]["attr2"] == {1098                        "StringValue": "109.12",1099                        "DataType": "Number",1100                    }1101                elif message["Body"] == "Test Message with one attribute":1102                    assert msg_index == 11103                    assert len(message["MessageAttributes"]) == 11104                    assert message["MessageAttributes"]["attr1"] == {1105                        "StringValue": "19.12",1106                        "DataType": "Number",1107                    }1108                elif message["Body"] == "Test Message without attribute":1109                    assert msg_index == 21110                    assert message.get("MessageAttributes") is None1111        retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1112        # todo add test for deduplication1113        # https://docs.aws.amazon.com/cli/latest/reference/sns/publish-batch.html1114        # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1115    @pytest.mark.aws_validated1116    def test_publish_batch_exceptions(1117        self,1118        sns_client,1119        sqs_client,1120        sns_create_topic,1121        sqs_create_queue,1122        sns_create_sqs_subscription,1123    ):1124        topic_name = f"topic-{short_uid()}.fifo"1125        queue_name = f"queue-{short_uid()}.fifo"1126        topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1127        queue_url = sqs_create_queue(1128            QueueName=queue_name,1129            Attributes={"FifoQueue": "true"},1130        )1131        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1132        subscription_arn = subscription["SubscriptionArn"]1133        sns_client.set_subscription_attributes(1134            SubscriptionArn=subscription_arn,1135            AttributeName="RawMessageDelivery",1136            AttributeValue="true",1137        )1138        with pytest.raises(ClientError) as e:1139            sns_client.publish_batch(1140                TopicArn=topic_arn,1141                PublishBatchRequestEntries=[1142                    {1143                        "Id": "1",1144                        "Message": "Test message without Group ID",1145                    }1146                ],1147            )1148        assert e.value.response["Error"]["Code"] == "InvalidParameter"1149        assert (1150            e.value.response["Error"]["Message"]1151            == "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"1152        )1153        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001154        with pytest.raises(ClientError) as e:1155            sns_client.publish_batch(1156                TopicArn=topic_arn,1157                PublishBatchRequestEntries=[1158                    {"Id": f"Id_{i}", "Message": "Too many messages"} for i in range(11)1159                ],1160            )1161        assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"1162        assert (1163            e.value.response["Error"]["Message"]1164            == "The batch request contains more entries than permissible."1165        )1166        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001167        with pytest.raises(ClientError) as e:1168            sns_client.publish_batch(1169                TopicArn=topic_arn,1170                PublishBatchRequestEntries=[1171                    {"Id": "1", "Message": "Messages with the same ID"} for i in range(2)1172                ],1173            )1174        assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1175        assert (1176            e.value.response["Error"]["Message"]1177            == "Two or more batch entries in the request have the same Id."1178        )1179        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001180        # todo add test and implement behaviour for ContentBasedDeduplication or MessageDeduplicationId1181    def add_xray_header(self, request, **kwargs):1182        request.headers[1183            "X-Amzn-Trace-Id"1184        ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1185    def test_publish_sqs_from_sns_with_xray_propagation(1186        self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1187    ):1188        # TODO: remove or adapt for asf1189        if SQS_BACKEND_IMPL != "elasticmq":1190            pytest.skip("not using elasticmq as SQS backend")1191        sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1192        topic = sns_create_topic()1193        topic_arn = topic["TopicArn"]1194        queue_url = sqs_create_queue()1195        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1196        sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1197        response = sqs_client.receive_message(1198            QueueUrl=queue_url,1199            AttributeNames=["SentTimestamp", "AWSTraceHeader"],1200            MaxNumberOfMessages=1,1201            MessageAttributeNames=["All"],1202            VisibilityTimeout=2,1203            WaitTimeSeconds=2,1204        )1205        assert len(response["Messages"]) == 11206        message = response["Messages"][0]1207        assert "Attributes" in message1208        assert "AWSTraceHeader" in message["Attributes"]1209        assert (1210            message["Attributes"]["AWSTraceHeader"]1211            == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1212        )1213    @pytest.mark.aws_validated1214    def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client):1215        topic_name = f"test-{short_uid()}"1216        topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}])1217        sns_client.delete_topic(TopicArn=topic["TopicArn"])1218        topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])1219        assert topic["TopicArn"] == topic1["TopicArn"]1220    @pytest.mark.aws_validated1221    def test_not_found_error_on_set_subscription_attributes(1222        self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription1223    ):1224        topic_arn = sns_create_topic()["TopicArn"]1225        queue_url = sqs_create_queue()1226        queue_arn = sqs_queue_arn(queue_url)1227        subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1228        subscription_arn = subscription["SubscriptionArn"]1229        subscription_attributes = sns_client.get_subscription_attributes(1230            SubscriptionArn=subscription_arn1231        )["Attributes"]1232        assert subscription_attributes["SubscriptionArn"] == subscription_arn1233        subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1234        assert len(subscriptions_by_topic["Subscriptions"]) == 11235        sns_client.unsubscribe(SubscriptionArn=subscription_arn)1236        def check_subscription_deleted():1237            try:1238                # AWS doesn't give NotFound error on GetSubscriptionAttributes for a while, might be cached1239                sns_client.set_subscription_attributes(1240                    SubscriptionArn=subscription_arn,1241                    AttributeName="RawMessageDelivery",1242                    AttributeValue="true",1243                )1244                raise Exception("Subscription is not deleted")1245            except ClientError as e:1246                assert e.response["Error"]["Code"] == "NotFound"1247                assert e.response["ResponseMetadata"]["HTTPStatusCode"] == 4041248        retry(check_subscription_deleted, retries=10, sleep_before=0.2, sleep=3)1249        subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1250        assert len(subscriptions_by_topic["Subscriptions"]) == 01251    @pytest.mark.aws_validated1252    def test_message_to_fifo_sqs(1253        self,1254        sns_client,1255        sqs_client,1256        sns_create_topic,1257        sqs_create_queue,1258        sns_create_sqs_subscription,1259    ):1260        topic_name = f"topic-{short_uid()}.fifo"1261        queue_name = f"queue-{short_uid()}.fifo"1262        topic_arn = sns_create_topic(1263            Name=topic_name,1264            Attributes={1265                "FifoTopic": "true",1266                "ContentBasedDeduplication": "true",  # todo: not enforced yet1267            },1268        )["TopicArn"]1269        queue_url = sqs_create_queue(1270            QueueName=queue_name,1271            Attributes={1272                "FifoQueue": "true",1273                "ContentBasedDeduplication": "true",1274            },1275        )1276        # todo check both ContentBasedDeduplication and MessageDeduplicationId when implemented1277        # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1278        sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1279        message = "Test"1280        sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1281        messages = sqs_client.receive_message(1282            QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=101283        )["Messages"]1284        msg_body = messages[0]["Body"]1285        assert json.loads(msg_body)["Message"] == message1286    @pytest.mark.aws_validated1287    def test_validations_for_fifo(1288        self,1289        sns_client,1290        sqs_client,1291        sns_create_topic,1292        sqs_create_queue,1293        sns_create_sqs_subscription,1294    ):1295        topic_name = f"topic-{short_uid()}"1296        fifo_topic_name = f"topic-{short_uid()}.fifo"1297        fifo_queue_name = f"queue-{short_uid()}.fifo"1298        topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]1299        fifo_topic_arn = sns_create_topic(Name=fifo_topic_name, Attributes={"FifoTopic": "true"})[1300            "TopicArn"1301        ]1302        fifo_queue_url = sqs_create_queue(1303            QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1304        )1305        with pytest.raises(ClientError) as e:1306            sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=fifo_queue_url)1307        assert e.match("standard SNS topic")1308        with pytest.raises(ClientError) as e:1309            sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1310        assert e.match("MessageGroupId")1311        with pytest.raises(ClientError) as e:1312            sns_client.publish(TopicArn=fifo_topic_arn, Message="test", MessageGroupId=short_uid())1313        # if ContentBasedDeduplication is not set at the topic level, it needs MessageDeduplicationId for each msg1314        assert e.match("MessageDeduplicationId")1315        assert e.match("ContentBasedDeduplication")1316        with pytest.raises(ClientError) as e:1317            sns_client.publish(1318                TopicArn=topic_arn, Message="test", MessageDeduplicationId=short_uid()1319            )1320        assert e.match("MessageDeduplicationId")1321        with pytest.raises(ClientError) as e:1322            sns_client.publish(TopicArn=topic_arn, Message="test", MessageGroupId=short_uid())1323        assert e.match("MessageGroupId")1324    @pytest.mark.aws_validated1325    def test_empty_sns_message(1326        self,1327        sns_client,1328        sqs_client,1329        sns_create_topic,1330        sqs_create_queue,1331        sns_create_sqs_subscription,1332    ):1333        topic_arn = sns_create_topic()["TopicArn"]1334        queue_url = sqs_create_queue()1335        sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1336        with pytest.raises(ClientError) as e:1337            sns_client.publish(Message="", TopicArn=topic_arn)1338        assert e.match("Empty message")1339        assert (1340            sqs_client.get_queue_attributes(1341                QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1342            )["Attributes"]["ApproximateNumberOfMessages"]1343            == "0"1344        )1345    @pytest.mark.parametrize("raw_message_delivery", [True, False])1346    @pytest.mark.aws_validated1347    def test_redrive_policy_sqs_queue_subscription(1348        self,1349        sns_client,1350        sqs_client,1351        sns_create_topic,1352        sqs_create_queue,1353        sqs_queue_arn,1354        sqs_queue_exists,1355        sns_create_sqs_subscription,1356        sns_allow_topic_sqs_queue,1357        raw_message_delivery,1358        snapshot,1359    ):1360        snapshot.add_transformer(snapshot.transform.sqs_api())1361        # Need to skip the MD5OfBody/Signature, because it contains a timestamp1362        snapshot.add_transformer(1363            snapshot.transform.key_value(1364                "Signature",1365                "<signature>",1366                reference_replacement=False,1367            )1368        )1369        snapshot.add_transformer(1370            snapshot.transform.key_value("MD5OfBody", "<md5-hash>", reference_replacement=False)1371        )1372        topic_arn = sns_create_topic()["TopicArn"]1373        queue_url = sqs_create_queue()1374        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1375        dlq_url = sqs_create_queue()1376        dlq_arn = sqs_queue_arn(dlq_url)1377        sns_client.set_subscription_attributes(1378            SubscriptionArn=subscription["SubscriptionArn"],1379            AttributeName="RedrivePolicy",1380            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1381        )1382        if raw_message_delivery:1383            sns_client.set_subscription_attributes(1384                SubscriptionArn=subscription["SubscriptionArn"],1385                AttributeName="RawMessageDelivery",1386                AttributeValue="true",1387            )1388        sns_allow_topic_sqs_queue(1389            sqs_queue_url=dlq_url,1390            sqs_queue_arn=dlq_arn,1391            sns_topic_arn=topic_arn,1392        )1393        sqs_client.delete_queue(QueueUrl=queue_url)1394        # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1395        assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1396        message = "test_dlq_after_sqs_endpoint_deleted"1397        message_attr = {1398            "attr1": {1399                "DataType": "Number",1400                "StringValue": "111",1401            },1402            "attr2": {1403                "DataType": "Binary",1404                "BinaryValue": b"\x02\x03\x04",1405            },1406        }1407        sns_client.publish(TopicArn=topic_arn, Message=message, MessageAttributes=message_attr)1408        response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)1409        assert (1410            len(response["Messages"]) == 11411        ), f"invalid number of messages in DLQ response {response}"1412        if raw_message_delivery:1413            assert response["Messages"][0]["Body"] == message1414            # MessageAttributes are lost with RawDelivery in AWS1415            assert "MessageAttributes" not in response["Messages"][0]1416            snapshot.match("raw_message_delivery", response)1417        else:1418            received_message = json.loads(response["Messages"][0]["Body"])1419            assert received_message["Type"] == "Notification"1420            assert received_message["Message"] == message1421            # Set the decoded JSON Body to be able to skip keys directly1422            response["Messages"][0]["Body"] = received_message1423            snapshot.match("json_encoded_delivery", response)1424    @pytest.mark.aws_validated1425    def test_message_attributes_not_missing(1426        self,1427        sns_client,1428        sqs_client,1429        sns_create_sqs_subscription,1430        sns_create_topic,1431        sqs_create_queue,1432    ):1433        topic_arn = sns_create_topic()["TopicArn"]1434        queue_url = sqs_create_queue()1435        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1436        assert subscription["SubscriptionArn"]1437        sns_client.set_subscription_attributes(1438            SubscriptionArn=subscription["SubscriptionArn"],1439            AttributeName="RawMessageDelivery",1440            AttributeValue="true",1441        )1442        attributes = {1443            "an-attribute-key": {"DataType": "String", "StringValue": "an-attribute-value"},1444            "binary-attribute": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"},1445        }1446        publish_response = sns_client.publish(1447            TopicArn=topic_arn,1448            Message="text",1449            MessageAttributes=attributes,1450        )1451        assert publish_response["MessageId"]1452        msg = sqs_client.receive_message(1453            QueueUrl=queue_url,1454            AttributeNames=["All"],1455            MessageAttributeNames=["All"],1456            WaitTimeSeconds=3,1457        )1458        # as SNS piggybacks on SQS MessageAttributes when RawDelivery is true1459        # BinaryValue depends on SQS implementation, and is decoded automatically1460        assert msg["Messages"][0]["MessageAttributes"] == attributes1461        sqs_client.delete_message(1462            QueueUrl=queue_url, ReceiptHandle=msg["Messages"][0]["ReceiptHandle"]1463        )1464        sns_client.set_subscription_attributes(1465            SubscriptionArn=subscription["SubscriptionArn"],1466            AttributeName="RawMessageDelivery",1467            AttributeValue="false",1468        )1469        publish_response = sns_client.publish(1470            TopicArn=topic_arn,1471            Message="text",1472            MessageAttributes=attributes,1473        )1474        assert publish_response["MessageId"]1475        msg = sqs_client.receive_message(1476            QueueUrl=queue_url,1477            AttributeNames=["All"],1478            MessageAttributeNames=["All"],1479            WaitTimeSeconds=3,1480        )1481        assert json.loads(msg["Messages"][0]["Body"])["MessageAttributes"] == {1482            "an-attribute-key": {"Type": "String", "Value": "an-attribute-value"},1483            "binary-attribute": {1484                # binary payload in base64 encoded by AWS, UTF-8 for JSON1485                # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1486                # need to be decoded manually as it's part of the message Body1487                "Type": "Binary",1488                "Value": b64encode(b"\x02\x03\x04").decode("utf-8"),1489            },1490        }1491    @pytest.mark.only_localstack1492    @pytest.mark.aws_validated1493    @pytest.mark.parametrize("raw_message_delivery", [True, False])1494    def test_subscribe_external_http_endpoint(1495        self,1496        sns_client,1497        sns_create_http_endpoint,1498        raw_message_delivery,1499    ):1500        # Necessitate manual set up to allow external access to endpoint, only in local testing1501        topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1502            raw_message_delivery1503        )1504        assert poll_condition(1505            lambda: len(server.log) >= 1,1506            timeout=5,1507        )1508        sub_request, _ = server.log[0]1509        payload = sub_request.get_json(force=True)1510        assert payload["Type"] == "SubscriptionConfirmation"1511        assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1512        assert "Signature" in payload1513        assert "SigningCertURL" in payload1514        token = payload["Token"]1515        subscribe_url = payload["SubscribeURL"]1516        service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1517        assert subscribe_url == (1518            f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1519        )1520        confirm_subscribe_request = requests.get(subscribe_url)1521        confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1522        assert (1523            confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1524                "SubscriptionArn"1525            ]1526            == subscription_arn1527        )1528        subscription_attributes = sns_client.get_subscription_attributes(1529            SubscriptionArn=subscription_arn1530        )1531        assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1532        message = "test_external_http_endpoint"1533        sns_client.publish(TopicArn=topic_arn, Message=message)1534        assert poll_condition(1535            lambda: len(server.log) >= 2,1536            timeout=5,1537        )1538        notification_request, _ = server.log[1]1539        assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1540        expected_unsubscribe_url = (1541            f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1542        )1543        if raw_message_delivery:1544            payload = notification_request.data.decode()1545            assert payload == message1546        else:1547            payload = notification_request.get_json(force=True)1548            assert payload["Type"] == "Notification"1549            assert "Signature" in payload1550            assert "SigningCertURL" in payload1551            assert payload["Message"] == message1552            assert payload["UnsubscribeURL"] == expected_unsubscribe_url1553        unsub_request = requests.get(expected_unsubscribe_url)1554        unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1555        assert "UnsubscribeResponse" in unsubscribe_confirmation1556        assert poll_condition(1557            lambda: len(server.log) >= 3,1558            timeout=5,1559        )1560        unsub_request, _ = server.log[2]1561        payload = unsub_request.get_json(force=True)1562        assert payload["Type"] == "UnsubscribeConfirmation"1563        assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1564        assert "Signature" in payload1565        assert "SigningCertURL" in payload1566        token = payload["Token"]1567        assert payload["SubscribeURL"] == (1568            f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1569        )1570    @pytest.mark.only_localstack1571    @pytest.mark.parametrize("raw_message_delivery", [True, False])1572    def test_dlq_external_http_endpoint(1573        self,1574        sns_client,1575        sqs_client,1576        sns_create_topic,1577        sqs_create_queue,1578        sqs_queue_arn,1579        sns_subscription,1580        sns_create_http_endpoint,1581        sns_create_sqs_subscription,1582        sns_allow_topic_sqs_queue,1583        raw_message_delivery,1584    ):1585        # Necessitate manual set up to allow external access to endpoint, only in local testing1586        topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1587            raw_message_delivery1588        )1589        dlq_url = sqs_create_queue()1590        dlq_arn = sqs_queue_arn(dlq_url)1591        sns_allow_topic_sqs_queue(1592            sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1593        )1594        sns_client.set_subscription_attributes(1595            SubscriptionArn=http_subscription_arn,1596            AttributeName="RedrivePolicy",1597            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1598        )1599        assert poll_condition(1600            lambda: len(server.log) >= 1,1601            timeout=5,1602        )1603        sub_request, _ = server.log[0]1604        payload = sub_request.get_json(force=True)1605        assert payload["Type"] == "SubscriptionConfirmation"...fixtures.py
Source:fixtures.py  
...386def sns_topic(sns_client, sns_create_topic) -> "GetTopicAttributesResponseTypeDef":387    topic_arn = sns_create_topic()["TopicArn"]388    return sns_client.get_topic_attributes(TopicArn=topic_arn)389@pytest.fixture390def sns_allow_topic_sqs_queue(sqs_client):391    def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:392        # allow topic to write to sqs queue393        sqs_client.set_queue_attributes(394            QueueUrl=sqs_queue_url,395            Attributes={396                "Policy": json.dumps(397                    {398                        "Statement": [399                            {400                                "Effect": "Allow",401                                "Principal": {"Service": "sns.amazonaws.com"},402                                "Action": "sqs:SendMessage",403                                "Resource": sqs_queue_arn,404                                "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},405                            }406                        ]407                    }408                )409            },410        )411    return _allow_sns_topic412@pytest.fixture413def sns_create_sqs_subscription(sns_client, sqs_client, sns_allow_topic_sqs_queue):414    subscriptions = []415    def _factory(topic_arn: str, queue_url: str) -> Dict[str, str]:416        queue_arn = sqs_client.get_queue_attributes(417            QueueUrl=queue_url, AttributeNames=["QueueArn"]418        )["Attributes"]["QueueArn"]419        # connect sns topic to sqs420        subscription = sns_client.subscribe(421            TopicArn=topic_arn,422            Protocol="sqs",423            Endpoint=queue_arn,424        )425        subscription_arn = subscription["SubscriptionArn"]426        # allow topic to write to sqs queue427        sns_allow_topic_sqs_queue(428            sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn429        )430        subscriptions.append(subscription_arn)431        return sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)[432            "Attributes"433        ]434    yield _factory435    for arn in subscriptions:436        try:437            sns_client.unsubscribe(SubscriptionArn=arn)438        except Exception as e:439            LOG.error("error cleaning up subscription %s: %s", arn, e)440@pytest.fixture441def sns_create_http_endpoint(sns_client, sns_create_topic, sns_subscription):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
