How to use sns_client method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import json3import random4import time5import pytest6import requests7from botocore.exceptions import ClientError8from localstack import config9from localstack.config import external_service_url10from localstack.constants import TEST_AWS_ACCOUNT_ID11from localstack.services.generic_proxy import ProxyListener12from localstack.services.infra import start_proxy13from localstack.services.install import SQS_BACKEND_IMPL14from localstack.services.sns.sns_listener import SNSBackend15from localstack.utils import testutil16from localstack.utils.aws import aws_stack17from localstack.utils.common import (18 get_free_tcp_port,19 get_service_protocol,20 retry,21 short_uid,22 to_str,23 wait_for_port_open,24)25from localstack.utils.testutil import check_expected_lambda_log_events_length26from .awslambda.functions import lambda_integration27from .awslambda.test_lambda import (28 LAMBDA_RUNTIME_PYTHON36,29 TEST_LAMBDA_LIBS,30 TEST_LAMBDA_PYTHON,31 TEST_LAMBDA_PYTHON_ECHO,32)33TEST_TOPIC_NAME = "TestTopic_snsTest"34TEST_QUEUE_NAME = "TestQueue_snsTest"35TEST_QUEUE_DLQ_NAME = "TestQueue_DLQ_snsTest"36TEST_TOPIC_NAME_2 = "topic-test-2"37PUBLICATION_TIMEOUT = 0.50038PUBLICATION_RETRIES = 439@pytest.fixture(scope="class")40def setup(request):41 request.cls.sqs_client = aws_stack.create_external_boto_client("sqs")42 request.cls.sns_client = aws_stack.create_external_boto_client("sns")43 request.cls.topic_arn = request.cls.sns_client.create_topic(Name=TEST_TOPIC_NAME)["TopicArn"]44 request.cls.queue_url = request.cls.sqs_client.create_queue(QueueName=TEST_QUEUE_NAME)[45 "QueueUrl"46 ]47 request.cls.dlq_url = request.cls.sqs_client.create_queue(QueueName=TEST_QUEUE_DLQ_NAME)[48 "QueueUrl"49 ]50 yield51 request.cls.sqs_client.delete_queue(QueueUrl=request.cls.queue_url)52 request.cls.sqs_client.delete_queue(QueueUrl=request.cls.dlq_url)53 request.cls.sns_client.delete_topic(TopicArn=request.cls.topic_arn)54@pytest.mark.usefixtures("setup")55class TestSNS:56 def test_publish_unicode_chars(self):57 # connect an SNS topic to a new SQS queue58 _, queue_arn, queue_url = self._create_queue()59 self.sns_client.subscribe(TopicArn=self.topic_arn, Protocol="sqs", Endpoint=queue_arn)60 # publish message to SNS, receive it from SQS, assert that messages are equal61 message = 'ö§a1"_!?,. £$-'62 self.sns_client.publish(TopicArn=self.topic_arn, Message=message)63 def check_message():64 msgs = self.sqs_client.receive_message(QueueUrl=queue_url)65 msg_received = msgs["Messages"][0]66 msg_received = json.loads(to_str(msg_received["Body"]))67 msg_received = msg_received["Message"]68 assert message == msg_received69 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)70 # clean up71 self.sqs_client.delete_queue(QueueUrl=queue_url)72 def test_subscribe_http_endpoint(self):73 # create HTTP endpoint and connect it to SNS topic74 class MyUpdateListener(ProxyListener):75 def forward_request(self, method, path, data, headers):76 records.append((json.loads(to_str(data)), headers))77 return 20078 records = []79 local_port = get_free_tcp_port()80 proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())81 wait_for_port_open(local_port)82 queue_arn = "%s://localhost:%s" % (get_service_protocol(), local_port)83 self.sns_client.subscribe(TopicArn=self.topic_arn, Protocol="http", Endpoint=queue_arn)84 def received():85 assert records[0][0]["Type"] == "SubscriptionConfirmation"86 assert records[0][1]["x-amz-sns-message-type"] == "SubscriptionConfirmation"87 token = records[0][0]["Token"]88 subscribe_url = records[0][0]["SubscribeURL"]89 assert subscribe_url == (90 "%s/?Action=ConfirmSubscription&TopicArn=%s&Token=%s"91 % (external_service_url("sns"), self.topic_arn, token)92 )93 assert "Signature" in records[0][0]94 assert "SigningCertURL" in records[0][0]95 retry(received, retries=5, sleep=1)96 proxy.stop()97 def test_subscribe_with_invalid_protocol(self):98 topic_arn = self.sns_client.create_topic(Name=TEST_TOPIC_NAME_2)["TopicArn"]99 with pytest.raises(ClientError) as e:100 self.sns_client.subscribe(101 TopicArn=topic_arn, Protocol="test-protocol", Endpoint="localstack@yopmail.com"102 )103 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400104 assert e.value.response["Error"]["Code"] == "InvalidParameter"105 # clean up106 self.sns_client.delete_topic(TopicArn=topic_arn)107 def test_attribute_raw_subscribe(self):108 # create SNS topic and connect it to an SQS queue109 queue_arn = aws_stack.sqs_queue_arn(TEST_QUEUE_NAME)110 self.sns_client.subscribe(111 TopicArn=self.topic_arn,112 Protocol="sqs",113 Endpoint=queue_arn,114 Attributes={"RawMessageDelivery": "true"},115 )116 # fetch subscription information117 subscription_list = self.sns_client.list_subscriptions()118 subscription_arn = ""119 for subscription in subscription_list["Subscriptions"]:120 if subscription["TopicArn"] == self.topic_arn:121 subscription_arn = subscription["SubscriptionArn"]122 actual_attributes = self.sns_client.get_subscription_attributes(123 SubscriptionArn=subscription_arn124 )["Attributes"]125 # assert the attributes are well set126 assert actual_attributes["RawMessageDelivery"]127 # publish message to SNS, receive it from SQS, assert that messages are equal and that they are Raw128 message = "This is a test message"129 binary_attribute = b"\x02\x03\x04"130 # extending this test case to test support for binary message attribute data131 # https://github.com/localstack/localstack/issues/2432132 self.sns_client.publish(133 TopicArn=self.topic_arn,134 Message=message,135 MessageAttributes={"store": {"DataType": "Binary", "BinaryValue": binary_attribute}},136 )137 def check_message():138 msgs = self.sqs_client.receive_message(139 QueueUrl=self.queue_url, MessageAttributeNames=["All"]140 )141 msg_received = msgs["Messages"][0]142 assert message == msg_received["Body"]143 assert binary_attribute == msg_received["MessageAttributes"]["store"]["BinaryValue"]144 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)145 def test_filter_policy(self):146 # connect SNS topic to an SQS queue147 queue_name, queue_arn, queue_url = self._create_queue()148 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}149 self.sns_client.subscribe(150 TopicArn=self.topic_arn,151 Protocol="sqs",152 Endpoint=queue_arn,153 Attributes={"FilterPolicy": json.dumps(filter_policy)},154 )155 # get number of messages156 num_msgs_0 = len(self.sqs_client.receive_message(QueueUrl=queue_url).get("Messages", []))157 # publish message that satisfies the filter policy, assert that message is received158 message = "This is a test message"159 self.sns_client.publish(160 TopicArn=self.topic_arn,161 Message=message,162 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99"}},163 )164 def check_message():165 num_msgs_1 = len(166 self.sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]167 )168 assert num_msgs_1 == (num_msgs_0 + 1)169 return num_msgs_1170 num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)171 # publish message that does not satisfy the filter policy, assert that message is not received172 message = "This is a test message"173 self.sns_client.publish(174 TopicArn=self.topic_arn,175 Message=message,176 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},177 )178 def check_message2():179 num_msgs_2 = len(180 self.sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]181 )182 assert num_msgs_2 == num_msgs_1183 return num_msgs_2184 retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)185 # clean up186 self.sqs_client.delete_queue(QueueUrl=queue_url)187 def test_exists_filter_policy(self):188 # connect SNS topic to an SQS queue189 queue_name, queue_arn, queue_url = self._create_queue()190 filter_policy = {"store": [{"exists": True}]}191 def do_subscribe(self, filter_policy, queue_arn):192 self.sns_client.subscribe(193 TopicArn=self.topic_arn,194 Protocol="sqs",195 Endpoint=queue_arn,196 Attributes={"FilterPolicy": json.dumps(filter_policy)},197 )198 do_subscribe(self, filter_policy, queue_arn)199 # get number of messages200 num_msgs_0 = len(self.sqs_client.receive_message(QueueUrl=queue_url).get("Messages", []))201 # publish message that satisfies the filter policy, assert that message is received202 message = "This is a test message"203 self.sns_client.publish(204 TopicArn=self.topic_arn,205 Message=message,206 MessageAttributes={207 "store": {"DataType": "Number", "StringValue": "99"},208 "def": {"DataType": "Number", "StringValue": "99"},209 },210 )211 def check_message1():212 num_msgs_1 = len(213 self.sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]214 )215 assert num_msgs_1 == (num_msgs_0 + 1)216 return num_msgs_1217 num_msgs_1 = retry(check_message1, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)218 # publish message that does not satisfy the filter policy, assert that message is not received219 message = "This is a test message"220 self.sns_client.publish(221 TopicArn=self.topic_arn,222 Message=message,223 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},224 )225 def check_message2():226 num_msgs_2 = len(227 self.sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]228 )229 assert num_msgs_2 == num_msgs_1230 return num_msgs_2231 retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)232 # test with exist operator set to false.233 queue_arn = aws_stack.sqs_queue_arn(TEST_QUEUE_NAME)234 filter_policy = {"store": [{"exists": False}]}235 do_subscribe(self, filter_policy, queue_arn)236 # get number of messages237 num_msgs_0 = len(238 self.sqs_client.receive_message(QueueUrl=self.queue_url).get("Messages", [])239 )240 # publish message with the attribute and see if its getting filtered.241 message = "This is a test message"242 self.sns_client.publish(243 TopicArn=self.topic_arn,244 Message=message,245 MessageAttributes={246 "store": {"DataType": "Number", "StringValue": "99"},247 "def": {"DataType": "Number", "StringValue": "99"},248 },249 )250 def check_message():251 num_msgs_1 = len(252 self.sqs_client.receive_message(QueueUrl=self.queue_url, VisibilityTimeout=0).get(253 "Messages", []254 )255 )256 assert num_msgs_1 == num_msgs_0257 return num_msgs_1258 num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)259 # publish message that without the attribute and see if its getting filtered.260 message = "This is a test message"261 self.sns_client.publish(262 TopicArn=self.topic_arn,263 Message=message,264 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},265 )266 def check_message3():267 num_msgs_2 = len(268 self.sqs_client.receive_message(QueueUrl=self.queue_url, VisibilityTimeout=0).get(269 "Messages", []270 )271 )272 assert num_msgs_2 == num_msgs_1273 return num_msgs_2274 retry(check_message3, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)275 # clean up276 self.sqs_client.delete_queue(QueueUrl=queue_url)277 @pytest.mark.parametrize("external_sqs_port", [None, 12345])278 def test_subscribe_sqs_queue(self, monkeypatch, external_sqs_port):279 _, queue_arn, queue_url = self._create_queue()280 if external_sqs_port:281 monkeypatch.setattr(config, "SQS_PORT_EXTERNAL", external_sqs_port)282 # publish message283 subscription = self._publish_sns_message_with_attrs(queue_arn, "sqs")284 # assert that message is received285 def check_message():286 messages = self.sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)[287 "Messages"288 ]289 assert json.loads(messages[0]["Body"])["MessageAttributes"]["attr1"]["Value"] == "99.12"290 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)291 # clean up292 self.sqs_client.delete_queue(QueueUrl=queue_url)293 self.sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])294 def test_subscribe_platform_endpoint(self):295 sns = self.sns_client296 sns_backend = SNSBackend.get()297 app_arn = sns.create_platform_application(Name="app1", Platform="p1", Attributes={})[298 "PlatformApplicationArn"299 ]300 platform_arn = sns.create_platform_endpoint(301 PlatformApplicationArn=app_arn, Token="token_1"302 )["EndpointArn"]303 subscription = self._publish_sns_message_with_attrs(platform_arn, "application")304 # assert that message has been received305 def check_message():306 assert len(sns_backend.platform_endpoint_messages[platform_arn]) > 0307 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)308 # clean up309 sns.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])310 sns.delete_endpoint(EndpointArn=platform_arn)311 sns.delete_platform_application(PlatformApplicationArn=app_arn)312 def _publish_sns_message_with_attrs(self, endpoint_arn, protocol):313 # create subscription with filter policy314 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}315 subscription = self.sns_client.subscribe(316 TopicArn=self.topic_arn,317 Protocol=protocol,318 Endpoint=endpoint_arn,319 Attributes={"FilterPolicy": json.dumps(filter_policy)},320 )321 # publish message that satisfies the filter policy322 message = "This is a test message"323 self.sns_client.publish(324 TopicArn=self.topic_arn,325 Message=message,326 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},327 )328 time.sleep(PUBLICATION_TIMEOUT)329 return subscription330 def test_unknown_topic_publish(self):331 fake_arn = "arn:aws:sns:us-east-1:123456789012:i_dont_exist"332 message = "This is a test message"333 with pytest.raises(ClientError) as e:334 self.sns_client.publish(TopicArn=fake_arn, Message=message)335 assert e.value.response["Error"]["Code"] == "NotFound"336 assert e.value.response["Error"]["Message"] == "Topic does not exist"337 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404338 def test_publish_sms(self):339 response = self.sns_client.publish(PhoneNumber="+33000000000", Message="This is a SMS")340 assert "MessageId" in response341 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200342 def test_publish_target(self):343 response = self.sns_client.publish(344 TargetArn="arn:aws:sns:us-east-1:000000000000:endpoint/APNS/abcdef/0f7d5971-aa8b-4bd5-b585-0826e9f93a66",345 Message="This is a push notification",346 )347 assert "MessageId" in response348 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200349 def test_tags(self):350 self.sns_client.tag_resource(351 ResourceArn=self.topic_arn,352 Tags=[353 {"Key": "123", "Value": "abc"},354 {"Key": "456", "Value": "def"},355 {"Key": "456", "Value": "def"},356 ],357 )358 tags = self.sns_client.list_tags_for_resource(ResourceArn=self.topic_arn)359 distinct_tags = [360 tag for idx, tag in enumerate(tags["Tags"]) if tag not in tags["Tags"][:idx]361 ]362 # test for duplicate tags363 assert len(tags["Tags"]) == len(distinct_tags)364 assert len(tags["Tags"]) == 2365 assert tags["Tags"][0]["Key"] == "123"366 assert tags["Tags"][0]["Value"] == "abc"367 assert tags["Tags"][1]["Key"] == "456"368 assert tags["Tags"][1]["Value"] == "def"369 self.sns_client.untag_resource(ResourceArn=self.topic_arn, TagKeys=["123"])370 tags = self.sns_client.list_tags_for_resource(ResourceArn=self.topic_arn)371 assert len(tags["Tags"]) == 1372 assert tags["Tags"][0]["Key"] == "456"373 assert tags["Tags"][0]["Value"] == "def"374 self.sns_client.tag_resource(375 ResourceArn=self.topic_arn, Tags=[{"Key": "456", "Value": "pqr"}]376 )377 tags = self.sns_client.list_tags_for_resource(ResourceArn=self.topic_arn)378 assert len(tags["Tags"]) == 1379 assert tags["Tags"][0]["Key"] == "456"380 assert tags["Tags"][0]["Value"] == "pqr"381 def test_topic_subscription(self):382 subscription = self.sns_client.subscribe(383 TopicArn=self.topic_arn, Protocol="email", Endpoint="localstack@yopmail.com"384 )385 sns_backend = SNSBackend.get()386 def check_subscription():387 subscription_arn = subscription["SubscriptionArn"]388 subscription_obj = sns_backend.subscription_status[subscription_arn]389 assert subscription_obj["Status"] == "Not Subscribed"390 _token = subscription_obj["Token"]391 self.sns_client.confirm_subscription(TopicArn=self.topic_arn, Token=_token)392 assert subscription_obj["Status"] == "Subscribed"393 retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)394 def test_dead_letter_queue(self):395 lambda_name = "test-%s" % short_uid()396 lambda_arn = aws_stack.lambda_function_arn(lambda_name)397 topic_name = "test-%s" % short_uid()398 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]399 queue_name = "test-%s" % short_uid()400 queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]401 queue_arn = aws_stack.sqs_queue_arn(queue_name)402 testutil.create_lambda_function(403 func_name=lambda_name,404 handler_file=TEST_LAMBDA_PYTHON,405 libs=TEST_LAMBDA_LIBS,406 runtime=LAMBDA_RUNTIME_PYTHON36,407 DeadLetterConfig={"TargetArn": queue_arn},408 )409 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)410 payload = {411 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,412 }413 self.sns_client.publish(TopicArn=topic_arn, Message=json.dumps(payload))414 def receive_dlq():415 result = self.sqs_client.receive_message(416 QueueUrl=queue_url, MessageAttributeNames=["All"]417 )418 msg_attrs = result["Messages"][0]["MessageAttributes"]419 assert len(result["Messages"]) > 0420 assert "RequestID" in msg_attrs421 assert "ErrorCode" in msg_attrs422 assert "ErrorMessage" in msg_attrs423 retry(receive_dlq, retries=8, sleep=2)424 def unsubscribe_all_from_sns(self):425 for subscription_arn in self.sns_client.list_subscriptions()["Subscriptions"]:426 self.sns_client.unsubscribe(SubscriptionArn=subscription_arn["SubscriptionArn"])427 def test_redrive_policy_http_subscription(self):428 self.unsubscribe_all_from_sns()429 # create HTTP endpoint and connect it to SNS topic430 class MyUpdateListener(ProxyListener):431 def forward_request(self, method, path, data, headers):432 records.append((json.loads(to_str(data)), headers))433 return 200434 records = []435 local_port = get_free_tcp_port()436 proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())437 wait_for_port_open(local_port)438 http_endpoint = "%s://localhost:%s" % (get_service_protocol(), local_port)439 subscription = self.sns_client.subscribe(440 TopicArn=self.topic_arn, Protocol="http", Endpoint=http_endpoint441 )442 self.sns_client.set_subscription_attributes(443 SubscriptionArn=subscription["SubscriptionArn"],444 AttributeName="RedrivePolicy",445 AttributeValue=json.dumps(446 {"deadLetterTargetArn": aws_stack.sqs_queue_arn(TEST_QUEUE_DLQ_NAME)}447 ),448 )449 proxy.stop()450 # for some reason, it takes a long time to stop the proxy thread -> TODO investigate451 time.sleep(5)452 self.sns_client.publish(453 TopicArn=self.topic_arn,454 Message=json.dumps({"message": "test_redrive_policy"}),455 )456 def receive_dlq():457 result = self.sqs_client.receive_message(458 QueueUrl=self.dlq_url, MessageAttributeNames=["All"]459 )460 assert len(result["Messages"]) > 0461 assert (462 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]463 == "test_redrive_policy"464 )465 retry(receive_dlq, retries=7, sleep=2.5)466 def test_redrive_policy_lambda_subscription(self):467 self.unsubscribe_all_from_sns()468 lambda_name = "test-%s" % short_uid()469 lambda_arn = aws_stack.lambda_function_arn(lambda_name)470 testutil.create_lambda_function(471 func_name=lambda_name,472 libs=TEST_LAMBDA_LIBS,473 handler_file=TEST_LAMBDA_PYTHON,474 runtime=LAMBDA_RUNTIME_PYTHON36,475 )476 subscription = self.sns_client.subscribe(477 TopicArn=self.topic_arn, Protocol="lambda", Endpoint=lambda_arn478 )479 self.sns_client.set_subscription_attributes(480 SubscriptionArn=subscription["SubscriptionArn"],481 AttributeName="RedrivePolicy",482 AttributeValue=json.dumps(483 {"deadLetterTargetArn": aws_stack.sqs_queue_arn(TEST_QUEUE_DLQ_NAME)}484 ),485 )486 testutil.delete_lambda_function(lambda_name)487 self.sns_client.publish(488 TopicArn=self.topic_arn,489 Message=json.dumps({"message": "test_redrive_policy"}),490 )491 def receive_dlq():492 result = self.sqs_client.receive_message(493 QueueUrl=self.dlq_url, MessageAttributeNames=["All"]494 )495 assert len(result["Messages"]) > 0496 assert (497 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]498 == "test_redrive_policy"499 )500 retry(receive_dlq, retries=10, sleep=2)501 def test_redrive_policy_queue_subscription(self):502 self.unsubscribe_all_from_sns()503 topic_arn = self.sns_client.create_topic(Name="topic-%s" % short_uid())["TopicArn"]504 invalid_queue_arn = aws_stack.sqs_queue_arn("invalid_queue")505 # subscribe with an invalid queue ARN, to trigger event on DLQ below506 subscription = self.sns_client.subscribe(507 TopicArn=topic_arn, Protocol="sqs", Endpoint=invalid_queue_arn508 )509 self.sns_client.set_subscription_attributes(510 SubscriptionArn=subscription["SubscriptionArn"],511 AttributeName="RedrivePolicy",512 AttributeValue=json.dumps(513 {"deadLetterTargetArn": aws_stack.sqs_queue_arn(TEST_QUEUE_DLQ_NAME)}514 ),515 )516 self.sns_client.publish(517 TopicArn=topic_arn, Message=json.dumps({"message": "test_redrive_policy"})518 )519 def receive_dlq():520 result = self.sqs_client.receive_message(521 QueueUrl=self.dlq_url, MessageAttributeNames=["All"]522 )523 assert len(result["Messages"]) > 0524 assert (525 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]526 == "test_redrive_policy"527 )528 retry(receive_dlq, retries=10, sleep=2)529 def test_publish_with_empty_subject(self):530 topic_arn = self.sns_client.create_topic(Name=TEST_TOPIC_NAME_2)["TopicArn"]531 # Publish without subject532 rs = self.sns_client.publish(533 TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"})534 )535 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200536 with pytest.raises(ClientError) as e:537 self.sns_client.publish(538 TopicArn=topic_arn,539 Subject="",540 Message=json.dumps({"message": "test_publish"}),541 )542 assert e.value.response["Error"]["Code"] == "InvalidParameter"543 # clean up544 self.sns_client.delete_topic(TopicArn=topic_arn)545 def test_create_topic_test_arn(self):546 response = self.sns_client.create_topic(Name=TEST_TOPIC_NAME)547 topic_arn_params = response["TopicArn"].split(":")548 testutil.response_arn_matches_partition(self.sns_client, response["TopicArn"])549 assert topic_arn_params[4] == TEST_AWS_ACCOUNT_ID550 assert topic_arn_params[5] == TEST_TOPIC_NAME551 def test_publish_message_by_target_arn(self):552 self.unsubscribe_all_from_sns()553 topic_name = "queue-{}".format(short_uid())554 func_name = "lambda-%s" % short_uid()555 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]556 testutil.create_lambda_function(557 handler_file=TEST_LAMBDA_PYTHON_ECHO,558 func_name=func_name,559 runtime=LAMBDA_RUNTIME_PYTHON36,560 )561 lambda_arn = aws_stack.lambda_function_arn(func_name)562 subscription_arn = self.sns_client.subscribe(563 TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn564 )["SubscriptionArn"]565 self.sns_client.publish(566 TopicArn=topic_arn, Message="test_message_1", Subject="test subject"567 )568 # Lambda invoked 1 time569 events = retry(570 check_expected_lambda_log_events_length,571 retries=3,572 sleep=1,573 function_name=func_name,574 expected_length=1,575 )576 message = events[0]["Records"][0]577 assert message["EventSubscriptionArn"] == subscription_arn578 self.sns_client.publish(579 TargetArn=topic_arn, Message="test_message_2", Subject="test subject"580 )581 events = retry(582 check_expected_lambda_log_events_length,583 retries=3,584 sleep=1,585 function_name=func_name,586 expected_length=2,587 )588 # Lambda invoked 1 more time589 assert len(events) == 2590 for event in events:591 message = event["Records"][0]592 assert message["EventSubscriptionArn"] == subscription_arn593 # clean up594 self.sns_client.delete_topic(TopicArn=topic_arn)595 lambda_client = aws_stack.create_external_boto_client("lambda")596 lambda_client.delete_function(FunctionName=func_name)597 def test_publish_message_after_subscribe_topic(self):598 self.unsubscribe_all_from_sns()599 topic_name = "queue-{}".format(short_uid())600 queue_name = "test-%s" % short_uid()601 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]602 queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]603 queue_arn = aws_stack.sqs_queue_arn(queue_name)604 rs = self.sns_client.publish(605 TopicArn=topic_arn, Subject="test subject", Message="test_message_1"606 )607 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200608 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)609 message_subject = "sqs subject"610 message_body = "test_message_2"611 rs = self.sns_client.publish(612 TopicArn=topic_arn, Subject=message_subject, Message=message_body613 )614 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200615 message_id = rs["MessageId"]616 def get_message(q_url):617 resp = self.sqs_client.receive_message(QueueUrl=q_url)618 return json.loads(resp["Messages"][0]["Body"])619 message = retry(get_message, retries=3, sleep=2, q_url=queue_url)620 assert message["MessageId"] == message_id621 assert message["Subject"] == message_subject622 assert message["Message"] == message_body623 # clean up624 self.sns_client.delete_topic(TopicArn=topic_arn)625 self.sqs_client.delete_queue(QueueUrl=queue_url)626 def test_create_duplicate_topic_with_different_tags(self):627 topic_name = "test-%s" % short_uid()628 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]629 with pytest.raises(ClientError) as e:630 self.sns_client.create_topic(Name=topic_name, Tags=[{"Key": "456", "Value": "pqr"}])631 assert e.value.response["Error"]["Code"] == "InvalidParameter"632 assert e.value.response["Error"]["Message"] == "Topic already exists with different tags"633 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400634 # clean up635 self.sns_client.delete_topic(TopicArn=topic_arn)636 def test_create_duplicate_topic_check_idempotentness(self):637 topic_name = "test-%s" % short_uid()638 tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]639 kwargs = [640 {"Tags": tags}, # to create topic with two tags641 {"Tags": tags}, # to create the same topic again with same tags642 {"Tags": [tags[0]]}, # to create the same topic again with one of the tags from above643 {"Tags": []}, # to create the same topic again with no tags644 ]645 responses = []646 for arg in kwargs:647 responses.append(self.sns_client.create_topic(Name=topic_name, **arg))648 # assert TopicArn is returned by all the above create_topic calls649 for i in range(len(responses)):650 assert "TopicArn" in responses[i]651 # clean up652 self.sns_client.delete_topic(TopicArn=responses[0]["TopicArn"])653 def test_create_platform_endpoint_check_idempotentness(self):654 response = self.sns_client.create_platform_application(655 Name="test-%s" % short_uid(),656 Platform="GCM",657 Attributes={"PlatformCredential": "123"},658 )659 kwargs_list = [660 {"Token": "test1", "CustomUserData": "test-data"},661 {"Token": "test1", "CustomUserData": "test-data"},662 {"Token": "test1"},663 {"Token": "test1"},664 ]665 platform_arn = response["PlatformApplicationArn"]666 responses = []667 for kwargs in kwargs_list:668 responses.append(669 self.sns_client.create_platform_endpoint(670 PlatformApplicationArn=platform_arn, **kwargs671 )672 )673 # Assert endpointarn is returned in every call create platform call674 for i in range(len(responses)):675 assert "EndpointArn" in responses[i]676 endpoint_arn = responses[0]["EndpointArn"]677 # clean up678 self.sns_client.delete_endpoint(EndpointArn=endpoint_arn)679 self.sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)680 def test_publish_by_path_parameters(self):681 topic_name = "topic-{}".format(short_uid())682 queue_name = "queue-{}".format(short_uid())683 message = "test message {}".format(short_uid())684 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]685 base_url = (686 f"{get_service_protocol()}://{config.LOCALSTACK_HOSTNAME}:{config.service_port('sns')}"687 )688 path = "Action=Publish&Version=2010-03-31&TopicArn={}&Message={}".format(topic_arn, message)689 queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]690 queue_arn = aws_stack.sqs_queue_arn(queue_name)691 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)692 r = requests.post(693 url="{}/?{}".format(base_url, path),694 headers=aws_stack.mock_aws_request_headers("sns"),695 )696 assert r.status_code == 200697 def get_notification(q_url):698 resp = self.sqs_client.receive_message(QueueUrl=q_url)699 return json.loads(resp["Messages"][0]["Body"])700 notification = retry(get_notification, retries=3, sleep=2, q_url=queue_url)701 assert notification["TopicArn"] == topic_arn702 assert notification["Message"] == message703 # clean up704 self.sns_client.delete_topic(TopicArn=topic_arn)705 self.sqs_client.delete_queue(QueueUrl=queue_url)706 def test_multiple_subscriptions_http_endpoint(self):707 self.unsubscribe_all_from_sns()708 # create HTTP endpoint and connect it to SNS topic709 class MyUpdateListener(ProxyListener):710 def forward_request(self, method, path, data, headers):711 records.append((json.loads(to_str(data)), headers))712 return 429713 number_of_subscriptions = 4714 records = []715 proxies = []716 for _ in range(number_of_subscriptions):717 local_port = get_free_tcp_port()718 proxies.append(719 start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())720 )721 wait_for_port_open(local_port)722 http_endpoint = "%s://localhost:%s" % (get_service_protocol(), local_port)723 self.sns_client.subscribe(724 TopicArn=self.topic_arn, Protocol="http", Endpoint=http_endpoint725 )726 # fetch subscription information727 subscription_list = self.sns_client.list_subscriptions()728 assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200729 assert len(subscription_list["Subscriptions"]) == number_of_subscriptions730 assert number_of_subscriptions == len(records)731 for proxy in proxies:732 proxy.stop()733 def _create_queue(self):734 queue_name = "queue-%s" % short_uid()735 queue_arn = aws_stack.sqs_queue_arn(queue_name)736 queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]737 return queue_name, queue_arn, queue_url738 def test_publish_sms_endpoint(self):739 list_of_contacts = [740 "+%d" % random.randint(100000000, 9999999999),741 "+%d" % random.randint(100000000, 9999999999),742 "+%d" % random.randint(100000000, 9999999999),743 ]744 message = "Good news everyone!"745 for number in list_of_contacts:746 self.sns_client.subscribe(TopicArn=self.topic_arn, Protocol="sms", Endpoint=number)747 self.sns_client.publish(Message=message, TopicArn=self.topic_arn)748 sns_backend = SNSBackend.get()749 def check_messages():750 sms_messages = sns_backend.sms_messages751 for contact in list_of_contacts:752 sms_was_found = False753 for message in sms_messages:754 if message["endpoint"] == contact:755 sms_was_found = True756 break757 assert sms_was_found758 retry(check_messages, sleep=0.5)759 def test_publish_sqs_from_sns(self):760 topic = self.sns_client.create_topic(Name="test_topic3")761 topic_arn = topic["TopicArn"]762 test_queue = self.sqs_client.create_queue(QueueName="test_queue3")763 queue_url = test_queue["QueueUrl"]764 subscription_arn = self.sns_client.subscribe(765 TopicArn=topic_arn,766 Protocol="sqs",767 Endpoint=queue_url,768 Attributes={"RawMessageDelivery": "true"},769 )["SubscriptionArn"]770 self.sns_client.publish(771 TargetArn=topic_arn,772 Message="Test msg",773 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},774 )775 def get_message_with_attributes(queue_url):776 response = self.sqs_client.receive_message(777 QueueUrl=queue_url, MessageAttributeNames=["All"]778 )779 assert response["Messages"][0]["MessageAttributes"] == {780 "attr1": {"DataType": "Number", "StringValue": "99.12"}781 }782 self.sqs_client.delete_message(783 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]784 )785 retry(get_message_with_attributes, retries=3, sleep=10, queue_url=queue_url)786 self.sns_client.set_subscription_attributes(787 SubscriptionArn=subscription_arn,788 AttributeName="RawMessageDelivery",789 AttributeValue="false",790 )791 self.sns_client.publish(792 TargetArn=topic_arn,793 Message="Test msg",794 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "100.12"}},795 )796 def get_message_without_attributes(queue_url):797 response = self.sqs_client.receive_message(798 QueueUrl=queue_url, MessageAttributeNames=["All"]799 )800 assert response["Messages"][0].get("MessageAttributes") is None801 assert "100.12" in response["Messages"][0]["Body"]802 self.sqs_client.delete_message(803 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]804 )805 retry(get_message_without_attributes, retries=3, sleep=10, queue_url=queue_url)806 def test_publish_batch_messages_from_sns_to_sqs(self):807 topic = self.sns_client.create_topic(Name="test_topic3")808 topic_arn = topic["TopicArn"]809 test_queue = self.sqs_client.create_queue(QueueName="test_queue3")810 queue_url = test_queue["QueueUrl"]811 self.sns_client.subscribe(812 TopicArn=topic_arn,813 Protocol="sqs",814 Endpoint=queue_url,815 Attributes={"RawMessageDelivery": "true"},816 )817 publish_batch_response = self.sns_client.publish_batch(818 TopicArn=topic_arn,819 PublishBatchRequestEntries=[820 {821 "Id": "1",822 "Message": "Test Message with two attributes",823 "Subject": "Subject",824 "MessageAttributes": {825 "attr1": {"DataType": "Number", "StringValue": "99.12"},826 "attr2": {"DataType": "Number", "StringValue": "109.12"},827 },828 },829 {830 "Id": "2",831 "Message": "Test Message with one attribute",832 "Subject": "Subject",833 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},834 },835 {836 "Id": "3",837 "Message": "Test Message without attribute",838 "Subject": "Subject",839 },840 {841 "Id": "4",842 "Message": "Test Message without subject",843 },844 ],845 )846 assert "Successful" in publish_batch_response847 assert "Failed" in publish_batch_response848 for successful_resp in publish_batch_response["Successful"]:849 assert "Id" in successful_resp850 assert "MessageId" in successful_resp851 def get_messages(queue_url):852 response = self.sqs_client.receive_message(853 QueueUrl=queue_url, MessageAttributeNames=["All"], MaxNumberOfMessages=10854 )855 assert len(response["Messages"]) == 4856 for message in response["Messages"]:857 assert "Body" in message858 if message["Body"] == "Test Message with two attributes":859 assert len(message["MessageAttributes"]) == 2860 assert message["MessageAttributes"]["attr1"] == {861 "StringValue": "99.12",862 "DataType": "Number",863 }864 assert message["MessageAttributes"]["attr2"] == {865 "StringValue": "109.12",866 "DataType": "Number",867 }868 elif message["Body"] == "Test Message with one attribute":869 assert len(message["MessageAttributes"]) == 1870 assert message["MessageAttributes"]["attr1"] == {871 "StringValue": "19.12",872 "DataType": "Number",873 }874 elif message["Body"] == "Test Message without attribute":875 assert message.get("MessageAttributes") is None876 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)877 def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(self):878 topic_name = f"topic-{short_uid()}.fifo"879 queue_name = f"queue-{short_uid()}.fifo"880 topic_arn = self.sns_client.create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})[881 "TopicArn"882 ]883 queue = self.sqs_client.create_queue(884 QueueName=queue_name,885 Attributes={"FifoQueue": "true"},886 )887 queue_url = queue["QueueUrl"]888 self.sns_client.subscribe(889 TopicArn=topic_arn,890 Protocol="sqs",891 Endpoint=queue_url,892 Attributes={"RawMessageDelivery": "true"},893 )894 message_group_id = "complexMessageGroupId"895 publish_batch_response = self.sns_client.publish_batch(896 TopicArn=topic_arn,897 PublishBatchRequestEntries=[898 {899 "Id": "1",900 "MessageGroupId": message_group_id,901 "Message": "Test Message with two attributes",902 "Subject": "Subject",903 "MessageAttributes": {904 "attr1": {"DataType": "Number", "StringValue": "99.12"},905 "attr2": {"DataType": "Number", "StringValue": "109.12"},906 },907 },908 {909 "Id": "2",910 "MessageGroupId": message_group_id,911 "Message": "Test Message with one attribute",912 "Subject": "Subject",913 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},914 },915 {916 "Id": "3",917 "MessageGroupId": message_group_id,918 "Message": "Test Message without attribute",919 "Subject": "Subject",920 },921 ],922 )923 assert "Successful" in publish_batch_response924 assert "Failed" in publish_batch_response925 for successful_resp in publish_batch_response["Successful"]:926 assert "Id" in successful_resp927 assert "MessageId" in successful_resp928 def get_messages(queue_url):929 response = self.sqs_client.receive_message(930 QueueUrl=queue_url,931 MessageAttributeNames=["All"],932 AttributeNames=["All"],933 MaxNumberOfMessages=10,934 )935 assert len(response["Messages"]) == 3936 for message in response["Messages"]:937 assert "Body" in message938 assert message["Attributes"]["MessageGroupId"] == message_group_id939 if message["Body"] == "Test Message with two attributes":940 assert len(message["MessageAttributes"]) == 2941 assert message["MessageAttributes"]["attr1"] == {942 "StringValue": "99.12",943 "DataType": "Number",944 }945 assert message["MessageAttributes"]["attr2"] == {946 "StringValue": "109.12",947 "DataType": "Number",948 }949 elif message["Body"] == "Test Message with one attribute":950 assert len(message["MessageAttributes"]) == 1951 assert message["MessageAttributes"]["attr1"] == {952 "StringValue": "19.12",953 "DataType": "Number",954 }955 elif message["Body"] == "Test Message without attribute":956 assert message.get("MessageAttributes") is None957 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)958 def test_publish_batch_exceptions(self):959 topic_name = f"topic-{short_uid()}.fifo"960 queue_name = f"queue-{short_uid()}.fifo"961 topic_arn = self.sns_client.create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})[962 "TopicArn"963 ]964 queue = self.sqs_client.create_queue(965 QueueName=queue_name,966 Attributes={"FifoQueue": "true"},967 )968 queue_url = queue["QueueUrl"]969 queue_arn = aws_stack.sqs_queue_arn(queue_name)970 self.sns_client.subscribe(971 TopicArn=topic_arn,972 Protocol="sqs",973 Endpoint=queue_arn,974 Attributes={"RawMessageDelivery": "true"},975 )976 with pytest.raises(ClientError) as e:977 self.sns_client.publish_batch(978 TopicArn=topic_arn,979 PublishBatchRequestEntries=[980 {981 "Id": "1",982 "Message": "Test Message with two attributes",983 }984 ],985 )986 assert e.value.response["Error"]["Code"] == "InvalidParameter"987 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400988 with pytest.raises(ClientError) as e:989 self.sns_client.publish_batch(990 TopicArn=topic_arn,991 PublishBatchRequestEntries=[992 {"Id": f"Id_{i}", "Message": f"message_{i}"} for i in range(11)993 ],994 )995 assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"996 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400997 with pytest.raises(ClientError) as e:998 self.sns_client.publish_batch(999 TopicArn=topic_arn,1000 PublishBatchRequestEntries=[1001 {"Id": "1", "Message": f"message_{i}"} for i in range(2)1002 ],1003 )1004 assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1005 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001006 # cleanup1007 self.sns_client.delete_topic(TopicArn=topic_arn)1008 self.sqs_client.delete_queue(QueueUrl=queue_url)1009 def add_xray_header(self, request, **kwargs):1010 request.headers[1011 "X-Amzn-Trace-Id"1012 ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1013 def test_publish_sqs_from_sns_with_xray_propagation(self):1014 if SQS_BACKEND_IMPL != "elasticmq":1015 pytest.skip("not using elasticmq as SQS backend")1016 self.sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1017 topic = self.sns_client.create_topic(Name="test_topic4")1018 topic_arn = topic["TopicArn"]1019 test_queue = self.sqs_client.create_queue(QueueName="test_queue4")1020 queue_url = test_queue["QueueUrl"]1021 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1022 self.sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1023 response = self.sqs_client.receive_message(1024 QueueUrl=queue_url,1025 AttributeNames=["SentTimestamp", "AWSTraceHeader"],1026 MaxNumberOfMessages=1,1027 MessageAttributeNames=["All"],1028 VisibilityTimeout=2,1029 WaitTimeSeconds=2,1030 )1031 assert len(response["Messages"]) == 11032 message = response["Messages"][0]1033 assert "Attributes" in message1034 assert "AWSTraceHeader" in message["Attributes"]1035 assert (1036 message["Attributes"]["AWSTraceHeader"]1037 == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1038 )1039 def test_create_topic_after_delete_with_new_tags(self):1040 topic_name = "test-%s" % short_uid()1041 topic = self.sns_client.create_topic(1042 Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}]1043 )1044 self.sns_client.delete_topic(TopicArn=topic["TopicArn"])1045 topic1 = self.sns_client.create_topic(1046 Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}]1047 )1048 assert topic["TopicArn"] == topic1["TopicArn"]1049 # cleanup1050 self.sns_client.delete_topic(TopicArn=topic1["TopicArn"])1051 def test_not_found_error_on_get_subscription_attributes(self):1052 topic_name = "queue-{}".format(short_uid())1053 queue_name = "test-%s" % short_uid()1054 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]1055 queue = self.sqs_client.create_queue(QueueName=queue_name)1056 queue_url = queue["QueueUrl"]1057 queue_arn = aws_stack.sqs_queue_arn(queue_name)1058 subscription = self.sns_client.subscribe(1059 TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn1060 )1061 subscription_attributes = self.sns_client.get_subscription_attributes(1062 SubscriptionArn=subscription["SubscriptionArn"]1063 )1064 assert (1065 subscription_attributes.get("Attributes").get("SubscriptionArn")1066 == subscription["SubscriptionArn"]1067 )1068 self.sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])1069 with pytest.raises(ClientError) as e:1070 self.sns_client.get_subscription_attributes(1071 SubscriptionArn=subscription["SubscriptionArn"]1072 )1073 assert e.value.response["Error"]["Code"] == "NotFound"1074 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4041075 # cleanup1076 self.sns_client.delete_topic(TopicArn=topic_arn)1077 self.sqs_client.delete_queue(QueueUrl=queue_url)1078 def test_message_to_fifo_sqs(self):1079 topic_name = "topic-{}.fifo".format(short_uid())1080 queue_name = "queue-%s.fifo" % short_uid()1081 topic_arn = self.sns_client.create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})[1082 "TopicArn"1083 ]1084 queue = self.sqs_client.create_queue(1085 QueueName=queue_name,1086 Attributes={"FifoQueue": "true"},1087 )1088 queue_url = queue["QueueUrl"]1089 queue_arn = aws_stack.sqs_queue_arn(queue_name)1090 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1091 message = "Test"1092 self.sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1093 def get_message():1094 received = self.sqs_client.receive_message(QueueUrl=queue_url)["Messages"][0]["Body"]1095 assert json.loads(received)["Message"] == message1096 retry(get_message, retries=5, sleep=2)1097 # cleanup1098 self.sns_client.delete_topic(TopicArn=topic_arn)1099 self.sqs_client.delete_queue(QueueUrl=queue_url)1100 def test_validations_for_fifo(self):1101 topic_name = "topic-{}".format(short_uid())1102 fifo_topic_name = "topic-{}.fifo".format(short_uid())1103 fifo_queue_name = "queue-%s.fifo" % short_uid()1104 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]1105 fifo_topic_arn = self.sns_client.create_topic(1106 Name=fifo_topic_name, Attributes={"FifoTopic": "true"}1107 )["TopicArn"]1108 fifo_queue_url = self.sqs_client.create_queue(1109 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1110 )["QueueUrl"]1111 fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)1112 with pytest.raises(ClientError) as e:1113 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=fifo_queue_arn)1114 assert e.match("standard SNS topic")1115 with pytest.raises(ClientError) as e:1116 self.sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1117 assert e.match("MessageGroupId")1118 self.sns_client.delete_topic(TopicArn=topic_arn)1119 self.sns_client.delete_topic(TopicArn=fifo_topic_arn)1120 self.sqs_client.delete_queue(QueueUrl=fifo_queue_url)1121def test_empty_sns_message(sns_client, sqs_client, sns_topic, sqs_queue):1122 topic_arn = sns_topic["Attributes"]["TopicArn"]1123 queue_arn = sqs_client.get_queue_attributes(QueueUrl=sqs_queue, AttributeNames=["QueueArn"])[1124 "Attributes"1125 ]["QueueArn"]1126 sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1127 with pytest.raises(ClientError) as e:1128 sns_client.publish(Message="", TopicArn=topic_arn)1129 assert e.match("Empty message")1130 assert (1131 sqs_client.get_queue_attributes(1132 QueueUrl=sqs_queue, AttributeNames=["ApproximateNumberOfMessages"]1133 )["Attributes"]["ApproximateNumberOfMessages"]1134 == "0"...

Full Screen

Full Screen

aws_sns.py

Source:aws_sns.py Github

copy

Full Screen

1import boto32TOPIC_NAME = 'MySubscriptionTopic'3TOPIC_ARN = 'TOPIC_ARN'4QUEUE_ARN = 'SQS_QUEUE_ARN'5def sns_client():6 sns = boto3.client('sns', region_name='eu-west-1')7 """ :type : pyboto3.sns """8 return sns9def create_topic():10 sns_client().create_topic(11 Name=TOPIC_NAME12 )13def get_topics():14 return sns_client().list_topics()15def get_topic_attributes(topic_arn):16 return sns_client().get_topic_attributes(17 TopicArn=topic_arn18 )19def update_topic_attributes(topic_arn):20 return sns_client().set_topic_attributes(21 TopicArn=topic_arn,22 AttributeName='DisplayName',23 AttributeValue=TOPIC_NAME + '-UPDATED'24 )25def delete_topic(topic_arn):26 return sns_client().delete_topic(27 TopicArn=topic_arn28 )29def create_email_subscription(topic_arn, email_address):30 return sns_client().subscribe(31 TopicArn=topic_arn,32 Protocol='email',33 Endpoint=email_address34 )35def create_sms_subscription(topic_arn, phone_number):36 return sns_client().subscribe(37 TopicArn=topic_arn,38 Protocol='sms',39 Endpoint=phone_number40 )41def create_sqs_queue_subscription(topic_arn, queue_arn):42 return sns_client().subscribe(43 TopicArn=topic_arn,44 Protocol='sqs',45 Endpoint=queue_arn46 )47def get_topic_subscriptions(topic_arn):48 return sns_client().list_subscriptions_by_topic(49 TopicArn=topic_arn50 )51def list_all_subscriptions():52 return sns_client()53 .list_subscriptions()54def check_if_phone_number_opted_out(phone_number):55 return sns_client().check_if_phone_number_is_opted_out(56 phoneNumber=phone_number57 )58def list_opted_out_phone_numbers():59 return sns_client().list_phone_numbers_opted_out()60def opt_out_of_email_subscription(email_address):61 subscriptions = get_topic_subscriptions(TOPIC_ARN)62 for subscription in subscriptions['Subscriptions']:63 if subscription['Protocol'] == 'email' and subscription['endpoint'] == email_address:64 print("Unsubscribing " + subscription['Endpoint'])65 subscription_arn = subscription['SubscriptionArn']66 sns_client().unsubscribe(67 SubscriptionArn=subscription_arn68 )69 print("Unsubscribed " + subscription['Endpoint'])70def opt_out_of_sms_subscription(phone_number):71 subscriptions = get_topic_subscriptions(TOPIC_ARN)72 for subscription in subscriptions['Subscriptions']:73 if subscription['Protocol'] == 'sms' and subscription['Endpoint'] == phone_number:74 print("Unsubscribing " + subscription['Endpoint'])75 subscription_arn = subscription['SubscriptionArn']76 sns_client().unsubscribe(77 SubscriptionArn=subscription_arn78 )79 print("Unsubscribed " + subscription['Endpoint'])80def opt_in_phone_number(phone_number):81 return sns_client().opt_in_phone_number(82 phoneNumber=phone_number83 )84def publish_message(topic_arn):85 return sns_client().publish(86 TopicArn=topic_arn,87 Message="Hello, you're receiving this because you've subscribed!"88 )89if __name__ == '__main__':90 print(create_topic())91 # print(get_topics())92 # print(get_topic_attributes(TOPIC_ARN))93 # update_topic_attributes(TOPIC_ARN)94 # delete_topic(TOPIC_ARN)95 # print(create_topic())96 # create_email_subscription(TOPIC_ARN, 'YOUR_EMAIL_ADDRESS')97 # create_sms_subscription(TOPIC_ARN, 'YOUR_PHONE_NUMBER')98 # create_sqs_queue_subscription(TOPIC_ARN, QUEUE_ARN)99 # print(get_topic_subscriptions(TOPIC_ARN))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful