Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py  
...50    def test_get_queue_url_contains_request_host(self, sqs_client, sqs_create_queue):51        if config.SERVICE_PROVIDER_CONFIG.get_provider("sqs") != "asf":52            pytest.xfail("this test only works for the ASF provider")53        queue_name = "test-queue-" + short_uid()54        sqs_create_queue(QueueName=queue_name)55        queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]56        account_id = constants.TEST_AWS_ACCOUNT_ID57        host = f"http://localhost:{config.EDGE_PORT}"58        # our current queue pattern looks like this, but may change going forward, or may be configurable59        assert queue_url == f"{host}/{account_id}/{queue_name}"60        # attempt to connect through a different host and make sure the URL contains that host61        host = f"http://127.0.0.1:{config.EDGE_PORT}"62        client = aws_stack.connect_to_service("sqs", endpoint_url=host)63        queue_url = client.get_queue_url(QueueName=queue_name)["QueueUrl"]64        assert queue_url == f"{host}/{account_id}/{queue_name}"65    def test_list_queues(self, sqs_client, sqs_create_queue):66        queue_names = [67            "a-test-queue-" + short_uid(),68            "a-test-queue-" + short_uid(),69            "b-test-queue-" + short_uid(),70        ]71        # create three queues with prefixes and collect their urls72        queue_urls = []73        for name in queue_names:74            sqs_create_queue(QueueName=name)75            queue_url = sqs_client.get_queue_url(QueueName=name)["QueueUrl"]76            assert queue_url.endswith(name)77            queue_urls.append(queue_url)78        # list queues with first prefix79        result = sqs_client.list_queues(QueueNamePrefix="a-test-queue-")80        assert "QueueUrls" in result81        assert len(result["QueueUrls"]) == 282        assert queue_urls[0] in result["QueueUrls"]83        assert queue_urls[1] in result["QueueUrls"]84        assert queue_urls[2] not in result["QueueUrls"]85        # list queues with second prefix86        result = sqs_client.list_queues(QueueNamePrefix="b-test-queue-")87        assert "QueueUrls" in result88        assert len(result["QueueUrls"]) == 189        assert queue_urls[0] not in result["QueueUrls"]90        assert queue_urls[1] not in result["QueueUrls"]91        assert queue_urls[2] in result["QueueUrls"]92        # list queues regardless of prefix prefix93        result = sqs_client.list_queues()94        assert "QueueUrls" in result95        for url in queue_urls:96            assert url in result["QueueUrls"]97    def test_create_queue_and_get_attributes(self, sqs_client, sqs_queue):98        result = sqs_client.get_queue_attributes(99            QueueUrl=sqs_queue, AttributeNames=["QueueArn", "CreatedTimestamp", "VisibilityTimeout"]100        )101        assert "Attributes" in result102        attrs = result["Attributes"]103        assert len(attrs) == 3104        assert "test-queue-" in attrs["QueueArn"]105        assert int(float(attrs["CreatedTimestamp"])) == pytest.approx(int(time.time()), 30)106        assert int(attrs["VisibilityTimeout"]) == 30, "visibility timeout is not the default value"107    def test_send_receive_message(self, sqs_client, sqs_queue):108        send_result = sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")109        assert send_result["MessageId"]110        assert send_result["MD5OfMessageBody"] == "78e731027d8fd50ed642340b7c9a63b3"111        # TODO: other attributes112        receive_result = sqs_client.receive_message(QueueUrl=sqs_queue)113        assert len(receive_result["Messages"]) == 1114        message = receive_result["Messages"][0]115        assert message["ReceiptHandle"]116        assert message["Body"] == "message"117        assert message["MessageId"] == send_result["MessageId"]118        assert message["MD5OfBody"] == send_result["MD5OfMessageBody"]119    def test_send_receive_message_multiple_queues(self, sqs_client, sqs_create_queue):120        queue0 = sqs_create_queue()121        queue1 = sqs_create_queue()122        sqs_client.send_message(QueueUrl=queue0, MessageBody="message")123        result = sqs_client.receive_message(QueueUrl=queue1)124        assert "Messages" not in result125        result = sqs_client.receive_message(QueueUrl=queue0)126        assert len(result["Messages"]) == 1127        assert result["Messages"][0]["Body"] == "message"128    def test_send_message_batch(self, sqs_client, sqs_queue):129        sqs_client.send_message_batch(130            QueueUrl=sqs_queue,131            Entries=[132                {"Id": "1", "MessageBody": "message-0"},133                {"Id": "2", "MessageBody": "message-1"},134            ],135        )136        response0 = sqs_client.receive_message(QueueUrl=sqs_queue)137        response1 = sqs_client.receive_message(QueueUrl=sqs_queue)138        response2 = sqs_client.receive_message(QueueUrl=sqs_queue)139        assert len(response0.get("Messages", [])) == 1140        assert len(response1.get("Messages", [])) == 1141        assert len(response2.get("Messages", [])) == 0142        message0 = response0["Messages"][0]143        message1 = response1["Messages"][0]144        assert message0["Body"] == "message-0"145        assert message1["Body"] == "message-1"146    def test_send_batch_receive_multiple(self, sqs_client, sqs_queue):147        # send a batch, then a single message, then receive them148        # Important: AWS does not guarantee the order of messages, be it within the batch or between sends149        message_count = 3150        sqs_client.send_message_batch(151            QueueUrl=sqs_queue,152            Entries=[153                {"Id": "1", "MessageBody": "message-0"},154                {"Id": "2", "MessageBody": "message-1"},155            ],156        )157        sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message-2")158        i = 0159        result_recv = {"Messages": []}160        while len(result_recv["Messages"]) < message_count and i < message_count:161            result_recv["Messages"] = result_recv["Messages"] + (162                sqs_client.receive_message(163                    QueueUrl=sqs_queue, MaxNumberOfMessages=message_count164                ).get("Messages")165            )166            i += 1167        assert len(result_recv["Messages"]) == message_count168        assert set(result_recv["Messages"][b]["Body"] for b in range(message_count)) == set(169            f"message-{b}" for b in range(message_count)170        )171    def test_send_message_batch_with_empty_list(self, sqs_client, sqs_create_queue):172        queue_url = sqs_create_queue()173        try:174            sqs_client.send_message_batch(QueueUrl=queue_url, Entries=[])175        except ClientError as e:176            assert "EmptyBatchRequest" in e.response["Error"]["Code"]177            assert e.response["ResponseMetadata"]["HTTPStatusCode"] in [400, 404]178    def test_tag_untag_queue(self, sqs_client, sqs_create_queue):179        queue_url = sqs_create_queue()180        # tag queue181        tags = {"tag1": "value1", "tag2": "value2", "tag3": ""}182        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)183        # check queue tags184        response = sqs_client.list_queue_tags(QueueUrl=queue_url)185        assert response["Tags"] == tags186        # remove tag1 and tag3187        sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])188        response = sqs_client.list_queue_tags(QueueUrl=queue_url)189        assert response["Tags"] == {"tag2": "value2"}190        # remove tag2191        sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag2"])192        response = sqs_client.list_queue_tags(QueueUrl=queue_url)193        assert "Tags" not in response194    def test_tags_case_sensitive(self, sqs_client, sqs_create_queue):195        queue_url = sqs_create_queue()196        # tag queue197        tags = {"MyTag": "value1", "mytag": "value2"}198        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)199        response = sqs_client.list_queue_tags(QueueUrl=queue_url)200        assert response["Tags"] == tags201    def test_untag_queue_ignores_non_existing_tag(self, sqs_client, sqs_create_queue):202        queue_url = sqs_create_queue()203        # tag queue204        tags = {"tag1": "value1", "tag2": "value2"}205        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)206        # remove tags207        sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])208        response = sqs_client.list_queue_tags(QueueUrl=queue_url)209        assert response["Tags"] == {"tag2": "value2"}210    def test_tag_queue_overwrites_existing_tag(self, sqs_client, sqs_create_queue):211        queue_url = sqs_create_queue()212        # tag queue213        tags = {"tag1": "value1", "tag2": "value2"}214        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)215        # overwrite tags216        tags = {"tag1": "VALUE1", "tag3": "value3"}217        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)218        response = sqs_client.list_queue_tags(QueueUrl=queue_url)219        assert response["Tags"] == {"tag1": "VALUE1", "tag2": "value2", "tag3": "value3"}220    def test_create_queue_with_tags(self, sqs_client, sqs_create_queue):221        tags = {"tag1": "value1", "tag2": "value2"}222        queue_url = sqs_create_queue(tags=tags)223        response = sqs_client.list_queue_tags(QueueUrl=queue_url)224        assert response["Tags"] == tags225    def test_create_queue_with_attributes(self, sqs_client, sqs_create_queue):226        attributes = {227            "MessageRetentionPeriod": "604800",  # Unsupported by ElasticMq, should be saved in memory228            "ReceiveMessageWaitTimeSeconds": "10",229            "VisibilityTimeout": "20",230        }231        queue_url = sqs_create_queue(Attributes=attributes)232        attrs = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[233            "Attributes"234        ]235        assert attrs["MessageRetentionPeriod"] == "604800"236        assert attrs["VisibilityTimeout"] == "20"237        assert attrs["ReceiveMessageWaitTimeSeconds"] == "10"238    def test_send_delay_and_wait_time(self, sqs_client, sqs_queue):239        sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="foobar", DelaySeconds=1)240        result = sqs_client.receive_message(QueueUrl=sqs_queue)241        assert "Messages" not in result242        result = sqs_client.receive_message(QueueUrl=sqs_queue, WaitTimeSeconds=2)243        assert "Messages" in result244        assert len(result["Messages"]) == 1245    def test_receive_after_visibility_timeout(self, sqs_client, sqs_create_queue):246        queue_url = sqs_create_queue(Attributes={"VisibilityTimeout": "1"})247        sqs_client.send_message(QueueUrl=queue_url, MessageBody="foobar")248        # receive the message249        result = sqs_client.receive_message(QueueUrl=queue_url)250        assert "Messages" in result251        message_receipt_0 = result["Messages"][0]252        # message should be within the visibility timeout253        result = sqs_client.receive_message(QueueUrl=queue_url)254        assert "Messages" not in result255        # visibility timeout should have expired256        result = sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=2)257        assert "Messages" in result258        message_receipt_1 = result["Messages"][0]259        assert (260            message_receipt_0["ReceiptHandle"] != message_receipt_1["ReceiptHandle"]261        ), "receipt handles should be different"262    def test_receive_terminate_visibility_timeout(self, sqs_client, sqs_queue):263        queue_url = sqs_queue264        sqs_client.send_message(QueueUrl=queue_url, MessageBody="foobar")265        result = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)266        assert "Messages" in result267        message_receipt_0 = result["Messages"][0]268        result = sqs_client.receive_message(QueueUrl=queue_url)269        assert "Messages" in result270        message_receipt_1 = result["Messages"][0]271        assert (272            message_receipt_0["ReceiptHandle"] != message_receipt_1["ReceiptHandle"]273        ), "receipt handles should be different"274        # TODO: check if this is correct (whether receive with VisibilityTimeout = 0 is permanent)275        result = sqs_client.receive_message(QueueUrl=queue_url)276        assert "Messages" not in result277    def test_delete_message_batch_from_lambda(278        self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function279    ):280        # issue 3671 - not recreatable281        # TODO: lambda creation does not work when testing against AWS282        queue_name = f"queue-{short_uid()}"283        queue_url = sqs_create_queue(QueueName=queue_name)284        lambda_name = f"lambda-{short_uid()}"285        create_lambda_function(286            func_name=lambda_name,287            libs=TEST_LAMBDA_LIBS,288            handler_file=TEST_LAMBDA_PYTHON,289            runtime=LAMBDA_RUNTIME_PYTHON36,290        )291        delete_batch_payload = {lambda_integration.MSG_BODY_DELETE_BATCH: queue_url}292        batch = []293        for i in range(4):294            batch.append({"Id": str(i), "MessageBody": str(i)})295        sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch)296        lambda_client.invoke(297            FunctionName=lambda_name, Payload=json.dumps(delete_batch_payload), LogType="Tail"298        )299        receive_result = sqs_client.receive_message(QueueUrl=queue_url)300        assert "Messages" not in receive_result.keys()301    def test_invalid_receipt_handle_should_return_error_message(self, sqs_client, sqs_create_queue):302        # issue 3619303        queue_name = "queue_3619_" + short_uid()304        queue_url = sqs_create_queue(QueueName=queue_name)305        with pytest.raises(Exception) as e:306            sqs_client.change_message_visibility(307                QueueUrl=queue_url, ReceiptHandle="INVALID", VisibilityTimeout=60308            )309        e.match("ReceiptHandleIsInvalid")310    def test_message_with_attributes_should_be_enqueued(self, sqs_client, sqs_create_queue):311        # issue 3737312        queue_name = "queue_3737_" + short_uid()313        queue_url = sqs_create_queue(QueueName=queue_name)314        assert queue_url.endswith(queue_name)315        message_body = "test"316        timestamp_attribute = {"DataType": "Number", "StringValue": "1614717034367"}317        message_attributes = {"timestamp": timestamp_attribute}318        response_send = sqs_client.send_message(319            QueueUrl=queue_url, MessageBody=message_body, MessageAttributes=message_attributes320        )321        response_receive = sqs_client.receive_message(QueueUrl=queue_url)322        assert response_receive["Messages"][0]["MessageId"] == response_send["MessageId"]323    @pytest.mark.xfail324    def test_batch_send_with_invalid_char_should_succeed(self, sqs_client, sqs_create_queue):325        # issue 4135326        queue_name = "queue_4135_" + short_uid()327        queue_url = sqs_create_queue(QueueName=queue_name)328        batch = []329        for i in range(0, 9):330            batch.append({"Id": str(i), "MessageBody": str(i)})331        batch.append({"Id": "9", "MessageBody": "\x01"})332        result_send = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=batch)333        assert len(result_send["Failed"]) == 1334    @only_localstack335    def test_external_hostname(self, monkeypatch, sqs_client, sqs_create_queue):336        external_host = "external-host"337        external_port = "12345"338        monkeypatch.setattr(config, "SQS_PORT_EXTERNAL", external_port)339        monkeypatch.setattr(config, "HOSTNAME_EXTERNAL", external_host)340        # TODO: remove once the old provider is discontinued341        from localstack.services.sqs import sqs_listener as old_sqs_listener342        monkeypatch.setattr(old_sqs_listener, "SQS_PORT_EXTERNAL", external_port)343        queue_name = f"queue-{short_uid()}"344        queue_url = sqs_create_queue(QueueName=queue_name)345        assert f"{external_host}:{external_port}" in queue_url346        message_body = "external_host_test"347        sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_body)348        receive_result = sqs_client.receive_message(QueueUrl=queue_url)349        assert receive_result["Messages"][0]["Body"] == message_body350    @only_localstack351    def test_external_hostname_via_host_header(self, sqs_create_queue):352        """test making a request with a different external hostname/port being returned"""353        queue_name = f"queue-{short_uid()}"354        sqs_create_queue(QueueName=queue_name)355        edge_url = config.get_edge_url()356        headers = aws_stack.mock_aws_request_headers("sqs")357        payload = f"Action=GetQueueUrl&QueueName={queue_name}"358        # assert regular/default queue URL is returned359        url = f"{edge_url}"360        result = requests.post(url, data=payload, headers=headers)361        assert result362        content = to_str(result.content)363        kwargs = {"flags": re.MULTILINE | re.DOTALL}364        assert re.match(rf".*<QueueUrl>\s*{edge_url}/[^<]+</QueueUrl>.*", content, **kwargs)365        # assert custom port is returned in queue URL366        port = 12345367        headers["Host"] = f"local-test-host:{port}"368        result = requests.post(url, data=payload, headers=headers)369        assert result370        content = to_str(result.content)371        # TODO: currently only asserting that the port matches - potentially should also return the custom hostname?372        assert re.match(rf".*<QueueUrl>\s*http://[^:]+:{port}[^<]+</QueueUrl>.*", content, **kwargs)373    @only_localstack374    @pytest.mark.xfail375    def test_external_host_via_header_complete_message_lifecycle(self, monkeypatch):376        queue_name = f"queue-{short_uid()}"377        edge_url = config.get_edge_url()378        headers = aws_stack.mock_aws_request_headers("sqs")379        port = 12345380        hostname = "aws-local"381        url = f"{hostname}:{port}"382        headers["Host"] = url383        payload = f"Action=CreateQueue&QueueName={queue_name}"384        result = requests.post(edge_url, data=payload, headers=headers)385        assert result.status_code == 200386        assert url in result.text387        queue_url = f"http://{url}/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"388        message_body = f"test message {short_uid()}"389        payload = f"Action=SendMessage&QueueUrl={queue_url}&MessageBody={message_body}"390        result = requests.post(edge_url, data=payload, headers=headers)391        assert result.status_code == 200392        assert "MD5" in result.text393        payload = f"Action=ReceiveMessage&QueueUrl={queue_url}&VisibilityTimeout=0"394        result = requests.post(edge_url, data=payload, headers=headers)395        assert result.status_code == 200396        assert message_body in result.text397        # the customer said that he used to be able to access it via "127.0.0.1" instead of "aws-local" as well398        queue_url = f"http://127.0.0.1/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"399        payload = f"Action=SendMessage&QueueUrl={queue_url}&MessageBody={message_body}"400        result = requests.post(edge_url, data=payload, headers=headers)401        assert result.status_code == 200402        assert "MD5" in result.text403        queue_url = f"http://127.0.0.1/{constants.TEST_AWS_ACCOUNT_ID}/{queue_name}"404        payload = f"Action=ReceiveMessage&QueueUrl={queue_url}&VisibilityTimeout=0"405        result = requests.post(edge_url, data=payload, headers=headers)406        assert result.status_code == 200407        assert message_body in result.text408    @pytest.mark.xfail409    def test_fifo_messages_in_order_after_timeout(self, sqs_client, sqs_create_queue):410        # issue 4287411        queue_name = f"queue-{short_uid()}.fifo"412        timeout = 1413        attributes = {"FifoQueue": "true", "VisibilityTimeout": f"{timeout}"}414        queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)415        for i in range(3):416            sqs_client.send_message(417                QueueUrl=queue_url,418                MessageBody=f"message-{i}",419                MessageGroupId="1",420                MessageDeduplicationId=f"{i}",421            )422        def receive_and_check_order():423            result_receive = sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)424            for j in range(3):425                assert result_receive["Messages"][j]["Body"] == f"message-{j}"426        receive_and_check_order()427        time.sleep(timeout + 1)428        receive_and_check_order()429    def test_list_queue_tags(self, sqs_client, sqs_create_queue):430        queue_name = f"queue-{short_uid()}"431        queue_url = sqs_create_queue(QueueName=queue_name)432        tags = {"testTag1": "test1", "testTag2": "test2"}433        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)434        tag_list = sqs_client.list_queue_tags(QueueUrl=queue_url)435        assert tags == tag_list["Tags"]436    def test_queue_list_nonexistent_tags(self, sqs_client, sqs_create_queue):437        queue_name = f"queue-{short_uid()}"438        queue_url = sqs_create_queue(QueueName=queue_name)439        tag_list = sqs_client.list_queue_tags(QueueUrl=queue_url)440        assert "Tags" not in tag_list["ResponseMetadata"].keys()441    def test_publish_get_delete_message(self, sqs_client, sqs_create_queue):442        # visibility part handled by test_receive_terminate_visibility_timeout443        queue_name = f"queue-{short_uid()}"444        queue_url = sqs_create_queue(QueueName=queue_name)445        message_body = "test message"446        result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_body)447        result_recv = sqs_client.receive_message(QueueUrl=queue_url)448        assert result_recv["Messages"][0]["MessageId"] == result_send["MessageId"]449        sqs_client.delete_message(450            QueueUrl=queue_url, ReceiptHandle=result_recv["Messages"][0]["ReceiptHandle"]451        )452        result_recv = sqs_client.receive_message(QueueUrl=queue_url)453        assert "Messages" not in result_recv.keys()454    def test_delete_message_deletes_with_change_visibility_timeout(455        self, sqs_client, sqs_create_queue456    ):457        # Old name: test_delete_message_deletes_visibility_agnostic458        queue_name = f"queue-{short_uid()}"459        queue_url = sqs_create_queue(QueueName=queue_name)460        message_id = sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")["MessageId"]461        result_recv = sqs_client.receive_message(QueueUrl=queue_url)462        result_follow_up = sqs_client.receive_message(QueueUrl=queue_url)463        assert result_recv["Messages"][0]["MessageId"] == message_id464        assert "Messages" not in result_follow_up.keys()465        receipt_handle = result_recv["Messages"][0]["ReceiptHandle"]466        sqs_client.change_message_visibility(467            QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=0468        )469        # check if the new timeout enables instant re-receiving, to ensure the message was deleted470        result_recv = sqs_client.receive_message(QueueUrl=queue_url)471        assert result_recv["Messages"][0]["MessageId"] == message_id472        receipt_handle = result_recv["Messages"][0]["ReceiptHandle"]473        sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)474        result_follow_up = sqs_client.receive_message(QueueUrl=queue_url)475        assert "Messages" not in result_follow_up.keys()476    def test_publish_get_delete_message_batch(self, sqs_client, sqs_create_queue):477        message_count = 10478        queue_name = f"queue-{short_uid()}"479        queue_url = sqs_create_queue(QueueName=queue_name)480        message_batch = [481            {482                "Id": f"message-{i}",483                "MessageBody": f"messageBody-{i}",484            }485            for i in range(message_count)486        ]487        result_send_batch = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=message_batch)488        successful = result_send_batch["Successful"]489        assert len(successful) == len(message_batch)490        result_recv = []491        i = 0492        while len(result_recv) < message_count and i < message_count:493            result_recv.extend(494                sqs_client.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=message_count)[495                    "Messages"496                ]497            )498            i += 1499        assert len(result_recv) == message_count500        ids_sent = set()501        ids_received = set()502        for i in range(message_count):503            ids_sent.add(successful[i]["MessageId"])504            ids_received.add((result_recv[i]["MessageId"]))505        assert ids_sent == ids_received506        delete_entries = [507            {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}508            for message in result_recv509        ]510        sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=delete_entries)511        confirmation = sqs_client.receive_message(512            QueueUrl=queue_url, MaxNumberOfMessages=message_count513        )514        assert "Messages" not in confirmation.keys()515    def test_create_and_send_to_fifo_queue(self, sqs_client, sqs_create_queue):516        # Old name: test_create_fifo_queue517        queue_name = f"queue-{short_uid()}.fifo"518        attributes = {"FifoQueue": "true"}519        queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)520        # it should preserve .fifo in the queue name521        assert queue_name in queue_url522        message_id = sqs_client.send_message(523            QueueUrl=queue_url,524            MessageBody="test",525            MessageDeduplicationId=f"dedup-{short_uid()}",526            MessageGroupId="test_group",527        )["MessageId"]528        result_recv = sqs_client.receive_message(QueueUrl=queue_url)529        assert result_recv["Messages"][0]["MessageId"] == message_id530    def test_fifo_queue_without_fifo_queue_attribute(self, sqs_create_queue):531        queue_name = f"invalid-{short_uid()}.fifo"532        with pytest.raises(Exception) as e:533            sqs_create_queue(QueueName=queue_name)534        e.match("InvalidParameterValue")535    def test_fifo_queue_requires_suffix(self, sqs_create_queue):536        queue_name = f"invalid-{short_uid()}"537        attributes = {"FifoQueue": "true"}538        with pytest.raises(Exception) as e:539            sqs_create_queue(QueueName=queue_name, Attributes=attributes)540        e.match("InvalidParameterValue")541    @pytest.mark.skipif(542        os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf",543        reason="New provider test which isn't covered by old one",544    )545    def test_standard_queue_cannot_have_fifo_suffix(self, sqs_create_queue):546        queue_name = f"queue-{short_uid()}.fifo"547        with pytest.raises(Exception) as e:548            sqs_create_queue(QueueName=queue_name)549        e.match("InvalidParameterValue")550    @pytest.mark.xfail551    def test_redrive_policy_attribute_validity(self, sqs_create_queue, sqs_client):552        dl_queue_name = f"dl-queue-{short_uid()}"553        dl_queue_url = sqs_create_queue(QueueName=dl_queue_name)554        dl_target_arn = sqs_client.get_queue_attributes(555            QueueUrl=dl_queue_url, AttributeNames=["QueueArn"]556        )["Attributes"]["QueueArn"]557        queue_name = f"queue-{short_uid()}"558        queue_url = sqs_create_queue(QueueName=queue_name)559        valid_max_receive_count = "42"560        invalid_max_receive_count = "invalid"561        with pytest.raises(Exception) as e:562            sqs_client.set_queue_attributes(563                QueueUrl=queue_url,564                Attributes={"RedrivePolicy": json.dumps({"deadLetterTargetArn": dl_target_arn})},565            )566        e.match("InvalidParameterValue")567        with pytest.raises(Exception) as e:568            sqs_client.set_queue_attributes(569                QueueUrl=queue_url,570                Attributes={571                    "RedrivePolicy": json.dumps({"maxReceiveCount": valid_max_receive_count})572                },573            )574        e.match("InvalidParameterValue")575        _invalid_redrive_policy = {576            "deadLetterTargetArn": dl_target_arn,577            "maxReceiveCount": invalid_max_receive_count,578        }579        with pytest.raises(Exception) as e:580            sqs_client.set_queue_attributes(581                QueueUrl=queue_url,582                Attributes={"RedrivePolicy": json.dumps(_invalid_redrive_policy)},583            )584        e.match("InvalidParameterValue")585        _valid_redrive_policy = {586            "deadLetterTargetArn": dl_target_arn,587            "maxReceiveCount": valid_max_receive_count,588        }589        sqs_client.set_queue_attributes(590            QueueUrl=queue_url, Attributes={"RedrivePolicy": json.dumps(_valid_redrive_policy)}591        )592    @pytest.mark.skip593    def test_invalid_dead_letter_arn_rejected_before_lookup(self, sqs_create_queue):594        queue_name = f"queue-{short_uid()}"595        dl_dummy_arn = "dummy"596        max_receive_count = 42597        _redrive_policy = {598            "deadLetterTargetArn": dl_dummy_arn,599            "maxReceiveCount": max_receive_count,600        }601        with pytest.raises(Exception) as e:602            sqs_create_queue(603                QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(_redrive_policy)}604            )605        e.match("InvalidParameterValue")606    def test_set_queue_policy(self, sqs_client, sqs_create_queue):607        queue_name = f"queue-{short_uid()}"608        queue_url = sqs_create_queue(QueueName=queue_name)609        attributes = {"Policy": TEST_POLICY}610        sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)611        # accessing the policy generally and specifically612        attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[613            "Attributes"614        ]615        policy = json.loads(attributes["Policy"])616        assert "sqs:SendMessage" == policy["Statement"][0]["Action"]617        attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["Policy"])[618            "Attributes"619        ]620        policy = json.loads(attributes["Policy"])621        assert "sqs:SendMessage" == policy["Statement"][0]["Action"]622    def test_set_empty_queue_policy(self, sqs_client, sqs_create_queue):623        queue_name = f"queue-{short_uid()}"624        queue_url = sqs_create_queue(QueueName=queue_name)625        attributes = {"Policy": ""}626        sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)627        attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[628            "Attributes"629        ]630        assert "Policy" not in attributes.keys()631        # check if this behaviour holds on existing Policies as well632        attributes = {"Policy": TEST_POLICY}633        sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)634        attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[635            "Attributes"636        ]637        assert "sqs:SendMessage" in attributes["Policy"]638        attributes = {"Policy": ""}639        sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)640        attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[641            "Attributes"642        ]643        assert "Policy" not in attributes.keys()644    def test_send_message_with_attributes(self, sqs_client, sqs_create_queue):645        # Old name: test_send_message_attributes646        queue_name = f"queue-{short_uid()}"647        queue_url = sqs_create_queue(QueueName=queue_name)648        attributes = {649            "attr1": {"StringValue": "test1", "DataType": "String"},650            "attr2": {"StringValue": "test2", "DataType": "String"},651        }652        result_send = sqs_client.send_message(653            QueueUrl=queue_url, MessageBody="test", MessageAttributes=attributes654        )655        result_receive = sqs_client.receive_message(656            QueueUrl=queue_url, MessageAttributeNames=["All"]657        )658        messages = result_receive["Messages"]659        assert messages[0]["MessageId"] == result_send["MessageId"]660        assert messages[0]["MessageAttributes"] == attributes661        assert messages[0]["MD5OfMessageAttributes"] == result_send["MD5OfMessageAttributes"]662    def test_sent_message_retains_attributes_after_receive(self, sqs_client, sqs_create_queue):663        # Old name: test_send_message_retains_attributes664        queue_name = f"queue-{short_uid()}"665        queue_url = sqs_create_queue(QueueName=queue_name)666        attributes = {"attr1": {"StringValue": "test1", "DataType": "String"}}667        sqs_client.send_message(668            QueueUrl=queue_url, MessageBody="test", MessageAttributes=attributes669        )670        # receive should not interfere with message attributes671        sqs_client.receive_message(672            QueueUrl=queue_url, VisibilityTimeout=0, MessageAttributeNames=["All"]673        )674        receive_result = sqs_client.receive_message(675            QueueUrl=queue_url, MessageAttributeNames=["All"]676        )677        assert receive_result["Messages"][0]["MessageAttributes"] == attributes678    @pytest.mark.xfail679    def test_send_message_with_invalid_string_attributes(self, sqs_client, sqs_create_queue):680        queue_name = f"queue-{short_uid()}"681        queue_url = sqs_create_queue(QueueName=queue_name)682        # base line against to detect general failure683        valid_attribute = {"attr.1øÃä": {"StringValue": "Valida", "DataType": "String"}}684        sqs_client.send_message(685            QueueUrl=queue_url, MessageBody="test", MessageAttributes=valid_attribute686        )687        def send_invalid(attribute):688            with pytest.raises(Exception) as e:689                sqs_client.send_message(690                    QueueUrl=queue_url, MessageBody="test", MessageAttributes=attribute691                )692            e.match("Invalid")693        # String Attributes must not contain non-printable characters694        # See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html695        invalid_attribute = {696            "attr1": {"StringValue": f"Invalid-{chr(8)},{chr(11)}", "DataType": "String"}697        }698        send_invalid(invalid_attribute)699        invalid_name_prefixes = ["aWs.", "AMAZON.", "."]700        for prefix in invalid_name_prefixes:701            invalid_attribute = {702                f"{prefix}-Invalid-attr": {"StringValue": "Valid", "DataType": "String"}703            }704            send_invalid(invalid_attribute)705        # Some illegal characters706        invalid_name_characters = ["!", '"', "§", "(", "?"]707        for char in invalid_name_characters:708            invalid_attribute = {709                f"Invalid-{char}-attr": {"StringValue": "Valid", "DataType": "String"}710            }711            send_invalid(invalid_attribute)712        # limit is 256 chars713        too_long_name = "L" * 257714        invalid_attribute = {f"{too_long_name}": {"StringValue": "Valid", "DataType": "String"}}715        send_invalid(invalid_attribute)716        # FIXME: no double periods should be allowed717        # invalid_attribute = {718        #     "Invalid..Name": {"StringValue": "Valid", "DataType": "String"}719        # }720        # send_invalid(invalid_attribute)721        invalid_type = "Invalid"722        invalid_attribute = {723            "Attribute_name": {"StringValue": "Valid", "DataType": f"{invalid_type}"}724        }725        send_invalid(invalid_attribute)726        too_long_type = f"Number.{'L'*256}"727        invalid_attribute = {728            "Attribute_name": {"StringValue": "Valid", "DataType": f"{too_long_type}"}729        }730        send_invalid(invalid_attribute)731        ends_with_dot = "Invalid."732        invalid_attribute = {f"{ends_with_dot}": {"StringValue": "Valid", "DataType": "String"}}733        send_invalid(invalid_attribute)734    @pytest.mark.xfail735    def test_send_message_with_invalid_fifo_parameters(self, sqs_client, sqs_create_queue):736        fifo_queue_name = f"queue-{short_uid()}.fifo"737        queue_url = sqs_create_queue(738            QueueName=fifo_queue_name,739            Attributes={"FifoQueue": "true"},740        )741        with pytest.raises(Exception) as e:742            sqs_client.send_message(743                QueueUrl=queue_url,744                MessageBody="test",745                MessageDeduplicationId=f"Invalid-{chr(8)}",746                MessageGroupId="1",747            )748        e.match("InvalidParameterValue")749        with pytest.raises(Exception) as e:750            sqs_client.send_message(751                QueueUrl=queue_url,752                MessageBody="test",753                MessageDeduplicationId="1",754                MessageGroupId=f"Invalid-{chr(8)}",755            )756        e.match("InvalidParameterValue")757    def test_send_message_with_invalid_payload_characters(self, sqs_client, sqs_create_queue):758        queue_name = f"queue-{short_uid()}"759        queue_url = sqs_create_queue(QueueName=queue_name)760        invalid_message_body = f"Invalid-{chr(0)}-{chr(8)}-{chr(19)}-{chr(65535)}"761        with pytest.raises(Exception) as e:762            sqs_client.send_message(QueueUrl=queue_url, MessageBody=invalid_message_body)763        e.match("InvalidMessageContents")764    def test_dead_letter_queue_config(self, sqs_client, sqs_create_queue):765        queue_name = f"queue-{short_uid()}"766        dead_letter_queue_name = f"dead_letter_queue-{short_uid()}"767        dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)768        url_parts = dl_queue_url.split("/")769        region = get_region()770        dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(771            region, url_parts[len(url_parts) - 2], url_parts[-1]772        )773        conf = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 50}774        attributes = {"RedrivePolicy": json.dumps(conf)}775        queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)776        assert queue_url777    def test_dead_letter_queue_execution(778        self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function779    ):780        # TODO: lambda creation does not work when testing against AWS781        queue_name = f"queue-{short_uid()}"782        dead_letter_queue_name = f"dl-queue-{short_uid()}"783        dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)784        # create arn785        url_parts = dl_queue_url.split("/")786        region = os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION787        dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(788            region, url_parts[len(url_parts) - 2], url_parts[-1]789        )790        policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}791        queue_url = sqs_create_queue(792            QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(policy)}793        )794        lambda_name = f"lambda-{short_uid()}"795        create_lambda_function(796            func_name=lambda_name,797            libs=TEST_LAMBDA_LIBS,798            handler_file=TEST_LAMBDA_PYTHON,799            runtime=LAMBDA_RUNTIME_PYTHON36,800        )801        # create arn802        url_parts = queue_url.split("/")803        queue_arn = "arn:aws:sqs:{}:{}:{}".format(804            region, url_parts[len(url_parts) - 2], url_parts[-1]805        )806        lambda_client.create_event_source_mapping(807            EventSourceArn=queue_arn, FunctionName=lambda_name808        )809        # add message to SQS, which will trigger the Lambda, resulting in an error810        payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}811        sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload))812        assert poll_condition(813            lambda: "Messages"814            in sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0),815            10.0,816            1.0,817        )818        result_recv = sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0)819        assert result_recv["Messages"][0]["Body"] == json.dumps(payload)820    def test_dead_letter_queue_max_receive_count(self, sqs_client, sqs_create_queue):821        queue_name = f"queue-{short_uid()}"822        dead_letter_queue_name = f"dl-queue-{short_uid()}"823        dl_queue_url = sqs_create_queue(824            QueueName=dead_letter_queue_name, Attributes={"VisibilityTimeout": "0"}825        )826        # create arn827        url_parts = dl_queue_url.split("/")828        dl_target_arn = aws_stack.sqs_queue_arn(829            url_parts[-1], account_id=url_parts[len(url_parts) - 2]830        )831        policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}832        queue_url = sqs_create_queue(833            QueueName=queue_name,834            Attributes={"RedrivePolicy": json.dumps(policy), "VisibilityTimeout": "0"},835        )836        result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")837        result_recv1_messages = sqs_client.receive_message(QueueUrl=queue_url).get("Messages")838        result_recv2_messages = sqs_client.receive_message(QueueUrl=queue_url).get("Messages")839        # only one request received a message840        assert (result_recv1_messages is None) != (result_recv2_messages is None)841        assert poll_condition(842            lambda: "Messages" in sqs_client.receive_message(QueueUrl=dl_queue_url), 5.0, 1.0843        )844        assert (845            sqs_client.receive_message(QueueUrl=dl_queue_url)["Messages"][0]["MessageId"]846            == result_send["MessageId"]847        )848    @pytest.mark.skipif(849        os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf", reason="Currently fails for moto provider"850    )851    def test_dead_letter_queue_chain(self, sqs_client, sqs_create_queue):852        # test a chain of 3 queues, with DLQ flow q1 -> q2 -> q3853        # create queues854        queue_names = [f"q-{short_uid()}", f"q-{short_uid()}", f"q-{short_uid()}"]855        for queue_name in queue_names:856            sqs_create_queue(QueueName=queue_name, Attributes={"VisibilityTimeout": "0"})857        queue_urls = [aws_stack.get_sqs_queue_url(queue_name) for queue_name in queue_names]858        # set redrive policies859        for idx, queue_name in enumerate(queue_names[:2]):860            policy = {861                "deadLetterTargetArn": aws_stack.sqs_queue_arn(queue_names[idx + 1]),862                "maxReceiveCount": 1,863            }864            sqs_client.set_queue_attributes(865                QueueUrl=queue_urls[idx],866                Attributes={"RedrivePolicy": json.dumps(policy), "VisibilityTimeout": "0"},867            )868        def _retry_receive(q_url):869            def _receive():870                _result = sqs_client.receive_message(QueueUrl=q_url)871                assert _result.get("Messages")872                return _result873            return retry(_receive, sleep=1, retries=5)874        # send message875        result = sqs_client.send_message(QueueUrl=queue_urls[0], MessageBody="test")876        # retrieve message from q1877        result = _retry_receive(queue_urls[0])878        assert len(result.get("Messages")) == 1879        # Wait for VisibilityTimeout to expire880        time.sleep(1.1)881        # retrieve message from q1 again -> no message, should go to DLQ q2882        result = sqs_client.receive_message(QueueUrl=queue_urls[0])883        assert not result.get("Messages")884        # retrieve message from q2885        result = _retry_receive(queue_urls[1])886        assert len(result.get("Messages")) == 1887        # retrieve message from q2 again -> no message, should go to DLQ q3888        result = sqs_client.receive_message(QueueUrl=queue_urls[1])889        assert not result.get("Messages")890        # retrieve message from q3891        result = _retry_receive(queue_urls[2])892        assert len(result.get("Messages")) == 1893    # TODO: check if test_set_queue_attribute_at_creation == test_create_queue_with_attributes894    def test_get_specific_queue_attribute_response(self, sqs_client, sqs_create_queue):895        queue_name = f"queue-{short_uid()}"896        dead_letter_queue_name = f"dead_letter_queue-{short_uid()}"897        dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)898        region = get_region()899        dl_result = sqs_client.get_queue_attributes(900            QueueUrl=dl_queue_url, AttributeNames=["QueueArn"]901        )902        dl_queue_arn = dl_result["Attributes"]["QueueArn"]903        max_receive_count = 10904        _redrive_policy = {905            "deadLetterTargetArn": dl_queue_arn,906            "maxReceiveCount": max_receive_count,907        }908        message_retention_period = "604800"909        attributes = {910            "MessageRetentionPeriod": message_retention_period,911            "DelaySeconds": "10",912            "RedrivePolicy": json.dumps(_redrive_policy),913        }914        queue_url = sqs_create_queue(QueueName=queue_name, Attributes=attributes)915        url_parts = queue_url.split("/")916        get_two_attributes = sqs_client.get_queue_attributes(917            QueueUrl=queue_url,918            AttributeNames=["MessageRetentionPeriod", "RedrivePolicy"],919        )920        get_single_attribute = sqs_client.get_queue_attributes(921            QueueUrl=queue_url,922            AttributeNames=["QueueArn"],923        )924        # asserts925        constructed_arn = "arn:aws:sqs:{}:{}:{}".format(926            region, url_parts[len(url_parts) - 2], url_parts[-1]927        )928        redrive_policy = json.loads(get_two_attributes.get("Attributes").get("RedrivePolicy"))929        assert message_retention_period == get_two_attributes.get("Attributes").get(930            "MessageRetentionPeriod"931        )932        assert constructed_arn == get_single_attribute.get("Attributes").get("QueueArn")933        assert max_receive_count == redrive_policy.get("maxReceiveCount")934    @pytest.mark.xfail935    def test_set_unsupported_attribute_fifo(self, sqs_client, sqs_create_queue):936        # TODO: behaviour diverges from AWS937        queue_name = f"queue-{short_uid()}"938        queue_url = sqs_create_queue(QueueName=queue_name)939        with pytest.raises(Exception) as e:940            sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes={"FifoQueue": "true"})941        e.match("InvalidAttributeName")942        fifo_queue_name = f"queue-{short_uid()}.fifo"943        fifo_queue_url = sqs_create_queue(944            QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}945        )946        sqs_client.set_queue_attributes(QueueUrl=fifo_queue_url, Attributes={"FifoQueue": "true"})947        with pytest.raises(Exception) as e:948            sqs_client.set_queue_attributes(949                QueueUrl=fifo_queue_url, Attributes={"FifoQueue": "false"}950            )951        e.match("InvalidAttributeValue")952    def test_fifo_queue_send_multiple_messages_multiple_single_receives(953        self, sqs_client, sqs_create_queue954    ):955        fifo_queue_name = f"queue-{short_uid()}.fifo"956        queue_url = sqs_create_queue(957            QueueName=fifo_queue_name,958            Attributes={"FifoQueue": "true"},959        )960        message_count = 4961        group_id = f"fifo_group-{short_uid()}"962        sent_messages = []963        for i in range(message_count):964            result = sqs_client.send_message(965                QueueUrl=queue_url,966                MessageBody=f"message{i}",967                MessageDeduplicationId=f"deduplication{i}",968                MessageGroupId=group_id,969            )970            sent_messages.append(result)971        for i in range(message_count):972            result = sqs_client.receive_message(QueueUrl=queue_url)973            message = result["Messages"][0]974            assert message["Body"] == f"message{i}"975            assert message["MD5OfBody"] == sent_messages[i]["MD5OfMessageBody"]976            assert message["MessageId"] == sent_messages[i]["MessageId"]977            sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"])978    @pytest.mark.xfail979    def test_disallow_queue_name_with_slashes(self, sqs_client, sqs_create_queue):980        queue_name = f"queue/{short_uid()}/"981        with pytest.raises(Exception) as e:982            sqs_create_queue(QueueName=queue_name)983        e.match("InvalidParameterValue")984    def test_post_list_queues_with_auth_in_presigned_url(self):985        # TODO: does not work when testing against AWS986        method = "post"987        protocol = get_service_protocol()988        # CI might not set EDGE_PORT variables properly989        port = 4566990        if protocol == "https":991            port = 443992        base_url = "{}://{}:{}".format(get_service_protocol(), config.LOCALSTACK_HOSTNAME, port)993        req = AWSRequest(994            method=method,995            url=base_url,996            data={"Action": "ListQueues", "Version": "2012-11-05"},997        )998        # boto doesn't support querystring-style auth, so we have to do some999        # weird logic to use boto's signing functions, to understand what's1000        # going on here look at the internals of the SigV4Auth.add_auth1001        # method.1002        datetime_now = datetime.datetime.utcnow()1003        req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)1004        signer = SigV4Auth(1005            Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),1006            "sqs",1007            os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION,1008        )1009        canonical_request = signer.canonical_request(req)1010        string_to_sign = signer.string_to_sign(req, canonical_request)1011        payload = {1012            "Action": "ListQueues",1013            "Version": "2012-11-05",1014            "X-Amz-Algorithm": "AWS4-HMAC-SHA256",1015            "X-Amz-Credential": signer.scope(req),1016            "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),1017            "X-Amz-Signature": signer.signature(string_to_sign, req),1018        }1019        response = requests.post(url=base_url, data=urlencode(payload))1020        assert response.status_code == 2001021        assert b"<ListQueuesResponse" in response.content1022    # FIXME: make this testcase work against the new provider1023    @pytest.mark.xfail1024    def test_get_list_queues_with_auth_in_presigned_url(self):1025        # TODO: does not work when testing against AWS1026        method = "get"1027        protocol = get_service_protocol()1028        port = config.EDGE_PORT_HTTP1029        if protocol == "https":1030            port = config.EDGE_PORT1031        base_url = "{}://{}:{}".format(get_service_protocol(), config.LOCALSTACK_HOSTNAME, port)1032        req = AWSRequest(1033            method=method,1034            url=base_url,1035            data={"Action": "ListQueues", "Version": "2012-11-05"},1036        )1037        # boto doesn't support querystring-style auth, so we have to do some1038        # weird logic to use boto's signing functions, to understand what's1039        # going on here look at the internals of the SigV4Auth.add_auth1040        # method.1041        datetime_now = datetime.datetime.utcnow()1042        req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)1043        signer = SigV4Auth(1044            Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),1045            "sqs",1046            os.environ.get("AWS_DEFAULT_REGION") or TEST_REGION,1047        )1048        canonical_request = signer.canonical_request(req)1049        string_to_sign = signer.string_to_sign(req, canonical_request)1050        payload = {1051            "Action": "ListQueues",1052            "Version": "2012-11-05",1053            "X-Amz-Algorithm": "AWS4-HMAC-SHA256",1054            "X-Amz-Credential": signer.scope(req),1055            "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),1056            "X-Amz-Signature": signer.signature(string_to_sign, req),1057        }1058        response = requests.get(base_url, params=payload)1059        assert response.status_code == 2001060        assert b"<ListQueuesResponse" in response.content1061    @pytest.mark.xfail1062    def test_system_attributes_have_no_effect_on_attr_md5(self, sqs_create_queue, sqs_client):1063        queue_name = f"queue-{short_uid()}"1064        queue_url = sqs_create_queue(QueueName=queue_name)1065        msg_attrs_provider = {"timestamp": {"StringValue": "1493147359900", "DataType": "Number"}}1066        aws_trace_header = {1067            "AWSTraceHeader": {1068                "StringValue": "Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1",1069                "DataType": "String",1070            }1071        }1072        response_send = sqs_client.send_message(1073            QueueUrl=queue_url, MessageBody="test", MessageAttributes=msg_attrs_provider1074        )1075        response_send_system_attr = sqs_client.send_message(1076            QueueUrl=queue_url,1077            MessageBody="test",1078            MessageAttributes=msg_attrs_provider,1079            MessageSystemAttributes=aws_trace_header,1080        )1081        assert (1082            response_send["MD5OfMessageAttributes"]1083            == response_send_system_attr["MD5OfMessageAttributes"]1084        )1085        assert response_send.get("MD5OfMessageSystemAttributes") is None1086        assert (1087            response_send_system_attr.get("MD5OfMessageSystemAttributes")1088            == "5ae4d5d7636402d80f4eb6d213245a88"1089        )1090    def test_inflight_message_requeue(self, sqs_client, sqs_create_queue):1091        visibility_timeout = 3 if os.environ.get("TEST_TARGET") == "AWS_CLOUD" else 21092        queue_name = f"queue-{short_uid()}"1093        queue_url = sqs_create_queue(1094            QueueName=queue_name1095        )  # , Attributes={"VisibilityTimeout": str(visibility_timeout)})1096        sqs_client.send_message(QueueUrl=queue_url, MessageBody="test1")1097        result_receive1 = sqs_client.receive_message(1098            QueueUrl=queue_url, VisibilityTimeout=visibility_timeout1099        )1100        time.sleep(visibility_timeout / 2)1101        sqs_client.send_message(QueueUrl=queue_url, MessageBody="test2")1102        time.sleep(visibility_timeout)1103        result_receive2 = sqs_client.receive_message(1104            QueueUrl=queue_url, VisibilityTimeout=visibility_timeout1105        )1106        assert result_receive1["Messages"][0]["Body"] == result_receive2["Messages"][0]["Body"]1107    @pytest.mark.xfail1108    def test_sequence_number(self, sqs_client, sqs_create_queue):1109        fifo_queue_name = f"queue-{short_uid()}.fifo"1110        fifo_queue_url = sqs_create_queue(1111            QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1112        )1113        message_content = f"test{short_uid()}"1114        dedup_id = f"fifo_dedup-{short_uid()}"1115        group_id = f"fifo_group-{short_uid()}"1116        send_result_fifo = sqs_client.send_message(1117            QueueUrl=fifo_queue_url,1118            MessageBody=message_content,1119            MessageGroupId=group_id,1120            MessageDeduplicationId=dedup_id,1121        )1122        assert "SequenceNumber" in send_result_fifo.keys()1123        queue_name = f"queue-{short_uid()}"1124        queue_url = sqs_create_queue(QueueName=queue_name)1125        send_result = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1126        assert "SequenceNumber" not in send_result1127    # Tests of diverging behaviour that was discovered during rewrite1128    @pytest.mark.xfail1129    def test_posting_to_fifo_requires_deduplicationid_group_id(self, sqs_client, sqs_create_queue):1130        fifo_queue_name = f"queue-{short_uid()}.fifo"1131        queue_url = sqs_create_queue(QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"})1132        message_content = f"test{short_uid()}"1133        dedup_id = f"fifo_dedup-{short_uid()}"1134        group_id = f"fifo_group-{short_uid()}"1135        with pytest.raises(Exception) as e:1136            sqs_client.send_message(1137                QueueUrl=queue_url, MessageBody=message_content, MessageGroupId=group_id1138            )1139        e.match("InvalidParameterValue")1140        with pytest.raises(Exception) as e:1141            sqs_client.send_message(1142                QueueUrl=queue_url, MessageBody=message_content, MessageDeduplicationId=dedup_id1143            )1144        e.match("MissingParameter")1145    # TODO: test approximateNumberOfMessages once delayed Messages are properly counted1146    def test_approximate_number_of_messages_delayed(self):1147        pass1148    @pytest.mark.xfail1149    def test_posting_to_queue_via_queue_name(self, sqs_client, sqs_create_queue):1150        # TODO: behaviour diverges from AWS1151        queue_name = f"queue-{short_uid()}"1152        sqs_create_queue(QueueName=queue_name)1153        result_send = sqs_client.send_message(1154            QueueUrl=queue_name, MessageBody="Using name instead of URL"1155        )1156        assert result_send["MD5OfMessageBody"] == "86a83f96652a1bfad3891e7d523750cb"1157        assert result_send["ResponseMetadata"]["HTTPStatusCode"] == 2001158    @pytest.mark.xfail1159    def test_invalid_string_attributes_cause_invalid_parameter_value_error(1160        self, sqs_client, sqs_create_queue1161    ):1162        queue_name = f"queue-{short_uid()}"1163        queue_url = sqs_create_queue(QueueName=queue_name)1164        invalid_attribute = {1165            "attr1": {"StringValue": f"Invalid-{chr(8)},{chr(11)}", "DataType": "String"}1166        }1167        with pytest.raises(Exception) as e:1168            sqs_client.send_message(1169                QueueUrl=queue_url, MessageBody="test", MessageAttributes=invalid_attribute1170            )1171        e.match("InvalidParameterValue")1172    def test_change_message_visibility_not_permanent(self, sqs_client, sqs_create_queue):1173        queue_name = f"queue-{short_uid()}"1174        queue_url = sqs_create_queue(QueueName=queue_name)1175        sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")1176        result_receive = sqs_client.receive_message(QueueUrl=queue_url)1177        receipt_handle = result_receive.get("Messages")[0]["ReceiptHandle"]1178        sqs_client.change_message_visibility(1179            QueueUrl=queue_url, ReceiptHandle=receipt_handle, VisibilityTimeout=01180        )1181        result_recv_1 = sqs_client.receive_message(QueueUrl=queue_url)1182        result_recv_2 = sqs_client.receive_message(QueueUrl=queue_url)1183        assert (1184            result_recv_1.get("Messages")[0]["MessageId"]1185            == result_receive.get("Messages")[0]["MessageId"]1186        )1187        assert "Messages" not in result_recv_2.keys()1188    @pytest.mark.skip1189    def test_dead_letter_queue_execution_lambda_mapping_preserves_id(1190        self, sqs_client, sqs_create_queue, lambda_client, create_lambda_function1191    ):1192        # TODO: lambda triggered dead letter delivery does not preserve the message id1193        # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html1194        queue_name = f"queue-{short_uid()}"1195        dead_letter_queue_name = "dl-queue-{}".format(short_uid())1196        dl_queue_url = sqs_create_queue(QueueName=dead_letter_queue_name)1197        # create arn1198        url_parts = dl_queue_url.split("/")1199        region = get_region()1200        dl_target_arn = "arn:aws:sqs:{}:{}:{}".format(1201            region, url_parts[len(url_parts) - 2], url_parts[-1]1202        )1203        policy = {"deadLetterTargetArn": dl_target_arn, "maxReceiveCount": 1}1204        queue_url = sqs_create_queue(1205            QueueName=queue_name, Attributes={"RedrivePolicy": json.dumps(policy)}1206        )1207        lambda_name = "lambda-{}".format(short_uid())1208        create_lambda_function(1209            func_name=lambda_name,1210            libs=TEST_LAMBDA_LIBS,1211            handler_file=TEST_LAMBDA_PYTHON,1212            runtime=LAMBDA_RUNTIME_PYTHON36,1213        )1214        # create arn1215        url_parts = queue_url.split("/")1216        queue_arn = "arn:aws:sqs:{}:{}:{}".format(1217            region, url_parts[len(url_parts) - 2], url_parts[-1]1218        )1219        lambda_client.create_event_source_mapping(1220            EventSourceArn=queue_arn, FunctionName=lambda_name1221        )1222        # add message to SQS, which will trigger the Lambda, resulting in an error1223        payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}1224        result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(payload))1225        assert poll_condition(1226            lambda: "Messages"1227            in sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0),1228            5.0,1229            1.0,1230        )1231        result_recv = sqs_client.receive_message(QueueUrl=dl_queue_url, VisibilityTimeout=0)1232        assert result_recv["Messages"][0]["MessageId"] == result_send["MessageId"]1233    # verification of community posted issue1234    # FIXME: \r gets lost1235    @pytest.mark.skip1236    def test_message_with_carriage_return(self, sqs_client, sqs_create_queue):1237        queue_name = f"queue-{short_uid()}"1238        queue_url = sqs_create_queue(QueueName=queue_name)1239        message_content = "{\r\n" + '"machineID" : "d357006e26ff47439e1ef894225d4307"' + "}"1240        result_send = sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1241        result_receive = sqs_client.receive_message(QueueUrl=queue_url)1242        assert result_send["MD5OfMessageBody"] == result_receive["Messages"][0]["MD5OfBody"]1243        assert message_content == result_receive["Messages"][0]["Body"]1244    def test_purge_queue(self, sqs_client, sqs_create_queue):1245        queue_name = f"queue-{short_uid()}"1246        queue_url = sqs_create_queue(QueueName=queue_name)1247        for i in range(3):1248            message_content = f"test-{i}"1249            sqs_client.send_message(QueueUrl=queue_url, MessageBody=message_content)1250        approx_nr_of_messages = sqs_client.get_queue_attributes(1251            QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1252        )1253        assert int(approx_nr_of_messages["Attributes"]["ApproximateNumberOfMessages"]) > 11254        sqs_client.purge_queue(QueueUrl=queue_url)1255        receive_result = sqs_client.receive_message(QueueUrl=queue_url)1256        assert "Messages" not in receive_result.keys()1257    def test_remove_message_with_old_receipt_handle(self, sqs_client, sqs_create_queue):1258        queue_name = f"queue-{short_uid()}"1259        queue_url = sqs_create_queue(QueueName=queue_name)1260        sqs_client.send_message(QueueUrl=queue_url, MessageBody="test")1261        result_receive = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=1)1262        time.sleep(2)1263        receipt_handle = result_receive["Messages"][0]["ReceiptHandle"]1264        sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)1265        # This is more suited to the check than receiving because it simply1266        # returns the number of elements in the queue, without further logic1267        approx_nr_of_messages = sqs_client.get_queue_attributes(1268            QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]1269        )1270        assert int(approx_nr_of_messages["Attributes"]["ApproximateNumberOfMessages"]) == 01271    @pytest.mark.skip(1272        reason="this is an AWS behaviour test that requires 5 minutes to run. Only execute manually"1273    )1274    def test_deduplication_interval(self, sqs_client, sqs_create_queue):1275        # TODO: AWS behaviour here "seems" inconsistent -> current code might need adaption1276        fifo_queue_name = f"queue-{short_uid()}.fifo"1277        queue_url = sqs_create_queue(QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"})1278        message_content = f"test{short_uid()}"1279        message_content_duplicate = f"{message_content}-duplicate"1280        message_content_half_time = f"{message_content}-half_time"1281        dedup_id = f"fifo_dedup-{short_uid()}"1282        group_id = f"fifo_group-{short_uid()}"1283        result_send = sqs_client.send_message(1284            QueueUrl=queue_url,1285            MessageBody=message_content,1286            MessageGroupId=group_id,1287            MessageDeduplicationId=dedup_id,1288        )1289        time.sleep(3)1290        sqs_client.send_message(1291            QueueUrl=queue_url,1292            MessageBody=message_content_duplicate,1293            MessageGroupId=group_id,1294            MessageDeduplicationId=dedup_id,1295        )1296        result_receive = sqs_client.receive_message(QueueUrl=queue_url)1297        sqs_client.delete_message(1298            QueueUrl=queue_url, ReceiptHandle=result_receive["Messages"][0]["ReceiptHandle"]1299        )1300        result_receive_duplicate = sqs_client.receive_message(QueueUrl=queue_url)1301        assert result_send.get("MessageId") == result_receive.get("Messages")[0].get("MessageId")1302        assert result_send.get("MD5OfMessageBody") == result_receive.get("Messages")[0].get(1303            "MD5OfBody"1304        )1305        assert "Messages" not in result_receive_duplicate.keys()1306        result_send = sqs_client.send_message(1307            QueueUrl=queue_url,1308            MessageBody=message_content,1309            MessageGroupId=group_id,1310            MessageDeduplicationId=dedup_id,1311        )1312        # ZZZZzzz...1313        # Fifo Deduplication Interval is 5 minutes at minimum, + there seems no way to change it.1314        # We give it a bit of leeway to avoid timing issues1315        # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html1316        time.sleep(2)1317        sqs_client.send_message(1318            QueueUrl=queue_url,1319            MessageBody=message_content_half_time,1320            MessageGroupId=group_id,1321            MessageDeduplicationId=dedup_id,1322        )1323        time.sleep(6 * 60)1324        result_send_duplicate = sqs_client.send_message(1325            QueueUrl=queue_url,1326            MessageBody=message_content_duplicate,1327            MessageGroupId=group_id,1328            MessageDeduplicationId=dedup_id,1329        )1330        result_receive = sqs_client.receive_message(QueueUrl=queue_url)1331        sqs_client.delete_message(1332            QueueUrl=queue_url, ReceiptHandle=result_receive["Messages"][0]["ReceiptHandle"]1333        )1334        result_receive_duplicate = sqs_client.receive_message(QueueUrl=queue_url)1335        assert result_send.get("MessageId") == result_receive.get("Messages")[0].get("MessageId")1336        assert result_send.get("MD5OfMessageBody") == result_receive.get("Messages")[0].get(1337            "MD5OfBody"1338        )1339        assert result_send_duplicate.get("MessageId") == result_receive_duplicate.get("Messages")[1340            01341        ].get("MessageId")1342        assert result_send_duplicate.get("MD5OfMessageBody") == result_receive_duplicate.get(1343            "Messages"1344        )[0].get("MD5OfBody")1345    @pytest.mark.skipif(1346        os.environ.get("PROVIDER_OVERRIDE_SQS") != "asf",1347        reason="New provider test which isn't covered by old one",1348    )1349    def test_sse_attributes_are_accepted(self, sqs_client, sqs_create_queue):1350        queue_name = f"queue-{short_uid()}"1351        queue_url = sqs_create_queue(QueueName=queue_name)1352        attributes = {1353            "KmsMasterKeyId": "testKeyId",1354            "KmsDataKeyReusePeriodSeconds": "6000",1355            "SqsManagedSseEnabled": "true",1356        }1357        sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes=attributes)1358        result_attributes = sqs_client.get_queue_attributes(1359            QueueUrl=queue_url, AttributeNames=["All"]1360        )["Attributes"]1361        keys = result_attributes.keys()1362        for k in attributes.keys():1363            assert k in keys1364            assert attributes[k] == result_attributes[k]1365def get_region():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
