How to use sns_create_sqs_subscription method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

...98 sns_create_sqs_subscription,99 ):100 topic_arn = sns_create_topic()["TopicArn"]101 queue_url = sqs_create_queue()102 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)103 # publish message to SNS, receive it from SQS, assert that messages are equal104 message = 'ö§a1"_!?,. £$-'105 sns_client.publish(TopicArn=topic_arn, Message=message)106 response = sqs_client.receive_message(107 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4108 )109 msg_received = response["Messages"][0]110 msg_received = json.loads(to_str(msg_received["Body"]))111 msg_received = msg_received["Message"]112 assert message == msg_received113 @pytest.mark.aws_validated114 def test_subscribe_with_invalid_protocol(self, sns_client, sns_create_topic, sns_subscription):115 topic_arn = sns_create_topic()["TopicArn"]116 with pytest.raises(ClientError) as e:117 sns_subscription(118 TopicArn=topic_arn, Protocol="test-protocol", Endpoint="localstack@yopmail.com"119 )120 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400121 assert e.value.response["Error"]["Code"] == "InvalidParameter"122 @pytest.mark.aws_validated123 def test_attribute_raw_subscribe(124 self,125 sqs_client,126 sns_client,127 sns_create_topic,128 sqs_create_queue,129 sns_create_sqs_subscription,130 ):131 topic_arn = sns_create_topic()["TopicArn"]132 queue_url = sqs_create_queue()133 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)134 subscription_arn = subscription["SubscriptionArn"]135 sns_client.set_subscription_attributes(136 SubscriptionArn=subscription_arn,137 AttributeName="RawMessageDelivery",138 AttributeValue="true",139 )140 actual_attributes = sns_client.get_subscription_attributes(141 SubscriptionArn=subscription_arn142 )["Attributes"]143 # assert the attributes are well set144 assert actual_attributes["RawMessageDelivery"]145 # publish message to SNS, receive it from SQS, assert that messages are equal and that they are Raw146 message = "This is a test message"147 binary_attribute = b"\x02\x03\x04"148 # extending this test case to test support for binary message attribute data149 # https://github.com/localstack/localstack/issues/2432150 sns_client.publish(151 TopicArn=topic_arn,152 Message=message,153 MessageAttributes={"store": {"DataType": "Binary", "BinaryValue": binary_attribute}},154 )155 response = sqs_client.receive_message(156 QueueUrl=queue_url,157 MessageAttributeNames=["All"],158 VisibilityTimeout=0,159 WaitTimeSeconds=4,160 )161 msg_received = response["Messages"][0]162 assert message == msg_received["Body"]163 # MessageAttributes are attached to the message when RawDelivery is true164 assert binary_attribute == msg_received["MessageAttributes"]["store"]["BinaryValue"]165 @pytest.mark.aws_validated166 def test_filter_policy(167 self,168 sns_client,169 sqs_client,170 sqs_create_queue,171 sns_create_topic,172 sns_create_sqs_subscription,173 ):174 topic_arn = sns_create_topic()["TopicArn"]175 queue_url = sqs_create_queue()176 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)177 subscription_arn = subscription["SubscriptionArn"]178 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}179 sns_client.set_subscription_attributes(180 SubscriptionArn=subscription_arn,181 AttributeName="FilterPolicy",182 AttributeValue=json.dumps(filter_policy),183 )184 # get number of messages185 num_msgs_0 = len(186 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])187 )188 # publish message that satisfies the filter policy, assert that message is received189 message = "This is a test message"190 message_attributes = {"attr1": {"DataType": "Number", "StringValue": "99"}}191 sns_client.publish(192 TopicArn=topic_arn,193 Message=message,194 MessageAttributes=message_attributes,195 )196 msgs_1 = sqs_client.receive_message(197 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4198 )["Messages"]199 num_msgs_1 = len(msgs_1)200 assert num_msgs_1 == (num_msgs_0 + 1)201 # publish message that does not satisfy the filter policy, assert that message is not received202 message = "This is another test message"203 sns_client.publish(204 TopicArn=topic_arn,205 Message=message,206 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},207 )208 num_msgs_2 = len(209 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4)[210 "Messages"211 ]212 )213 assert num_msgs_2 == num_msgs_1214 @pytest.mark.aws_validated215 def test_exists_filter_policy(216 self,217 sns_client,218 sqs_client,219 sqs_create_queue,220 sns_create_topic,221 sns_create_sqs_subscription,222 ):223 topic_arn = sns_create_topic()["TopicArn"]224 queue_url = sqs_create_queue()225 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)226 subscription_arn = subscription["SubscriptionArn"]227 filter_policy = {"store": [{"exists": True}]}228 sns_client.set_subscription_attributes(229 SubscriptionArn=subscription_arn,230 AttributeName="FilterPolicy",231 AttributeValue=json.dumps(filter_policy),232 )233 # get number of messages234 num_msgs_0 = len(235 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])236 )237 # publish message that satisfies the filter policy, assert that message is received238 message_1 = f"message-{short_uid()}"239 sns_client.publish(240 TopicArn=topic_arn,241 Message=message_1,242 MessageAttributes={243 "store": {"DataType": "Number", "StringValue": "99"},244 "def": {"DataType": "Number", "StringValue": "99"},245 },246 )247 msgs_1 = sqs_client.receive_message(248 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4249 )["Messages"]250 num_msgs_1 = len(msgs_1)251 assert message_1 == json.loads(msgs_1[0]["Body"])["Message"]252 assert num_msgs_1 == (num_msgs_0 + 1)253 # publish message that does not satisfy the filter policy, assert that message is not received254 message_2 = f"message-{short_uid()}"255 sns_client.publish(256 TopicArn=topic_arn,257 Message=message_2,258 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},259 )260 msgs_2 = sqs_client.receive_message(261 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4262 )["Messages"]263 num_msgs_2 = len(msgs_2)264 # assert that it's still the same message that #1265 assert json.loads(msgs_2[0]["Body"])["Message"] == message_1266 assert num_msgs_2 == num_msgs_1267 # delete first message268 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=msgs_1[0]["ReceiptHandle"])269 # test with exist operator set to false.270 filter_policy = json.dumps({"store": [{"exists": False}]})271 sns_client.set_subscription_attributes(272 SubscriptionArn=subscription_arn,273 AttributeName="FilterPolicy",274 AttributeValue=filter_policy,275 )276 def get_filter_policy():277 subscription_attrs = sns_client.get_subscription_attributes(278 SubscriptionArn=subscription_arn279 )280 return subscription_attrs["Attributes"]["FilterPolicy"] == filter_policy281 # wait for the new filter policy to be in effect282 poll_condition(lambda: get_filter_policy() == filter_policy, timeout=4)283 # publish message that satisfies the filter policy, assert that message is received284 message_3 = f"message-{short_uid()}"285 sns_client.publish(286 TopicArn=topic_arn,287 Message=message_3,288 MessageAttributes={"def": {"DataType": "Number", "StringValue": "99"}},289 )290 msgs_3 = sqs_client.receive_message(291 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4292 )["Messages"]293 num_msgs_3 = len(msgs_3)294 # assert that it is not the the same message that #1295 assert json.loads(msgs_3[0]["Body"])["Message"] == message_3296 assert num_msgs_3 == num_msgs_1297 # publish message that does not satisfy the filter policy, assert that message is not received298 message_4 = f"message-{short_uid()}"299 sns_client.publish(300 TopicArn=topic_arn,301 Message=message_4,302 MessageAttributes={303 "store": {"DataType": "Number", "StringValue": "99"},304 "def": {"DataType": "Number", "StringValue": "99"},305 },306 )307 msgs_4 = sqs_client.receive_message(308 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=4309 )["Messages"]310 num_msgs_4 = len(msgs_4)311 # assert that it's still the same message that #3312 assert json.loads(msgs_4[0]["Body"])["Message"] == message_3313 assert num_msgs_4 == num_msgs_3314 @pytest.mark.aws_validated315 def test_subscribe_sqs_queue(316 self,317 sns_client,318 sqs_client,319 sqs_create_queue,320 sns_create_topic,321 sns_create_sqs_subscription,322 ):323 # TODO: check with non default external port324 topic_arn = sns_create_topic()["TopicArn"]325 queue_url = sqs_create_queue()326 # create subscription with filter policy327 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)328 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}329 sns_client.set_subscription_attributes(330 SubscriptionArn=subscription["SubscriptionArn"],331 AttributeName="FilterPolicy",332 AttributeValue=json.dumps(filter_policy),333 )334 # publish message that satisfies the filter policy335 message = "This is a test message"336 sns_client.publish(337 TopicArn=topic_arn,338 Message=message,339 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},340 )341 # assert that message is received342 response = sqs_client.receive_message(343 QueueUrl=queue_url,344 VisibilityTimeout=0,345 MessageAttributeNames=["All"],346 WaitTimeSeconds=4,347 )348 message = response["Messages"][0]349 message_body = json.loads(message["Body"])350 assert message_body["MessageAttributes"]["attr1"]["Value"] == "99.12"351 @pytest.mark.only_localstack352 def test_subscribe_platform_endpoint(353 self, sns_client, sqs_create_queue, sns_create_topic, sns_subscription354 ):355 sns_backend = SNSBackend.get()356 topic_arn = sns_create_topic()["TopicArn"]357 app_arn = sns_client.create_platform_application(Name="app1", Platform="p1", Attributes={})[358 "PlatformApplicationArn"359 ]360 platform_arn = sns_client.create_platform_endpoint(361 PlatformApplicationArn=app_arn, Token="token_1"362 )["EndpointArn"]363 # create subscription with filter policy364 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}365 subscription = sns_subscription(366 TopicArn=topic_arn,367 Protocol="application",368 Endpoint=platform_arn,369 Attributes={"FilterPolicy": json.dumps(filter_policy)},370 )371 # publish message that satisfies the filter policy372 message = "This is a test message"373 sns_client.publish(374 TopicArn=topic_arn,375 Message=message,376 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},377 )378 # assert that message has been received379 def check_message():380 assert len(sns_backend.platform_endpoint_messages[platform_arn]) > 0381 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)382 # clean up383 sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])384 sns_client.delete_endpoint(EndpointArn=platform_arn)385 sns_client.delete_platform_application(PlatformApplicationArn=app_arn)386 @pytest.mark.aws_validated387 def test_unknown_topic_publish(self, sns_client, sns_create_topic):388 # create topic to get the basic arn structure389 # otherwise you get InvalidClientTokenId exception because of account id390 topic_arn = sns_create_topic()["TopicArn"]391 # append to get an unknown topic392 fake_arn = f"{topic_arn}-fake"393 message = "This is a test message"394 with pytest.raises(ClientError) as e:395 sns_client.publish(TopicArn=fake_arn, Message=message)396 assert e.value.response["Error"]["Code"] == "NotFound"397 assert e.value.response["Error"]["Message"] == "Topic does not exist"398 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404399 @pytest.mark.only_localstack400 def test_publish_sms(self, sns_client):401 response = sns_client.publish(PhoneNumber="+33000000000", Message="This is a SMS")402 assert "MessageId" in response403 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200404 def test_publish_non_existent_target(self, sns_client):405 with pytest.raises(ClientError) as ex:406 sns_client.publish(407 TargetArn="arn:aws:sns:us-east-1:000000000000:endpoint/APNS/abcdef/0f7d5971-aa8b-4bd5-b585-0826e9f93a66",408 Message="This is a push notification",409 )410 assert ex.value.response["Error"]["Code"] == "InvalidClientTokenId"411 @pytest.mark.aws_validated412 def test_tags(self, sns_client, sns_create_topic, snapshot):413 topic_arn = sns_create_topic()["TopicArn"]414 with pytest.raises(ClientError) as exc:415 sns_client.tag_resource(416 ResourceArn=topic_arn,417 Tags=[418 {"Key": "k1", "Value": "v1"},419 {"Key": "k2", "Value": "v2"},420 {"Key": "k2", "Value": "v2"},421 ],422 )423 snapshot.match("duplicate-key-error", exc.value.response)424 sns_client.tag_resource(425 ResourceArn=topic_arn,426 Tags=[427 {"Key": "k1", "Value": "v1"},428 {"Key": "k2", "Value": "v2"},429 ],430 )431 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)432 # could not figure out the logic for tag order in AWS, so resorting to sorting it manually in place433 tags["Tags"].sort(key=itemgetter("Key"))434 snapshot.match("list-created-tags", tags)435 sns_client.untag_resource(ResourceArn=topic_arn, TagKeys=["k1"])436 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)437 snapshot.match("list-after-delete-tags", tags)438 # test update tag439 sns_client.tag_resource(ResourceArn=topic_arn, Tags=[{"Key": "k2", "Value": "v2b"}])440 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)441 snapshot.match("list-after-update-tags", tags)442 @pytest.mark.only_localstack443 def test_topic_subscription(self, sns_client, sns_create_topic, sns_subscription):444 topic_arn = sns_create_topic()["TopicArn"]445 subscription = sns_subscription(446 TopicArn=topic_arn,447 Protocol="email",448 Endpoint="localstack@yopmail.com",449 )450 sns_backend = SNSBackend.get()451 def check_subscription():452 subscription_arn = subscription["SubscriptionArn"]453 subscription_obj = sns_backend.subscription_status[subscription_arn]454 assert subscription_obj["Status"] == "Not Subscribed"455 _token = subscription_obj["Token"]456 sns_client.confirm_subscription(TopicArn=topic_arn, Token=_token)457 assert subscription_obj["Status"] == "Subscribed"458 retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)459 @pytest.mark.aws_validated460 def test_sqs_topic_subscription_confirmation(461 self, sns_client, sns_create_topic, sqs_create_queue, sns_create_sqs_subscription462 ):463 topic_arn = sns_create_topic()["TopicArn"]464 queue_url = sqs_create_queue()465 subscription_attrs = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)466 def check_subscription():467 nonlocal subscription_attrs468 if not subscription_attrs["PendingConfirmation"] == "false":469 subscription_arn = subscription_attrs["SubscriptionArn"]470 subscription_attrs = sns_client.get_subscription_attributes(471 SubscriptionArn=subscription_arn472 )["Attributes"]473 return subscription_attrs["PendingConfirmation"] == "false"474 # SQS subscriptions are auto confirmed if they are from the user and in the same region475 assert poll_condition(check_subscription, timeout=5)476 @pytest.mark.aws_validated477 def test_sns_topic_as_lambda_dead_letter_queue(478 self,479 sns_client,480 sqs_client,481 lambda_client,482 lambda_su_role,483 create_lambda_function,484 sns_create_topic,485 sqs_create_queue,486 sns_subscription,487 sns_create_sqs_subscription,488 ):489 # create an SNS topic that will be used as a DLQ by the lambda490 dlq_topic_arn = sns_create_topic()["TopicArn"]491 queue_url = sqs_create_queue()492 # sqs_subscription493 sns_create_sqs_subscription(topic_arn=dlq_topic_arn, queue_url=queue_url)494 # create an SNS topic that will be used to invoke the lambda495 lambda_topic_arn = sns_create_topic()["TopicArn"]496 function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"497 lambda_creation_response = create_lambda_function(498 func_name=function_name,499 handler_file=TEST_LAMBDA_PYTHON,500 runtime=LAMBDA_RUNTIME_PYTHON37,501 role=lambda_su_role,502 DeadLetterConfig={"TargetArn": dlq_topic_arn},503 )504 lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]505 # allow the SNS topic to invoke the lambda506 permission_id = f"test-statement-{short_uid()}"507 lambda_client.add_permission(508 FunctionName=function_name,509 StatementId=permission_id,510 Action="lambda:InvokeFunction",511 Principal="sns.amazonaws.com",512 SourceArn=lambda_topic_arn,513 )514 # subscribe the lambda to the SNS topic: lambda_subscription515 sns_subscription(516 TopicArn=lambda_topic_arn,517 Protocol="lambda",518 Endpoint=lambda_arn,519 )520 payload = {521 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,522 }523 sns_client.publish(TopicArn=lambda_topic_arn, Message=json.dumps(payload))524 def receive_dlq():525 result = sqs_client.receive_message(526 QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0527 )528 assert len(result["Messages"]) > 0529 msg_body = json.loads(result["Messages"][0]["Body"])530 msg_attrs = msg_body["MessageAttributes"]531 assert "RequestID" in msg_attrs532 assert "ErrorCode" in msg_attrs533 assert "ErrorMessage" in msg_attrs534 # check that the SQS queue subscribed to the SNS topic used as DLQ received the error from the lambda535 # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here536 # reduced retries when using localstack to avoid tests flaking537 retries = 120 if is_aws_cloud() else 3538 retry(receive_dlq, retries=retries, sleep=3)539 @pytest.mark.only_localstack540 def test_redrive_policy_http_subscription(541 self,542 sns_client,543 sns_create_topic,544 sqs_client,545 sqs_create_queue,546 sqs_queue_arn,547 sns_subscription,548 ):549 dlq_name = f"dlq-{short_uid()}"550 dlq_url = sqs_create_queue(QueueName=dlq_name)551 dlq_arn = sqs_queue_arn(dlq_url)552 topic_arn = sns_create_topic()["TopicArn"]553 # create HTTP endpoint and connect it to SNS topic554 with HTTPServer() as server:555 server.expect_request("/subscription").respond_with_data(b"", 200)556 http_endpoint = server.url_for("/subscription")557 wait_for_port_open(server.port)558 subscription = sns_subscription(559 TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint560 )561 sns_client.set_subscription_attributes(562 SubscriptionArn=subscription["SubscriptionArn"],563 AttributeName="RedrivePolicy",564 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),565 )566 # wait for subscription notification to arrive at http endpoint567 poll_condition(lambda: len(server.log) >= 1, timeout=10)568 request, _ = server.log[0]569 event = request.get_json(force=True)570 assert request.path.endswith("/subscription")571 assert event["Type"] == "SubscriptionConfirmation"572 assert event["TopicArn"] == topic_arn573 wait_for_port_closed(server.port)574 sns_client.publish(575 TopicArn=topic_arn,576 Message=json.dumps({"message": "test_redrive_policy"}),577 )578 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)579 assert (580 len(response["Messages"]) == 1581 ), f"invalid number of messages in DLQ response {response}"582 message = json.loads(response["Messages"][0]["Body"])583 assert message["Type"] == "Notification"584 assert json.loads(message["Message"])["message"] == "test_redrive_policy"585 @pytest.mark.aws_validated586 def test_redrive_policy_lambda_subscription(587 self,588 sns_client,589 sns_create_topic,590 sqs_create_queue,591 sqs_queue_arn,592 lambda_client,593 create_lambda_function,594 lambda_su_role,595 sqs_client,596 sns_subscription,597 sns_allow_topic_sqs_queue,598 ):599 dlq_url = sqs_create_queue()600 dlq_arn = sqs_queue_arn(dlq_url)601 topic_arn = sns_create_topic()["TopicArn"]602 sns_allow_topic_sqs_queue(603 sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn604 )605 lambda_name = f"test-{short_uid()}"606 lambda_arn = create_lambda_function(607 func_name=lambda_name,608 libs=TEST_LAMBDA_LIBS,609 handler_file=TEST_LAMBDA_PYTHON,610 runtime=LAMBDA_RUNTIME_PYTHON37,611 role=lambda_su_role,612 )["CreateFunctionResponse"]["FunctionArn"]613 subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)614 sns_client.set_subscription_attributes(615 SubscriptionArn=subscription["SubscriptionArn"],616 AttributeName="RedrivePolicy",617 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),618 )619 lambda_client.delete_function(FunctionName=lambda_name)620 sns_client.publish(621 TopicArn=topic_arn,622 Message=json.dumps({"message": "test_redrive_policy"}),623 )624 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)625 assert (626 len(response["Messages"]) == 1627 ), f"invalid number of messages in DLQ response {response}"628 message = json.loads(response["Messages"][0]["Body"])629 assert message["Type"] == "Notification"630 assert json.loads(message["Message"])["message"] == "test_redrive_policy"631 @pytest.mark.aws_validated632 def test_publish_with_empty_subject(self, sns_client, sns_create_topic):633 topic_arn = sns_create_topic()["TopicArn"]634 # Publish without subject635 rs = sns_client.publish(TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"}))636 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200637 with pytest.raises(ClientError) as e:638 sns_client.publish(639 TopicArn=topic_arn,640 Subject="",641 Message=json.dumps({"message": "test_publish"}),642 )643 assert e.value.response["Error"]["Code"] == "InvalidParameter"644 assert e.value.response["Error"]["Message"] == "Invalid parameter: Subject"645 # todo test with snapshot to aws validate it646 def test_create_topic_test_arn(self, sns_create_topic, sns_client):647 topic_name = f"topic-{short_uid()}"648 response = sns_create_topic(Name=topic_name)649 topic_arn_params = response["TopicArn"].split(":")650 testutil.response_arn_matches_partition(sns_client, response["TopicArn"])651 assert topic_arn_params[4] == get_aws_account_id()652 assert topic_arn_params[5] == topic_name653 @pytest.mark.aws_validated654 def test_publish_message_by_target_arn(655 self,656 sns_client,657 sqs_client,658 sns_create_topic,659 sqs_create_queue,660 sns_create_sqs_subscription,661 ):662 # using an SQS subscription to test TopicArn/TargetArn as it is easier to check against AWS663 topic_arn = sns_create_topic()["TopicArn"]664 queue_url = sqs_create_queue()665 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)666 sns_client.publish(TopicArn=topic_arn, Message="test-msg-1")667 response = sqs_client.receive_message(668 QueueUrl=queue_url,669 MessageAttributeNames=["All"],670 VisibilityTimeout=0,671 WaitTimeSeconds=4,672 )673 assert len(response["Messages"]) == 1674 message = response["Messages"][0]675 msg_body = json.loads(message["Body"])676 assert msg_body["Message"] == "test-msg-1"677 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])678 # publish with TargetArn instead of TopicArn679 sns_client.publish(TargetArn=topic_arn, Message="test-msg-2")680 response = sqs_client.receive_message(681 QueueUrl=queue_url,682 MessageAttributeNames=["All"],683 VisibilityTimeout=0,684 WaitTimeSeconds=4,685 )686 assert len(response["Messages"]) == 1687 message = response["Messages"][0]688 msg_body = json.loads(message["Body"])689 assert msg_body["Message"] == "test-msg-2"690 @pytest.mark.aws_validated691 def test_publish_message_before_subscribe_topic(692 self,693 sns_client,694 sns_create_topic,695 sqs_client,696 sqs_create_queue,697 sns_subscription,698 sns_create_sqs_subscription,699 ):700 topic_arn = sns_create_topic()["TopicArn"]701 queue_url = sqs_create_queue()702 rs = sns_client.publish(703 TopicArn=topic_arn, Subject="test subject", Message="test_message_1"704 )705 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200706 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)707 message_subject = "sqs subject"708 message_body = "test_message_2"709 rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body)710 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200711 message_id = rs["MessageId"]712 response = sqs_client.receive_message(713 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5714 )715 # nothing was subscribing to the topic, so the first message is lost716 assert len(response["Messages"]) == 1717 message = json.loads(response["Messages"][0]["Body"])718 assert message["MessageId"] == message_id719 assert message["Subject"] == message_subject720 assert message["Message"] == message_body721 @pytest.mark.aws_validated722 def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic):723 topic_name = f"test-{short_uid()}"724 sns_create_topic(Name=topic_name)725 with pytest.raises(ClientError) as e:726 sns_client.create_topic(Name=topic_name, Tags=[{"Key": "key1", "Value": "value1"}])727 assert e.value.response["Error"]["Code"] == "InvalidParameter"728 assert (729 e.value.response["Error"]["Message"]730 == "Invalid parameter: Tags Reason: Topic already exists with different tags"731 )732 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400733 @pytest.mark.aws_validated734 def test_create_duplicate_topic_check_idempotency(self, sns_create_topic):735 topic_name = f"test-{short_uid()}"736 tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]737 kwargs = [738 {"Tags": tags}, # to create the same topic again with same tags739 {"Tags": [tags[0]]}, # to create the same topic again with one of the tags from above740 {"Tags": []}, # to create the same topic again with no tags741 ]742 # create topic with two tags743 response = sns_create_topic(Name=topic_name, Tags=tags)744 topic_arn = response["TopicArn"]745 for arg in kwargs:746 response = sns_create_topic(Name=topic_name, **arg)747 # assert TopicArn returned by all the above create_topic calls is the same as the original748 assert response["TopicArn"] == topic_arn749 @pytest.mark.only_localstack750 @pytest.mark.skip(751 reason="Idempotency not supported in Moto backend. See bug https://github.com/spulec/moto/issues/2333"752 )753 def test_create_platform_endpoint_check_idempotency(self, sns_client):754 response = sns_client.create_platform_application(755 Name=f"test-{short_uid()}",756 Platform="GCM",757 Attributes={"PlatformCredential": "123"},758 )759 kwargs_list = [760 {"Token": "test1", "CustomUserData": "test-data"},761 {"Token": "test1", "CustomUserData": "test-data"},762 {"Token": "test1"},763 {"Token": "test1"},764 ]765 platform_arn = response["PlatformApplicationArn"]766 responses = []767 for kwargs in kwargs_list:768 responses.append(769 sns_client.create_platform_endpoint(PlatformApplicationArn=platform_arn, **kwargs)770 )771 # Assert endpointarn is returned in every call create platform call772 for i in range(len(responses)):773 assert "EndpointArn" in responses[i]774 endpoint_arn = responses[0]["EndpointArn"]775 # clean up776 sns_client.delete_endpoint(EndpointArn=endpoint_arn)777 sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)778 @pytest.mark.aws_validated779 def test_publish_by_path_parameters(780 self,781 sns_create_topic,782 sns_client,783 sqs_client,784 sqs_create_queue,785 sns_create_sqs_subscription,786 aws_http_client_factory,787 ):788 message = f"test message {short_uid()}"789 topic_arn = sns_create_topic()["TopicArn"]790 queue_url = sqs_create_queue()791 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)792 client = aws_http_client_factory("sns", region="us-east-1")793 if is_aws_cloud():794 endpoint_url = "https://sns.us-east-1.amazonaws.com"795 else:796 endpoint_url = config.get_edge_url()797 response = client.post(798 endpoint_url,799 params={800 "Action": "Publish",801 "Version": "2010-03-31",802 "TopicArn": topic_arn,803 "Message": message,804 },805 )806 assert response.status_code == 200807 assert b"<PublishResponse" in response.content808 messages = sqs_client.receive_message(809 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=5810 )["Messages"]811 msg_body = json.loads(messages[0]["Body"])812 assert msg_body["TopicArn"] == topic_arn813 assert msg_body["Message"] == message814 @pytest.mark.only_localstack815 def test_multiple_subscriptions_http_endpoint(816 self, sns_client, sns_create_topic, sns_subscription817 ):818 # create a topic819 topic_arn = sns_create_topic()["TopicArn"]820 # build fake http server endpoints821 _requests = queue.Queue()822 # create HTTP endpoint and connect it to SNS topic823 def handler(request):824 _requests.put(request)825 return Response(status=429)826 number_of_endpoints = 4827 servers = []828 for _ in range(number_of_endpoints):829 server = HTTPServer()830 server.start()831 servers.append(server)832 server.expect_request("/").respond_with_handler(handler)833 http_endpoint = server.url_for("/")834 wait_for_port_open(http_endpoint)835 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)836 # fetch subscription information837 subscription_list = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)838 assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200839 assert (840 len(subscription_list["Subscriptions"]) == number_of_endpoints841 ), f"unexpected number of subscriptions {subscription_list}"842 for _ in range(number_of_endpoints):843 request = _requests.get(timeout=2)844 assert request.get_json(True)["TopicArn"] == topic_arn845 with pytest.raises(queue.Empty):846 # make sure only four requests are received847 _requests.get(timeout=1)848 for server in servers:849 server.stop()850 @pytest.mark.only_localstack851 def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription):852 list_of_contacts = [853 f"+{random.randint(100000000, 9999999999)}",854 f"+{random.randint(100000000, 9999999999)}",855 f"+{random.randint(100000000, 9999999999)}",856 ]857 message = "Good news everyone!"858 topic_arn = sns_create_topic()["TopicArn"]859 for number in list_of_contacts:860 sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=number)861 sns_client.publish(Message=message, TopicArn=topic_arn)862 sns_backend = SNSBackend.get()863 def check_messages():864 sms_messages = sns_backend.sms_messages865 for contact in list_of_contacts:866 sms_was_found = False867 for message in sms_messages:868 if message["endpoint"] == contact:869 sms_was_found = True870 break871 assert sms_was_found872 retry(check_messages, sleep=0.5)873 @pytest.mark.aws_validated874 def test_publish_sqs_from_sns(875 self,876 sns_client,877 sqs_client,878 sns_create_topic,879 sqs_create_queue,880 sns_create_sqs_subscription,881 ):882 topic_arn = sns_create_topic()["TopicArn"]883 queue_url = sqs_create_queue()884 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)885 subscription_arn = subscription["SubscriptionArn"]886 sns_client.set_subscription_attributes(887 SubscriptionArn=subscription_arn,888 AttributeName="RawMessageDelivery",889 AttributeValue="true",890 )891 string_value = "99.12"892 sns_client.publish(893 TopicArn=topic_arn,894 Message="Test msg",895 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},896 )897 response = sqs_client.receive_message(898 QueueUrl=queue_url,899 MessageAttributeNames=["All"],900 VisibilityTimeout=0,901 WaitTimeSeconds=4,902 )903 # format is of SQS MessageAttributes when RawDelivery is set to "true"904 assert response["Messages"][0]["MessageAttributes"] == {905 "attr1": {"DataType": "Number", "StringValue": string_value}906 }907 sqs_client.delete_message(908 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]909 )910 sns_client.set_subscription_attributes(911 SubscriptionArn=subscription_arn,912 AttributeName="RawMessageDelivery",913 AttributeValue="false",914 )915 string_value = "100.12"916 sns_client.publish(917 TargetArn=topic_arn,918 Message="Test msg",919 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},920 )921 response = sqs_client.receive_message(922 QueueUrl=queue_url,923 MessageAttributeNames=["All"],924 VisibilityTimeout=0,925 WaitTimeSeconds=4,926 )927 message_body = json.loads(response["Messages"][0]["Body"])928 # format is SNS MessageAttributes when RawDelivery is "false"929 assert message_body["MessageAttributes"] == {930 "attr1": {"Type": "Number", "Value": string_value}931 }932 @pytest.mark.aws_validated933 def test_publish_batch_messages_from_sns_to_sqs(934 self,935 sns_client,936 sqs_client,937 sns_create_topic,938 sqs_create_queue,939 sns_create_sqs_subscription,940 ):941 topic_arn = sns_create_topic()["TopicArn"]942 queue_url = sqs_create_queue()943 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)944 subscription_arn = subscription["SubscriptionArn"]945 sns_client.set_subscription_attributes(946 SubscriptionArn=subscription_arn,947 AttributeName="RawMessageDelivery",948 AttributeValue="true",949 )950 publish_batch_response = sns_client.publish_batch(951 TopicArn=topic_arn,952 PublishBatchRequestEntries=[953 {954 "Id": "1",955 "Message": "Test Message with two attributes",956 "Subject": "Subject",957 "MessageAttributes": {958 "attr1": {"DataType": "Number", "StringValue": "99.12"},959 "attr2": {"DataType": "Number", "StringValue": "109.12"},960 },961 },962 {963 "Id": "2",964 "Message": "Test Message with one attribute",965 "Subject": "Subject",966 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},967 },968 {969 "Id": "3",970 "Message": "Test Message without attribute",971 "Subject": "Subject",972 },973 {974 "Id": "4",975 "Message": "Test Message without subject",976 },977 ],978 )979 assert "Successful" in publish_batch_response980 assert len(publish_batch_response["Successful"]) == 4981 assert "Failed" in publish_batch_response982 for successful_resp in publish_batch_response["Successful"]:983 assert "Id" in successful_resp984 assert "MessageId" in successful_resp985 message_received = set()986 def get_messages():987 response = sqs_client.receive_message(988 QueueUrl=queue_url, MessageAttributeNames=["All"], WaitTimeSeconds=1989 )990 for message in response["Messages"]:991 message_received.add(message["MessageId"])992 assert "Body" in message993 if message["Body"] == "Test Message with two attributes":994 assert len(message["MessageAttributes"]) == 2995 assert message["MessageAttributes"]["attr1"] == {996 "StringValue": "99.12",997 "DataType": "Number",998 }999 assert message["MessageAttributes"]["attr2"] == {1000 "StringValue": "109.12",1001 "DataType": "Number",1002 }1003 elif message["Body"] == "Test Message with one attribute":1004 assert len(message["MessageAttributes"]) == 11005 assert message["MessageAttributes"]["attr1"] == {1006 "StringValue": "19.12",1007 "DataType": "Number",1008 }1009 elif message["Body"] == "Test Message without attribute":1010 assert message.get("MessageAttributes") is None1011 assert len(message_received) == 41012 retry(get_messages, retries=3, sleep=1)1013 @pytest.mark.aws_validated1014 def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(1015 self,1016 sns_client,1017 sns_create_topic,1018 sqs_client,1019 sqs_create_queue,1020 sns_create_sqs_subscription,1021 ):1022 topic_name = f"topic-{short_uid()}.fifo"1023 queue_name = f"queue-{short_uid()}.fifo"1024 topic_arn = sns_create_topic(1025 Name=topic_name,1026 Attributes={1027 "FifoTopic": "true",1028 "ContentBasedDeduplication": "true",1029 },1030 )["TopicArn"]1031 queue_url = sqs_create_queue(1032 QueueName=queue_name,1033 Attributes={1034 "FifoQueue": "true",1035 "ContentBasedDeduplication": "true",1036 },1037 )1038 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1039 subscription_arn = subscription["SubscriptionArn"]1040 sns_client.set_subscription_attributes(1041 SubscriptionArn=subscription_arn,1042 AttributeName="RawMessageDelivery",1043 AttributeValue="true",1044 )1045 message_group_id = "complexMessageGroupId"1046 publish_batch_response = sns_client.publish_batch(1047 TopicArn=topic_arn,1048 PublishBatchRequestEntries=[1049 {1050 "Id": "1",1051 "MessageGroupId": message_group_id,1052 "Message": "Test Message with two attributes",1053 "Subject": "Subject",1054 "MessageAttributes": {1055 "attr1": {"DataType": "Number", "StringValue": "99.12"},1056 "attr2": {"DataType": "Number", "StringValue": "109.12"},1057 },1058 },1059 {1060 "Id": "2",1061 "MessageGroupId": message_group_id,1062 "Message": "Test Message with one attribute",1063 "Subject": "Subject",1064 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},1065 },1066 {1067 "Id": "3",1068 "MessageGroupId": message_group_id,1069 "Message": "Test Message without attribute",1070 "Subject": "Subject",1071 },1072 ],1073 )1074 assert "Successful" in publish_batch_response1075 assert "Failed" in publish_batch_response1076 for successful_resp in publish_batch_response["Successful"]:1077 assert "Id" in successful_resp1078 assert "MessageId" in successful_resp1079 def get_messages(queue_url):1080 response = sqs_client.receive_message(1081 QueueUrl=queue_url,1082 MessageAttributeNames=["All"],1083 AttributeNames=["All"],1084 MaxNumberOfMessages=10,1085 )1086 assert len(response["Messages"]) == 31087 for msg_index, message in enumerate(response["Messages"]):1088 assert "Body" in message1089 assert message["Attributes"]["MessageGroupId"] == message_group_id1090 if message["Body"] == "Test Message with two attributes":1091 assert msg_index == 01092 assert len(message["MessageAttributes"]) == 21093 assert message["MessageAttributes"]["attr1"] == {1094 "StringValue": "99.12",1095 "DataType": "Number",1096 }1097 assert message["MessageAttributes"]["attr2"] == {1098 "StringValue": "109.12",1099 "DataType": "Number",1100 }1101 elif message["Body"] == "Test Message with one attribute":1102 assert msg_index == 11103 assert len(message["MessageAttributes"]) == 11104 assert message["MessageAttributes"]["attr1"] == {1105 "StringValue": "19.12",1106 "DataType": "Number",1107 }1108 elif message["Body"] == "Test Message without attribute":1109 assert msg_index == 21110 assert message.get("MessageAttributes") is None1111 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1112 # todo add test for deduplication1113 # https://docs.aws.amazon.com/cli/latest/reference/sns/publish-batch.html1114 # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1115 @pytest.mark.aws_validated1116 def test_publish_batch_exceptions(1117 self,1118 sns_client,1119 sqs_client,1120 sns_create_topic,1121 sqs_create_queue,1122 sns_create_sqs_subscription,1123 ):1124 topic_name = f"topic-{short_uid()}.fifo"1125 queue_name = f"queue-{short_uid()}.fifo"1126 topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1127 queue_url = sqs_create_queue(1128 QueueName=queue_name,1129 Attributes={"FifoQueue": "true"},1130 )1131 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1132 subscription_arn = subscription["SubscriptionArn"]1133 sns_client.set_subscription_attributes(1134 SubscriptionArn=subscription_arn,1135 AttributeName="RawMessageDelivery",1136 AttributeValue="true",1137 )1138 with pytest.raises(ClientError) as e:1139 sns_client.publish_batch(1140 TopicArn=topic_arn,1141 PublishBatchRequestEntries=[1142 {1143 "Id": "1",1144 "Message": "Test message without Group ID",1145 }1146 ],1147 )1148 assert e.value.response["Error"]["Code"] == "InvalidParameter"1149 assert (1150 e.value.response["Error"]["Message"]1151 == "Invalid parameter: The MessageGroupId parameter is required for FIFO topics"1152 )1153 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001154 with pytest.raises(ClientError) as e:1155 sns_client.publish_batch(1156 TopicArn=topic_arn,1157 PublishBatchRequestEntries=[1158 {"Id": f"Id_{i}", "Message": "Too many messages"} for i in range(11)1159 ],1160 )1161 assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"1162 assert (1163 e.value.response["Error"]["Message"]1164 == "The batch request contains more entries than permissible."1165 )1166 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001167 with pytest.raises(ClientError) as e:1168 sns_client.publish_batch(1169 TopicArn=topic_arn,1170 PublishBatchRequestEntries=[1171 {"Id": "1", "Message": "Messages with the same ID"} for i in range(2)1172 ],1173 )1174 assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1175 assert (1176 e.value.response["Error"]["Message"]1177 == "Two or more batch entries in the request have the same Id."1178 )1179 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001180 # todo add test and implement behaviour for ContentBasedDeduplication or MessageDeduplicationId1181 def add_xray_header(self, request, **kwargs):1182 request.headers[1183 "X-Amzn-Trace-Id"1184 ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1185 def test_publish_sqs_from_sns_with_xray_propagation(1186 self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1187 ):1188 # TODO: remove or adapt for asf1189 if SQS_BACKEND_IMPL != "elasticmq":1190 pytest.skip("not using elasticmq as SQS backend")1191 sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1192 topic = sns_create_topic()1193 topic_arn = topic["TopicArn"]1194 queue_url = sqs_create_queue()1195 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1196 sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1197 response = sqs_client.receive_message(1198 QueueUrl=queue_url,1199 AttributeNames=["SentTimestamp", "AWSTraceHeader"],1200 MaxNumberOfMessages=1,1201 MessageAttributeNames=["All"],1202 VisibilityTimeout=2,1203 WaitTimeSeconds=2,1204 )1205 assert len(response["Messages"]) == 11206 message = response["Messages"][0]1207 assert "Attributes" in message1208 assert "AWSTraceHeader" in message["Attributes"]1209 assert (1210 message["Attributes"]["AWSTraceHeader"]1211 == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1212 )1213 @pytest.mark.aws_validated1214 def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client):1215 topic_name = f"test-{short_uid()}"1216 topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}])1217 sns_client.delete_topic(TopicArn=topic["TopicArn"])1218 topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])1219 assert topic["TopicArn"] == topic1["TopicArn"]1220 @pytest.mark.aws_validated1221 def test_not_found_error_on_set_subscription_attributes(1222 self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription1223 ):1224 topic_arn = sns_create_topic()["TopicArn"]1225 queue_url = sqs_create_queue()1226 queue_arn = sqs_queue_arn(queue_url)1227 subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1228 subscription_arn = subscription["SubscriptionArn"]1229 subscription_attributes = sns_client.get_subscription_attributes(1230 SubscriptionArn=subscription_arn1231 )["Attributes"]1232 assert subscription_attributes["SubscriptionArn"] == subscription_arn1233 subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1234 assert len(subscriptions_by_topic["Subscriptions"]) == 11235 sns_client.unsubscribe(SubscriptionArn=subscription_arn)1236 def check_subscription_deleted():1237 try:1238 # AWS doesn't give NotFound error on GetSubscriptionAttributes for a while, might be cached1239 sns_client.set_subscription_attributes(1240 SubscriptionArn=subscription_arn,1241 AttributeName="RawMessageDelivery",1242 AttributeValue="true",1243 )1244 raise Exception("Subscription is not deleted")1245 except ClientError as e:1246 assert e.response["Error"]["Code"] == "NotFound"1247 assert e.response["ResponseMetadata"]["HTTPStatusCode"] == 4041248 retry(check_subscription_deleted, retries=10, sleep_before=0.2, sleep=3)1249 subscriptions_by_topic = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1250 assert len(subscriptions_by_topic["Subscriptions"]) == 01251 @pytest.mark.aws_validated1252 def test_message_to_fifo_sqs(1253 self,1254 sns_client,1255 sqs_client,1256 sns_create_topic,1257 sqs_create_queue,1258 sns_create_sqs_subscription,1259 ):1260 topic_name = f"topic-{short_uid()}.fifo"1261 queue_name = f"queue-{short_uid()}.fifo"1262 topic_arn = sns_create_topic(1263 Name=topic_name,1264 Attributes={1265 "FifoTopic": "true",1266 "ContentBasedDeduplication": "true", # todo: not enforced yet1267 },1268 )["TopicArn"]1269 queue_url = sqs_create_queue(1270 QueueName=queue_name,1271 Attributes={1272 "FifoQueue": "true",1273 "ContentBasedDeduplication": "true",1274 },1275 )1276 # todo check both ContentBasedDeduplication and MessageDeduplicationId when implemented1277 # https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html1278 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1279 message = "Test"1280 sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1281 messages = sqs_client.receive_message(1282 QueueUrl=queue_url, VisibilityTimeout=0, WaitTimeSeconds=101283 )["Messages"]1284 msg_body = messages[0]["Body"]1285 assert json.loads(msg_body)["Message"] == message1286 @pytest.mark.aws_validated1287 def test_validations_for_fifo(1288 self,1289 sns_client,1290 sqs_client,1291 sns_create_topic,1292 sqs_create_queue,1293 sns_create_sqs_subscription,1294 ):1295 topic_name = f"topic-{short_uid()}"1296 fifo_topic_name = f"topic-{short_uid()}.fifo"1297 fifo_queue_name = f"queue-{short_uid()}.fifo"1298 topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]1299 fifo_topic_arn = sns_create_topic(Name=fifo_topic_name, Attributes={"FifoTopic": "true"})[1300 "TopicArn"1301 ]1302 fifo_queue_url = sqs_create_queue(1303 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1304 )1305 with pytest.raises(ClientError) as e:1306 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=fifo_queue_url)1307 assert e.match("standard SNS topic")1308 with pytest.raises(ClientError) as e:1309 sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1310 assert e.match("MessageGroupId")1311 with pytest.raises(ClientError) as e:1312 sns_client.publish(TopicArn=fifo_topic_arn, Message="test", MessageGroupId=short_uid())1313 # if ContentBasedDeduplication is not set at the topic level, it needs MessageDeduplicationId for each msg1314 assert e.match("MessageDeduplicationId")1315 assert e.match("ContentBasedDeduplication")1316 with pytest.raises(ClientError) as e:1317 sns_client.publish(1318 TopicArn=topic_arn, Message="test", MessageDeduplicationId=short_uid()1319 )1320 assert e.match("MessageDeduplicationId")1321 with pytest.raises(ClientError) as e:1322 sns_client.publish(TopicArn=topic_arn, Message="test", MessageGroupId=short_uid())1323 assert e.match("MessageGroupId")1324 @pytest.mark.aws_validated1325 def test_empty_sns_message(1326 self,1327 sns_client,1328 sqs_client,1329 sns_create_topic,1330 sqs_create_queue,1331 sns_create_sqs_subscription,1332 ):1333 topic_arn = sns_create_topic()["TopicArn"]1334 queue_url = sqs_create_queue()1335 sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1336 with pytest.raises(ClientError) as e:1337 sns_client.publish(Message="", TopicArn=topic_arn)1338 assert e.match("Empty message")1339 assert (1340 sqs_client.get_queue_attributes(1341 QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1342 )["Attributes"]["ApproximateNumberOfMessages"]1343 == "0"1344 )1345 @pytest.mark.parametrize("raw_message_delivery", [True, False])1346 @pytest.mark.aws_validated1347 def test_redrive_policy_sqs_queue_subscription(1348 self,1349 sns_client,1350 sqs_client,1351 sns_create_topic,1352 sqs_create_queue,1353 sqs_queue_arn,1354 sqs_queue_exists,1355 sns_create_sqs_subscription,1356 sns_allow_topic_sqs_queue,1357 raw_message_delivery,1358 snapshot,1359 ):1360 snapshot.add_transformer(snapshot.transform.sqs_api())1361 # Need to skip the MD5OfBody/Signature, because it contains a timestamp1362 snapshot.add_transformer(1363 snapshot.transform.key_value(1364 "Signature",1365 "<signature>",1366 reference_replacement=False,1367 )1368 )1369 snapshot.add_transformer(1370 snapshot.transform.key_value("MD5OfBody", "<md5-hash>", reference_replacement=False)1371 )1372 topic_arn = sns_create_topic()["TopicArn"]1373 queue_url = sqs_create_queue()1374 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1375 dlq_url = sqs_create_queue()1376 dlq_arn = sqs_queue_arn(dlq_url)1377 sns_client.set_subscription_attributes(1378 SubscriptionArn=subscription["SubscriptionArn"],1379 AttributeName="RedrivePolicy",1380 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1381 )1382 if raw_message_delivery:1383 sns_client.set_subscription_attributes(1384 SubscriptionArn=subscription["SubscriptionArn"],1385 AttributeName="RawMessageDelivery",1386 AttributeValue="true",1387 )1388 sns_allow_topic_sqs_queue(1389 sqs_queue_url=dlq_url,1390 sqs_queue_arn=dlq_arn,1391 sns_topic_arn=topic_arn,1392 )1393 sqs_client.delete_queue(QueueUrl=queue_url)1394 # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1395 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1396 message = "test_dlq_after_sqs_endpoint_deleted"1397 message_attr = {1398 "attr1": {1399 "DataType": "Number",1400 "StringValue": "111",1401 },1402 "attr2": {1403 "DataType": "Binary",1404 "BinaryValue": b"\x02\x03\x04",1405 },1406 }1407 sns_client.publish(TopicArn=topic_arn, Message=message, MessageAttributes=message_attr)1408 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=10)1409 assert (1410 len(response["Messages"]) == 11411 ), f"invalid number of messages in DLQ response {response}"1412 if raw_message_delivery:1413 assert response["Messages"][0]["Body"] == message1414 # MessageAttributes are lost with RawDelivery in AWS1415 assert "MessageAttributes" not in response["Messages"][0]1416 snapshot.match("raw_message_delivery", response)1417 else:1418 received_message = json.loads(response["Messages"][0]["Body"])1419 assert received_message["Type"] == "Notification"1420 assert received_message["Message"] == message1421 # Set the decoded JSON Body to be able to skip keys directly1422 response["Messages"][0]["Body"] = received_message1423 snapshot.match("json_encoded_delivery", response)1424 @pytest.mark.aws_validated1425 def test_message_attributes_not_missing(1426 self,1427 sns_client,1428 sqs_client,1429 sns_create_sqs_subscription,1430 sns_create_topic,1431 sqs_create_queue,1432 ):1433 topic_arn = sns_create_topic()["TopicArn"]1434 queue_url = sqs_create_queue()1435 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1436 assert subscription["SubscriptionArn"]1437 sns_client.set_subscription_attributes(1438 SubscriptionArn=subscription["SubscriptionArn"],1439 AttributeName="RawMessageDelivery",1440 AttributeValue="true",1441 )1442 attributes = {1443 "an-attribute-key": {"DataType": "String", "StringValue": "an-attribute-value"},1444 "binary-attribute": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"},1445 }1446 publish_response = sns_client.publish(1447 TopicArn=topic_arn,1448 Message="text",1449 MessageAttributes=attributes,...

Full Screen

Full Screen

test_s3_notifications_sns.py

Source:test_s3_notifications_sns.py Github

copy

Full Screen

...91 topic_arn = sns_create_topic()["TopicArn"]92 queue_url = sqs_create_queue()93 key_name = "bucket-key"94 # connect topic to queue95 sns_create_sqs_subscription(topic_arn, queue_url)96 create_sns_bucket_notification(97 s3_client, sns_client, bucket_name, topic_arn, ["s3:ObjectCreated:*"]98 )99 # trigger the events100 s3_client.put_object(Bucket=bucket_name, Key=key_name, Body="first event")101 s3_client.put_object(Bucket=bucket_name, Key=key_name, Body="second event")102 # collect messages103 messages = sqs_collect_sns_messages(sqs_client, queue_url, 2)104 # asserts105 # first event106 message = messages[0]107 assert message["Type"] == "Notification"108 assert message["TopicArn"] == topic_arn109 assert message["Subject"] == "Amazon S3 Notification"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful