Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py  
...1497        sns_create_http_endpoint,1498        raw_message_delivery,1499    ):1500        # Necessitate manual set up to allow external access to endpoint, only in local testing1501        topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1502            raw_message_delivery1503        )1504        assert poll_condition(1505            lambda: len(server.log) >= 1,1506            timeout=5,1507        )1508        sub_request, _ = server.log[0]1509        payload = sub_request.get_json(force=True)1510        assert payload["Type"] == "SubscriptionConfirmation"1511        assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1512        assert "Signature" in payload1513        assert "SigningCertURL" in payload1514        token = payload["Token"]1515        subscribe_url = payload["SubscribeURL"]1516        service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1517        assert subscribe_url == (1518            f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1519        )1520        confirm_subscribe_request = requests.get(subscribe_url)1521        confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1522        assert (1523            confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1524                "SubscriptionArn"1525            ]1526            == subscription_arn1527        )1528        subscription_attributes = sns_client.get_subscription_attributes(1529            SubscriptionArn=subscription_arn1530        )1531        assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1532        message = "test_external_http_endpoint"1533        sns_client.publish(TopicArn=topic_arn, Message=message)1534        assert poll_condition(1535            lambda: len(server.log) >= 2,1536            timeout=5,1537        )1538        notification_request, _ = server.log[1]1539        assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1540        expected_unsubscribe_url = (1541            f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1542        )1543        if raw_message_delivery:1544            payload = notification_request.data.decode()1545            assert payload == message1546        else:1547            payload = notification_request.get_json(force=True)1548            assert payload["Type"] == "Notification"1549            assert "Signature" in payload1550            assert "SigningCertURL" in payload1551            assert payload["Message"] == message1552            assert payload["UnsubscribeURL"] == expected_unsubscribe_url1553        unsub_request = requests.get(expected_unsubscribe_url)1554        unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1555        assert "UnsubscribeResponse" in unsubscribe_confirmation1556        assert poll_condition(1557            lambda: len(server.log) >= 3,1558            timeout=5,1559        )1560        unsub_request, _ = server.log[2]1561        payload = unsub_request.get_json(force=True)1562        assert payload["Type"] == "UnsubscribeConfirmation"1563        assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1564        assert "Signature" in payload1565        assert "SigningCertURL" in payload1566        token = payload["Token"]1567        assert payload["SubscribeURL"] == (1568            f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1569        )1570    @pytest.mark.only_localstack1571    @pytest.mark.parametrize("raw_message_delivery", [True, False])1572    def test_dlq_external_http_endpoint(1573        self,1574        sns_client,1575        sqs_client,1576        sns_create_topic,1577        sqs_create_queue,1578        sqs_queue_arn,1579        sns_subscription,1580        sns_create_http_endpoint,1581        sns_create_sqs_subscription,1582        sns_allow_topic_sqs_queue,1583        raw_message_delivery,1584    ):1585        # Necessitate manual set up to allow external access to endpoint, only in local testing1586        topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1587            raw_message_delivery1588        )1589        dlq_url = sqs_create_queue()1590        dlq_arn = sqs_queue_arn(dlq_url)1591        sns_allow_topic_sqs_queue(1592            sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1593        )1594        sns_client.set_subscription_attributes(1595            SubscriptionArn=http_subscription_arn,1596            AttributeName="RedrivePolicy",1597            AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1598        )1599        assert poll_condition(1600            lambda: len(server.log) >= 1,...fixtures.py
Source:fixtures.py  
...437            sns_client.unsubscribe(SubscriptionArn=arn)438        except Exception as e:439            LOG.error("error cleaning up subscription %s: %s", arn, e)440@pytest.fixture441def sns_create_http_endpoint(sns_client, sns_create_topic, sns_subscription):442    http_servers = []443    def _create_http_endpoint(444        raw_message_delivery: bool = False,445    ) -> Tuple[str, str, str, HTTPServer]:446        server = HTTPServer()447        server.start()448        http_servers.append(server)449        server.expect_request("/sns-endpoint").respond_with_data(status=200)450        endpoint_url = server.url_for("/sns-endpoint")451        wait_for_port_open(endpoint_url)452        topic_arn = sns_create_topic()["TopicArn"]453        subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)454        subscription_arn = subscription["SubscriptionArn"]455        delivery_policy = {...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
