Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...888    elif subscriber["Protocol"] == "lambda":889        try:890            external_url = external_service_url("sns")891            unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])892            response = process_sns_notification_to_lambda(893                subscriber["Endpoint"],894                topic_arn,895                subscriber["SubscriptionArn"],896                message,897                message_id,898                # see the format here899                # https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html900                # issue with sdk to serialize the attribute inside lambda901                prepare_message_attributes(message_attributes),902                unsubscribe_url,903                subject=req_data.get("Subject")[0],904            )905            if response is not None:906                delivery = {907                    "statusCode": response[0],908                    "providerResponse": response[1],909                }910                store_delivery_log(subscriber, True, message, message_id, delivery)911            # TODO: Check if it can be removed912            if isinstance(response, Response):913                response.raise_for_status()914            elif isinstance(response, FlaskResponse):915                if response.status_code >= 400:916                    raise Exception(917                        "Error response (code %s): %s" % (response.status_code, response.data)918                    )919        except Exception as exc:920            LOG.info(921                "Unable to run Lambda function on SNS message: %s %s", exc, traceback.format_exc()922            )923            store_delivery_log(subscriber, False, message, message_id)924            message_body = create_sns_message_body(925                subscriber, req_data, message_id, message_attributes926            )927            sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))928        return929    elif subscriber["Protocol"] in ["http", "https"]:930        msg_type = (req_data.get("Type") or ["Notification"])[0]931        try:932            message_body = create_sns_message_body(933                subscriber, req_data, message_id, message_attributes934            )935        except Exception:936            return937        try:938            message_headers = {939                "Content-Type": "text/plain",940                # AWS headers according to941                # https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html#http-header942                "x-amz-sns-message-type": msg_type,943                "x-amz-sns-message-id": message_id,944                "x-amz-sns-topic-arn": subscriber["TopicArn"],945                "User-Agent": "Amazon Simple Notification Service Agent",946            }947            if msg_type != "SubscriptionConfirmation":948                # while testing, never had those from AWS but the docs above states it should be there949                message_headers["x-amz-sns-subscription-arn"] = subscriber["SubscriptionArn"]950            # When raw message delivery is enabled, x-amz-sns-rawdelivery needs to be set to 'true'951            # indicating that the message has been published without JSON formatting.952            # https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html953            elif msg_type == "Notification" and is_raw_message_delivery(subscriber):954                message_headers["x-amz-sns-rawdelivery"] = "true"955            response = requests.post(956                subscriber["Endpoint"],957                headers=message_headers,958                data=message_body,959                verify=False,960            )961            delivery = {962                "statusCode": response.status_code,963                "providerResponse": response.content.decode("utf-8"),964            }965            store_delivery_log(subscriber, True, message, message_id, delivery)966            response.raise_for_status()967        except Exception as exc:968            LOG.info(969                "Received error on sending SNS message, putting to DLQ (if configured): %s", exc970            )971            store_delivery_log(subscriber, False, message, message_id)972            # AWS doesn't send to the DLQ if there's an error trying to deliver a UnsubscribeConfirmation msg973            if msg_type != "UnsubscribeConfirmation":974                sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))975        return976    elif subscriber["Protocol"] == "application":977        try:978            sns_client = aws_stack.connect_to_service("sns")979            sns_client.publish(TargetArn=subscriber["Endpoint"], Message=message)980            store_delivery_log(subscriber, True, message, message_id)981        except Exception as exc:982            LOG.warning(983                "Unable to forward SNS message to SNS platform app: %s %s",984                exc,985                traceback.format_exc(),986            )987            store_delivery_log(subscriber, False, message, message_id)988            message_body = create_sns_message_body(subscriber, req_data, message_id)989            sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))990        return991    elif subscriber["Protocol"] in ["email", "email-json"]:992        ses_client = aws_stack.connect_to_service("ses")993        if subscriber.get("Endpoint"):994            ses_client.verify_email_address(EmailAddress=subscriber.get("Endpoint"))995            ses_client.verify_email_address(EmailAddress="admin@localstack.com")996            ses_client.send_email(997                Source="admin@localstack.com",998                Message={999                    "Body": {1000                        "Text": {1001                            "Data": create_sns_message_body(1002                                subscriber=subscriber, req_data=req_data, message_id=message_id1003                            )1004                            if subscriber["Protocol"] == "email-json"1005                            else message1006                        }1007                    },1008                    "Subject": {"Data": "SNS-Subscriber-Endpoint"},1009                },1010                Destination={"ToAddresses": [subscriber.get("Endpoint")]},1011            )1012            store_delivery_log(subscriber, True, message, message_id)1013    elif subscriber["Protocol"] == "firehose":1014        firehose_client = aws_stack.connect_to_service("firehose")1015        endpoint = subscriber["Endpoint"]1016        sns_body = create_sns_message_body(1017            subscriber=subscriber, req_data=req_data, message_id=message_id1018        )1019        if endpoint:1020            delivery_stream = aws_stack.extract_resource_from_arn(endpoint).split("/")[1]1021            firehose_client.put_record(1022                DeliveryStreamName=delivery_stream, Record={"Data": to_bytes(sns_body)}1023            )1024            store_delivery_log(subscriber, True, message, message_id)1025        return1026    else:1027        LOG.warning('Unexpected protocol "%s" for SNS subscription', subscriber["Protocol"])1028def process_sns_notification_to_lambda(1029    func_arn,1030    topic_arn,1031    subscription_arn,1032    message,1033    message_id,1034    message_attributes,1035    unsubscribe_url,1036    subject="",1037) -> tuple[int, bytes] | None:1038    """1039    Process the SNS notification to lambda1040    :param func_arn: Arn of the target function1041    :param topic_arn: Arn of the topic invoking the function1042    :param subscription_arn: Arn of the subscription...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
