Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...680        )681        # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881682        if protocol in ["http", "https"]:683            external_url = external_service_url("sns")684            subscription["UnsubscribeURL"] = create_unsubscribe_url(external_url, subscription_arn)685            confirmation = {686                "Type": ["SubscriptionConfirmation"],687                "Token": [subscription_token],688                "Message": [689                    f"You have chosen to subscribe to the topic {topic_arn}.\n"690                    + "To confirm the subscription, visit the SubscribeURL included in this message."691                ],692                "SubscribeURL": [create_subscribe_url(external_url, topic_arn, subscription_token)],693            }694            publish_message(topic_arn, confirmation, {}, subscription_arn, skip_checks=True)695        elif protocol in ["sqs", "lambda"]:696            # Auto-confirm sqs and lambda subscriptions for now697            # TODO: revisit for multi-account698            self.confirm_subscription(context, topic_arn, subscription_token)699        return SubscribeResponse(SubscriptionArn=subscription_arn)700    def tag_resource(701        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList702    ) -> TagResourceResponse:703        # TODO: can this be used to tag any resource when using AWS?704        # each tag key must be unique705        # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices706        unique_tag_keys = {tag["Key"] for tag in tags}707        if len(unique_tag_keys) < len(tags):708            raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")709        call_moto(context)710        sns_backend = SNSBackend.get()711        existing_tags = sns_backend.sns_tags.get(resource_arn, [])712        def existing_tag_index(item):713            for idx, tag in enumerate(existing_tags):714                if item["Key"] == tag["Key"]:715                    return idx716            return None717        for item in tags:718            existing_index = existing_tag_index(item)719            if existing_index is None:720                existing_tags.append(item)721            else:722                existing_tags[existing_index] = item723        sns_backend.sns_tags[resource_arn] = existing_tags724        return TagResourceResponse()725    def delete_topic(self, context: RequestContext, topic_arn: topicARN) -> None:726        call_moto(context)727        sns_backend = SNSBackend.get()728        sns_backend.sns_subscriptions.pop(topic_arn, None)729        sns_backend.sns_tags.pop(topic_arn, None)730        event_publisher.fire_event(731            event_publisher.EVENT_SNS_DELETE_TOPIC,732            payload={"t": event_publisher.get_hash(topic_arn)},733        )734    def create_topic(735        self,736        context: RequestContext,737        name: topicName,738        attributes: TopicAttributesMap = None,739        tags: TagList = None,740    ) -> CreateTopicResponse:741        moto_response = call_moto(context)742        sns_backend = SNSBackend.get()743        topic_arn = moto_response["TopicArn"]744        tag_resource_success = extract_tags(topic_arn, tags, True, sns_backend)745        if not tag_resource_success:746            raise InvalidParameterException(747                "Invalid parameter: Tags Reason: Topic already exists with different tags"748            )749        if tags:750            self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)751        sns_backend.sns_subscriptions[topic_arn] = (752            sns_backend.sns_subscriptions.get(topic_arn) or []753        )754        # publish event755        event_publisher.fire_event(756            event_publisher.EVENT_SNS_CREATE_TOPIC,757            payload={"t": event_publisher.get_hash(topic_arn)},758        )759        return CreateTopicResponse(TopicArn=topic_arn)760def message_to_subscribers(761    message_id,762    message,763    topic_arn,764    req_data,765    headers,766    subscription_arn=None,767    skip_checks=False,768    message_attributes=None,769):770    # AWS allows using TargetArn to publish to a topic, for backward compatibility771    if not topic_arn:772        topic_arn = req_data.get("TargetArn")773    sns_backend = SNSBackend.get()774    subscriptions = sns_backend.sns_subscriptions.get(topic_arn, [])775    async def wait_for_messages_sent():776        subs = [777            message_to_subscriber(778                message_id,779                message,780                topic_arn,781                req_data,782                headers,783                subscription_arn,784                skip_checks,785                sns_backend,786                subscriber,787                subscriptions,788                message_attributes,789            )790            for subscriber in list(subscriptions)791        ]792        if subs:793            await asyncio.wait(subs)794    asyncio.run(wait_for_messages_sent())795async def message_to_subscriber(796    message_id,797    message,798    topic_arn,799    req_data,800    headers,801    subscription_arn,802    skip_checks,803    sns_backend,804    subscriber,805    subscriptions,806    message_attributes,807):808    if subscription_arn not in [None, subscriber["SubscriptionArn"]]:809        return810    filter_policy = json.loads(subscriber.get("FilterPolicy") or "{}")811    if not skip_checks and not check_filter_policy(filter_policy, message_attributes):812        LOG.info(813            "SNS filter policy %s does not match attributes %s", filter_policy, message_attributes814        )815        return816    # todo: Message attributes are sent only when the message structure is String, not JSON.817    if subscriber["Protocol"] == "sms":818        event = {819            "topic_arn": topic_arn,820            "endpoint": subscriber["Endpoint"],821            "message_content": req_data["Message"][0],822        }823        sns_backend.sms_messages.append(event)824        LOG.info(825            "Delivering SMS message to %s: %s",826            subscriber["Endpoint"],827            req_data["Message"][0],828        )829        # MOCK DATA830        delivery = {831            "phoneCarrier": "Mock Carrier",832            "mnc": 270,833            "priceInUSD": 0.00645,834            "smsType": "Transactional",835            "mcc": 310,836            "providerResponse": "Message has been accepted by phone carrier",837            "dwellTimeMsUntilDeviceAck": 200,838        }839        store_delivery_log(subscriber, True, message, message_id, delivery)840        return841    elif subscriber["Protocol"] == "sqs":842        queue_url = None843        message_body = create_sns_message_body(subscriber, req_data, message_id, message_attributes)844        try:845            endpoint = subscriber["Endpoint"]846            if "sqs_queue_url" in subscriber:847                queue_url = subscriber.get("sqs_queue_url")848            elif "://" in endpoint:849                queue_url = endpoint850            else:851                queue_url = aws_stack.get_sqs_queue_url(endpoint)852                subscriber["sqs_queue_url"] = queue_url853            message_group_id = req_data.get("MessageGroupId", [""])[0]854            message_deduplication_id = req_data.get("MessageDeduplicationId", [""])[0]855            sqs_client = aws_stack.connect_to_service("sqs")856            kwargs = {}857            if message_group_id:858                kwargs["MessageGroupId"] = message_group_id859            if message_deduplication_id:860                kwargs["MessageDeduplicationId"] = message_deduplication_id861            sqs_client.send_message(862                QueueUrl=queue_url,863                MessageBody=message_body,864                MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes),865                MessageSystemAttributes=create_sqs_system_attributes(headers),866                **kwargs,867            )868            store_delivery_log(subscriber, True, message, message_id)869        except Exception as exc:870            LOG.info("Unable to forward SNS message to SQS: %s %s", exc, traceback.format_exc())871            store_delivery_log(subscriber, False, message, message_id)872            sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))873            if "NonExistentQueue" in str(exc):874                LOG.info(875                    'Removing non-existent queue "%s" subscribed to topic "%s"',876                    queue_url,877                    topic_arn,878                )879                subscriptions.remove(subscriber)880        return881    elif subscriber["Protocol"] == "lambda":882        try:883            external_url = external_service_url("sns")884            unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])885            response = lambda_api.process_sns_notification(886                subscriber["Endpoint"],887                topic_arn,888                subscriber["SubscriptionArn"],889                message,890                message_id,891                # see the format here892                # https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html893                # issue with sdk to serialize the attribute inside lambda894                prepare_message_attributes(message_attributes),895                unsubscribe_url,896                subject=req_data.get("Subject")[0],897            )898            if response is not None:899                delivery = {900                    "statusCode": response.status_code,901                    "providerResponse": response.get_data(),902                }903                store_delivery_log(subscriber, True, message, message_id, delivery)904            # TODO: Check if it can be removed905            if isinstance(response, Response):906                response.raise_for_status()907            elif isinstance(response, FlaskResponse):908                if response.status_code >= 400:909                    raise Exception(910                        "Error response (code %s): %s" % (response.status_code, response.data)911                    )912        except Exception as exc:913            LOG.info(914                "Unable to run Lambda function on SNS message: %s %s", exc, traceback.format_exc()915            )916            store_delivery_log(subscriber, False, message, message_id)917            message_body = create_sns_message_body(918                subscriber, req_data, message_id, message_attributes919            )920            sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))921        return922    elif subscriber["Protocol"] in ["http", "https"]:923        msg_type = (req_data.get("Type") or ["Notification"])[0]924        try:925            message_body = create_sns_message_body(926                subscriber, req_data, message_id, message_attributes927            )928        except Exception:929            return930        try:931            message_headers = {932                "Content-Type": "text/plain",933                # AWS headers according to934                # https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html#http-header935                "x-amz-sns-message-type": msg_type,936                "x-amz-sns-message-id": message_id,937                "x-amz-sns-topic-arn": subscriber["TopicArn"],938                "User-Agent": "Amazon Simple Notification Service Agent",939            }940            if msg_type != "SubscriptionConfirmation":941                # while testing, never had those from AWS but the docs above states it should be there942                message_headers["x-amz-sns-subscription-arn"] = subscriber["SubscriptionArn"]943            # When raw message delivery is enabled, x-amz-sns-rawdelivery needs to be set to 'true'944            # indicating that the message has been published without JSON formatting.945            # https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html946            elif msg_type == "Notification" and is_raw_message_delivery(subscriber):947                message_headers["x-amz-sns-rawdelivery"] = "true"948            response = requests.post(949                subscriber["Endpoint"],950                headers=message_headers,951                data=message_body,952                verify=False,953            )954            delivery = {955                "statusCode": response.status_code,956                "providerResponse": response.content.decode("utf-8"),957            }958            store_delivery_log(subscriber, True, message, message_id, delivery)959            response.raise_for_status()960        except Exception as exc:961            LOG.info(962                "Received error on sending SNS message, putting to DLQ (if configured): %s", exc963            )964            store_delivery_log(subscriber, False, message, message_id)965            # AWS doesnt send to the DLQ if there's an error trying to deliver a UnsubscribeConfirmation msg966            if msg_type != "UnsubscribeConfirmation":967                sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))968        return969    elif subscriber["Protocol"] == "application":970        try:971            sns_client = aws_stack.connect_to_service("sns")972            sns_client.publish(TargetArn=subscriber["Endpoint"], Message=message)973            store_delivery_log(subscriber, True, message, message_id)974        except Exception as exc:975            LOG.warning(976                "Unable to forward SNS message to SNS platform app: %s %s",977                exc,978                traceback.format_exc(),979            )980            store_delivery_log(subscriber, False, message, message_id)981            message_body = create_sns_message_body(subscriber, req_data, message_id)982            sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))983        return984    elif subscriber["Protocol"] in ["email", "email-json"]:985        ses_client = aws_stack.connect_to_service("ses")986        if subscriber.get("Endpoint"):987            ses_client.verify_email_address(EmailAddress=subscriber.get("Endpoint"))988            ses_client.verify_email_address(EmailAddress="admin@localstack.com")989            ses_client.send_email(990                Source="admin@localstack.com",991                Message={992                    "Body": {993                        "Text": {994                            "Data": create_sns_message_body(995                                subscriber=subscriber, req_data=req_data, message_id=message_id996                            )997                            if subscriber["Protocol"] == "email-json"998                            else message999                        }1000                    },1001                    "Subject": {"Data": "SNS-Subscriber-Endpoint"},1002                },1003                Destination={"ToAddresses": [subscriber.get("Endpoint")]},1004            )1005            store_delivery_log(subscriber, True, message, message_id)1006    else:1007        LOG.warning('Unexpected protocol "%s" for SNS subscription', subscriber["Protocol"])1008def get_subscription_by_arn(sub_arn):1009    sns_backend = SNSBackend.get()1010    # TODO maintain separate map instead of traversing all items1011    for key, subscriptions in sns_backend.sns_subscriptions.items():1012        for sub in subscriptions:1013            if sub["SubscriptionArn"] == sub_arn:1014                return sub1015def create_sns_message_body(1016    subscriber, req_data, message_id=None, message_attributes: MessageAttributeMap = None1017) -> str:1018    message = req_data["Message"][0]1019    message_type = req_data.get("Type", ["Notification"])[0]1020    protocol = subscriber["Protocol"]1021    if req_data.get("MessageStructure") == ["json"]:1022        message = json.loads(message)1023        try:1024            message = message.get(protocol, message["default"])1025        except KeyError:1026            raise Exception("Unable to find 'default' key in message payload")1027    if message_type == "Notification" and is_raw_message_delivery(subscriber):1028        return message1029    external_url = external_service_url("sns")1030    data = {1031        "Type": message_type,1032        "MessageId": message_id,1033        "TopicArn": subscriber["TopicArn"],1034        "Message": message,1035        "Timestamp": timestamp_millis(),1036        "SignatureVersion": "1",1037        # TODO Add a more sophisticated solution with an actual signature1038        # Hardcoded1039        "Signature": "EXAMPLEpH+..",1040        "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",1041    }1042    if message_type == "Notification":1043        unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])1044        data["UnsubscribeURL"] = unsubscribe_url1045    for key in ["Subject", "SubscribeURL", "Token"]:1046        if req_data.get(key) and req_data[key][0]:1047            data[key] = req_data[key][0]1048    if message_attributes:1049        data["MessageAttributes"] = prepare_message_attributes(message_attributes)1050    return json.dumps(data)1051def _get_tags(topic_arn):1052    sns_backend = SNSBackend.get()1053    if topic_arn not in sns_backend.sns_tags:1054        sns_backend.sns_tags[topic_arn] = []1055    return sns_backend.sns_tags[topic_arn]1056def is_raw_message_delivery(susbcriber):1057    return susbcriber.get("RawMessageDelivery") in ("true", True, "True")1058def create_sqs_message_attributes(subscriber, attributes):1059    message_attributes = {}1060    if not is_raw_message_delivery(subscriber):1061        return message_attributes1062    for key, value in attributes.items():1063        # TODO: check if naming differs between ASF and QueryParameters, if not remove .get("Type") and .get("Value")1064        if value.get("Type") or value.get("DataType"):1065            tpe = value.get("Type") or value.get("DataType")1066            attribute = {"DataType": tpe}1067            if tpe == "Binary":1068                val = value.get("BinaryValue") or value.get("Value")1069                attribute["BinaryValue"] = base64.b64decode(to_bytes(val))1070                # base64 decoding might already have happened, in which decode fails.1071                # If decode fails, fallback to whatever is in there.1072                if not attribute["BinaryValue"]:1073                    attribute["BinaryValue"] = val1074            else:1075                val = value.get("StringValue") or value.get("Value", "")1076                attribute["StringValue"] = str(val)1077            message_attributes[key] = attribute1078    return message_attributes1079def prepare_message_attributes(message_attributes: MessageAttributeMap):1080    attributes = {}1081    if not message_attributes:1082        return attributes1083    # todo: Number type is not supported for Lambda subscriptions, passed as String1084    #  do conversion here1085    for attr_name, attr in message_attributes.items():1086        if attr.get("StringValue", None):1087            val = attr["StringValue"]1088        else:1089            # binary payload in base64 encoded by AWS, UTF-8 for JSON1090            # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1091            val = base64.b64encode(attr["BinaryValue"]).decode()1092        attributes[attr_name] = {1093            "Type": attr["DataType"],1094            "Value": val,1095        }1096    return attributes1097def create_subscribe_url(external_url, topic_arn, subscription_token):1098    return f"{external_url}/?Action=ConfirmSubscription&TopicArn={topic_arn}&Token={subscription_token}"1099def create_unsubscribe_url(external_url, subscription_arn):1100    return f"{external_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1101def is_number(x):1102    try:1103        float(x)1104        return True1105    except ValueError:1106        return False1107def evaluate_numeric_condition(conditions, value):1108    if not is_number(value):1109        return False1110    for i in range(0, len(conditions), 2):1111        value = float(value)1112        operator = conditions[i]1113        operand = float(conditions[i + 1])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
