How to use create_unsubscribe_url method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...680 )681 # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881682 if protocol in ["http", "https"]:683 external_url = external_service_url("sns")684 subscription["UnsubscribeURL"] = create_unsubscribe_url(external_url, subscription_arn)685 confirmation = {686 "Type": ["SubscriptionConfirmation"],687 "Token": [subscription_token],688 "Message": [689 f"You have chosen to subscribe to the topic {topic_arn}.\n"690 + "To confirm the subscription, visit the SubscribeURL included in this message."691 ],692 "SubscribeURL": [create_subscribe_url(external_url, topic_arn, subscription_token)],693 }694 publish_message(topic_arn, confirmation, {}, subscription_arn, skip_checks=True)695 elif protocol in ["sqs", "lambda"]:696 # Auto-confirm sqs and lambda subscriptions for now697 # TODO: revisit for multi-account698 self.confirm_subscription(context, topic_arn, subscription_token)699 return SubscribeResponse(SubscriptionArn=subscription_arn)700 def tag_resource(701 self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList702 ) -> TagResourceResponse:703 # TODO: can this be used to tag any resource when using AWS?704 # each tag key must be unique705 # https://docs.aws.amazon.com/general/latest/gr/aws_tagging.html#tag-best-practices706 unique_tag_keys = {tag["Key"] for tag in tags}707 if len(unique_tag_keys) < len(tags):708 raise InvalidParameterException("Invalid parameter: Duplicated keys are not allowed.")709 call_moto(context)710 sns_backend = SNSBackend.get()711 existing_tags = sns_backend.sns_tags.get(resource_arn, [])712 def existing_tag_index(item):713 for idx, tag in enumerate(existing_tags):714 if item["Key"] == tag["Key"]:715 return idx716 return None717 for item in tags:718 existing_index = existing_tag_index(item)719 if existing_index is None:720 existing_tags.append(item)721 else:722 existing_tags[existing_index] = item723 sns_backend.sns_tags[resource_arn] = existing_tags724 return TagResourceResponse()725 def delete_topic(self, context: RequestContext, topic_arn: topicARN) -> None:726 call_moto(context)727 sns_backend = SNSBackend.get()728 sns_backend.sns_subscriptions.pop(topic_arn, None)729 sns_backend.sns_tags.pop(topic_arn, None)730 event_publisher.fire_event(731 event_publisher.EVENT_SNS_DELETE_TOPIC,732 payload={"t": event_publisher.get_hash(topic_arn)},733 )734 def create_topic(735 self,736 context: RequestContext,737 name: topicName,738 attributes: TopicAttributesMap = None,739 tags: TagList = None,740 ) -> CreateTopicResponse:741 moto_response = call_moto(context)742 sns_backend = SNSBackend.get()743 topic_arn = moto_response["TopicArn"]744 tag_resource_success = extract_tags(topic_arn, tags, True, sns_backend)745 if not tag_resource_success:746 raise InvalidParameterException(747 "Invalid parameter: Tags Reason: Topic already exists with different tags"748 )749 if tags:750 self.tag_resource(context=context, resource_arn=topic_arn, tags=tags)751 sns_backend.sns_subscriptions[topic_arn] = (752 sns_backend.sns_subscriptions.get(topic_arn) or []753 )754 # publish event755 event_publisher.fire_event(756 event_publisher.EVENT_SNS_CREATE_TOPIC,757 payload={"t": event_publisher.get_hash(topic_arn)},758 )759 return CreateTopicResponse(TopicArn=topic_arn)760def message_to_subscribers(761 message_id,762 message,763 topic_arn,764 req_data,765 headers,766 subscription_arn=None,767 skip_checks=False,768 message_attributes=None,769):770 # AWS allows using TargetArn to publish to a topic, for backward compatibility771 if not topic_arn:772 topic_arn = req_data.get("TargetArn")773 sns_backend = SNSBackend.get()774 subscriptions = sns_backend.sns_subscriptions.get(topic_arn, [])775 async def wait_for_messages_sent():776 subs = [777 message_to_subscriber(778 message_id,779 message,780 topic_arn,781 req_data,782 headers,783 subscription_arn,784 skip_checks,785 sns_backend,786 subscriber,787 subscriptions,788 message_attributes,789 )790 for subscriber in list(subscriptions)791 ]792 if subs:793 await asyncio.wait(subs)794 asyncio.run(wait_for_messages_sent())795async def message_to_subscriber(796 message_id,797 message,798 topic_arn,799 req_data,800 headers,801 subscription_arn,802 skip_checks,803 sns_backend,804 subscriber,805 subscriptions,806 message_attributes,807):808 if subscription_arn not in [None, subscriber["SubscriptionArn"]]:809 return810 filter_policy = json.loads(subscriber.get("FilterPolicy") or "{}")811 if not skip_checks and not check_filter_policy(filter_policy, message_attributes):812 LOG.info(813 "SNS filter policy %s does not match attributes %s", filter_policy, message_attributes814 )815 return816 # todo: Message attributes are sent only when the message structure is String, not JSON.817 if subscriber["Protocol"] == "sms":818 event = {819 "topic_arn": topic_arn,820 "endpoint": subscriber["Endpoint"],821 "message_content": req_data["Message"][0],822 }823 sns_backend.sms_messages.append(event)824 LOG.info(825 "Delivering SMS message to %s: %s",826 subscriber["Endpoint"],827 req_data["Message"][0],828 )829 # MOCK DATA830 delivery = {831 "phoneCarrier": "Mock Carrier",832 "mnc": 270,833 "priceInUSD": 0.00645,834 "smsType": "Transactional",835 "mcc": 310,836 "providerResponse": "Message has been accepted by phone carrier",837 "dwellTimeMsUntilDeviceAck": 200,838 }839 store_delivery_log(subscriber, True, message, message_id, delivery)840 return841 elif subscriber["Protocol"] == "sqs":842 queue_url = None843 message_body = create_sns_message_body(subscriber, req_data, message_id, message_attributes)844 try:845 endpoint = subscriber["Endpoint"]846 if "sqs_queue_url" in subscriber:847 queue_url = subscriber.get("sqs_queue_url")848 elif "://" in endpoint:849 queue_url = endpoint850 else:851 queue_url = aws_stack.get_sqs_queue_url(endpoint)852 subscriber["sqs_queue_url"] = queue_url853 message_group_id = req_data.get("MessageGroupId", [""])[0]854 message_deduplication_id = req_data.get("MessageDeduplicationId", [""])[0]855 sqs_client = aws_stack.connect_to_service("sqs")856 kwargs = {}857 if message_group_id:858 kwargs["MessageGroupId"] = message_group_id859 if message_deduplication_id:860 kwargs["MessageDeduplicationId"] = message_deduplication_id861 sqs_client.send_message(862 QueueUrl=queue_url,863 MessageBody=message_body,864 MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes),865 MessageSystemAttributes=create_sqs_system_attributes(headers),866 **kwargs,867 )868 store_delivery_log(subscriber, True, message, message_id)869 except Exception as exc:870 LOG.info("Unable to forward SNS message to SQS: %s %s", exc, traceback.format_exc())871 store_delivery_log(subscriber, False, message, message_id)872 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))873 if "NonExistentQueue" in str(exc):874 LOG.info(875 'Removing non-existent queue "%s" subscribed to topic "%s"',876 queue_url,877 topic_arn,878 )879 subscriptions.remove(subscriber)880 return881 elif subscriber["Protocol"] == "lambda":882 try:883 external_url = external_service_url("sns")884 unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])885 response = lambda_api.process_sns_notification(886 subscriber["Endpoint"],887 topic_arn,888 subscriber["SubscriptionArn"],889 message,890 message_id,891 # see the format here892 # https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html893 # issue with sdk to serialize the attribute inside lambda894 prepare_message_attributes(message_attributes),895 unsubscribe_url,896 subject=req_data.get("Subject")[0],897 )898 if response is not None:899 delivery = {900 "statusCode": response.status_code,901 "providerResponse": response.get_data(),902 }903 store_delivery_log(subscriber, True, message, message_id, delivery)904 # TODO: Check if it can be removed905 if isinstance(response, Response):906 response.raise_for_status()907 elif isinstance(response, FlaskResponse):908 if response.status_code >= 400:909 raise Exception(910 "Error response (code %s): %s" % (response.status_code, response.data)911 )912 except Exception as exc:913 LOG.info(914 "Unable to run Lambda function on SNS message: %s %s", exc, traceback.format_exc()915 )916 store_delivery_log(subscriber, False, message, message_id)917 message_body = create_sns_message_body(918 subscriber, req_data, message_id, message_attributes919 )920 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))921 return922 elif subscriber["Protocol"] in ["http", "https"]:923 msg_type = (req_data.get("Type") or ["Notification"])[0]924 try:925 message_body = create_sns_message_body(926 subscriber, req_data, message_id, message_attributes927 )928 except Exception:929 return930 try:931 message_headers = {932 "Content-Type": "text/plain",933 # AWS headers according to934 # https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html#http-header935 "x-amz-sns-message-type": msg_type,936 "x-amz-sns-message-id": message_id,937 "x-amz-sns-topic-arn": subscriber["TopicArn"],938 "User-Agent": "Amazon Simple Notification Service Agent",939 }940 if msg_type != "SubscriptionConfirmation":941 # while testing, never had those from AWS but the docs above states it should be there942 message_headers["x-amz-sns-subscription-arn"] = subscriber["SubscriptionArn"]943 # When raw message delivery is enabled, x-amz-sns-rawdelivery needs to be set to 'true'944 # indicating that the message has been published without JSON formatting.945 # https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html946 elif msg_type == "Notification" and is_raw_message_delivery(subscriber):947 message_headers["x-amz-sns-rawdelivery"] = "true"948 response = requests.post(949 subscriber["Endpoint"],950 headers=message_headers,951 data=message_body,952 verify=False,953 )954 delivery = {955 "statusCode": response.status_code,956 "providerResponse": response.content.decode("utf-8"),957 }958 store_delivery_log(subscriber, True, message, message_id, delivery)959 response.raise_for_status()960 except Exception as exc:961 LOG.info(962 "Received error on sending SNS message, putting to DLQ (if configured): %s", exc963 )964 store_delivery_log(subscriber, False, message, message_id)965 # AWS doesnt send to the DLQ if there's an error trying to deliver a UnsubscribeConfirmation msg966 if msg_type != "UnsubscribeConfirmation":967 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))968 return969 elif subscriber["Protocol"] == "application":970 try:971 sns_client = aws_stack.connect_to_service("sns")972 sns_client.publish(TargetArn=subscriber["Endpoint"], Message=message)973 store_delivery_log(subscriber, True, message, message_id)974 except Exception as exc:975 LOG.warning(976 "Unable to forward SNS message to SNS platform app: %s %s",977 exc,978 traceback.format_exc(),979 )980 store_delivery_log(subscriber, False, message, message_id)981 message_body = create_sns_message_body(subscriber, req_data, message_id)982 sns_error_to_dead_letter_queue(subscriber, message_body, str(exc))983 return984 elif subscriber["Protocol"] in ["email", "email-json"]:985 ses_client = aws_stack.connect_to_service("ses")986 if subscriber.get("Endpoint"):987 ses_client.verify_email_address(EmailAddress=subscriber.get("Endpoint"))988 ses_client.verify_email_address(EmailAddress="admin@localstack.com")989 ses_client.send_email(990 Source="admin@localstack.com",991 Message={992 "Body": {993 "Text": {994 "Data": create_sns_message_body(995 subscriber=subscriber, req_data=req_data, message_id=message_id996 )997 if subscriber["Protocol"] == "email-json"998 else message999 }1000 },1001 "Subject": {"Data": "SNS-Subscriber-Endpoint"},1002 },1003 Destination={"ToAddresses": [subscriber.get("Endpoint")]},1004 )1005 store_delivery_log(subscriber, True, message, message_id)1006 else:1007 LOG.warning('Unexpected protocol "%s" for SNS subscription', subscriber["Protocol"])1008def get_subscription_by_arn(sub_arn):1009 sns_backend = SNSBackend.get()1010 # TODO maintain separate map instead of traversing all items1011 for key, subscriptions in sns_backend.sns_subscriptions.items():1012 for sub in subscriptions:1013 if sub["SubscriptionArn"] == sub_arn:1014 return sub1015def create_sns_message_body(1016 subscriber, req_data, message_id=None, message_attributes: MessageAttributeMap = None1017) -> str:1018 message = req_data["Message"][0]1019 message_type = req_data.get("Type", ["Notification"])[0]1020 protocol = subscriber["Protocol"]1021 if req_data.get("MessageStructure") == ["json"]:1022 message = json.loads(message)1023 try:1024 message = message.get(protocol, message["default"])1025 except KeyError:1026 raise Exception("Unable to find 'default' key in message payload")1027 if message_type == "Notification" and is_raw_message_delivery(subscriber):1028 return message1029 external_url = external_service_url("sns")1030 data = {1031 "Type": message_type,1032 "MessageId": message_id,1033 "TopicArn": subscriber["TopicArn"],1034 "Message": message,1035 "Timestamp": timestamp_millis(),1036 "SignatureVersion": "1",1037 # TODO Add a more sophisticated solution with an actual signature1038 # Hardcoded1039 "Signature": "EXAMPLEpH+..",1040 "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",1041 }1042 if message_type == "Notification":1043 unsubscribe_url = create_unsubscribe_url(external_url, subscriber["SubscriptionArn"])1044 data["UnsubscribeURL"] = unsubscribe_url1045 for key in ["Subject", "SubscribeURL", "Token"]:1046 if req_data.get(key) and req_data[key][0]:1047 data[key] = req_data[key][0]1048 if message_attributes:1049 data["MessageAttributes"] = prepare_message_attributes(message_attributes)1050 return json.dumps(data)1051def _get_tags(topic_arn):1052 sns_backend = SNSBackend.get()1053 if topic_arn not in sns_backend.sns_tags:1054 sns_backend.sns_tags[topic_arn] = []1055 return sns_backend.sns_tags[topic_arn]1056def is_raw_message_delivery(susbcriber):1057 return susbcriber.get("RawMessageDelivery") in ("true", True, "True")1058def create_sqs_message_attributes(subscriber, attributes):1059 message_attributes = {}1060 if not is_raw_message_delivery(subscriber):1061 return message_attributes1062 for key, value in attributes.items():1063 # TODO: check if naming differs between ASF and QueryParameters, if not remove .get("Type") and .get("Value")1064 if value.get("Type") or value.get("DataType"):1065 tpe = value.get("Type") or value.get("DataType")1066 attribute = {"DataType": tpe}1067 if tpe == "Binary":1068 val = value.get("BinaryValue") or value.get("Value")1069 attribute["BinaryValue"] = base64.b64decode(to_bytes(val))1070 # base64 decoding might already have happened, in which decode fails.1071 # If decode fails, fallback to whatever is in there.1072 if not attribute["BinaryValue"]:1073 attribute["BinaryValue"] = val1074 else:1075 val = value.get("StringValue") or value.get("Value", "")1076 attribute["StringValue"] = str(val)1077 message_attributes[key] = attribute1078 return message_attributes1079def prepare_message_attributes(message_attributes: MessageAttributeMap):1080 attributes = {}1081 if not message_attributes:1082 return attributes1083 # todo: Number type is not supported for Lambda subscriptions, passed as String1084 # do conversion here1085 for attr_name, attr in message_attributes.items():1086 if attr.get("StringValue", None):1087 val = attr["StringValue"]1088 else:1089 # binary payload in base64 encoded by AWS, UTF-8 for JSON1090 # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1091 val = base64.b64encode(attr["BinaryValue"]).decode()1092 attributes[attr_name] = {1093 "Type": attr["DataType"],1094 "Value": val,1095 }1096 return attributes1097def create_subscribe_url(external_url, topic_arn, subscription_token):1098 return f"{external_url}/?Action=ConfirmSubscription&TopicArn={topic_arn}&Token={subscription_token}"1099def create_unsubscribe_url(external_url, subscription_arn):1100 return f"{external_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1101def is_number(x):1102 try:1103 float(x)1104 return True1105 except ValueError:1106 return False1107def evaluate_numeric_condition(conditions, value):1108 if not is_number(value):1109 return False1110 for i in range(0, len(conditions), 2):1111 value = float(value)1112 operator = conditions[i]1113 operand = float(conditions[i + 1])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful