How to use prepare_message_attributes method in localstack

Best Python code snippet using localstack_python

sns_listener.py

Source:sns_listener.py Github

copy

Full Screen

...639 data["Subject"] = [message.get("Subject")]640 if ".fifo" in topic_arn:641 data["MessageGroupId"] = message.get("MessageGroupId")642 # TODO: add MessageDeduplication checks once ASF-SQS implementation becomes default643 message_attributes = prepare_message_attributes(message.get("MessageAttributes", []))644 try:645 message_to_subscribers(646 message_id,647 message["Message"],648 topic_arn,649 data,650 headers,651 message_attributes=message_attributes,652 )653 response["Successful"].append({"Id": message["Id"], "MessageId": message_id})654 except Exception:655 response["Failed"].append({"Id": message["Id"]})656 return response657def do_delete_topic(topic_arn):658 sns_backend = SNSBackend.get()659 sns_backend.sns_subscriptions.pop(topic_arn, None)660 sns_backend.sns_tags.pop(topic_arn, None)661def do_confirm_subscription(topic_arn, token):662 sns_backend = SNSBackend.get()663 for k, v in sns_backend.subscription_status.items():664 if v["Token"] == token and v["TopicArn"] == topic_arn:665 v["Status"] = "Subscribed"666def do_subscribe(topic_arn, endpoint, protocol, subscription_arn, attributes, filter_policy=None):667 sns_backend = SNSBackend.get()668 topic_subs = sns_backend.sns_subscriptions[topic_arn] = (669 sns_backend.sns_subscriptions.get(topic_arn) or []670 )671 # An endpoint may only be subscribed to a topic once. Subsequent672 # subscribe calls do nothing (subscribe is idempotent).673 for existing_topic_subscription in topic_subs:674 if existing_topic_subscription.get("Endpoint") == endpoint:675 return676 subscription = {677 # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html678 "TopicArn": topic_arn,679 "Endpoint": endpoint,680 "Protocol": protocol,681 "SubscriptionArn": subscription_arn,682 "FilterPolicy": filter_policy,683 }684 subscription.update(attributes)685 topic_subs.append(subscription)686 if subscription_arn not in sns_backend.subscription_status:687 sns_backend.subscription_status[subscription_arn] = {}688 sns_backend.subscription_status[subscription_arn].update(689 {"TopicArn": topic_arn, "Token": short_uid(), "Status": "Not Subscribed"}690 )691 # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881692 if protocol in ["http", "https"]:693 token = short_uid()694 external_url = external_service_url("sns")695 subscription["UnsubscribeURL"] = "%s/?Action=Unsubscribe&SubscriptionArn=%s" % (696 external_url,697 subscription_arn,698 )699 confirmation = {700 "Type": ["SubscriptionConfirmation"],701 "Token": [token],702 "Message": [703 ("You have chosen to subscribe to the topic %s.\n" % topic_arn)704 + "To confirm the subscription, visit the SubscribeURL included in this message."705 ],706 "SubscribeURL": [707 "%s/?Action=ConfirmSubscription&TopicArn=%s&Token=%s"708 % (external_url, topic_arn, token)709 ],710 }711 publish_message(topic_arn, confirmation, {}, subscription_arn, skip_checks=True)712def do_unsubscribe(subscription_arn):713 sns_backend = SNSBackend.get()714 for topic_arn, existing_subs in sns_backend.sns_subscriptions.items():715 sns_backend.sns_subscriptions[topic_arn] = [716 sub for sub in existing_subs if sub["SubscriptionArn"] != subscription_arn717 ]718def _get_tags(topic_arn):719 sns_backend = SNSBackend.get()720 if topic_arn not in sns_backend.sns_tags:721 sns_backend.sns_tags[topic_arn] = []722 return sns_backend.sns_tags[topic_arn]723def do_list_tags_for_resource(topic_arn):724 return _get_tags(topic_arn)725def do_tag_resource(topic_arn, tags):726 sns_backend = SNSBackend.get()727 existing_tags = sns_backend.sns_tags.get(topic_arn, [])728 tags = [tag for idx, tag in enumerate(tags) if tag not in tags[:idx]]729 def existing_tag_index(item):730 for idx, tag in enumerate(existing_tags):731 if item["Key"] == tag["Key"]:732 return idx733 return None734 for item in tags:735 existing_index = existing_tag_index(item)736 if existing_index is None:737 existing_tags.append(item)738 else:739 existing_tags[existing_index] = item740 sns_backend.sns_tags[topic_arn] = existing_tags741def do_untag_resource(topic_arn, tag_keys):742 sns_backend = SNSBackend.get()743 sns_backend.sns_tags[topic_arn] = [t for t in _get_tags(topic_arn) if t["Key"] not in tag_keys]744# ---------------745# HELPER METHODS746# ---------------747def get_subscription_by_arn(sub_arn):748 sns_backend = SNSBackend.get()749 # TODO maintain separate map instead of traversing all items750 for key, subscriptions in sns_backend.sns_subscriptions.items():751 for sub in subscriptions:752 if sub["SubscriptionArn"] == sub_arn:753 return sub754def make_response(op_name, content="", message_id=None):755 response = Response()756 if not content:757 message_id = message_id or str(uuid.uuid4())758 content = "<MessageId>%s</MessageId>" % message_id759 response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">760 <{op_name}Result>761 {content}762 </{op_name}Result>763 <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>764 </{op_name}Response>""".format(765 op_name=op_name, content=content, req_id=short_uid()766 )767 response.status_code = 200768 return response769# TODO move to utils!770def make_error(message, code=400, code_string="InvalidParameter"):771 response = Response()772 response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>773 <Type>Sender</Type>774 <Code>{code_string}</Code>775 <Message>{message}</Message>776 </Error><RequestId>{req_id}</RequestId>777 </ErrorResponse>""".format(778 message=message, code_string=code_string, req_id=short_uid()779 )780 response.status_code = code781 return response782def create_sns_message_body(subscriber, req_data, message_id=None):783 message = req_data["Message"][0]784 protocol = subscriber["Protocol"]785 if req_data.get("MessageStructure") == ["json"]:786 message = json.loads(message)787 try:788 message = message.get(protocol, message["default"])789 except KeyError:790 raise Exception("Unable to find 'default' key in message payload")791 if is_raw_message_delivery(subscriber):792 return message793 data = {794 "Type": req_data.get("Type", ["Notification"])[0],795 "MessageId": message_id,796 "TopicArn": subscriber["TopicArn"],797 "Message": message,798 "Timestamp": timestamp_millis(),799 "SignatureVersion": "1",800 # TODO Add a more sophisticated solution with an actual signature801 # Hardcoded802 "Signature": "EXAMPLEpH+..",803 "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",804 }805 for key in ["Subject", "SubscribeURL", "Token"]:806 if req_data.get(key):807 data[key] = req_data[key][0]808 for key in HTTP_SUBSCRIPTION_ATTRIBUTES:809 if key in subscriber:810 data[key] = subscriber[key]811 attributes = get_message_attributes(req_data)812 if attributes:813 data["MessageAttributes"] = attributes814 return json.dumps(data)815def create_sqs_message_attributes(subscriber, attributes):816 if not is_raw_message_delivery(subscriber):817 return {}818 message_attributes = {}819 for key, value in attributes.items():820 if value.get("Type"):821 attribute = {"DataType": value["Type"]}822 if value["Type"] == "Binary":823 attribute["BinaryValue"] = base64.decodebytes(to_bytes(value["Value"]))824 else:825 attribute["StringValue"] = str(value.get("Value", ""))826 message_attributes[key] = attribute827 return message_attributes828def prepare_message_attributes(message_attributes):829 attributes = {}830 for attr in message_attributes:831 attributes[attr["Name"]] = {832 "Type": attr["Value"]["DataType"],833 "Value": attr["Value"]["StringValue"]834 if attr["Value"].get("StringValue", None)835 else attr["Value"]["BinaryValue"],836 }837 return attributes838def get_message_attributes(req_data):839 extracted_msg_attrs = parse_urlencoded_data(req_data, "MessageAttributes.entry")840 return prepare_message_attributes(extracted_msg_attrs)841def get_subscribe_attributes(req_data):842 attributes = {}843 for key in req_data.keys():844 if ".key" in key:845 attributes[req_data[key][0]] = req_data[key.replace("key", "value")][0]846 defaults = {847 # TODO: this is required to get TF "aws_sns_topic_subscription" working, but this should848 # be revisited (e.g., cross-account subscriptions should not be confirmed automatically)849 "PendingConfirmation": "false"850 }851 for key, value in defaults.items():852 attributes[key] = attributes.get(key, value)853 return attributes854def is_number(x):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful