How to use sqs_create_queue method in localstack

Best Python code snippet using localstack_python

test_sqs.py

Source:test_sqs.py Github

copy

Full Screen

...50 def test_get_queue_url_contains_request_host(self, sqs_client, sqs_create_queue):51 if config.SERVICE_PROVIDER_CONFIG.get_provider("sqs") != "asf":52 pytest.xfail("this test only works for the ASF provider")53 queue_name = "test-queue-" + short_uid()54 sqs_create_queue(QueueName=queue_name)55 queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]56 account_id = constants.TEST_AWS_ACCOUNT_ID57 host = f"http://localhost:{config.EDGE_PORT}"58 # our current queue pattern looks like this, but may change going forward, or may be configurable59 assert queue_url == f"{host}/{account_id}/{queue_name}"60 # attempt to connect through a different host and make sure the URL contains that host61 host = f"http://127.0.0.1:{config.EDGE_PORT}"62 client = aws_stack.connect_to_service("sqs", endpoint_url=host)63 queue_url = client.get_queue_url(QueueName=queue_name)["QueueUrl"]64 assert queue_url == f"{host}/{account_id}/{queue_name}"65 def test_list_queues(self, sqs_client, sqs_create_queue):66 queue_names = [67 "a-test-queue-" + short_uid(),68 "a-test-queue-" + short_uid(),69 "b-test-queue-" + short_uid(),70 ]71 # create three queues with prefixes and collect their urls72 queue_urls = []73 for name in queue_names:74 sqs_create_queue(QueueName=name)75 queue_url = sqs_client.get_queue_url(QueueName=name)["QueueUrl"]76 assert queue_url.endswith(name)77 queue_urls.append(queue_url)78 # list queues with first prefix79 result = sqs_client.list_queues(QueueNamePrefix="a-test-queue-")80 assert "QueueUrls" in result81 assert len(result["QueueUrls"]) == 282 assert queue_urls[0] in result["QueueUrls"]83 assert queue_urls[1] in result["QueueUrls"]84 assert queue_urls[2] not in result["QueueUrls"]85 # list queues with second prefix86 result = sqs_client.list_queues(QueueNamePrefix="b-test-queue-")87 assert "QueueUrls" in result88 assert len(result["QueueUrls"]) == 189 assert queue_urls[0] not in result["QueueUrls"]90 assert queue_urls[1] not in result["QueueUrls"]91 assert queue_urls[2] in result["QueueUrls"]92 # list queues regardless of prefix prefix93 result = sqs_client.list_queues()94 assert "QueueUrls" in result95 for url in queue_urls:96 assert url in result["QueueUrls"]97 def test_create_queue_and_get_attributes(self, sqs_client, sqs_queue):98 result = sqs_client.get_queue_attributes(99 QueueUrl=sqs_queue, AttributeNames=["QueueArn", "CreatedTimestamp", "VisibilityTimeout"]100 )101 assert "Attributes" in result102 attrs = result["Attributes"]103 assert len(attrs) == 3104 assert "test-queue-" in attrs["QueueArn"]105 assert int(float(attrs["CreatedTimestamp"])) == pytest.approx(int(time.time()), 30)106 assert int(attrs["VisibilityTimeout"]) == 30, "visibility timeout is not the default value"107 def test_send_receive_message(self, sqs_client, sqs_queue):108 send_result = sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")109 assert send_result["MessageId"]110 assert send_result["MD5OfMessageBody"] == "78e731027d8fd50ed642340b7c9a63b3"111 # TODO: other attributes112 receive_result = sqs_client.receive_message(QueueUrl=sqs_queue)113 assert len(receive_result["Messages"]) == 1114 message = receive_result["Messages"][0]115 assert message["ReceiptHandle"]116 assert message["Body"] == "message"117 assert message["MessageId"] == send_result["MessageId"]118 assert message["MD5OfBody"] == send_result["MD5OfMessageBody"]119 def test_send_receive_message_multiple_queues(self, sqs_client, sqs_create_queue):120 queue0 = sqs_create_queue()121 queue1 = sqs_create_queue()122 sqs_client.send_message(QueueUrl=queue0, MessageBody="message")123 result = sqs_client.receive_message(QueueUrl=queue1)124 assert "Messages" not in result125 result = sqs_client.receive_message(QueueUrl=queue0)126 assert len(result["Messages"]) == 1127 assert result["Messages"][0]["Body"] == "message"128 def test_send_message_batch(self, sqs_client, sqs_queue):129 sqs_client.send_message_batch(130 QueueUrl=sqs_queue,131 Entries=[132 {"Id": "1", "MessageBody": "message-0"},133 {"Id": "2", "MessageBody": "message-1"},134 ],135 )136 response0 = sqs_client.receive_message(QueueUrl=sqs_queue)137 response1 = sqs_client.receive_message(QueueUrl=sqs_queue)138 response2 = sqs_client.receive_message(QueueUrl=sqs_queue)139 assert len(response0.get("Messages", [])) == 1140 assert len(response1.get("Messages", [])) == 1141 assert len(response2.get("Messages", [])) == 0142 message0 = response0["Messages"][0]143 message1 = response1["Messages"][0]144 assert message0["Body"] == "message-0"145 assert message1["Body"] == "message-1"146 def test_send_batch_receive_multiple(self, sqs_client, sqs_queue):147 # send a batch, then a single message, then receive them148 # Important: AWS does not guarantee the order of messages, be it within the batch or between sends149 message_count = 3150 sqs_client.send_message_batch(151 QueueUrl=sqs_queue,152 Entries=[153 {"Id": "1", "MessageBody": "message-0"},154 {"Id": "2", "MessageBody": "message-1"},155 ],156 )157 sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message-2")158 i = 0159 result_recv = {"Messages": []}160 while len(result_recv["Messages"]) < message_count and i < message_count:161 result_recv["Messages"] = result_recv["Messages"] + (162 sqs_client.receive_message(163 QueueUrl=sqs_queue, MaxNumberOfMessages=message_count164 ).get("Messages")165 )166 i += 1167 assert len(result_recv["Messages"]) == message_count168 assert set(result_recv["Messages"][b]["Body"] for b in range(message_count)) == set(169 f"message-{b}" for b in range(message_count)170 )171 def test_send_message_batch_with_empty_list(self, sqs_client, sqs_create_queue):172 queue_url = sqs_create_queue()173 try:174 sqs_client.send_message_batch(QueueUrl=queue_url, Entries=[])175 except ClientError as e:176 assert "EmptyBatchRequest" in e.response["Error"]["Code"]177 assert e.response["ResponseMetadata"]["HTTPStatusCode"] in [400, 404]178 def test_tag_untag_queue(self, sqs_client, sqs_create_queue):179 queue_url = sqs_create_queue()180 # tag queue181 tags = {"tag1": "value1", "tag2": "value2", "tag3": ""}182 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)183 # check queue tags184 response = sqs_client.list_queue_tags(QueueUrl=queue_url)185 assert response["Tags"] == tags186 # remove tag1 and tag3187 sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])188 response = sqs_client.list_queue_tags(QueueUrl=queue_url)189 assert response["Tags"] == {"tag2": "value2"}190 # remove tag2191 sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag2"])192 response = sqs_client.list_queue_tags(QueueUrl=queue_url)193 assert "Tags" not in response194 def test_tags_case_sensitive(self, sqs_client, sqs_create_queue):195 queue_url = sqs_create_queue()196 # tag queue197 tags = {"MyTag": "value1", "mytag": "value2"}198 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)199 response = sqs_client.list_queue_tags(QueueUrl=queue_url)200 assert response["Tags"] == tags201 def test_untag_queue_ignores_non_existing_tag(self, sqs_client, sqs_create_queue):202 queue_url = sqs_create_queue()203 # tag queue204 tags = {"tag1": "value1", "tag2": "value2"}205 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)206 # remove tags207 sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])208 response = sqs_client.list_queue_tags(QueueUrl=queue_url)209 assert response["Tags"] == {"tag2": "value2"}210 def test_tag_queue_overwrites_existing_tag(self, sqs_client, sqs_create_queue):211 queue_url = sqs_create_queue()212 # tag queue213 tags = {"tag1": "value1", "tag2": "value2"}214 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)215 # overwrite tags216 tags = {"tag1": "VALUE1", "tag3": "value3"}217 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)218 response = sqs_client.list_queue_tags(QueueUrl=queue_url)219 assert response["Tags"] == {"tag1": "VALUE1", "tag2": "value2", "tag3": "value3"}220 def test_create_queue_with_tags(self, sqs_client, sqs_create_queue):221 tags = {"tag1": "value1", "tag2": "value2"}222 queue_url = sqs_create_queue(tags=tags)223 response = sqs_client.list_queue_tags(QueueUrl=queue_url)224 assert response["Tags"] == tags225 def test_create_queue_with_attributes(self, sqs_client, sqs_create_queue):226 attributes = {227 "MessageRetentionPeriod": "604800", # Unsupported by ElasticMq, should be saved in memory228 "ReceiveMessageWaitTimeSeconds": "10",229 "VisibilityTimeout": "20",230 }231 queue_url = sqs_create_queue(Attributes=attributes)232 attrs = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[233 "Attributes"234 ]235 assert attrs["MessageRetentionPeriod"] == "604800"236 assert attrs["VisibilityTimeout"] == "20"237 assert attrs["ReceiveMessageWaitTimeSeconds"] == "10"238 def test_send_delay_and_wait_time(self, sqs_client, sqs_queue):239 sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="foobar", DelaySeconds=1)240 result = sqs_client.receive_message(QueueUrl=sqs_queue)241 assert "Messages" not in result242 result = sqs_client.receive_message(QueueUrl=sqs_queue, WaitTimeSeconds=2)243 assert "Messages" in result244 assert len(result["Messages"]) == 1245 def test_receive_after_visibility_timeout(self, sqs_client, sqs_create_queue):246 queue_url = sqs_create_queue(Attributes={"VisibilityTimeout": "1"})247 sqs_client.send_message(QueueUrl=queue_url, MessageBody="foobar")248 # receive the message249 result = sqs_client.receive_message(QueueUrl=queue_url)250 assert "Messages" in result251 message_receipt_0 = result["Messages"][0]252 # message should be within the visibility timeout253 result = sqs_client.receive_message(QueueUrl=queue_url)254 assert "Messages" not in result255 # visibility timeout should have expired256 result = sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=2)257 assert "Messages" in result258 message_receipt_1 = result["Messages"][0]259 assert (260 message_receipt_0["ReceiptHandle"] != message_receipt_1["ReceiptHandle"]261 ), "receipt handles should be different"262 def test_receive_terminate_visibility_timeout(self, sqs_client, sqs_queue):263 queue_url = sqs_queue264 sqs_client.send_message(QueueUrl=queue_url, MessageBody="foobar")265 result = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)266 assert "Messages" in result267 message_receipt_0 = result["Messages"][0]268 result = sqs_client.receive_message(QueueUrl=queue_url)269 assert "Messages" in result270 message_receipt_1 = result["Messages"][0]271 assert (272 message_receipt_0["ReceiptHandle"] != message_receipt_1["ReceiptHandle"]273 ), "receipt handles should be different"274 # TODO: check if this is correct (whether receive with VisibilityTimeout = 0 is permanent)275 result = sqs_client.receive_message(QueueUrl=queue_url)276 assert "Messages" not in result277 def test_delete_message_batch_from_lambda(278 self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function279 ):280 # issue 3671 - not recreatable281 # TODO: lambda creation does not work when testing against AWS282 queue_name = f"queue-{short_uid()}"283 queue_url = sqs_create_queue(QueueName=queue_name)284 lambda_name = f"lambda-{short_uid()}"285 create_lambda_function(286 func_name=lambda_name,287 libs=TEST_LAMBDA_LIBS,288 handler_file=TEST_LAMBDA_PYTHON,289 runtime=LAMBDA_RUNTIME_PYTHON36,290 )291 delete_batch_payload = {lambda_integration.MSG_BODY_DELETE_BATCH: queue_url}292 batch = []293 for i in range(4):294 batch.append({"Id": str(i), "MessageBody": str(i)})295 sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch)296 lambda_client.invoke(297 FunctionName=lambda_name, Payload=json.dumps(delete_batch_payload), LogType="Tail"298 )299 receive_result = sqs_client.receive_message(QueueUrl=queue_url)300 assert "Messages" not in receive_result.keys()301 def test_invalid_receipt_handle_should_return_error_message(self, sqs_client, sqs_create_queue):302 # issue 3619303 queue_name = "queue_3619_" + short_uid()304 queue_url = sqs_create_queue(QueueName=queue_name)305 with pytest.raises(Exception) as e:306 sqs_client.change_message_visibility(307 QueueUrl=queue_url, ReceiptHandle="INVALID", VisibilityTimeout=60308 )309 e.match("ReceiptHandleIsInvalid")310 def test_message_with_attributes_should_be_enqueued(self, sqs_client, sqs_create_queue):311 # issue 3737312 queue_name = "queue_3737_" + short_uid()313 queue_url = sqs_create_queue(QueueName=queue_name)314 assert queue_url.endswith(queue_name)315 message_body = "test"316 timestamp_attribute = {"DataType": "Number", "StringValue": "1614717034367"}317 message_attributes = {"timestamp": timestamp_attribute}318 response_send = sqs_client.send_message(319 QueueUrl=queue_url, MessageBody=message_body, MessageAttributes=message_attributes320 )321 response_receive = sqs_client.receive_message(QueueUrl=queue_url)322 assert response_receive["Messages"][0]["MessageId"] == response_send["MessageId"]323 @pytest.mark.xfail324 def test_batch_send_with_invalid_char_should_succeed(self, sqs_client, sqs_create_queue):325 # issue 4135326 queue_name = "queue_4135_" + short_uid()327 queue_url = sqs_create_queue(QueueName=queue_name)328 batch = []329 for i in range(0, 9):330 batch.append({"Id": str(i), "MessageBody": str(i)})331 batch.append({"Id": "9", "MessageBody": "\x01"})332 result_send = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch)333 assert len(result_send["Failed"]) == 1334 @only_localstack335 def test_external_hostname(self, monkeypatch, sqs_client, sqs_create_queue):336 external_host = "external-host"337 external_port = "12345"338 monkeypatch.setattr(config, "SQS_PORT_EXTERNAL", external_port)339 monkeypatch.setattr(config, "HOSTNAME_EXTERNAL", external_host)340 # TODO: remove once the old provider is discontinued341 from localstack.services.sqs import sqs_listener as old_sqs_listener342 monkeypatch.setattr(old_sqs_listener, "SQS_PORT_EXTERNAL", external_port)343 queue_name = f"queue-{short_uid()}"344 queue_url = sqs_create_queue(QueueName=queue_name)345 assert f"{external_host}:{external_port}" in queue_url346 message_body = "external_host_test"347 sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_body)348 receive_result = sqs_client.receive_message(QueueUrl=queue_url)349 assert receive_result["Messages"][0]["Body"] == message_body350 @only_localstack351 def test_external_hostname_via_host_header(self, sqs_create_queue):352 """test making a request with a different external hostname/port being returned"""353 queue_name = f"queue-{short_uid()}"354 sqs_create_queue(QueueName=queue_name)355 edge_url = config.get_edge_url()356 headers = aws_stack.mock_aws_request_headers("sqs")357 payload = f"Action=GetQueueUrl&QueueName={queue_name}"358 # assert regular/default queue URL is returned359 url = f"{edge_url}"360 result = requests.post(url, data=payload, headers=headers)361 assert result362 content = to_str(result.content)363 kwargs = {"flags": re.MULTILINE | re.DOTALL}364 assert re.match(rf".*<QueueUrl>\s*{edge_url}/[^<]+</QueueUrl>.*", content, **kwargs)365 # assert custom port is returned in queue URL366 port = 12345367 headers["Host"] = f"local-test-host:{port}"368 result = requests.post(url, data=payload, headers=headers)369 assert result370 content = to_str(result.content)371 # TODO: currently only asserting that the port matches - potentially should also return the custom hostname?372 assert re.match(rf".*<QueueUrl>\s*http://[^:]+:{port}[^<]+</QueueUrl>.*", content, **kwargs)373 @only_localstack374 @pytest.mark.xfail375 def test_external_host_via_header_complete_message_lifecycle(self, monkeypatch):376 queue_name = f"queue-{short_uid()}"377 edge_url = config.get_edge_url()378 headers = aws_stack.mock_aws_request_headers("sqs")379 port = 12345380 hostname = "aws-local"381 url = f"{hostname}:{port}"382 headers["Host"] = url383 payload = f"Action=CreateQueue&QueueName={queue_name}"384 result = requests.post(edge_url, data=payload, headers=headers)385 assert result.status_code == 200386 assert url in result.text387 queue_url = f"http://{url}/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"388 message_body = f"test message {short_uid()}"389 payload = f"Action=SendMessage&QueueUrl={queue_url}&MessageBody={message_body}"390 result = requests.post(edge_url, data=payload, headers=headers)391 assert result.status_code == 200392 assert "MD5" in result.text393 payload = f"Action=ReceiveMessage&QueueUrl={queue_url}&VisibilityTimeout=0"394 result = requests.post(edge_url, data=payload, headers=headers)395 assert result.status_code == 200396 assert message_body in result.text397 # the customer said that he used to be able to access it via "127.0.0.1" instead of "aws-local" as well398 queue_url = f"http://127.0.0.1/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"399 payload = f"Action=SendMessage&QueueUrl={queue_url}&MessageBody={message_body}"400 result = requests.post(edge_url, data=payload, headers=headers)401 assert result.status_code == 200402 assert "MD5" in result.text403 queue_url = f"http://127.0.0.1/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"404 payload = f"Action=ReceiveMessage&QueueUrl={queue_url}&VisibilityTimeout=0"405 result = requests.post(edge_url, data=payload, headers=headers)406 assert result.status_code == 200407 assert message_body in result.text408 @pytest.mark.xfail409 def test_fifo_messages_in_order_after_timeout(self, sqs_client, sqs_create_queue):410 # issue 4287411 queue_name = f"queue-{short_uid()}.fifo"412 timeout = 1413 attributes = {"FifoQueue": "true", "VisibilityTimeout": f"{timeout}"}414 queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)415 for i in range(3):416 sqs_client.send_message(417 QueueUrl=queue_url,418 MessageBody=f"message-{i}",419 MessageGroupId="1",420 MessageDeduplicationId=f"{i}",421 )422 def receive_and_check_order():423 result_receive = sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)424 for j in range(3):425 assert result_receive["Messages"][j]["Body"] == f"message-{j}"426 receive_and_check_order()427 time.sleep(timeout + 1)428 receive_and_check_order()429 def test_list_queue_tags(self, sqs_client, sqs_create_queue):430 queue_name = f"queue-{short_uid()}"431 queue_url = sqs_create_queue(QueueName=queue_name)432 tags = {"testTag1": "test1", "testTag2": "test2"}433 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)434 tag_list = sqs_client.list_queue_tags(QueueUrl=queue_url)435 assert tags == tag_list["Tags"]436 def test_queue_list_nonexistent_tags(self, sqs_client, sqs_create_queue):437 queue_name = f"queue-{short_uid()}"438 queue_url = sqs_create_queue(QueueName=queue_name)439 tag_list = sqs_client.list_queue_tags(QueueUrl=queue_url)440 assert "Tags" not in tag_list["ResponseMetadata"].keys()441 def test_publish_get_delete_message(self, sqs_client, sqs_create_queue):442 # visibility part handled by test_receive_terminate_visibility_timeout443 queue_name = f"queue-{short_uid()}"444 queue_url = sqs_create_queue(QueueName=queue_name)445 message_body = "test message"446 result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_body)447 result_recv = sqs_client.receive_message(QueueUrl=queue_url)448 assert result_recv["Messages"][0]["MessageId"] == result_send["MessageId"]449 sqs_client.delete_message(450 QueueUrl=queue_url, ReceiptHandle=result_recv["Messages"][0]["ReceiptHandle"]451 )452 result_recv = sqs_client.receive_message(QueueUrl=queue_url)453 assert "Messages" not in result_recv.keys()454 def test_delete_message_deletes_with_change_visibility_timeout(455 self, sqs_client, sqs_create_queue456 ):457 # Old name: test_delete_message_deletes_visibility_agnostic458 queue_name = f"queue-{short_uid()}"459 queue_url = sqs_create_queue(QueueName=queue_name)460 message_id = sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")["MessageId"]461 result_recv = sqs_client.receive_message(QueueUrl=queue_url)462 result_follow_up = sqs_client.receive_message(QueueUrl=queue_url)463 assert result_recv["Messages"][0]["MessageId"] == message_id464 assert "Messages" not in result_follow_up.keys()465 receipt_handle = result_recv["Messages"][0]["ReceiptHandle"]466 sqs_client.change_message_visibility(467 QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=0468 )469 # check if the new timeout enables instant re-receiving, to ensure the message was deleted470 result_recv = sqs_client.receive_message(QueueUrl=queue_url)471 assert result_recv["Messages"][0]["MessageId"] == message_id472 receipt_handle = result_recv["Messages"][0]["ReceiptHandle"]473 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)474 result_follow_up = sqs_client.receive_message(QueueUrl=queue_url)475 assert "Messages" not in result_follow_up.keys()476 def test_publish_get_delete_message_batch(self, sqs_client, sqs_create_queue):477 message_count = 10478 queue_name = f"queue-{short_uid()}"479 queue_url = sqs_create_queue(QueueName=queue_name)480 message_batch = [481 {482 "Id": f"message-{i}",483 "MessageBody": f"messageBody-{i}",484 }485 for i in range(message_count)486 ]487 result_send_batch = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=message_batch)488 successful = result_send_batch["Successful"]489 assert len(successful) == len(message_batch)490 result_recv = []491 i = 0492 while len(result_recv) < message_count and i < message_count:493 result_recv.extend(494 sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=message_count)[495 "Messages"496 ]497 )498 i += 1499 assert len(result_recv) == message_count500 ids_sent = set()501 ids_received = set()502 for i in range(message_count):503 ids_sent.add(successful[i]["MessageId"])504 ids_received.add((result_recv[i]["MessageId"]))505 assert ids_sent == ids_received506 delete_entries = [507 {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}508 for message in result_recv509 ]510 sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=delete_entries)511 confirmation = sqs_client.receive_message(512 QueueUrl=queue_url, MaxNumberOfMessages=message_count513 )514 assert "Messages" not in confirmation.keys()515 def test_create_and_send_to_fifo_queue(self, sqs_client, sqs_create_queue):516 # Old name: test_create_fifo_queue517 queue_name = f"queue-{short_uid()}.fifo"518 attributes = {"FifoQueue": "true"}519 queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)520 # it should preserve .fifo in the queue name521 assert queue_name in queue_url522 message_id = sqs_client.send_message(523 QueueUrl=queue_url,524 MessageBody="test",525 MessageDeduplicationId=f"dedup-{short_uid()}",526 MessageGroupId="test_group",527 )["MessageId"]528 result_recv = sqs_client.receive_message(QueueUrl=queue_url)529 assert result_recv["Messages"][0]["MessageId"] == message_id530 def test_fifo_queue_without_fifo_queue_attribute(self, sqs_create_queue):531 queue_name = f"invalid-{short_uid()}.fifo"532 with pytest.raises(Exception) as e:533 sqs_create_queue(QueueName=queue_name)534 e.match("InvalidParameterValue")535 def test_fifo_queue_requires_suffix(self, sqs_create_queue):536 queue_name = f"invalid-{short_uid()}"537 attributes = {"FifoQueue": "true"}538 with pytest.raises(Exception) as e:539 sqs_create_queue(QueueName=queue_name, Attributes=attributes)540 e.match("InvalidParameterValue")541 @pytest.mark.skipif(542 os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf",543 reason="New provider test which isn't covered by old one",544 )545 def test_standard_queue_cannot_have_fifo_suffix(self, sqs_create_queue):546 queue_name = f"queue-{short_uid()}.fifo"547 with pytest.raises(Exception) as e:548 sqs_create_queue(QueueName=queue_name)549 e.match("InvalidParameterValue")550 @pytest.mark.xfail551 def test_redrive_policy_attribute_validity(self, sqs_create_queue, sqs_client):552 dl_queue_name = f"dl-queue-{short_uid()}"553 dl_queue_url = sqs_create_queue(QueueName=dl_queue_name)554 dl_target_arn = sqs_client.get_queue_attributes(555 QueueUrl=dl_queue_url, AttributeNames=["QueueArn"]556 )["Attributes"]["QueueArn"]557 queue_name = f"queue-{short_uid()}"558 queue_url = sqs_create_queue(QueueName=queue_name)559 valid_max_receive_count = "42"560 invalid_max_receive_count = "invalid"561 with pytest.raises(Exception) as e:562 sqs_client.set_queue_attributes(563 QueueUrl=queue_url,564 Attributes={"RedrivePolicy": json.dumps({"deadLetterTargetArn": dl_target_arn})},565 )566 e.match("InvalidParameterValue")567 with pytest.raises(Exception) as e:568 sqs_client.set_queue_attributes(569 QueueUrl=queue_url,570 Attributes={571 "RedrivePolicy": json.dumps({"maxReceiveCount": valid_max_receive_count})572 },573 )574 e.match("InvalidParameterValue")575 _invalid_redrive_policy = {576 "deadLetterTargetArn": dl_target_arn,577 "maxReceiveCount": invalid_max_receive_count,578 }579 with pytest.raises(Exception) as e:580 sqs_client.set_queue_attributes(581 QueueUrl=queue_url,582 Attributes={"RedrivePolicy": json.dumps(_invalid_redrive_policy)},583 )584 e.match("InvalidParameterValue")585 _valid_redrive_policy = {586 "deadLetterTargetArn": dl_target_arn,587 "maxReceiveCount": valid_max_receive_count,588 }589 sqs_client.set_queue_attributes(590 QueueUrl=queue_url, Attributes={"RedrivePolicy": json.dumps(_valid_redrive_policy)}591 )592 @pytest.mark.skip593 def test_invalid_dead_letter_arn_rejected_before_lookup(self, sqs_create_queue):594 queue_name = f"queue-{short_uid()}"595 dl_dummy_arn = "dummy"596 max_receive_count = 42597 _redrive_policy = {598 "deadLetterTargetArn": dl_dummy_arn,599 "maxReceiveCount": max_receive_count,600 }601 with pytest.raises(Exception) as e:602 sqs_create_queue(603 QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(_redrive_policy)}604 )605 e.match("InvalidParameterValue")606 def test_set_queue_policy(self, sqs_client, sqs_create_queue):607 queue_name = f"queue-{short_uid()}"608 queue_url = sqs_create_queue(QueueName=queue_name)609 attributes = {"Policy": TEST_POLICY}610 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)611 # accessing the policy generally and specifically612 attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[613 "Attributes"614 ]615 policy = json.loads(attributes["Policy"])616 assert "sqs:SendMessage" == policy["Statement"][0]["Action"]617 attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["Policy"])[618 "Attributes"619 ]620 policy = json.loads(attributes["Policy"])621 assert "sqs:SendMessage" == policy["Statement"][0]["Action"]622 def test_set_empty_queue_policy(self, sqs_client, sqs_create_queue):623 queue_name = f"queue-{short_uid()}"624 queue_url = sqs_create_queue(QueueName=queue_name)625 attributes = {"Policy": ""}626 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)627 attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[628 "Attributes"629 ]630 assert "Policy" not in attributes.keys()631 # check if this behaviour holds on existing Policies as well632 attributes = {"Policy": TEST_POLICY}633 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)634 attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[635 "Attributes"636 ]637 assert "sqs:SendMessage" in attributes["Policy"]638 attributes = {"Policy": ""}639 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)640 attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[641 "Attributes"642 ]643 assert "Policy" not in attributes.keys()644 def test_send_message_with_attributes(self, sqs_client, sqs_create_queue):645 # Old name: test_send_message_attributes646 queue_name = f"queue-{short_uid()}"647 queue_url = sqs_create_queue(QueueName=queue_name)648 attributes = {649 "attr1": {"StringValue": "test1", "DataType": "String"},650 "attr2": {"StringValue": "test2", "DataType": "String"},651 }652 result_send = sqs_client.send_message(653 QueueUrl=queue_url, MessageBody="test", MessageAttributes=attributes654 )655 result_receive = sqs_client.receive_message(656 QueueUrl=queue_url, MessageAttributeNames=["All"]657 )658 messages = result_receive["Messages"]659 assert messages[0]["MessageId"] == result_send["MessageId"]660 assert messages[0]["MessageAttributes"] == attributes661 assert messages[0]["MD5OfMessageAttributes"] == result_send["MD5OfMessageAttributes"]662 def test_sent_message_retains_attributes_after_receive(self, sqs_client, sqs_create_queue):663 # Old name: test_send_message_retains_attributes664 queue_name = f"queue-{short_uid()}"665 queue_url = sqs_create_queue(QueueName=queue_name)666 attributes = {"attr1": {"StringValue": "test1", "DataType": "String"}}667 sqs_client.send_message(668 QueueUrl=queue_url, MessageBody="test", MessageAttributes=attributes669 )670 # receive should not interfere with message attributes671 sqs_client.receive_message(672 QueueUrl=queue_url, VisibilityTimeout=0, MessageAttributeNames=["All"]673 )674 receive_result = sqs_client.receive_message(675 QueueUrl=queue_url, MessageAttributeNames=["All"]676 )677 assert receive_result["Messages"][0]["MessageAttributes"] == attributes678 @pytest.mark.xfail679 def test_send_message_with_invalid_string_attributes(self, sqs_client, sqs_create_queue):680 queue_name = f"queue-{short_uid()}"681 queue_url = sqs_create_queue(QueueName=queue_name)682 # base line against to detect general failure683 valid_attribute = {"attr.1øßä": {"StringValue": "Valida", "DataType": "String"}}684 sqs_client.send_message(685 QueueUrl=queue_url, MessageBody="test", MessageAttributes=valid_attribute686 )687 def send_invalid(attribute):688 with pytest.raises(Exception) as e:689 sqs_client.send_message(690 QueueUrl=queue_url, MessageBody="test", MessageAttributes=attribute691 )692 e.match("Invalid")693 # String Attributes must not contain non-printable characters694 # See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html695 invalid_attribute = {696 "attr1": {"StringValue": f"Invalid-{chr(8)},{chr(11)}", "DataType": "String"}697 }698 send_invalid(invalid_attribute)699 invalid_name_prefixes = ["aWs.", "AMAZON.", "."]700 for prefix in invalid_name_prefixes:701 invalid_attribute = {702 f"{prefix}-Invalid-attr": {"StringValue": "Valid", "DataType": "String"}703 }704 send_invalid(invalid_attribute)705 # Some illegal characters706 invalid_name_characters = ["!", '"', "§", "(", "?"]707 for char in invalid_name_characters:708 invalid_attribute = {709 f"Invalid-{char}-attr": {"StringValue": "Valid", "DataType": "String"}710 }711 send_invalid(invalid_attribute)712 # limit is 256 chars713 too_long_name = "L" * 257714 invalid_attribute = {f"{too_long_name}": {"StringValue": "Valid", "DataType": "String"}}715 send_invalid(invalid_attribute)716 # FIXME: no double periods should be allowed717 # invalid_attribute = {718 # "Invalid..Name": {"StringValue": "Valid", "DataType": "String"}719 # }720 # send_invalid(invalid_attribute)721 invalid_type = "Invalid"722 invalid_attribute = {723 "Attribute_name": {"StringValue": "Valid", "DataType": f"{invalid_type}"}724 }725 send_invalid(invalid_attribute)726 too_long_type = f"Number.{'L'*256}"727 invalid_attribute = {728 "Attribute_name": {"StringValue": "Valid", "DataType": f"{too_long_type}"}729 }730 send_invalid(invalid_attribute)731 ends_with_dot = "Invalid."732 invalid_attribute = {f"{ends_with_dot}": {"StringValue": "Valid", "DataType": "String"}}733 send_invalid(invalid_attribute)734 @pytest.mark.xfail735 def test_send_message_with_invalid_fifo_parameters(self, sqs_client, sqs_create_queue):736 fifo_queue_name = f"queue-{short_uid()}.fifo"737 queue_url = sqs_create_queue(738 QueueName=fifo_queue_name,739 Attributes={"FifoQueue": "true"},740 )741 with pytest.raises(Exception) as e:742 sqs_client.send_message(743 QueueUrl=queue_url,744 MessageBody="test",745 MessageDeduplicationId=f"Invalid-{chr(8)}",746 MessageGroupId="1",747 )748 e.match("InvalidParameterValue")749 with pytest.raises(Exception) as e:750 sqs_client.send_message(751 QueueUrl=queue_url,752 MessageBody="test",753 MessageDeduplicationId="1",754 MessageGroupId=f"Invalid-{chr(8)}",755 )756 e.match("InvalidParameterValue")757 def test_send_message_with_invalid_payload_characters(self, sqs_client, sqs_create_queue):758 queue_name = f"queue-{short_uid()}"759 queue_url = sqs_create_queue(QueueName=queue_name)760 invalid_message_body = f"Invalid-{chr(0)}-{chr(8)}-{chr(19)}-{chr(65535)}"761 with pytest.raises(Exception) as e:762 sqs_client.send_message(QueueUrl=queue_url, MessageBody=invalid_message_body)763 e.match("InvalidMessageContents")764 def test_dead_letter_queue_config(self, sqs_client, sqs_create_queue):765 queue_name = f"queue-{short_uid()}"766 dead_letter_queue_name = f"dead_letter_queue-{short_uid()}"767 dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)768 url_parts = dl_queue_url.split("/")769 region = get_region()770 dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(771 region, url_parts[len(url_parts) - 2], url_parts[-1]772 )773 conf = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 50}774 attributes = {"RedrivePolicy": json.dumps(conf)}775 queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)776 assert queue_url777 def test_dead_letter_queue_execution(778 self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function779 ):780 # TODO: lambda creation does not work when testing against AWS781 queue_name = f"queue-{short_uid()}"782 dead_letter_queue_name = f"dl-queue-{short_uid()}"783 dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)784 # create arn785 url_parts = dl_queue_url.split("/")786 region = os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION787 dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(788 region, url_parts[len(url_parts) - 2], url_parts[-1]789 )790 policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}791 queue_url = sqs_create_queue(792 QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(policy)}793 )794 lambda_name = f"lambda-{short_uid()}"795 create_lambda_function(796 func_name=lambda_name,797 libs=TEST_LAMBDA_LIBS,798 handler_file=TEST_LAMBDA_PYTHON,799 runtime=LAMBDA_RUNTIME_PYTHON36,800 )801 # create arn802 url_parts = queue_url.split("/")803 queue_arn = "arn:aws:sqs:{}:{}:{}".format(804 region, url_parts[len(url_parts) - 2], url_parts[-1]805 )806 lambda_client.create_event_source_mapping(807 EventSourceArn=queue_arn, FunctionName=lambda_name808 )809 # add message to SQS, which will trigger the Lambda, resulting in an error810 payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}811 sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload))812 assert poll_condition(813 lambda: "Messages"814 in sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0),815 10.0,816 1.0,817 )818 result_recv = sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0)819 assert result_recv["Messages"][0]["Body"] == json.dumps(payload)820 def test_dead_letter_queue_max_receive_count(self, sqs_client, sqs_create_queue):821 queue_name = f"queue-{short_uid()}"822 dead_letter_queue_name = f"dl-queue-{short_uid()}"823 dl_queue_url = sqs_create_queue(824 QueueName=dead_letter_queue_name, Attributes={"VisibilityTimeout": "0"}825 )826 # create arn827 url_parts = dl_queue_url.split("/")828 dl_target_arn = aws_stack.sqs_queue_arn(829 url_parts[-1], account_id=url_parts[len(url_parts) - 2]830 )831 policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}832 queue_url = sqs_create_queue(833 QueueName=queue_name,834 Attributes={"RedrivePolicy": json.dumps(policy), "VisibilityTimeout": "0"},835 )836 result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")837 result_recv1_messages = sqs_client.receive_message(QueueUrl=queue_url).get("Messages")838 result_recv2_messages = sqs_client.receive_message(QueueUrl=queue_url).get("Messages")839 # only one request received a message840 assert (result_recv1_messages is None) != (result_recv2_messages is None)841 assert poll_condition(842 lambda: "Messages" in sqs_client.receive_message(QueueUrl=dl_queue_url), 5.0, 1.0843 )844 assert (845 sqs_client.receive_message(QueueUrl=dl_queue_url)["Messages"][0]["MessageId"]846 == result_send["MessageId"]847 )848 @pytest.mark.skipif(849 os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf", reason="Currently fails for moto provider"850 )851 def test_dead_letter_queue_chain(self, sqs_client, sqs_create_queue):852 # test a chain of 3 queues, with DLQ flow q1 -> q2 -> q3853 # create queues854 queue_names = [f"q-{short_uid()}", f"q-{short_uid()}", f"q-{short_uid()}"]855 for queue_name in queue_names:856 sqs_create_queue(QueueName=queue_name, Attributes={"VisibilityTimeout": "0"})857 queue_urls = [aws_stack.get_sqs_queue_url(queue_name) for queue_name in queue_names]858 # set redrive policies859 for idx, queue_name in enumerate(queue_names[:2]):860 policy = {861 "deadLetterTargetArn": aws_stack.sqs_queue_arn(queue_names[idx + 1]),862 "maxReceiveCount": 1,863 }864 sqs_client.set_queue_attributes(865 QueueUrl=queue_urls[idx],866 Attributes={"RedrivePolicy": json.dumps(policy), "VisibilityTimeout": "0"},867 )868 def _retry_receive(q_url):869 def _receive():870 _result = sqs_client.receive_message(QueueUrl=q_url)871 assert _result.get("Messages")872 return _result873 return retry(_receive, sleep=1, retries=5)874 # send message875 result = sqs_client.send_message(QueueUrl=queue_urls[0], MessageBody="test")876 # retrieve message from q1877 result = _retry_receive(queue_urls[0])878 assert len(result.get("Messages")) == 1879 # Wait for VisibilityTimeout to expire880 time.sleep(1.1)881 # retrieve message from q1 again -> no message, should go to DLQ q2882 result = sqs_client.receive_message(QueueUrl=queue_urls[0])883 assert not result.get("Messages")884 # retrieve message from q2885 result = _retry_receive(queue_urls[1])886 assert len(result.get("Messages")) == 1887 # retrieve message from q2 again -> no message, should go to DLQ q3888 result = sqs_client.receive_message(QueueUrl=queue_urls[1])889 assert not result.get("Messages")890 # retrieve message from q3891 result = _retry_receive(queue_urls[2])892 assert len(result.get("Messages")) == 1893 # TODO: check if test_set_queue_attribute_at_creation == test_create_queue_with_attributes894 def test_get_specific_queue_attribute_response(self, sqs_client, sqs_create_queue):895 queue_name = f"queue-{short_uid()}"896 dead_letter_queue_name = f"dead_letter_queue-{short_uid()}"897 dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)898 region = get_region()899 dl_result = sqs_client.get_queue_attributes(900 QueueUrl=dl_queue_url, AttributeNames=["QueueArn"]901 )902 dl_queue_arn = dl_result["Attributes"]["QueueArn"]903 max_receive_count = 10904 _redrive_policy = {905 "deadLetterTargetArn": dl_queue_arn,906 "maxReceiveCount": max_receive_count,907 }908 message_retention_period = "604800"909 attributes = {910 "MessageRetentionPeriod": message_retention_period,911 "DelaySeconds": "10",912 "RedrivePolicy": json.dumps(_redrive_policy),913 }914 queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)915 url_parts = queue_url.split("/")916 get_two_attributes = sqs_client.get_queue_attributes(917 QueueUrl=queue_url,918 AttributeNames=["MessageRetentionPeriod", "RedrivePolicy"],919 )920 get_single_attribute = sqs_client.get_queue_attributes(921 QueueUrl=queue_url,922 AttributeNames=["QueueArn"],923 )924 # asserts925 constructed_arn = "arn:aws:sqs:{}:{}:{}".format(926 region, url_parts[len(url_parts) - 2], url_parts[-1]927 )928 redrive_policy = json.loads(get_two_attributes.get("Attributes").get("RedrivePolicy"))929 assert message_retention_period == get_two_attributes.get("Attributes").get(930 "MessageRetentionPeriod"931 )932 assert constructed_arn == get_single_attribute.get("Attributes").get("QueueArn")933 assert max_receive_count == redrive_policy.get("maxReceiveCount")934 @pytest.mark.xfail935 def test_set_unsupported_attribute_fifo(self, sqs_client, sqs_create_queue):936 # TODO: behaviour diverges from AWS937 queue_name = f"queue-{short_uid()}"938 queue_url = sqs_create_queue(QueueName=queue_name)939 with pytest.raises(Exception) as e:940 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes={"FifoQueue": "true"})941 e.match("InvalidAttributeName")942 fifo_queue_name = f"queue-{short_uid()}.fifo"943 fifo_queue_url = sqs_create_queue(944 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}945 )946 sqs_client.set_queue_attributes(QueueUrl=fifo_queue_url, Attributes={"FifoQueue": "true"})947 with pytest.raises(Exception) as e:948 sqs_client.set_queue_attributes(949 QueueUrl=fifo_queue_url, Attributes={"FifoQueue": "false"}950 )951 e.match("InvalidAttributeValue")952 def test_fifo_queue_send_multiple_messages_multiple_single_receives(953 self, sqs_client, sqs_create_queue954 ):955 fifo_queue_name = f"queue-{short_uid()}.fifo"956 queue_url = sqs_create_queue(957 QueueName=fifo_queue_name,958 Attributes={"FifoQueue": "true"},959 )960 message_count = 4961 group_id = f"fifo_group-{short_uid()}"962 sent_messages = []963 for i in range(message_count):964 result = sqs_client.send_message(965 QueueUrl=queue_url,966 MessageBody=f"message{i}",967 MessageDeduplicationId=f"deduplication{i}",968 MessageGroupId=group_id,969 )970 sent_messages.append(result)971 for i in range(message_count):972 result = sqs_client.receive_message(QueueUrl=queue_url)973 message = result["Messages"][0]974 assert message["Body"] == f"message{i}"975 assert message["MD5OfBody"] == sent_messages[i]["MD5OfMessageBody"]976 assert message["MessageId"] == sent_messages[i]["MessageId"]977 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])978 @pytest.mark.xfail979 def test_disallow_queue_name_with_slashes(self, sqs_client, sqs_create_queue):980 queue_name = f"queue/{short_uid()}/"981 with pytest.raises(Exception) as e:982 sqs_create_queue(QueueName=queue_name)983 e.match("InvalidParameterValue")984 def test_post_list_queues_with_auth_in_presigned_url(self):985 # TODO: does not work when testing against AWS986 method = "post"987 protocol = get_service_protocol()988 # CI might not set EDGE_PORT variables properly989 port = 4566990 if protocol == "https":991 port = 443992 base_url = "{}://{}:{}".format(get_service_protocol(), config.LOCALSTACK_HOSTNAME, port)993 req = AWSRequest(994 method=method,995 url=base_url,996 data={"Action": "ListQueues", "Version": "2012-11-05"},997 )998 # boto doesn't support querystring-style auth, so we have to do some999 # weird logic to use boto's signing functions, to understand what's1000 # going on here look at the internals of the SigV4Auth.add_auth1001 # method.1002 datetime_now = datetime.datetime.utcnow()1003 req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)1004 signer = SigV4Auth(1005 Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),1006 "sqs",1007 os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION,1008 )1009 canonical_request = signer.canonical_request(req)1010 string_to_sign = signer.string_to_sign(req, canonical_request)1011 payload = {1012 "Action": "ListQueues",1013 "Version": "2012-11-05",1014 "X-Amz-Algorithm": "AWS4-HMAC-SHA256",1015 "X-Amz-Credential": signer.scope(req),1016 "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),1017 "X-Amz-Signature": signer.signature(string_to_sign, req),1018 }1019 response = requests.post(url=base_url, data=urlencode(payload))1020 assert response.status_code == 2001021 assert b"<ListQueuesResponse" in response.content1022 # FIXME: make this testcase work against the new provider1023 @pytest.mark.xfail1024 def test_get_list_queues_with_auth_in_presigned_url(self):1025 # TODO: does not work when testing against AWS1026 method = "get"1027 protocol = get_service_protocol()1028 port = config.EDGE_PORT_HTTP1029 if protocol == "https":1030 port = config.EDGE_PORT1031 base_url = "{}://{}:{}".format(get_service_protocol(), config.LOCALSTACK_HOSTNAME, port)1032 req = AWSRequest(1033 method=method,1034 url=base_url,1035 data={"Action": "ListQueues", "Version": "2012-11-05"},1036 )1037 # boto doesn't support querystring-style auth, so we have to do some1038 # weird logic to use boto's signing functions, to understand what's1039 # going on here look at the internals of the SigV4Auth.add_auth1040 # method.1041 datetime_now = datetime.datetime.utcnow()1042 req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)1043 signer = SigV4Auth(1044 Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),1045 "sqs",1046 os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION,1047 )1048 canonical_request = signer.canonical_request(req)1049 string_to_sign = signer.string_to_sign(req, canonical_request)1050 payload = {1051 "Action": "ListQueues",1052 "Version": "2012-11-05",1053 "X-Amz-Algorithm": "AWS4-HMAC-SHA256",1054 "X-Amz-Credential": signer.scope(req),1055 "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),1056 "X-Amz-Signature": signer.signature(string_to_sign, req),1057 }1058 response = requests.get(base_url, params=payload)1059 assert response.status_code == 2001060 assert b"<ListQueuesResponse" in response.content1061 @pytest.mark.xfail1062 def test_system_attributes_have_no_effect_on_attr_md5(self, sqs_create_queue, sqs_client):1063 queue_name = f"queue-{short_uid()}"1064 queue_url = sqs_create_queue(QueueName=queue_name)1065 msg_attrs_provider = {"timestamp": {"StringValue": "1493147359900", "DataType": "Number"}}1066 aws_trace_header = {1067 "AWSTraceHeader": {1068 "StringValue": "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1",1069 "DataType": "String",1070 }1071 }1072 response_send = sqs_client.send_message(1073 QueueUrl=queue_url, MessageBody="test", MessageAttributes=msg_attrs_provider1074 )1075 response_send_system_attr = sqs_client.send_message(1076 QueueUrl=queue_url,1077 MessageBody="test",1078 MessageAttributes=msg_attrs_provider,1079 MessageSystemAttributes=aws_trace_header,1080 )1081 assert (1082 response_send["MD5OfMessageAttributes"]1083 == response_send_system_attr["MD5OfMessageAttributes"]1084 )1085 assert response_send.get("MD5OfMessageSystemAttributes") is None1086 assert (1087 response_send_system_attr.get("MD5OfMessageSystemAttributes")1088 == "5ae4d5d7636402d80f4eb6d213245a88"1089 )1090 def test_inflight_message_requeue(self, sqs_client, sqs_create_queue):1091 visibility_timeout = 3 if os.environ.get("TEST_TARGET") == "AWS_CLOUD" else 21092 queue_name = f"queue-{short_uid()}"1093 queue_url = sqs_create_queue(1094 QueueName=queue_name1095 ) # , Attributes={"VisibilityTimeout": str(visibility_timeout)})1096 sqs_client.send_message(QueueUrl=queue_url, MessageBody="test1")1097 result_receive1 = sqs_client.receive_message(1098 QueueUrl=queue_url, VisibilityTimeout=visibility_timeout1099 )1100 time.sleep(visibility_timeout / 2)1101 sqs_client.send_message(QueueUrl=queue_url, MessageBody="test2")1102 time.sleep(visibility_timeout)1103 result_receive2 = sqs_client.receive_message(1104 QueueUrl=queue_url, VisibilityTimeout=visibility_timeout1105 )1106 assert result_receive1["Messages"][0]["Body"] == result_receive2["Messages"][0]["Body"]1107 @pytest.mark.xfail1108 def test_sequence_number(self, sqs_client, sqs_create_queue):1109 fifo_queue_name = f"queue-{short_uid()}.fifo"1110 fifo_queue_url = sqs_create_queue(1111 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1112 )1113 message_content = f"test{short_uid()}"1114 dedup_id = f"fifo_dedup-{short_uid()}"1115 group_id = f"fifo_group-{short_uid()}"1116 send_result_fifo = sqs_client.send_message(1117 QueueUrl=fifo_queue_url,1118 MessageBody=message_content,1119 MessageGroupId=group_id,1120 MessageDeduplicationId=dedup_id,1121 )1122 assert "SequenceNumber" in send_result_fifo.keys()1123 queue_name = f"queue-{short_uid()}"1124 queue_url = sqs_create_queue(QueueName=queue_name)1125 send_result = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1126 assert "SequenceNumber" not in send_result1127 # Tests of diverging behaviour that was discovered during rewrite1128 @pytest.mark.xfail1129 def test_posting_to_fifo_requires_deduplicationid_group_id(self, sqs_client, sqs_create_queue):1130 fifo_queue_name = f"queue-{short_uid()}.fifo"1131 queue_url = sqs_create_queue(QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"})1132 message_content = f"test{short_uid()}"1133 dedup_id = f"fifo_dedup-{short_uid()}"1134 group_id = f"fifo_group-{short_uid()}"1135 with pytest.raises(Exception) as e:1136 sqs_client.send_message(1137 QueueUrl=queue_url, MessageBody=message_content, MessageGroupId=group_id1138 )1139 e.match("InvalidParameterValue")1140 with pytest.raises(Exception) as e:1141 sqs_client.send_message(1142 QueueUrl=queue_url, MessageBody=message_content, MessageDeduplicationId=dedup_id1143 )1144 e.match("MissingParameter")1145 # TODO: test approximateNumberOfMessages once delayed Messages are properly counted1146 def test_approximate_number_of_messages_delayed(self):1147 pass1148 @pytest.mark.xfail1149 def test_posting_to_queue_via_queue_name(self, sqs_client, sqs_create_queue):1150 # TODO: behaviour diverges from AWS1151 queue_name = f"queue-{short_uid()}"1152 sqs_create_queue(QueueName=queue_name)1153 result_send = sqs_client.send_message(1154 QueueUrl=queue_name, MessageBody="Using name instead of URL"1155 )1156 assert result_send["MD5OfMessageBody"] == "86a83f96652a1bfad3891e7d523750cb"1157 assert result_send["ResponseMetadata"]["HTTPStatusCode"] == 2001158 @pytest.mark.xfail1159 def test_invalid_string_attributes_cause_invalid_parameter_value_error(1160 self, sqs_client, sqs_create_queue1161 ):1162 queue_name = f"queue-{short_uid()}"1163 queue_url = sqs_create_queue(QueueName=queue_name)1164 invalid_attribute = {1165 "attr1": {"StringValue": f"Invalid-{chr(8)},{chr(11)}", "DataType": "String"}1166 }1167 with pytest.raises(Exception) as e:1168 sqs_client.send_message(1169 QueueUrl=queue_url, MessageBody="test", MessageAttributes=invalid_attribute1170 )1171 e.match("InvalidParameterValue")1172 def test_change_message_visibility_not_permanent(self, sqs_client, sqs_create_queue):1173 queue_name = f"queue-{short_uid()}"1174 queue_url = sqs_create_queue(QueueName=queue_name)1175 sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")1176 result_receive = sqs_client.receive_message(QueueUrl=queue_url)1177 receipt_handle = result_receive.get("Messages")[0]["ReceiptHandle"]1178 sqs_client.change_message_visibility(1179 QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=01180 )1181 result_recv_1 = sqs_client.receive_message(QueueUrl=queue_url)1182 result_recv_2 = sqs_client.receive_message(QueueUrl=queue_url)1183 assert (1184 result_recv_1.get("Messages")[0]["MessageId"]1185 == result_receive.get("Messages")[0]["MessageId"]1186 )1187 assert "Messages" not in result_recv_2.keys()1188 @pytest.mark.skip1189 def test_dead_letter_queue_execution_lambda_mapping_preserves_id(1190 self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function1191 ):1192 # TODO: lambda triggered dead letter delivery does not preserve the message id1193 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html1194 queue_name = f"queue-{short_uid()}"1195 dead_letter_queue_name = "dl-queue-{}".format(short_uid())1196 dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)1197 # create arn1198 url_parts = dl_queue_url.split("/")1199 region = get_region()1200 dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(1201 region, url_parts[len(url_parts) - 2], url_parts[-1]1202 )1203 policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}1204 queue_url = sqs_create_queue(1205 QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(policy)}1206 )1207 lambda_name = "lambda-{}".format(short_uid())1208 create_lambda_function(1209 func_name=lambda_name,1210 libs=TEST_LAMBDA_LIBS,1211 handler_file=TEST_LAMBDA_PYTHON,1212 runtime=LAMBDA_RUNTIME_PYTHON36,1213 )1214 # create arn1215 url_parts = queue_url.split("/")1216 queue_arn = "arn:aws:sqs:{}:{}:{}".format(1217 region, url_parts[len(url_parts) - 2], url_parts[-1]1218 )1219 lambda_client.create_event_source_mapping(1220 EventSourceArn=queue_arn, FunctionName=lambda_name1221 )1222 # add message to SQS, which will trigger the Lambda, resulting in an error1223 payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}1224 result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload))1225 assert poll_condition(1226 lambda: "Messages"1227 in sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0),1228 5.0,1229 1.0,1230 )1231 result_recv = sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0)1232 assert result_recv["Messages"][0]["MessageId"] == result_send["MessageId"]1233 # verification of community posted issue1234 # FIXME: \r gets lost1235 @pytest.mark.skip1236 def test_message_with_carriage_return(self, sqs_client, sqs_create_queue):1237 queue_name = f"queue-{short_uid()}"1238 queue_url = sqs_create_queue(QueueName=queue_name)1239 message_content = "{\r\n" + '"machineID" : "d357006e26ff47439e1ef894225d4307"' + "}"1240 result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1241 result_receive = sqs_client.receive_message(QueueUrl=queue_url)1242 assert result_send["MD5OfMessageBody"] == result_receive["Messages"][0]["MD5OfBody"]1243 assert message_content == result_receive["Messages"][0]["Body"]1244 def test_purge_queue(self, sqs_client, sqs_create_queue):1245 queue_name = f"queue-{short_uid()}"1246 queue_url = sqs_create_queue(QueueName=queue_name)1247 for i in range(3):1248 message_content = f"test-{i}"1249 sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1250 approx_nr_of_messages = sqs_client.get_queue_attributes(1251 QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1252 )1253 assert int(approx_nr_of_messages["Attributes"]["ApproximateNumberOfMessages"]) > 11254 sqs_client.purge_queue(QueueUrl=queue_url)1255 receive_result = sqs_client.receive_message(QueueUrl=queue_url)1256 assert "Messages" not in receive_result.keys()1257 def test_remove_message_with_old_receipt_handle(self, sqs_client, sqs_create_queue):1258 queue_name = f"queue-{short_uid()}"1259 queue_url = sqs_create_queue(QueueName=queue_name)1260 sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")1261 result_receive = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=1)1262 time.sleep(2)1263 receipt_handle = result_receive["Messages"][0]["ReceiptHandle"]1264 sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)1265 # This is more suited to the check than receiving because it simply1266 # returns the number of elements in the queue, without further logic1267 approx_nr_of_messages = sqs_client.get_queue_attributes(1268 QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1269 )1270 assert int(approx_nr_of_messages["Attributes"]["ApproximateNumberOfMessages"]) == 01271 @pytest.mark.skip(1272 reason="this is an AWS behaviour test that requires 5 minutes to run. Only execute manually"1273 )1274 def test_deduplication_interval(self, sqs_client, sqs_create_queue):1275 # TODO: AWS behaviour here "seems" inconsistent -> current code might need adaption1276 fifo_queue_name = f"queue-{short_uid()}.fifo"1277 queue_url = sqs_create_queue(QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"})1278 message_content = f"test{short_uid()}"1279 message_content_duplicate = f"{message_content}-duplicate"1280 message_content_half_time = f"{message_content}-half_time"1281 dedup_id = f"fifo_dedup-{short_uid()}"1282 group_id = f"fifo_group-{short_uid()}"1283 result_send = sqs_client.send_message(1284 QueueUrl=queue_url,1285 MessageBody=message_content,1286 MessageGroupId=group_id,1287 MessageDeduplicationId=dedup_id,1288 )1289 time.sleep(3)1290 sqs_client.send_message(1291 QueueUrl=queue_url,1292 MessageBody=message_content_duplicate,1293 MessageGroupId=group_id,1294 MessageDeduplicationId=dedup_id,1295 )1296 result_receive = sqs_client.receive_message(QueueUrl=queue_url)1297 sqs_client.delete_message(1298 QueueUrl=queue_url, ReceiptHandle=result_receive["Messages"][0]["ReceiptHandle"]1299 )1300 result_receive_duplicate = sqs_client.receive_message(QueueUrl=queue_url)1301 assert result_send.get("MessageId") == result_receive.get("Messages")[0].get("MessageId")1302 assert result_send.get("MD5OfMessageBody") == result_receive.get("Messages")[0].get(1303 "MD5OfBody"1304 )1305 assert "Messages" not in result_receive_duplicate.keys()1306 result_send = sqs_client.send_message(1307 QueueUrl=queue_url,1308 MessageBody=message_content,1309 MessageGroupId=group_id,1310 MessageDeduplicationId=dedup_id,1311 )1312 # ZZZZzzz...1313 # Fifo Deduplication Interval is 5 minutes at minimum, + there seems no way to change it.1314 # We give it a bit of leeway to avoid timing issues1315 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html1316 time.sleep(2)1317 sqs_client.send_message(1318 QueueUrl=queue_url,1319 MessageBody=message_content_half_time,1320 MessageGroupId=group_id,1321 MessageDeduplicationId=dedup_id,1322 )1323 time.sleep(6 * 60)1324 result_send_duplicate = sqs_client.send_message(1325 QueueUrl=queue_url,1326 MessageBody=message_content_duplicate,1327 MessageGroupId=group_id,1328 MessageDeduplicationId=dedup_id,1329 )1330 result_receive = sqs_client.receive_message(QueueUrl=queue_url)1331 sqs_client.delete_message(1332 QueueUrl=queue_url, ReceiptHandle=result_receive["Messages"][0]["ReceiptHandle"]1333 )1334 result_receive_duplicate = sqs_client.receive_message(QueueUrl=queue_url)1335 assert result_send.get("MessageId") == result_receive.get("Messages")[0].get("MessageId")1336 assert result_send.get("MD5OfMessageBody") == result_receive.get("Messages")[0].get(1337 "MD5OfBody"1338 )1339 assert result_send_duplicate.get("MessageId") == result_receive_duplicate.get("Messages")[1340 01341 ].get("MessageId")1342 assert result_send_duplicate.get("MD5OfMessageBody") == result_receive_duplicate.get(1343 "Messages"1344 )[0].get("MD5OfBody")1345 @pytest.mark.skipif(1346 os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf",1347 reason="New provider test which isn't covered by old one",1348 )1349 def test_sse_attributes_are_accepted(self, sqs_client, sqs_create_queue):1350 queue_name = f"queue-{short_uid()}"1351 queue_url = sqs_create_queue(QueueName=queue_name)1352 attributes = {1353 "KmsMasterKeyId": "testKeyId",1354 "KmsDataKeyReusePeriodSeconds": "6000",1355 "SqsManagedSseEnabled": "true",1356 }1357 sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)1358 result_attributes = sqs_client.get_queue_attributes(1359 QueueUrl=queue_url, AttributeNames=["All"]1360 )["Attributes"]1361 keys = result_attributes.keys()1362 for k in attributes.keys():1363 assert k in keys1364 assert attributes[k] == result_attributes[k]1365def get_region():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful