How to use sns_allow_topic_sqs_queue method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

...598 ):599 dlq_url = sqs_create_queue()600 dlq_arn = sqs_queue_arn(dlq_url)601 topic_arn = sns_create_topic()["TopicArn"]602 sns_allow_topic_sqs_queue(603 sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn604 )605 lambda_name = f"test-{short_uid()}"606 lambda_arn = create_lambda_function(607 func_name=lambda_name,608 libs=TEST_LAMBDA_LIBS,609 handler_file=TEST_LAMBDA_PYTHON,610 runtime=LAMBDA_RUNTIME_PYTHON37,611 role=lambda_su_role,612 )["CreateFunctionResponse"]["FunctionArn"]613 subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)614 sns_client.set_subscription_attributes(615 SubscriptionArn=subscription["SubscriptionArn"],616 AttributeName="RedrivePolicy",617 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),618 )619 lambda_client.delete_function(FunctionName=lambda_name)620 sns_client.publish(621 TopicArn=topic_arn,622 Message=json.dumps({"message": "test_redrive_policy"}),623 )624 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)625 assert (626 len(response["Messages"]) == 1627 ), f"invalid number of messages in DLQ response {response}"628 message = json.loads(response["Messages"][0]["Body"])629 assert message["Type"] == "Notification"630 assert json.loads(message["Message"])["message"] == "test_redrive_policy"631 @pytest.mark.aws_validated632 def test_publish_with_empty_subject(self, sns_client, sns_create_topic):633 topic_arn = sns_create_topic()["TopicArn"]634 # Publish without subject635 rs = sns_client.publish(TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"}))636 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200637 with pytest.raises(ClientError) as e:638 sns_client.publish(639 TopicArn=topic_arn,640 Subject="",641 Message=json.dumps({"message": "test_publish"}),642 )643 assert e.value.response["Error"]["Code"] == "InvalidParameter"644 assert e.value.response["Error"]["Message"] == "Invalid parameter: Subject"645 # todo test with snapshot to aws validate it646 def test_create_topic_test_arn(self, sns_create_topic, sns_client):647 topic_name = f"topic-{short_uid()}"648 response = sns_create_topic(Name=topic_name)649 topic_arn_params = response["TopicArn"].split(":")650 testutil.response_arn_matches_partition(sns_client, response["TopicArn"])651 assert topic_arn_params[4] == get_aws_account_id()652 assert topic_arn_params[5] == topic_name653 @pytest.mark.aws_validated654 def test_publish_message_by_target_arn(655 self,656 sns_client,657 sqs_client,658 sns_create_topic,659 sqs_create_queue,660 sns_create_sqs_subscription,661 ):662 # using an SQS subscription to test TopicArn/TargetArn as it is easier to check against AWS663 topic_arn = sns_create_topic()["TopicArn"]664 queue_url = sqs_create_queue()665 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)666 sns_client.publish(TopicArn=topic_arn, Message="test-msg-1")667 response = sqs_client.receive_message(668 QueueUrl=queue_url,669 MessageAttributeNames=["All"],670 VisibilityTimeout=0,671 WaitTimeSeconds=4,672 )673 assert len(response["Messages"]) == 1674 message = response["Messages"][0]675 msg_body = json.loads(message["Body"])676 assert msg_body["Message"] == "test-msg-1"677 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])678 # publish with TargetArn instead of TopicArn679 sns_client.publish(TargetArn=topic_arn, Message="test-msg-2")680 response = sqs_client.receive_message(681 QueueUrl=queue_url,682 MessageAttributeNames=["All"],683 VisibilityTimeout=0,684 WaitTimeSeconds=4,685 )686 assert len(response["Messages"]) == 1687 message = response["Messages"][0]688 msg_body = json.loads(message["Body"])689 assert msg_body["Message"] == "test-msg-2"690 @pytest.mark.aws_validated691 def test_publish_message_before_subscribe_topic(692 self,693 sns_client,694 sns_create_topic,695 sqs_client,696 sqs_create_queue,697 sns_subscription,698 sns_create_sqs_subscription,699 ):700 topic_arn = sns_create_topic()["TopicArn"]701 queue_url = sqs_create_queue()702 rs = sns_client.publish(703 TopicArn=topic_arn, Subject="test subject", Message="test_message_1"704 )705 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200706 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)707 message_subject = "sqs subject"708 message_body = "test_message_2"709 rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body)710 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200711 message_id = rs["MessageId"]712 response = sqs_client.receive_message(713 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5714 )715 # nothing was subscribing to the topic, so the first message is lost716 assert len(response["Messages"]) == 1717 message = json.loads(response["Messages"][0]["Body"])718 assert message["MessageId"] == message_id719 assert message["Subject"] == message_subject720 assert message["Message"] == message_body721 @pytest.mark.aws_validated722 def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic):723 topic_name = f"test-{short_uid()}"724 sns_create_topic(Name=topic_name)725 with pytest.raises(ClientError) as e:726 sns_client.create_topic(Name=topic_name, Tags=[{"Key": "key1", "Value": "value1"}])727 assert e.value.response["Error"]["Code"] == "InvalidParameter"728 assert (729 e.value.response["Error"]["Message"]730 == "Invalid parameter: Tags Reason: Topic already exists with different tags"731 )732 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400733 @pytest.mark.aws_validated734 def test_create_duplicate_topic_check_idempotency(self, sns_create_topic):735 topic_name = f"test-{short_uid()}"736 tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]737 kwargs = [738 {"Tags": tags}, # to create the same topic again with same tags739 {"Tags": [tags[0]]}, # to create the same topic again with one of the tags from above740 {"Tags": []}, # to create the same topic again with no tags741 ]742 # create topic with two tags743 response = sns_create_topic(Name=topic_name, Tags=tags)744 topic_arn = response["TopicArn"]745 for arg in kwargs:746 response = sns_create_topic(Name=topic_name, **arg)747 # assert TopicArn returned by all the above create_topic calls is the same as the original748 assert response["TopicArn"] == topic_arn749 @pytest.mark.only_localstack750 @pytest.mark.skip(751 reason="Idempotency not supported in Moto backend. See bug https://github.com/spulec/moto/issues/2333"752 )753 def test_create_platform_endpoint_check_idempotency(self, sns_client):754 response = sns_client.create_platform_application(755 Name=f"test-{short_uid()}",756 Platform="GCM",757 Attributes={"PlatformCredential": "123"},758 )759 kwargs_list = [760 {"Token": "test1", "CustomUserData": "test-data"},761 {"Token": "test1", "CustomUserData": "test-data"},762 {"Token": "test1"},763 {"Token": "test1"},764 ]765 platform_arn = response["PlatformApplicationArn"]766 responses = []767 for kwargs in kwargs_list:768 responses.append(769 sns_client.create_platform_endpoint(PlatformApplicationArn=platform_arn, **kwargs)770 )771 # Assert endpointarn is returned in every call create platform call772 for i in range(len(responses)):773 assert "EndpointArn" in responses[i]774 endpoint_arn = responses[0]["EndpointArn"]775 # clean up776 sns_client.delete_endpoint(EndpointArn=endpoint_arn)777 sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)778 @pytest.mark.aws_validated779 def test_publish_by_path_parameters(780 self,781 sns_create_topic,782 sns_client,783 sqs_client,784 sqs_create_queue,785 sns_create_sqs_subscription,786 aws_http_client_factory,787 ):788 message = f"test message {short_uid()}"789 topic_arn = sns_create_topic()["TopicArn"]790 queue_url = sqs_create_queue()791 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)792 client = aws_http_client_factory("sns", region="us-east-1")793 if is_aws_cloud():794 endpoint_url = "https://sns.us-east-1.amazonaws.com"795 else:796 endpoint_url = config.get_edge_url()797 response = client.post(798 endpoint_url,799 params={800 "Action": "Publish",801 "Version": "2010-03-31",802 "TopicArn": topic_arn,803 "Message": message,804 },805 )806 assert response.status_code == 200807 assert b"<PublishResponse" in response.content808 messages = sqs_client.receive_message(809 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5810 )["Messages"]811 msg_body = json.loads(messages[0]["Body"])812 assert msg_body["TopicArn"] == topic_arn813 assert msg_body["Message"] == message814 @pytest.mark.only_localstack815 def test_multiple_subscriptions_http_endpoint(816 self, sns_client, sns_create_topic, sns_subscription817 ):818 # create a topic819 topic_arn = sns_create_topic()["TopicArn"]820 # build fake http server endpoints821 _requests = queue.Queue()822 # create HTTP endpoint and connect it to SNS topic823 def handler(request):824 _requests.put(request)825 return Response(status=429)826 number_of_endpoints = 4827 servers = []828 for _ in range(number_of_endpoints):829 server = HTTPServer()830 server.start()831 servers.append(server)832 server.expect_request("/").respond_with_handler(handler)833 http_endpoint = server.url_for("/")834 wait_for_port_open(http_endpoint)835 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)836 # fetch subscription information837 subscription_list = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)838 assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200839 assert (840 len(subscription_list["Subscriptions"]) == number_of_endpoints841 ), f"unexpected number of subscriptions {subscription_list}"842 for _ in range(number_of_endpoints):843 request = _requests.get(timeout=2)844 assert request.get_json(True)["TopicArn"] == topic_arn845 with pytest.raises(queue.Empty):846 # make sure only four requests are received847 _requests.get(timeout=1)848 for server in servers:849 server.stop()850 @pytest.mark.only_localstack851 def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription):852 list_of_contacts = [853 f"+{random.randint(100000000, 9999999999)}",854 f"+{random.randint(100000000, 9999999999)}",855 f"+{random.randint(100000000, 9999999999)}",856 ]857 message = "Good news everyone!"858 topic_arn = sns_create_topic()["TopicArn"]859 for number in list_of_contacts:860 sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=number)861 sns_client.publish(Message=message, TopicArn=topic_arn)862 sns_backend = SNSBackend.get()863 def check_messages():864 sms_messages = sns_backend.sms_messages865 for contact in list_of_contacts:866 sms_was_found = False867 for message in sms_messages:868 if message["endpoint"] == contact:869 sms_was_found = True870 break871 assert sms_was_found872 retry(check_messages, sleep=0.5)873 @pytest.mark.aws_validated874 def test_publish_sqs_from_sns(875 self,876 sns_client,877 sqs_client,878 sns_create_topic,879 sqs_create_queue,880 sns_create_sqs_subscription,881 ):882 topic_arn = sns_create_topic()["TopicArn"]883 queue_url = sqs_create_queue()884 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)885 subscription_arn = subscription["SubscriptionArn"]886 sns_client.set_subscription_attributes(887 SubscriptionArn=subscription_arn,888 AttributeName="RawMessageDelivery",889 AttributeValue="true",890 )891 string_value = "99.12"892 sns_client.publish(893 TopicArn=topic_arn,894 Message="Test msg",895 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},896 )897 response = sqs_client.receive_message(898 QueueUrl=queue_url,899 MessageAttributeNames=["All"],900 VisibilityTimeout=0,901 WaitTimeSeconds=4,902 )903 # format is of SQS MessageAttributes when RawDelivery is set to "true"904 assert response["Messages"][0]["MessageAttributes"] == {905 "attr1": {"DataType": "Number", "StringValue": string_value}906 }907 sqs_client.delete_message(908 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]909 )910 sns_client.set_subscription_attributes(911 SubscriptionArn=subscription_arn,912 AttributeName="RawMessageDelivery",913 AttributeValue="false",914 )915 string_value = "100.12"916 sns_client.publish(917 TargetArn=topic_arn,918 Message="Test msg",919 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},920 )921 response = sqs_client.receive_message(922 QueueUrl=queue_url,923 MessageAttributeNames=["All"],924 VisibilityTimeout=0,925 WaitTimeSeconds=4,926 )927 message_body = json.loads(response["Messages"][0]["Body"])928 # format is SNS MessageAttributes when RawDelivery is "false"929 assert message_body["MessageAttributes"] == {930 "attr1": {"Type": "Number", "Value": string_value}931 }932 @pytest.mark.aws_validated933 def test_publish_batch_messages_from_sns_to_sqs(934 self,935 sns_client,936 sqs_client,937 sns_create_topic,938 sqs_create_queue,939 sns_create_sqs_subscription,940 ):941 topic_arn = sns_create_topic()["TopicArn"]942 queue_url = sqs_create_queue()943 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)944 subscription_arn = subscription["SubscriptionArn"]945 sns_client.set_subscription_attributes(946 SubscriptionArn=subscription_arn,947 AttributeName="RawMessageDelivery",948 AttributeValue="true",949 )950 publish_batch_response = sns_client.publish_batch(951 TopicArn=topic_arn,952 PublishBatchRequestEntries=[953 {954 "Id": "1",955 "Message": "Test Message with two attributes",956 "Subject": "Subject",957 "MessageAttributes": {958 "attr1": {"DataType": "Number", "StringValue": "99.12"},959 "attr2": {"DataType": "Number", "StringValue": "109.12"},960 },961 },962 {963 "Id": "2",964 "Message": "Test Message with one attribute",965 "Subject": "Subject",966 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},967 },968 {969 "Id": "3",970 "Message": "Test Message without attribute",971 "Subject": "Subject",972 },973 {974 "Id": "4",975 "Message": "Test Message without subject",976 },977 ],978 )979 assert "Successful" in publish_batch_response980 assert len(publish_batch_response["Successful"]) == 4981 assert "Failed" in publish_batch_response982 for successful_resp in publish_batch_response["Successful"]:983 assert "Id" in successful_resp984 assert "MessageId" in successful_resp985 message_received = set()986 def get_messages():987 response = sqs_client.receive_message(988 QueueUrl=queue_url, MessageAttributeNames=["All"], WaitTimeSeconds=1989 )990 for message in response["Messages"]:991 message_received.add(message["MessageId"])992 assert "Body" in message993 if message["Body"] == "Test Message with two attributes":994 assert len(message["MessageAttributes"]) == 2995 assert message["MessageAttributes"]["attr1"] == {996 "StringValue": "99.12",997 "DataType": "Number",998 }999 assert message["MessageAttributes"]["attr2"] == {1000 "StringValue": "109.12",1001 "DataType": "Number",1002 }1003 elif message["Body"] == "Test Message with one attribute":1004 assert len(message["MessageAttributes"]) == 11005 assert message["MessageAttributes"]["attr1"] == {1006 "StringValue": "19.12",1007 "DataType": "Number",1008 }1009 elif message["Body"] == "Test Message without attribute":1010 assert message.get("MessageAttributes") is None1011 assert len(message_received) == 41012 retry(get_messages, retries=3, sleep=1)1013 @pytest.mark.aws_validated1014 def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(1015 self,1016 sns_client,1017 sns_create_topic,1018 sqs_client,1019 sqs_create_queue,1020 sns_create_sqs_subscription,1021 ):1022 topic_name = f"topic-{short_uid()}.fifo"1023 queue_name = f"queue-{short_uid()}.fifo"1024 topic_arn = sns_create_topic(1025 Name=topic_name,1026 Attributes={1027 "FifoTopic": "true",1028 "ContentBasedDeduplication": "true",1029 },1030 )["TopicArn"]1031 queue_url = sqs_create_queue(1032 QueueName=queue_name,1033 Attributes={1034 "FifoQueue": "true",1035 "ContentBasedDeduplication": "true",1036 },1037 )1038 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1039 subscription_arn = subscription["SubscriptionArn"]1040 sns_client.set_subscription_attributes(1041 SubscriptionArn=subscription_arn,1042 AttributeName="RawMessageDelivery",1043 AttributeValue="true",1044 )1045 message_group_id = "complexMessageGroupId"1046 publish_batch_response = sns_client.publish_batch(1047 TopicArn=topic_arn,1048 PublishBatchRequestEntries=[1049 {1050 "Id": "1",1051 "MessageGroupId": message_group_id,1052 "Message": "Test Message with two attributes",1053 "Subject": "Subject",1054 "MessageAttributes": {1055 "attr1": {"DataType": "Number", "StringValue": "99.12"},1056 "attr2": {"DataType": "Number", "StringValue": "109.12"},1057 },1058 },1059 {1060 "Id": "2",1061 "MessageGroupId": message_group_id,1062 "Message": "Test Message with one attribute",1063 "Subject": "Subject",1064 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},1065 },1066 {1067 "Id": "3",1068 "MessageGroupId": message_group_id,1069 "Message": "Test Message without attribute",1070 "Subject": "Subject",1071 },1072 ],1073 )1074 assert "Successful" in publish_batch_response1075 assert "Failed" in publish_batch_response1076 for successful_resp in publish_batch_response["Successful"]:1077 assert "Id" in successful_resp1078 assert "MessageId" in successful_resp1079 def get_messages(queue_url):1080 response = sqs_client.receive_message(1081 QueueUrl=queue_url,1082 MessageAttributeNames=["All"],1083 AttributeNames=["All"],1084 MaxNumberOfMessages=10,1085 )1086 assert len(response["Messages"]) == 31087 for msg_index, message in enumerate(response["Messages"]):1088 assert "Body" in message1089 assert message["Attributes"]["MessageGroupId"] == message_group_id1090 if message["Body"] == "Test Message with two attributes":1091 assert msg_index == 01092 assert len(message["MessageAttributes"]) == 21093 assert message["MessageAttributes"]["attr1"] == {1094 "StringValue": "99.12",1095 "DataType": "Number",1096 }1097 assert message["MessageAttributes"]["attr2"] == {1098 "StringValue": "109.12",1099 "DataType": "Number",1100 }1101 elif message["Body"] == "Test Message with one attribute":1102 assert msg_index == 11103 assert len(message["MessageAttributes"]) == 11104 assert message["MessageAttributes"]["attr1"] == {1105 "StringValue": "19.12",1106 "DataType": "Number",1107 }1108 elif message["Body"] == "Test Message without attribute":1109 assert msg_index == 21110 assert message.get("MessageAttributes") is None1111 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1112 # todo add test for deduplication1113 # https://docs.aws.amazon.com/cli/latest/reference/sns/publish-batch.html1114 # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1115 @pytest.mark.aws_validated1116 def test_publish_batch_exceptions(1117 self,1118 sns_client,1119 sqs_client,1120 sns_create_topic,1121 sqs_create_queue,1122 sns_create_sqs_subscription,1123 ):1124 topic_name = f"topic-{short_uid()}.fifo"1125 queue_name = f"queue-{short_uid()}.fifo"1126 topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1127 queue_url = sqs_create_queue(1128 QueueName=queue_name,1129 Attributes={"FifoQueue": "true"},1130 )1131 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1132 subscription_arn = subscription["SubscriptionArn"]1133 sns_client.set_subscription_attributes(1134 SubscriptionArn=subscription_arn,1135 AttributeName="RawMessageDelivery",1136 AttributeValue="true",1137 )1138 with pytest.raises(ClientError) as e:1139 sns_client.publish_batch(1140 TopicArn=topic_arn,1141 PublishBatchRequestEntries=[1142 {1143 "Id": "1",1144 "Message": "Test message without Group ID",1145 }1146 ],1147 )1148 assert e.value.response["Error"]["Code"] == "InvalidParameter"1149 assert (1150 e.value.response["Error"]["Message"]1151 == "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"1152 )1153 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001154 with pytest.raises(ClientError) as e:1155 sns_client.publish_batch(1156 TopicArn=topic_arn,1157 PublishBatchRequestEntries=[1158 {"Id": f"Id_{i}", "Message": "Too many messages"} for i in range(11)1159 ],1160 )1161 assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"1162 assert (1163 e.value.response["Error"]["Message"]1164 == "The batch request contains more entries than permissible."1165 )1166 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001167 with pytest.raises(ClientError) as e:1168 sns_client.publish_batch(1169 TopicArn=topic_arn,1170 PublishBatchRequestEntries=[1171 {"Id": "1", "Message": "Messages with the same ID"} for i in range(2)1172 ],1173 )1174 assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1175 assert (1176 e.value.response["Error"]["Message"]1177 == "Two or more batch entries in the request have the same Id."1178 )1179 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001180 # todo add test and implement behaviour for ContentBasedDeduplication or MessageDeduplicationId1181 def add_xray_header(self, request, **kwargs):1182 request.headers[1183 "X-Amzn-Trace-Id"1184 ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1185 def test_publish_sqs_from_sns_with_xray_propagation(1186 self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1187 ):1188 # TODO: remove or adapt for asf1189 if SQS_BACKEND_IMPL != "elasticmq":1190 pytest.skip("not using elasticmq as SQS backend")1191 sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1192 topic = sns_create_topic()1193 topic_arn = topic["TopicArn"]1194 queue_url = sqs_create_queue()1195 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1196 sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1197 response = sqs_client.receive_message(1198 QueueUrl=queue_url,1199 AttributeNames=["SentTimestamp", "AWSTraceHeader"],1200 MaxNumberOfMessages=1,1201 MessageAttributeNames=["All"],1202 VisibilityTimeout=2,1203 WaitTimeSeconds=2,1204 )1205 assert len(response["Messages"]) == 11206 message = response["Messages"][0]1207 assert "Attributes" in message1208 assert "AWSTraceHeader" in message["Attributes"]1209 assert (1210 message["Attributes"]["AWSTraceHeader"]1211 == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1212 )1213 @pytest.mark.aws_validated1214 def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client):1215 topic_name = f"test-{short_uid()}"1216 topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}])1217 sns_client.delete_topic(TopicArn=topic["TopicArn"])1218 topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])1219 assert topic["TopicArn"] == topic1["TopicArn"]1220 @pytest.mark.aws_validated1221 def test_not_found_error_on_set_subscription_attributes(1222 self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription1223 ):1224 topic_arn = sns_create_topic()["TopicArn"]1225 queue_url = sqs_create_queue()1226 queue_arn = sqs_queue_arn(queue_url)1227 subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1228 subscription_arn = subscription["SubscriptionArn"]1229 subscription_attributes = sns_client.get_subscription_attributes(1230 SubscriptionArn=subscription_arn1231 )["Attributes"]1232 assert subscription_attributes["SubscriptionArn"] == subscription_arn1233 subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1234 assert len(subscriptions_by_topic["Subscriptions"]) == 11235 sns_client.unsubscribe(SubscriptionArn=subscription_arn)1236 def check_subscription_deleted():1237 try:1238 # AWS doesn't give NotFound error on GetSubscriptionAttributes for a while, might be cached1239 sns_client.set_subscription_attributes(1240 SubscriptionArn=subscription_arn,1241 AttributeName="RawMessageDelivery",1242 AttributeValue="true",1243 )1244 raise Exception("Subscription is not deleted")1245 except ClientError as e:1246 assert e.response["Error"]["Code"] == "NotFound"1247 assert e.response["ResponseMetadata"]["HTTPStatusCode"] == 4041248 retry(check_subscription_deleted, retries=10, sleep_before=0.2, sleep=3)1249 subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1250 assert len(subscriptions_by_topic["Subscriptions"]) == 01251 @pytest.mark.aws_validated1252 def test_message_to_fifo_sqs(1253 self,1254 sns_client,1255 sqs_client,1256 sns_create_topic,1257 sqs_create_queue,1258 sns_create_sqs_subscription,1259 ):1260 topic_name = f"topic-{short_uid()}.fifo"1261 queue_name = f"queue-{short_uid()}.fifo"1262 topic_arn = sns_create_topic(1263 Name=topic_name,1264 Attributes={1265 "FifoTopic": "true",1266 "ContentBasedDeduplication": "true", # todo: not enforced yet1267 },1268 )["TopicArn"]1269 queue_url = sqs_create_queue(1270 QueueName=queue_name,1271 Attributes={1272 "FifoQueue": "true",1273 "ContentBasedDeduplication": "true",1274 },1275 )1276 # todo check both ContentBasedDeduplication and MessageDeduplicationId when implemented1277 # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1278 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1279 message = "Test"1280 sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1281 messages = sqs_client.receive_message(1282 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=101283 )["Messages"]1284 msg_body = messages[0]["Body"]1285 assert json.loads(msg_body)["Message"] == message1286 @pytest.mark.aws_validated1287 def test_validations_for_fifo(1288 self,1289 sns_client,1290 sqs_client,1291 sns_create_topic,1292 sqs_create_queue,1293 sns_create_sqs_subscription,1294 ):1295 topic_name = f"topic-{short_uid()}"1296 fifo_topic_name = f"topic-{short_uid()}.fifo"1297 fifo_queue_name = f"queue-{short_uid()}.fifo"1298 topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]1299 fifo_topic_arn = sns_create_topic(Name=fifo_topic_name, Attributes={"FifoTopic": "true"})[1300 "TopicArn"1301 ]1302 fifo_queue_url = sqs_create_queue(1303 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1304 )1305 with pytest.raises(ClientError) as e:1306 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=fifo_queue_url)1307 assert e.match("standard SNS topic")1308 with pytest.raises(ClientError) as e:1309 sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1310 assert e.match("MessageGroupId")1311 with pytest.raises(ClientError) as e:1312 sns_client.publish(TopicArn=fifo_topic_arn, Message="test", MessageGroupId=short_uid())1313 # if ContentBasedDeduplication is not set at the topic level, it needs MessageDeduplicationId for each msg1314 assert e.match("MessageDeduplicationId")1315 assert e.match("ContentBasedDeduplication")1316 with pytest.raises(ClientError) as e:1317 sns_client.publish(1318 TopicArn=topic_arn, Message="test", MessageDeduplicationId=short_uid()1319 )1320 assert e.match("MessageDeduplicationId")1321 with pytest.raises(ClientError) as e:1322 sns_client.publish(TopicArn=topic_arn, Message="test", MessageGroupId=short_uid())1323 assert e.match("MessageGroupId")1324 @pytest.mark.aws_validated1325 def test_empty_sns_message(1326 self,1327 sns_client,1328 sqs_client,1329 sns_create_topic,1330 sqs_create_queue,1331 sns_create_sqs_subscription,1332 ):1333 topic_arn = sns_create_topic()["TopicArn"]1334 queue_url = sqs_create_queue()1335 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1336 with pytest.raises(ClientError) as e:1337 sns_client.publish(Message="", TopicArn=topic_arn)1338 assert e.match("Empty message")1339 assert (1340 sqs_client.get_queue_attributes(1341 QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1342 )["Attributes"]["ApproximateNumberOfMessages"]1343 == "0"1344 )1345 @pytest.mark.parametrize("raw_message_delivery", [True, False])1346 @pytest.mark.aws_validated1347 def test_redrive_policy_sqs_queue_subscription(1348 self,1349 sns_client,1350 sqs_client,1351 sns_create_topic,1352 sqs_create_queue,1353 sqs_queue_arn,1354 sqs_queue_exists,1355 sns_create_sqs_subscription,1356 sns_allow_topic_sqs_queue,1357 raw_message_delivery,1358 snapshot,1359 ):1360 snapshot.add_transformer(snapshot.transform.sqs_api())1361 # Need to skip the MD5OfBody/Signature, because it contains a timestamp1362 snapshot.add_transformer(1363 snapshot.transform.key_value(1364 "Signature",1365 "<signature>",1366 reference_replacement=False,1367 )1368 )1369 snapshot.add_transformer(1370 snapshot.transform.key_value("MD5OfBody", "<md5-hash>", reference_replacement=False)1371 )1372 topic_arn = sns_create_topic()["TopicArn"]1373 queue_url = sqs_create_queue()1374 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1375 dlq_url = sqs_create_queue()1376 dlq_arn = sqs_queue_arn(dlq_url)1377 sns_client.set_subscription_attributes(1378 SubscriptionArn=subscription["SubscriptionArn"],1379 AttributeName="RedrivePolicy",1380 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1381 )1382 if raw_message_delivery:1383 sns_client.set_subscription_attributes(1384 SubscriptionArn=subscription["SubscriptionArn"],1385 AttributeName="RawMessageDelivery",1386 AttributeValue="true",1387 )1388 sns_allow_topic_sqs_queue(1389 sqs_queue_url=dlq_url,1390 sqs_queue_arn=dlq_arn,1391 sns_topic_arn=topic_arn,1392 )1393 sqs_client.delete_queue(QueueUrl=queue_url)1394 # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1395 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1396 message = "test_dlq_after_sqs_endpoint_deleted"1397 message_attr = {1398 "attr1": {1399 "DataType": "Number",1400 "StringValue": "111",1401 },1402 "attr2": {1403 "DataType": "Binary",1404 "BinaryValue": b"\x02\x03\x04",1405 },1406 }1407 sns_client.publish(TopicArn=topic_arn, Message=message, MessageAttributes=message_attr)1408 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)1409 assert (1410 len(response["Messages"]) == 11411 ), f"invalid number of messages in DLQ response {response}"1412 if raw_message_delivery:1413 assert response["Messages"][0]["Body"] == message1414 # MessageAttributes are lost with RawDelivery in AWS1415 assert "MessageAttributes" not in response["Messages"][0]1416 snapshot.match("raw_message_delivery", response)1417 else:1418 received_message = json.loads(response["Messages"][0]["Body"])1419 assert received_message["Type"] == "Notification"1420 assert received_message["Message"] == message1421 # Set the decoded JSON Body to be able to skip keys directly1422 response["Messages"][0]["Body"] = received_message1423 snapshot.match("json_encoded_delivery", response)1424 @pytest.mark.aws_validated1425 def test_message_attributes_not_missing(1426 self,1427 sns_client,1428 sqs_client,1429 sns_create_sqs_subscription,1430 sns_create_topic,1431 sqs_create_queue,1432 ):1433 topic_arn = sns_create_topic()["TopicArn"]1434 queue_url = sqs_create_queue()1435 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1436 assert subscription["SubscriptionArn"]1437 sns_client.set_subscription_attributes(1438 SubscriptionArn=subscription["SubscriptionArn"],1439 AttributeName="RawMessageDelivery",1440 AttributeValue="true",1441 )1442 attributes = {1443 "an-attribute-key": {"DataType": "String", "StringValue": "an-attribute-value"},1444 "binary-attribute": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"},1445 }1446 publish_response = sns_client.publish(1447 TopicArn=topic_arn,1448 Message="text",1449 MessageAttributes=attributes,1450 )1451 assert publish_response["MessageId"]1452 msg = sqs_client.receive_message(1453 QueueUrl=queue_url,1454 AttributeNames=["All"],1455 MessageAttributeNames=["All"],1456 WaitTimeSeconds=3,1457 )1458 # as SNS piggybacks on SQS MessageAttributes when RawDelivery is true1459 # BinaryValue depends on SQS implementation, and is decoded automatically1460 assert msg["Messages"][0]["MessageAttributes"] == attributes1461 sqs_client.delete_message(1462 QueueUrl=queue_url, ReceiptHandle=msg["Messages"][0]["ReceiptHandle"]1463 )1464 sns_client.set_subscription_attributes(1465 SubscriptionArn=subscription["SubscriptionArn"],1466 AttributeName="RawMessageDelivery",1467 AttributeValue="false",1468 )1469 publish_response = sns_client.publish(1470 TopicArn=topic_arn,1471 Message="text",1472 MessageAttributes=attributes,1473 )1474 assert publish_response["MessageId"]1475 msg = sqs_client.receive_message(1476 QueueUrl=queue_url,1477 AttributeNames=["All"],1478 MessageAttributeNames=["All"],1479 WaitTimeSeconds=3,1480 )1481 assert json.loads(msg["Messages"][0]["Body"])["MessageAttributes"] == {1482 "an-attribute-key": {"Type": "String", "Value": "an-attribute-value"},1483 "binary-attribute": {1484 # binary payload in base64 encoded by AWS, UTF-8 for JSON1485 # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1486 # need to be decoded manually as it's part of the message Body1487 "Type": "Binary",1488 "Value": b64encode(b"\x02\x03\x04").decode("utf-8"),1489 },1490 }1491 @pytest.mark.only_localstack1492 @pytest.mark.aws_validated1493 @pytest.mark.parametrize("raw_message_delivery", [True, False])1494 def test_subscribe_external_http_endpoint(1495 self,1496 sns_client,1497 sns_create_http_endpoint,1498 raw_message_delivery,1499 ):1500 # Necessitate manual set up to allow external access to endpoint, only in local testing1501 topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1502 raw_message_delivery1503 )1504 assert poll_condition(1505 lambda: len(server.log) >= 1,1506 timeout=5,1507 )1508 sub_request, _ = server.log[0]1509 payload = sub_request.get_json(force=True)1510 assert payload["Type"] == "SubscriptionConfirmation"1511 assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1512 assert "Signature" in payload1513 assert "SigningCertURL" in payload1514 token = payload["Token"]1515 subscribe_url = payload["SubscribeURL"]1516 service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1517 assert subscribe_url == (1518 f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1519 )1520 confirm_subscribe_request = requests.get(subscribe_url)1521 confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1522 assert (1523 confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1524 "SubscriptionArn"1525 ]1526 == subscription_arn1527 )1528 subscription_attributes = sns_client.get_subscription_attributes(1529 SubscriptionArn=subscription_arn1530 )1531 assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1532 message = "test_external_http_endpoint"1533 sns_client.publish(TopicArn=topic_arn, Message=message)1534 assert poll_condition(1535 lambda: len(server.log) >= 2,1536 timeout=5,1537 )1538 notification_request, _ = server.log[1]1539 assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1540 expected_unsubscribe_url = (1541 f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1542 )1543 if raw_message_delivery:1544 payload = notification_request.data.decode()1545 assert payload == message1546 else:1547 payload = notification_request.get_json(force=True)1548 assert payload["Type"] == "Notification"1549 assert "Signature" in payload1550 assert "SigningCertURL" in payload1551 assert payload["Message"] == message1552 assert payload["UnsubscribeURL"] == expected_unsubscribe_url1553 unsub_request = requests.get(expected_unsubscribe_url)1554 unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1555 assert "UnsubscribeResponse" in unsubscribe_confirmation1556 assert poll_condition(1557 lambda: len(server.log) >= 3,1558 timeout=5,1559 )1560 unsub_request, _ = server.log[2]1561 payload = unsub_request.get_json(force=True)1562 assert payload["Type"] == "UnsubscribeConfirmation"1563 assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1564 assert "Signature" in payload1565 assert "SigningCertURL" in payload1566 token = payload["Token"]1567 assert payload["SubscribeURL"] == (1568 f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1569 )1570 @pytest.mark.only_localstack1571 @pytest.mark.parametrize("raw_message_delivery", [True, False])1572 def test_dlq_external_http_endpoint(1573 self,1574 sns_client,1575 sqs_client,1576 sns_create_topic,1577 sqs_create_queue,1578 sqs_queue_arn,1579 sns_subscription,1580 sns_create_http_endpoint,1581 sns_create_sqs_subscription,1582 sns_allow_topic_sqs_queue,1583 raw_message_delivery,1584 ):1585 # Necessitate manual set up to allow external access to endpoint, only in local testing1586 topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1587 raw_message_delivery1588 )1589 dlq_url = sqs_create_queue()1590 dlq_arn = sqs_queue_arn(dlq_url)1591 sns_allow_topic_sqs_queue(1592 sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1593 )1594 sns_client.set_subscription_attributes(1595 SubscriptionArn=http_subscription_arn,1596 AttributeName="RedrivePolicy",1597 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1598 )1599 assert poll_condition(1600 lambda: len(server.log) >= 1,1601 timeout=5,1602 )1603 sub_request, _ = server.log[0]1604 payload = sub_request.get_json(force=True)1605 assert payload["Type"] == "SubscriptionConfirmation"...

Full Screen

Full Screen

fixtures.py

Source:fixtures.py Github

copy

Full Screen

...386def sns_topic(sns_client, sns_create_topic) -> "GetTopicAttributesResponseTypeDef":387 topic_arn = sns_create_topic()["TopicArn"]388 return sns_client.get_topic_attributes(TopicArn=topic_arn)389@pytest.fixture390def sns_allow_topic_sqs_queue(sqs_client):391 def _allow_sns_topic(sqs_queue_url, sqs_queue_arn, sns_topic_arn) -> None:392 # allow topic to write to sqs queue393 sqs_client.set_queue_attributes(394 QueueUrl=sqs_queue_url,395 Attributes={396 "Policy": json.dumps(397 {398 "Statement": [399 {400 "Effect": "Allow",401 "Principal": {"Service": "sns.amazonaws.com"},402 "Action": "sqs:SendMessage",403 "Resource": sqs_queue_arn,404 "Condition": {"ArnEquals": {"aws:SourceArn": sns_topic_arn}},405 }406 ]407 }408 )409 },410 )411 return _allow_sns_topic412@pytest.fixture413def sns_create_sqs_subscription(sns_client, sqs_client, sns_allow_topic_sqs_queue):414 subscriptions = []415 def _factory(topic_arn: str, queue_url: str) -> Dict[str, str]:416 queue_arn = sqs_client.get_queue_attributes(417 QueueUrl=queue_url, AttributeNames=["QueueArn"]418 )["Attributes"]["QueueArn"]419 # connect sns topic to sqs420 subscription = sns_client.subscribe(421 TopicArn=topic_arn,422 Protocol="sqs",423 Endpoint=queue_arn,424 )425 subscription_arn = subscription["SubscriptionArn"]426 # allow topic to write to sqs queue427 sns_allow_topic_sqs_queue(428 sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn429 )430 subscriptions.append(subscription_arn)431 return sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)[432 "Attributes"433 ]434 yield _factory435 for arn in subscriptions:436 try:437 sns_client.unsubscribe(SubscriptionArn=arn)438 except Exception as e:439 LOG.error("error cleaning up subscription %s: %s", arn, e)440@pytest.fixture441def sns_create_http_endpoint(sns_client, sns_create_topic, sns_subscription):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful