Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py  
...2170            },2171        )2172        assert response.ok2173        assert "<DeleteQueueResponse " in response.text2174        assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)2175    @pytest.mark.aws_validated2176    def test_get_send_and_receive_messages(self, sqs_create_queue, sqs_http_client):2177        queue1_url = sqs_create_queue()2178        queue2_url = sqs_create_queue()2179        # items in queue 12180        response = sqs_http_client.get(2181            queue1_url,2182            params={2183                "Action": "SendMessage",2184                "MessageBody": "foobar",2185            },2186        )2187        assert response.ok2188        # no items in queue 22189        response = sqs_http_client.get(2190            queue2_url,2191            params={2192                "Action": "ReceiveMessage",2193            },2194        )2195        assert response.ok2196        assert "foobar" not in response.text2197        assert "<ReceiveMessageResult/>" in response.text.replace(2198            " />", "/>"2199        )  # expect response to be empty2200        # get items from queue 12201        response = sqs_http_client.get(2202            queue1_url,2203            params={2204                "Action": "ReceiveMessage",2205            },2206        )2207        assert response.ok2208        assert "<Body>foobar</Body>" in response.text2209        assert "<MD5OfBody>" in response.text2210    @pytest.mark.aws_validated2211    def test_get_on_deleted_queue_fails(2212        self, sqs_client, sqs_create_queue, sqs_http_client, sqs_queue_exists2213    ):2214        queue_url = sqs_create_queue()2215        sqs_client.delete_queue(QueueUrl=queue_url)2216        assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)2217        response = sqs_http_client.get(2218            queue_url,2219            params={2220                "Action": "GetQueueAttributes",2221                "AttributeName.1": "QueueArn",2222            },2223        )2224        assert "<Code>AWS.SimpleQueueService.NonExistentQueue</Code>" in response.text2225        assert "<Message>The specified queue does not exist for this wsdl version" in response.text2226        assert response.status_code == 4002227    @pytest.mark.aws_validated2228    def test_get_without_query_returns_unknown_operation(self, sqs_create_queue, sqs_http_client):2229        queue_name = f"test-queue-{short_uid()}"2230        queue_url = sqs_create_queue(QueueName=queue_name)...test_sns.py
Source:test_sns.py  
...1561            sns_topic_arn=topic_arn,1562        )1563        sqs_client.delete_queue(QueueUrl=queue_url)1564        # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1565        assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1566        message = "test_dlq_after_sqs_endpoint_deleted"1567        message_attr = {1568            "attr1": {1569                "DataType": "Number",1570                "StringValue": "111",1571            },1572            "attr2": {1573                "DataType": "Binary",1574                "BinaryValue": b"\x02\x03\x04",1575            },1576        }1577        sns_client.publish(TopicArn=topic_arn, Message=message, MessageAttributes=message_attr)1578        response = sqs_client.receive_message(1579            QueueUrl=dlq_url,1580            WaitTimeSeconds=10,1581            AttributeNames=["All"],1582            MessageAttributeNames=["All"],1583        )1584        snapshot.match("messages", response)1585    @pytest.mark.aws_validated1586    def test_message_attributes_not_missing(1587        self,1588        sns_client,1589        sqs_client,1590        sns_create_sqs_subscription,1591        sns_create_topic,1592        sqs_create_queue,1593        snapshot,1594    ):1595        # the hash isn't the same because of the Binary attributes (maybe decoding order?)1596        snapshot.add_transformer(1597            snapshot.transform.key_value(1598                "MD5OfMessageAttributes",1599                value_replacement="<md5-hash>",1600                reference_replacement=False,1601            )1602        )1603        topic_arn = sns_create_topic()["TopicArn"]1604        queue_url = sqs_create_queue()1605        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1606        sns_client.set_subscription_attributes(1607            SubscriptionArn=subscription["SubscriptionArn"],1608            AttributeName="RawMessageDelivery",1609            AttributeValue="true",1610        )1611        attributes = {1612            "an-attribute-key": {"DataType": "String", "StringValue": "an-attribute-value"},1613            "binary-attribute": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"},1614        }1615        publish_response = sns_client.publish(1616            TopicArn=topic_arn,1617            Message="text",1618            MessageAttributes=attributes,1619        )1620        snapshot.match("publish-msg-raw", publish_response)1621        msg = sqs_client.receive_message(1622            QueueUrl=queue_url,1623            AttributeNames=["All"],1624            MessageAttributeNames=["All"],1625            WaitTimeSeconds=3,1626        )1627        # as SNS piggybacks on SQS MessageAttributes when RawDelivery is true1628        # BinaryValue depends on SQS implementation, and is decoded automatically1629        snapshot.match("raw-delivery-msg-attrs", msg)1630        sqs_client.delete_message(1631            QueueUrl=queue_url, ReceiptHandle=msg["Messages"][0]["ReceiptHandle"]1632        )1633        sns_client.set_subscription_attributes(1634            SubscriptionArn=subscription["SubscriptionArn"],1635            AttributeName="RawMessageDelivery",1636            AttributeValue="false",1637        )1638        publish_response = sns_client.publish(1639            TopicArn=topic_arn,1640            Message="text",1641            MessageAttributes=attributes,1642        )1643        snapshot.match("publish-msg-json", publish_response)1644        msg = sqs_client.receive_message(1645            QueueUrl=queue_url,1646            AttributeNames=["All"],1647            MessageAttributeNames=["All"],1648            WaitTimeSeconds=3,1649        )1650        snapshot.match("json-delivery-msg-attrs", msg)1651        # binary payload in base64 encoded by AWS, UTF-8 for JSON1652        # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1653    @pytest.mark.only_localstack1654    @pytest.mark.aws_validated1655    @pytest.mark.parametrize("raw_message_delivery", [True, False])1656    def test_subscribe_external_http_endpoint(1657        self,1658        sns_client,1659        sns_create_http_endpoint,1660        raw_message_delivery,1661    ):1662        # Necessitate manual set up to allow external access to endpoint, only in local testing1663        topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1664            raw_message_delivery1665        )1666        assert poll_condition(1667            lambda: len(server.log) >= 1,1668            timeout=5,1669        )1670        sub_request, _ = server.log[0]1671        payload = sub_request.get_json(force=True)1672        assert payload["Type"] == "SubscriptionConfirmation"1673        assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1674        assert "Signature" in payload1675        assert "SigningCertURL" in payload1676        token = payload["Token"]1677        subscribe_url = payload["SubscribeURL"]1678        service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1679        assert subscribe_url == (1680            f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1681        )1682        confirm_subscribe_request = requests.get(subscribe_url)1683        confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1684        assert (1685            confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1686                "SubscriptionArn"1687            ]1688            == subscription_arn1689        )1690        subscription_attributes = sns_client.get_subscription_attributes(1691            SubscriptionArn=subscription_arn1692        )1693        assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1694        message = "test_external_http_endpoint"1695        sns_client.publish(TopicArn=topic_arn, Message=message)1696        assert poll_condition(1697            lambda: len(server.log) >= 2,1698            timeout=5,1699        )1700        notification_request, _ = server.log[1]1701        assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1702        expected_unsubscribe_url = (1703            f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1704        )1705        if raw_message_delivery:1706            payload = notification_request.data.decode()1707            assert payload == message1708        else:1709            payload = notification_request.get_json(force=True)1710            assert payload["Type"] == "Notification"1711            assert "Signature" in payload1712            assert "SigningCertURL" in payload1713            assert payload["Message"] == message1714            assert payload["UnsubscribeURL"] == expected_unsubscribe_url1715        unsub_request = requests.get(expected_unsubscribe_url)1716        unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1717        assert "UnsubscribeResponse" in unsubscribe_confirmation1718        assert poll_condition(1719            lambda: len(server.log) >= 3,1720            timeout=5,1721        )1722        unsub_request, _ = server.log[2]1723        payload = unsub_request.get_json(force=True)1724        assert payload["Type"] == "UnsubscribeConfirmation"1725        assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1726        assert "Signature" in payload1727        assert "SigningCertURL" in payload1728        token = payload["Token"]1729        assert payload["SubscribeURL"] == (1730            f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1731        )1732    @pytest.mark.only_localstack1733    @pytest.mark.parametrize("raw_message_delivery", [True, False])1734    def test_dlq_external_http_endpoint(1735        self,1736        sns_client,1737        sqs_client,1738        sns_create_topic,1739        sqs_create_queue,1740        sqs_queue_arn,1741        sns_subscription,1742        sns_create_http_endpoint,1743        sns_create_sqs_subscription,1744        sns_allow_topic_sqs_queue,1745        raw_message_delivery,1746    ):1747        # Necessitate manual set up to allow external access to endpoint, only in local testing1748        topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1749            raw_message_delivery1750        )1751        dlq_url = sqs_create_queue()1752        dlq_arn = sqs_queue_arn(dlq_url)1753        sns_allow_topic_sqs_queue(1754            sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1755        )1756        sns_client.set_subscription_attributes(1757            SubscriptionArn=http_subscription_arn,1758            AttributeName="RedrivePolicy",1759            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1760        )1761        assert poll_condition(1762            lambda: len(server.log) >= 1,1763            timeout=5,1764        )1765        sub_request, _ = server.log[0]1766        payload = sub_request.get_json(force=True)1767        assert payload["Type"] == "SubscriptionConfirmation"1768        assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1769        subscribe_url = payload["SubscribeURL"]1770        service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1771        confirm_subscribe_request = requests.get(subscribe_url)1772        confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1773        assert (1774            confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1775                "SubscriptionArn"1776            ]1777            == http_subscription_arn1778        )1779        subscription_attributes = sns_client.get_subscription_attributes(1780            SubscriptionArn=http_subscription_arn1781        )1782        assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1783        server.stop()1784        wait_for_port_closed(server.port)1785        message = "test_dlq_external_http_endpoint"1786        sns_client.publish(TopicArn=topic_arn, Message=message)1787        response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=3)1788        assert (1789            len(response["Messages"]) == 11790        ), f"invalid number of messages in DLQ response {response}"1791        if raw_message_delivery:1792            assert response["Messages"][0]["Body"] == message1793        else:1794            received_message = json.loads(response["Messages"][0]["Body"])1795            assert received_message["Type"] == "Notification"1796            assert received_message["Message"] == message1797        receipt_handle = response["Messages"][0]["ReceiptHandle"]1798        sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)1799        expected_unsubscribe_url = (1800            f"{service_url}/?Action=Unsubscribe&SubscriptionArn={http_subscription_arn}"1801        )1802        unsub_request = requests.get(expected_unsubscribe_url)1803        unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1804        assert "UnsubscribeResponse" in unsubscribe_confirmation1805        response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=2)1806        # AWS doesn't send to the DLQ if the UnsubscribeConfirmation fails to be delivered1807        assert "Messages" not in response1808    @pytest.mark.aws_validated1809    def test_publish_too_long_message(self, sns_client, sns_create_topic, snapshot):1810        topic_arn = sns_create_topic()["TopicArn"]1811        # simulate payload over 256kb1812        message = "This is a test message" * 120001813        with pytest.raises(ClientError) as e:1814            sns_client.publish(TopicArn=topic_arn, Message=message)1815        snapshot.match("error", e.value.response)1816        assert e.value.response["Error"]["Code"] == "InvalidParameter"1817        assert e.value.response["Error"]["Message"] == "Invalid parameter: Message too long"1818        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001819    @pytest.mark.only_localstack  # needs real credentials for GCM/FCM1820    def test_publish_to_gcm(self, sns_client):1821        key = "mock_server_key"1822        token = "mock_token"1823        platform_app_arn = sns_client.create_platform_application(1824            Name="firebase", Platform="GCM", Attributes={"PlatformCredential": key}1825        )["PlatformApplicationArn"]1826        endpoint_arn = sns_client.create_platform_endpoint(1827            PlatformApplicationArn=platform_app_arn,1828            Token=token,1829        )["EndpointArn"]1830        message = {1831            "GCM": '{ "notification": {"title": "Title of notification", "body": "It works" } }'1832        }1833        with pytest.raises(ClientError) as ex:1834            sns_client.publish(1835                TargetArn=endpoint_arn, MessageStructure="json", Message=json.dumps(message)1836            )1837        assert ex.value.response["Error"]["Code"] == "InvalidParameter"1838        sns_client.delete_endpoint(EndpointArn=endpoint_arn)1839        sns_client.delete_platform_application(PlatformApplicationArn=platform_app_arn)1840    @pytest.mark.aws_validated1841    @pytest.mark.skip_snapshot_verify(1842        paths=[1843            "$..Attributes.Owner",1844            "$..Attributes.ConfirmationWasAuthenticated",1845            "$..Attributes.RawMessageDelivery",1846            "$..Attributes.sqs_queue_url",1847            "$..Subscriptions..Owner",1848        ]1849    )1850    def test_subscription_after_failure_to_deliver(1851        self,1852        sns_client,1853        sqs_client,1854        sns_create_topic,1855        sqs_create_queue,1856        sqs_queue_arn,1857        sqs_queue_exists,1858        sns_create_sqs_subscription,1859        sns_allow_topic_sqs_queue,1860        snapshot,1861    ):1862        topic_arn = sns_create_topic()["TopicArn"]1863        queue_name = f"test-queue-{short_uid()}"1864        queue_url = sqs_create_queue(QueueName=queue_name)1865        subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1866        subscription_arn = subscription["SubscriptionArn"]1867        dlq_url = sqs_create_queue()1868        dlq_arn = sqs_queue_arn(dlq_url)1869        sns_allow_topic_sqs_queue(1870            sqs_queue_url=dlq_url,1871            sqs_queue_arn=dlq_arn,1872            sns_topic_arn=topic_arn,1873        )1874        sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)1875        snapshot.match("subscriptions-attrs", sub_attrs)1876        message = "test_dlq_before_sqs_endpoint_deleted"1877        sns_client.publish(TopicArn=topic_arn, Message=message)1878        response = sqs_client.receive_message(1879            QueueUrl=queue_url, WaitTimeSeconds=10, MaxNumberOfMessages=41880        )1881        snapshot.match("messages-before-delete", response)1882        sqs_client.delete_message(1883            QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]1884        )1885        sqs_client.delete_queue(QueueUrl=queue_url)1886        # try to send a message before setting a DLQ1887        message = "test_dlq_after_sqs_endpoint_deleted"1888        sns_client.publish(TopicArn=topic_arn, Message=message)1889        # to avoid race condition, publish is async and the redrive policy can be in effect before the actual publish1890        time.sleep(1)1891        # check the subscription is still there after we deleted the queue1892        subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1893        snapshot.match("subscriptions", subscriptions)1894        sns_client.set_subscription_attributes(1895            SubscriptionArn=subscription_arn,1896            AttributeName="RedrivePolicy",1897            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1898        )1899        sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)1900        snapshot.match("subscriptions-attrs-with-redrive", sub_attrs)1901        # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1902        assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1903        # test sending and receiving multiple messages1904        for i in range(2):1905            message = f"test_dlq_after_sqs_endpoint_deleted_{i}"1906            sns_client.publish(TopicArn=topic_arn, Message=message)1907            response = sqs_client.receive_message(1908                QueueUrl=dlq_url, WaitTimeSeconds=10, MaxNumberOfMessages=41909            )1910            sqs_client.delete_message(1911                QueueUrl=dlq_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]1912            )1913            snapshot.match(f"message-{i}-after-delete", response)1914    @pytest.mark.aws_validated1915    def test_publish_to_firehose_with_s3(1916        self,...find_dangling_cloudwatch_alarms.py
Source:find_dangling_cloudwatch_alarms.py  
...39            return False40        else:41            raise42@functools.cache43def sqs_queue_exists(sess, *, queue_name):44    """45    Returns True if this SQS queue exists, False otherwise.46    """47    client = sess.client("sqs")48    try:49        client.get_queue_url(QueueName=queue_name)50        return True51    except ClientError as err:  # pragma: no cover52        if err.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":53            return False54        else:55            raise56@functools.cache57def lambda_function_exists(sess, *, function_name):58    """59    Returns True if this Lambda function exists, False otherwise.60    """61    client = sess.client("lambda")62    try:63        client.get_function(FunctionName=function_name)64        return True65    except ClientError as err:  # pragma: no cover66        if err.response["Error"]["Code"] == "ResourceNotFoundException":67            return False68        else:69            raise70if __name__ == "__main__":71    sess = boto3.Session()72    for alarm in get_metric_alarm_descriptions(sess):73        dimensions = {dim["Name"]: dim["Value"] for dim in alarm["Dimensions"]}74        # Is this alarm based on a non-existent table?75        if alarm.get("Namespace") == "AWS/DynamoDB":76            table_name = dimensions["TableName"]77            if not dynamodb_table_exists(sess, table_name=table_name):78                print(79                    f"!!! Alarm {alarm['AlarmArn']} is based on non-existent table {table_name}"80                )81            continue82        # Is this alarm based on a non-existent queue?83        if alarm.get("Namespace") == "AWS/SQS":84            queue_name = dimensions["QueueName"]85            if not sqs_queue_exists(sess, queue_name=queue_name):86                print(87                    f"!!! Alarm {alarm['AlarmArn']} is based on non-existent SQS queue {queue_name}"88                )89            continue90        # Is this alarm based on a non-existent Lambda function?91        if alarm.get("Namespace") == "AWS/Lambda":92            function_name = dimensions["FunctionName"]93            if not lambda_function_exists(sess, function_name=function_name):94                print(95                    f"!!! Alarm {alarm['AlarmArn']} is based on non-existent Lambda function {function_name}"96                )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
