How to use sns_create_http_endpoint method in localstack

Best Python code snippet using localstack_python

test_sns.py

Source:test_sns.py Github

copy

Full Screen

...1497 sns_create_http_endpoint,1498 raw_message_delivery,1499 ):1500 # Necessitate manual set up to allow external access to endpoint, only in local testing1501 topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1502 raw_message_delivery1503 )1504 assert poll_condition(1505 lambda: len(server.log) >= 1,1506 timeout=5,1507 )1508 sub_request, _ = server.log[0]1509 payload = sub_request.get_json(force=True)1510 assert payload["Type"] == "SubscriptionConfirmation"1511 assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1512 assert "Signature" in payload1513 assert "SigningCertURL" in payload1514 token = payload["Token"]1515 subscribe_url = payload["SubscribeURL"]1516 service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1517 assert subscribe_url == (1518 f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1519 )1520 confirm_subscribe_request = requests.get(subscribe_url)1521 confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1522 assert (1523 confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1524 "SubscriptionArn"1525 ]1526 == subscription_arn1527 )1528 subscription_attributes = sns_client.get_subscription_attributes(1529 SubscriptionArn=subscription_arn1530 )1531 assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1532 message = "test_external_http_endpoint"1533 sns_client.publish(TopicArn=topic_arn, Message=message)1534 assert poll_condition(1535 lambda: len(server.log) >= 2,1536 timeout=5,1537 )1538 notification_request, _ = server.log[1]1539 assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1540 expected_unsubscribe_url = (1541 f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1542 )1543 if raw_message_delivery:1544 payload = notification_request.data.decode()1545 assert payload == message1546 else:1547 payload = notification_request.get_json(force=True)1548 assert payload["Type"] == "Notification"1549 assert "Signature" in payload1550 assert "SigningCertURL" in payload1551 assert payload["Message"] == message1552 assert payload["UnsubscribeURL"] == expected_unsubscribe_url1553 unsub_request = requests.get(expected_unsubscribe_url)1554 unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1555 assert "UnsubscribeResponse" in unsubscribe_confirmation1556 assert poll_condition(1557 lambda: len(server.log) >= 3,1558 timeout=5,1559 )1560 unsub_request, _ = server.log[2]1561 payload = unsub_request.get_json(force=True)1562 assert payload["Type"] == "UnsubscribeConfirmation"1563 assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1564 assert "Signature" in payload1565 assert "SigningCertURL" in payload1566 token = payload["Token"]1567 assert payload["SubscribeURL"] == (1568 f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1569 )1570 @pytest.mark.only_localstack1571 @pytest.mark.parametrize("raw_message_delivery", [True, False])1572 def test_dlq_external_http_endpoint(1573 self,1574 sns_client,1575 sqs_client,1576 sns_create_topic,1577 sqs_create_queue,1578 sqs_queue_arn,1579 sns_subscription,1580 sns_create_http_endpoint,1581 sns_create_sqs_subscription,1582 sns_allow_topic_sqs_queue,1583 raw_message_delivery,1584 ):1585 # Necessitate manual set up to allow external access to endpoint, only in local testing1586 topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1587 raw_message_delivery1588 )1589 dlq_url = sqs_create_queue()1590 dlq_arn = sqs_queue_arn(dlq_url)1591 sns_allow_topic_sqs_queue(1592 sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1593 )1594 sns_client.set_subscription_attributes(1595 SubscriptionArn=http_subscription_arn,1596 AttributeName="RedrivePolicy",1597 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1598 )1599 assert poll_condition(1600 lambda: len(server.log) >= 1,...

Full Screen

Full Screen

fixtures.py

Source:fixtures.py Github

copy

Full Screen

...437 sns_client.unsubscribe(SubscriptionArn=arn)438 except Exception as e:439 LOG.error("error cleaning up subscription %s: %s", arn, e)440@pytest.fixture441def sns_create_http_endpoint(sns_client, sns_create_topic, sns_subscription):442 http_servers = []443 def _create_http_endpoint(444 raw_message_delivery: bool = False,445 ) -> Tuple[str, str, str, HTTPServer]:446 server = HTTPServer()447 server.start()448 http_servers.append(server)449 server.expect_request("/sns-endpoint").respond_with_data(status=200)450 endpoint_url = server.url_for("/sns-endpoint")451 wait_for_port_open(endpoint_url)452 topic_arn = sns_create_topic()["TopicArn"]453 subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint_url)454 subscription_arn = subscription["SubscriptionArn"]455 delivery_policy = {...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful