How to use process_sns_notification_to_lambda method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...888 elif subscriber["Protocol"] == "lambda":889 try:890 external_url = external_service_url("sns")891 unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])892 response = process_sns_notification_to_lambda(893 subscriber["Endpoint"],894 topic_arn,895 subscriber["SubscriptionArn"],896 message,897 message_id,898 # see the format here899 # https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html900 # issue with sdk to serialize the attribute inside lambda901 prepare_message_attributes(message_attributes),902 unsubscribe_url,903 subject=req_data.get("Subject")[0],904 )905 if response is not None:906 delivery = {907 "statusCode": response[0],908 "providerResponse": response[1],909 }910 store_delivery_log(subscriber, True, message, message_id, delivery)911 # TODO: Check if it can be removed912 if isinstance(response, Response):913 response.raise_for_status()914 elif isinstance(response, FlaskResponse):915 if response.status_code >= 400:916 raise Exception(917 "Error response (code %s): %s" % (response.status_code, response.data)918 )919 except Exception as exc:920 LOG.info(921 "Unable to run Lambda function on SNS message: %s %s", exc, traceback.format_exc()922 )923 store_delivery_log(subscriber, False, message, message_id)924 message_body = create_sns_message_body(925 subscriber, req_data, message_id, message_attributes926 )927 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))928 return929 elif subscriber["Protocol"] in ["http", "https"]:930 msg_type = (req_data.get("Type") or ["Notification"])[0]931 try:932 message_body = create_sns_message_body(933 subscriber, req_data, message_id, message_attributes934 )935 except Exception:936 return937 try:938 message_headers = {939 "Content-Type": "text/plain",940 # AWS headers according to941 # https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html#http-header942 "x-amz-sns-message-type": msg_type,943 "x-amz-sns-message-id": message_id,944 "x-amz-sns-topic-arn": subscriber["TopicArn"],945 "User-Agent": "Amazon Simple Notification Service Agent",946 }947 if msg_type != "SubscriptionConfirmation":948 # while testing, never had those from AWS but the docs above states it should be there949 message_headers["x-amz-sns-subscription-arn"] = subscriber["SubscriptionArn"]950 # When raw message delivery is enabled, x-amz-sns-rawdelivery needs to be set to 'true'951 # indicating that the message has been published without JSON formatting.952 # https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html953 elif msg_type == "Notification" and is_raw_message_delivery(subscriber):954 message_headers["x-amz-sns-rawdelivery"] = "true"955 response = requests.post(956 subscriber["Endpoint"],957 headers=message_headers,958 data=message_body,959 verify=False,960 )961 delivery = {962 "statusCode": response.status_code,963 "providerResponse": response.content.decode("utf-8"),964 }965 store_delivery_log(subscriber, True, message, message_id, delivery)966 response.raise_for_status()967 except Exception as exc:968 LOG.info(969 "Received error on sending SNS message, putting to DLQ (if configured): %s", exc970 )971 store_delivery_log(subscriber, False, message, message_id)972 # AWS doesn't send to the DLQ if there's an error trying to deliver a UnsubscribeConfirmation msg973 if msg_type != "UnsubscribeConfirmation":974 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))975 return976 elif subscriber["Protocol"] == "application":977 try:978 sns_client = aws_stack.connect_to_service("sns")979 sns_client.publish(TargetArn=subscriber["Endpoint"], Message=message)980 store_delivery_log(subscriber, True, message, message_id)981 except Exception as exc:982 LOG.warning(983 "Unable to forward SNS message to SNS platform app: %s %s",984 exc,985 traceback.format_exc(),986 )987 store_delivery_log(subscriber, False, message, message_id)988 message_body = create_sns_message_body(subscriber, req_data, message_id)989 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))990 return991 elif subscriber["Protocol"] in ["email", "email-json"]:992 ses_client = aws_stack.connect_to_service("ses")993 if subscriber.get("Endpoint"):994 ses_client.verify_email_address(EmailAddress=subscriber.get("Endpoint"))995 ses_client.verify_email_address(EmailAddress="admin@localstack.com")996 ses_client.send_email(997 Source="admin@localstack.com",998 Message={999 "Body": {1000 "Text": {1001 "Data": create_sns_message_body(1002 subscriber=subscriber, req_data=req_data, message_id=message_id1003 )1004 if subscriber["Protocol"] == "email-json"1005 else message1006 }1007 },1008 "Subject": {"Data": "SNS-Subscriber-Endpoint"},1009 },1010 Destination={"ToAddresses": [subscriber.get("Endpoint")]},1011 )1012 store_delivery_log(subscriber, True, message, message_id)1013 elif subscriber["Protocol"] == "firehose":1014 firehose_client = aws_stack.connect_to_service("firehose")1015 endpoint = subscriber["Endpoint"]1016 sns_body = create_sns_message_body(1017 subscriber=subscriber, req_data=req_data, message_id=message_id1018 )1019 if endpoint:1020 delivery_stream = aws_stack.extract_resource_from_arn(endpoint).split("/")[1]1021 firehose_client.put_record(1022 DeliveryStreamName=delivery_stream, Record={"Data": to_bytes(sns_body)}1023 )1024 store_delivery_log(subscriber, True, message, message_id)1025 return1026 else:1027 LOG.warning('Unexpected protocol "%s" for SNS subscription', subscriber["Protocol"])1028def process_sns_notification_to_lambda(1029 func_arn,1030 topic_arn,1031 subscription_arn,1032 message,1033 message_id,1034 message_attributes,1035 unsubscribe_url,1036 subject="",1037) -> tuple[int, bytes] | None:1038 """1039 Process the SNS notification to lambda1040 :param func_arn: Arn of the target function1041 :param topic_arn: Arn of the topic invoking the function1042 :param subscription_arn: Arn of the subscription...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful