Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py  
...770            tags=TEST_LAMBDA_TAGS,771        )772        self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])773        self.check_tagged_queue(response["QueueUrl"])774    def test_tag_untag_queue(self):775        queue_name = "queue-{}".format(short_uid())776        queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]777        response = self.client.tag_queue(778            QueueUrl=queue_url,779            Tags=TEST_LAMBDA_TAGS,780        )781        self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])782        self.check_tagged_queue(queue_url)783    def test_posting_to_queue_with_trailing_slash(self):784        queue_name = "queue-{}".format(short_uid())785        queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]786        base_url = "{}://{}:{}".format(787            get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS788        )789        encoded_url = urlencode(790            {791                "Action": "SendMessage",792                "Version": "2012-11-05",793                "QueueUrl": "{}/{}/{}/".format(base_url, TEST_AWS_ACCOUNT_ID, queue_name),794                "MessageBody": "test body",795            }796        )797        r = requests.post(url=base_url, data=encoded_url)798        self.assertEqual(r.status_code, 200)799        # We can get the message back800        resp = self.client.receive_message(QueueUrl=queue_url)801        self.assertEqual(resp["Messages"][0]["Body"], "test body")802        # clean up803        self.client.delete_queue(QueueUrl=queue_url)804    def test_create_queue_with_slashes(self):805        queue_name = "queue/%s" % short_uid()806        queue_url = self.client.create_queue(QueueName=queue_name)807        result = self.client.list_queues()808        self.assertIn(queue_url.get("QueueUrl"), result.get("QueueUrls"))809        # clean up810        self.client.delete_queue(QueueUrl=queue_url.get("QueueUrl"))811        result = self.client.list_queues()812        self.assertNotIn(queue_url.get("QueueUrl"), result.get("QueueUrls", []))813    def list_queues_with_auth_in_presigned_url(self, method):814        base_url = "{}://{}:{}".format(815            get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS816        )817        req = AWSRequest(818            method=method,819            url=base_url,820            data={"Action": "ListQueues", "Version": "2012-11-05"},821        )822        # boto doesn't support querystring-style auth, so we have to do some823        # weird logic to use boto's signing functions, to understand what's824        # going on here look at the internals of the SigV4Auth.add_auth825        # method.826        datetime_now = datetime.datetime.utcnow()827        req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)828        signer = SigV4Auth(829            Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),830            "sqs",831            aws_stack.get_region(),832        )833        canonical_request = signer.canonical_request(req)834        string_to_sign = signer.string_to_sign(req, canonical_request)835        payload = {836            "Action": "ListQueues",837            "Version": "2012-11-05",838            "X-Amz-Algorithm": "AWS4-HMAC-SHA256",839            "X-Amz-Credential": signer.scope(req),840            "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),841            "X-Amz-Signature": signer.signature(string_to_sign, req),842        }843        if method == "GET":844            return requests.get(url=base_url, params=payload)845        else:846            return requests.post(url=base_url, data=urlencode(payload))847    def test_post_list_queues_with_auth_in_presigned_url(self):848        res = self.list_queues_with_auth_in_presigned_url(method="POST")849        self.assertEqual(res.status_code, 200)850        self.assertTrue(b"<ListQueuesResponse>" in res.content)851    def test_get_list_queues_with_auth_in_presigned_url(self):852        res = self.list_queues_with_auth_in_presigned_url(method="GET")853        self.assertEqual(res.status_code, 200)854        self.assertTrue(b"<ListQueuesResponse>" in res.content)855    # ---------------856    # HELPER METHODS857    # ---------------858    def receive_dlq(self, queue_url, assert_error_details=False, assert_receive_count=None):859        """Assert that a message has been received on the given DLQ"""860        result = self.client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])861        self.assertGreater(len(result["Messages"]), 0)862        msg = result["Messages"][0]863        msg_attrs = msg.get("MessageAttributes") or msg.get("Attributes")864        if assert_error_details:865            self.assertIn("RequestID", msg_attrs)866            self.assertIn("ErrorCode", msg_attrs)867            self.assertIn("ErrorMessage", msg_attrs)868        else:869            if assert_receive_count is not None:870                pass871                # TODO: this started failing with latest moto upgrade,872                # probably in or around this commit:873                # https://github.com/spulec/moto/commit/6da4905da940e25e317db60b7657ea632f58ef1d874                # self.assertEqual(str(assert_receive_count), msg_attrs.get('ApproximateReceiveCount'))875    def check_tagged_queue(self, queue_url):876        response = self.client.list_queue_tags(QueueUrl=queue_url)877        self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])878        tags_keys_as_list = list(TEST_LAMBDA_TAGS.keys())879        for key in tags_keys_as_list:880            self.assertIn(key, response["Tags"])881        random_tag_key = random.choice(tags_keys_as_list)882        # Pop random chosen tag key from list883        tags_keys_as_list.pop(tags_keys_as_list.index(random_tag_key))884        response = self.client.untag_queue(QueueUrl=queue_url, TagKeys=[random_tag_key])885        self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])886        response = self.client.list_queue_tags(QueueUrl=queue_url)887        self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])888        self.assertNotIn(random_tag_key, response["Tags"])889        for key in tags_keys_as_list:890            self.assertIn(key, response["Tags"])891        # Clean up892        self.client.untag_queue(893            QueueUrl=queue_url,894            TagKeys=tags_keys_as_list,895        )896        self.client.delete_queue(QueueUrl=queue_url)897    def check_msg_attributes(self, queue_url, attrs):898        messages_all_attributes = self.client.receive_message(899            QueueUrl=queue_url, MessageAttributeNames=["All"]900        )901        self.assertEqual(messages_all_attributes.get("Messages")[0]["MessageAttributes"], attrs)902class TestSqsProvider:903    def test_list_queues(self, sqs_client, sqs_create_queue):904        queue_names = [905            "a-test-queue-" + short_uid(),906            "a-test-queue-" + short_uid(),907            "b-test-queue-" + short_uid(),908        ]909        # create three queues with prefixes and collect their urls910        queue_urls = []911        for name in queue_names:912            sqs_create_queue(QueueName=name)913            queue_url = sqs_client.get_queue_url(QueueName=name)["QueueUrl"]914            assert queue_url.endswith(name)915            queue_urls.append(queue_url)916        # list queues with first prefix917        result = sqs_client.list_queues(QueueNamePrefix="a-test-queue-")918        assert "QueueUrls" in result919        assert len(result["QueueUrls"]) == 2920        assert queue_urls[0] in result["QueueUrls"]921        assert queue_urls[1] in result["QueueUrls"]922        assert queue_urls[2] not in result["QueueUrls"]923        # list queues with second prefix924        result = sqs_client.list_queues(QueueNamePrefix="b-test-queue-")925        assert "QueueUrls" in result926        assert len(result["QueueUrls"]) == 1927        assert queue_urls[0] not in result["QueueUrls"]928        assert queue_urls[1] not in result["QueueUrls"]929        assert queue_urls[2] in result["QueueUrls"]930    def test_create_queue_and_get_attributes(self, sqs_client, sqs_queue):931        result = sqs_client.get_queue_attributes(932            QueueUrl=sqs_queue, AttributeNames=["QueueArn", "CreatedTimestamp", "VisibilityTimeout"]933        )934        assert "Attributes" in result935        attrs = result["Attributes"]936        assert len(attrs) == 3937        assert "test-queue-" in attrs["QueueArn"]938        assert int(float(attrs["CreatedTimestamp"])) == pytest.approx(int(time.time()), 30)939        assert int(attrs["VisibilityTimeout"]) == 30, "visibility timeout is not the default value"940    def test_send_receive_message(self, sqs_client, sqs_queue):941        send_result = sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")942        assert send_result["MessageId"]943        assert send_result["MD5OfMessageBody"] == "78e731027d8fd50ed642340b7c9a63b3"944        # TODO: other attributes945        receive_result = sqs_client.receive_message(QueueUrl=sqs_queue)946        assert len(receive_result["Messages"]) == 1947        message = receive_result["Messages"][0]948        assert message["ReceiptHandle"]949        assert message["Body"] == "message"950        assert message["MessageId"] == send_result["MessageId"]951        assert message["MD5OfBody"] == send_result["MD5OfMessageBody"]952    def test_send_receive_message_multiple_queues(self, sqs_client, sqs_create_queue):953        queue0 = sqs_create_queue()954        queue1 = sqs_create_queue()955        sqs_client.send_message(QueueUrl=queue0, MessageBody="message")956        result = sqs_client.receive_message(QueueUrl=queue1)957        assert "Messages" not in result958        result = sqs_client.receive_message(QueueUrl=queue0)959        assert len(result["Messages"]) == 1960        assert result["Messages"][0]["Body"] == "message"961    def test_send_message_batch(self, sqs_client, sqs_queue):962        sqs_client.send_message_batch(963            QueueUrl=sqs_queue,964            Entries=[965                {"Id": "1", "MessageBody": "message-0"},966                {"Id": "2", "MessageBody": "message-1"},967            ],968        )969        response0 = sqs_client.receive_message(QueueUrl=sqs_queue)970        response1 = sqs_client.receive_message(QueueUrl=sqs_queue)971        response2 = sqs_client.receive_message(QueueUrl=sqs_queue)972        assert len(response0.get("Messages", [])) == 1973        assert len(response1.get("Messages", [])) == 1974        assert len(response2.get("Messages", [])) == 0975        message0 = response0["Messages"][0]976        message1 = response1["Messages"][0]977        assert message0["Body"] == "message-0"978        assert message1["Body"] == "message-1"979    def test_send_batch_receive_multiple(self, sqs_client, sqs_queue):980        # send a batch, then a single message, receive them a981        sqs_client.send_message_batch(982            QueueUrl=sqs_queue,983            Entries=[984                {"Id": "1", "MessageBody": "message-0"},985                {"Id": "2", "MessageBody": "message-1"},986            ],987        )988        sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message-2")989        response = sqs_client.receive_message(QueueUrl=sqs_queue, MaxNumberOfMessages=3)990        assert len(response["Messages"]) == 3991        assert response["Messages"][0]["Body"] == "message-0"992        assert response["Messages"][1]["Body"] == "message-1"993        assert response["Messages"][2]["Body"] == "message-2"994    def test_send_message_batch_with_empty_list(self, sqs_client, sqs_create_queue):995        queue_url = sqs_create_queue()996        try:997            sqs_client.send_message_batch(QueueUrl=queue_url, Entries=[])998        except ClientError as e:999            assert "EmptyBatchRequest" in e.response["Error"]["Code"]1000            assert e.response["ResponseMetadata"]["HTTPStatusCode"] in [400, 404]1001    def test_tag_untag_queue(self, sqs_client, sqs_create_queue):1002        queue_url = sqs_create_queue()1003        # tag queue1004        tags = {"tag1": "value1", "tag2": "value2", "tag3": ""}1005        sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)1006        # check queue tags1007        response = sqs_client.list_queue_tags(QueueUrl=queue_url)1008        assert response["Tags"] == tags1009        # remove tag1 and tag31010        sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])1011        response = sqs_client.list_queue_tags(QueueUrl=queue_url)1012        assert response["Tags"] == {"tag2": "value2"}1013        # remove tag21014        sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag2"])1015        response = sqs_client.list_queue_tags(QueueUrl=queue_url)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
