How to use store_delivery_log method in localstack

Best Python code snippet using localstack_python

sns_listener.py

Source:sns_listener.py Github

copy

Full Screen

...446 "mcc": 310,447 "providerResponse": "Message has been accepted by phone carrier",448 "dwellTimeMsUntilDeviceAck": 200,449 }450 store_delivery_log(subscriber, True, message, message_id, delivery)451 return452 elif subscriber["Protocol"] == "sqs":453 queue_url = None454 try:455 endpoint = subscriber["Endpoint"]456 if "sqs_queue_url" in subscriber:457 queue_url = subscriber.get("sqs_queue_url")458 elif "://" in endpoint:459 queue_url = endpoint460 else:461 queue_name = endpoint.split(":")[5]462 queue_url = aws_stack.get_sqs_queue_url(queue_name)463 subscriber["sqs_queue_url"] = queue_url464 message_group_id = (465 req_data.get("MessageGroupId") if req_data.get("MessageGroupId") else ""466 )467 if isinstance(message_group_id, list):468 message_group_id = message_group_id[0]469 message_deduplication_id = (470 req_data.get("MessageDeduplicationId")[0]471 if req_data.get("MessageDeduplicationId")472 else ""473 )474 sqs_client = aws_stack.connect_to_service("sqs")475 kwargs = {}476 if message_group_id:477 kwargs["MessageGroupId"] = message_group_id478 if message_deduplication_id:479 kwargs["MessageDeduplicationId"] = message_deduplication_id480 sqs_client.send_message(481 QueueUrl=queue_url,482 MessageBody=create_sns_message_body(subscriber, req_data, message_id),483 MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes),484 MessageSystemAttributes=create_sqs_system_attributes(headers),485 **kwargs,486 )487 store_delivery_log(subscriber, True, message, message_id)488 except Exception as exc:489 LOG.info("Unable to forward SNS message to SQS: %s %s", exc, traceback.format_exc())490 store_delivery_log(subscriber, False, message, message_id)491 sns_error_to_dead_letter_queue(subscriber["SubscriptionArn"], req_data, str(exc))492 if "NonExistentQueue" in str(exc):493 LOG.info(494 'Removing non-existent queue "%s" subscribed to topic "%s"',495 queue_url,496 topic_arn,497 )498 subscriptions.remove(subscriber)499 return500 elif subscriber["Protocol"] == "lambda":501 try:502 external_url = external_service_url("sns")503 unsubscribe_url = "%s/?Action=Unsubscribe&SubscriptionArn=%s" % (504 external_url,505 subscriber["SubscriptionArn"],506 )507 response = lambda_api.process_sns_notification(508 subscriber["Endpoint"],509 topic_arn,510 subscriber["SubscriptionArn"],511 message,512 message_id,513 message_attributes,514 unsubscribe_url,515 subject=req_data.get("Subject", [None])[0],516 )517 if response is not None:518 delivery = {519 "statusCode": response.status_code,520 "providerResponse": response.get_data(),521 }522 store_delivery_log(subscriber, True, message, message_id, delivery)523 if isinstance(response, Response):524 response.raise_for_status()525 elif isinstance(response, FlaskResponse):526 if response.status_code >= 400:527 raise Exception(528 "Error response (code %s): %s" % (response.status_code, response.data)529 )530 except Exception as exc:531 LOG.info(532 "Unable to run Lambda function on SNS message: %s %s", exc, traceback.format_exc()533 )534 store_delivery_log(subscriber, False, message, message_id)535 sns_error_to_dead_letter_queue(subscriber["SubscriptionArn"], req_data, str(exc))536 return537 elif subscriber["Protocol"] in ["http", "https"]:538 msg_type = (req_data.get("Type") or ["Notification"])[0]539 try:540 message_body = create_sns_message_body(subscriber, req_data, message_id)541 except Exception:542 return543 try:544 response = requests.post(545 subscriber["Endpoint"],546 headers={547 "Content-Type": "text/plain",548 # AWS headers according to549 # https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html#http-header550 "x-amz-sns-message-type": msg_type,551 "x-amz-sns-topic-arn": subscriber["TopicArn"],552 "x-amz-sns-subscription-arn": subscriber["SubscriptionArn"],553 "User-Agent": "Amazon Simple Notification Service Agent",554 },555 data=message_body,556 verify=False,557 )558 delivery = {559 "statusCode": response.status_code,560 "providerResponse": response.content.decode("utf-8"),561 }562 store_delivery_log(subscriber, True, message, message_id, delivery)563 response.raise_for_status()564 except Exception as exc:565 LOG.info(566 "Received error on sending SNS message, putting to DLQ (if configured): %s", exc567 )568 store_delivery_log(subscriber, False, message, message_id)569 sns_error_to_dead_letter_queue(subscriber["SubscriptionArn"], req_data, str(exc))570 return571 elif subscriber["Protocol"] == "application":572 try:573 sns_client = aws_stack.connect_to_service("sns")574 sns_client.publish(TargetArn=subscriber["Endpoint"], Message=message)575 store_delivery_log(subscriber, True, message, message_id)576 except Exception as exc:577 LOG.warning(578 "Unable to forward SNS message to SNS platform app: %s %s",579 exc,580 traceback.format_exc(),581 )582 store_delivery_log(subscriber, False, message, message_id)583 sns_error_to_dead_letter_queue(subscriber["SubscriptionArn"], req_data, str(exc))584 return585 elif subscriber["Protocol"] in ["email", "email-json"]:586 ses_client = aws_stack.connect_to_service("ses")587 if subscriber.get("Endpoint"):588 ses_client.verify_email_address(EmailAddress=subscriber.get("Endpoint"))589 ses_client.verify_email_address(EmailAddress="admin@localstack.com")590 ses_client.send_email(591 Source="admin@localstack.com",592 Message={593 "Body": {594 "Text": {595 "Data": create_sns_message_body(596 subscriber=subscriber, req_data=req_data, message_id=message_id597 )598 if subscriber["Protocol"] == "email-json"599 else message600 }601 },602 "Subject": {"Data": "SNS-Subscriber-Endpoint"},603 },604 Destination={"ToAddresses": [subscriber.get("Endpoint")]},605 )606 store_delivery_log(subscriber, True, message, message_id)607 else:608 LOG.warning('Unexpected protocol "%s" for SNS subscription', subscriber["Protocol"])609def publish_message(topic_arn, req_data, headers, subscription_arn=None, skip_checks=False):610 sns_backend = SNSBackend.get()611 message = req_data["Message"][0]612 message_id = str(uuid.uuid4())613 if topic_arn and ":endpoint/" in topic_arn:614 # cache messages published to platform endpoints615 cache = sns_backend.platform_endpoint_messages[topic_arn] = (616 sns_backend.platform_endpoint_messages.get(topic_arn) or []617 )618 cache.append(req_data)619 LOG.debug("Publishing message to TopicArn: %s | Message: %s", topic_arn, message)620 start_thread(621 lambda _: message_to_subscribers(622 message_id,623 message,624 topic_arn,625 req_data,626 headers,627 subscription_arn,628 skip_checks,629 )630 )631 return message_id632def publish_batch(topic_arn, messages, headers):633 response = {"Successful": [], "Failed": []}634 for message in messages:635 message_id = str(uuid.uuid4())636 data = {}637 data["TopicArn"] = [topic_arn]638 data["Message"] = [message["Message"]]639 data["Subject"] = [message.get("Subject")]640 if ".fifo" in topic_arn:641 data["MessageGroupId"] = message.get("MessageGroupId")642 # TODO: add MessageDeduplication checks once ASF-SQS implementation becomes default643 message_attributes = prepare_message_attributes(message.get("MessageAttributes", []))644 try:645 message_to_subscribers(646 message_id,647 message["Message"],648 topic_arn,649 data,650 headers,651 message_attributes=message_attributes,652 )653 response["Successful"].append({"Id": message["Id"], "MessageId": message_id})654 except Exception:655 response["Failed"].append({"Id": message["Id"]})656 return response657def do_delete_topic(topic_arn):658 sns_backend = SNSBackend.get()659 sns_backend.sns_subscriptions.pop(topic_arn, None)660 sns_backend.sns_tags.pop(topic_arn, None)661def do_confirm_subscription(topic_arn, token):662 sns_backend = SNSBackend.get()663 for k, v in sns_backend.subscription_status.items():664 if v["Token"] == token and v["TopicArn"] == topic_arn:665 v["Status"] = "Subscribed"666def do_subscribe(topic_arn, endpoint, protocol, subscription_arn, attributes, filter_policy=None):667 sns_backend = SNSBackend.get()668 topic_subs = sns_backend.sns_subscriptions[topic_arn] = (669 sns_backend.sns_subscriptions.get(topic_arn) or []670 )671 # An endpoint may only be subscribed to a topic once. Subsequent672 # subscribe calls do nothing (subscribe is idempotent).673 for existing_topic_subscription in topic_subs:674 if existing_topic_subscription.get("Endpoint") == endpoint:675 return676 subscription = {677 # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html678 "TopicArn": topic_arn,679 "Endpoint": endpoint,680 "Protocol": protocol,681 "SubscriptionArn": subscription_arn,682 "FilterPolicy": filter_policy,683 }684 subscription.update(attributes)685 topic_subs.append(subscription)686 if subscription_arn not in sns_backend.subscription_status:687 sns_backend.subscription_status[subscription_arn] = {}688 sns_backend.subscription_status[subscription_arn].update(689 {"TopicArn": topic_arn, "Token": short_uid(), "Status": "Not Subscribed"}690 )691 # Send out confirmation message for HTTP(S), fix for https://github.com/localstack/localstack/issues/881692 if protocol in ["http", "https"]:693 token = short_uid()694 external_url = external_service_url("sns")695 subscription["UnsubscribeURL"] = "%s/?Action=Unsubscribe&SubscriptionArn=%s" % (696 external_url,697 subscription_arn,698 )699 confirmation = {700 "Type": ["SubscriptionConfirmation"],701 "Token": [token],702 "Message": [703 ("You have chosen to subscribe to the topic %s.\n" % topic_arn)704 + "To confirm the subscription, visit the SubscribeURL included in this message."705 ],706 "SubscribeURL": [707 "%s/?Action=ConfirmSubscription&TopicArn=%s&Token=%s"708 % (external_url, topic_arn, token)709 ],710 }711 publish_message(topic_arn, confirmation, {}, subscription_arn, skip_checks=True)712def do_unsubscribe(subscription_arn):713 sns_backend = SNSBackend.get()714 for topic_arn, existing_subs in sns_backend.sns_subscriptions.items():715 sns_backend.sns_subscriptions[topic_arn] = [716 sub for sub in existing_subs if sub["SubscriptionArn"] != subscription_arn717 ]718def _get_tags(topic_arn):719 sns_backend = SNSBackend.get()720 if topic_arn not in sns_backend.sns_tags:721 sns_backend.sns_tags[topic_arn] = []722 return sns_backend.sns_tags[topic_arn]723def do_list_tags_for_resource(topic_arn):724 return _get_tags(topic_arn)725def do_tag_resource(topic_arn, tags):726 sns_backend = SNSBackend.get()727 existing_tags = sns_backend.sns_tags.get(topic_arn, [])728 tags = [tag for idx, tag in enumerate(tags) if tag not in tags[:idx]]729 def existing_tag_index(item):730 for idx, tag in enumerate(existing_tags):731 if item["Key"] == tag["Key"]:732 return idx733 return None734 for item in tags:735 existing_index = existing_tag_index(item)736 if existing_index is None:737 existing_tags.append(item)738 else:739 existing_tags[existing_index] = item740 sns_backend.sns_tags[topic_arn] = existing_tags741def do_untag_resource(topic_arn, tag_keys):742 sns_backend = SNSBackend.get()743 sns_backend.sns_tags[topic_arn] = [t for t in _get_tags(topic_arn) if t["Key"] not in tag_keys]744# ---------------745# HELPER METHODS746# ---------------747def get_subscription_by_arn(sub_arn):748 sns_backend = SNSBackend.get()749 # TODO maintain separate map instead of traversing all items750 for key, subscriptions in sns_backend.sns_subscriptions.items():751 for sub in subscriptions:752 if sub["SubscriptionArn"] == sub_arn:753 return sub754def make_response(op_name, content="", message_id=None):755 response = Response()756 if not content:757 message_id = message_id or str(uuid.uuid4())758 content = "<MessageId>%s</MessageId>" % message_id759 response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">760 <{op_name}Result>761 {content}762 </{op_name}Result>763 <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>764 </{op_name}Response>""".format(765 op_name=op_name, content=content, req_id=short_uid()766 )767 response.status_code = 200768 return response769# TODO move to utils!770def make_error(message, code=400, code_string="InvalidParameter"):771 response = Response()772 response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>773 <Type>Sender</Type>774 <Code>{code_string}</Code>775 <Message>{message}</Message>776 </Error><RequestId>{req_id}</RequestId>777 </ErrorResponse>""".format(778 message=message, code_string=code_string, req_id=short_uid()779 )780 response.status_code = code781 return response782def create_sns_message_body(subscriber, req_data, message_id=None):783 message = req_data["Message"][0]784 protocol = subscriber["Protocol"]785 if req_data.get("MessageStructure") == ["json"]:786 message = json.loads(message)787 try:788 message = message.get(protocol, message["default"])789 except KeyError:790 raise Exception("Unable to find 'default' key in message payload")791 if is_raw_message_delivery(subscriber):792 return message793 data = {794 "Type": req_data.get("Type", ["Notification"])[0],795 "MessageId": message_id,796 "TopicArn": subscriber["TopicArn"],797 "Message": message,798 "Timestamp": timestamp_millis(),799 "SignatureVersion": "1",800 # TODO Add a more sophisticated solution with an actual signature801 # Hardcoded802 "Signature": "EXAMPLEpH+..",803 "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem",804 }805 for key in ["Subject", "SubscribeURL", "Token"]:806 if req_data.get(key):807 data[key] = req_data[key][0]808 for key in HTTP_SUBSCRIPTION_ATTRIBUTES:809 if key in subscriber:810 data[key] = subscriber[key]811 attributes = get_message_attributes(req_data)812 if attributes:813 data["MessageAttributes"] = attributes814 return json.dumps(data)815def create_sqs_message_attributes(subscriber, attributes):816 if not is_raw_message_delivery(subscriber):817 return {}818 message_attributes = {}819 for key, value in attributes.items():820 if value.get("Type"):821 attribute = {"DataType": value["Type"]}822 if value["Type"] == "Binary":823 attribute["BinaryValue"] = base64.decodebytes(to_bytes(value["Value"]))824 else:825 attribute["StringValue"] = str(value.get("Value", ""))826 message_attributes[key] = attribute827 return message_attributes828def prepare_message_attributes(message_attributes):829 attributes = {}830 for attr in message_attributes:831 attributes[attr["Name"]] = {832 "Type": attr["Value"]["DataType"],833 "Value": attr["Value"]["StringValue"]834 if attr["Value"].get("StringValue", None)835 else attr["Value"]["BinaryValue"],836 }837 return attributes838def get_message_attributes(req_data):839 extracted_msg_attrs = parse_urlencoded_data(req_data, "MessageAttributes.entry")840 return prepare_message_attributes(extracted_msg_attrs)841def get_subscribe_attributes(req_data):842 attributes = {}843 for key in req_data.keys():844 if ".key" in key:845 attributes[req_data[key][0]] = req_data[key.replace("key", "value")][0]846 defaults = {847 # TODO: this is required to get TF "aws_sns_topic_subscription" working, but this should848 # be revisited (e.g., cross-account subscriptions should not be confirmed automatically)849 "PendingConfirmation": "false"850 }851 for key, value in defaults.items():852 attributes[key] = attributes.get(key, value)853 return attributes854def is_number(x):855 try:856 float(x)857 return True858 except ValueError:859 return False860def evaluate_numeric_condition(conditions, value):861 if not is_number(value):862 return False863 for i in range(0, len(conditions), 2):864 value = float(value)865 operator = conditions[i]866 operand = float(conditions[i + 1])867 if operator == "=":868 if value != operand:869 return False870 elif operator == ">":871 if value <= operand:872 return False873 elif operator == "<":874 if value >= operand:875 return False876 elif operator == ">=":877 if value < operand:878 return False879 elif operator == "<=":880 if value > operand:881 return False882 return True883def evaluate_exists_condition(conditions, message_attributes, criteria):884 # support for exists: false was added in april 2021885 # https://aws.amazon.com/about-aws/whats-new/2021/04/amazon-sns-grows-the-set-of-message-filtering-operators/886 if conditions:887 return message_attributes.get(criteria) is not None888 else:889 return message_attributes.get(criteria) is None890def evaluate_condition(value, condition, message_attributes, criteria):891 if type(condition) is not dict:892 return value == condition893 elif condition.get("exists") is not None:894 return evaluate_exists_condition(condition.get("exists"), message_attributes, criteria)895 elif value is None:896 # the remaining conditions require the value to not be None897 return False898 elif condition.get("anything-but"):899 return value not in condition.get("anything-but")900 elif condition.get("prefix"):901 prefix = condition.get("prefix")902 return value.startswith(prefix)903 elif condition.get("numeric"):904 return evaluate_numeric_condition(condition.get("numeric"), value)905 return False906def evaluate_filter_policy_conditions(conditions, attribute, message_attributes, criteria):907 if type(conditions) is not list:908 conditions = [conditions]909 if attribute is not None and attribute["Type"] == "String.Array":910 values = ast.literal_eval(attribute["Value"])911 for value in values:912 for condition in conditions:913 if evaluate_condition(value, condition, message_attributes, criteria):914 return True915 else:916 for condition in conditions:917 value = attribute["Value"] if attribute is not None else None918 if evaluate_condition(value, condition, message_attributes, criteria):919 return True920 return False921def check_filter_policy(filter_policy, message_attributes):922 if not filter_policy:923 return True924 for criteria in filter_policy:925 conditions = filter_policy.get(criteria)926 attribute = message_attributes.get(criteria)927 if (928 evaluate_filter_policy_conditions(conditions, attribute, message_attributes, criteria)929 is False930 ):931 return False932 return True933def is_raw_message_delivery(susbcriber):934 return susbcriber.get("RawMessageDelivery") in ("true", True, "True")935def store_delivery_log(936 subscriber: dict, success: bool, message: str, message_id: str, delivery: dict = None937):938 log_group_name = subscriber.get("TopicArn", "").replace("arn:aws:", "").replace(":", "/")939 log_stream_name = long_uid()940 invocation_time = int(time.time() * 1000)941 delivery = not_none_or(delivery, {})942 delivery["deliveryId"] = (long_uid(),)943 delivery["destination"] = (subscriber.get("Endpoint", ""),)944 delivery["dwellTimeMs"] = 200945 if not success:946 delivery["attemps"] = 1947 delivery_log = {948 "notification": {949 "messageMD5Sum": md5(message),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful