How to use _topic_arn method in localstack

Best Python code snippet using localstack_python

message_action.py

Source:message_action.py Github

copy

Full Screen

1# Standard Library2import json3import os4# Third Party5import boto36# First Party7from smdebug.core.logger import get_logger8# action :9# {name:'sms' or 'email', 'endpoint':'phone or emailid'}10class MessageAction:11 def __init__(self, rule_name, message_type, message_endpoint):12 self._topic_name = "SMDebugRules"13 self._logger = get_logger()14 if message_type == "sms" or message_type == "email":15 self._protocol = message_type16 else:17 self._protocol = None18 self._logger.info(19 f"Unsupported message type:{message_type} in MessageAction. Returning"20 )21 return22 self._message_endpoint = message_endpoint23 # Below 2 is to help in tests24 self._last_send_mesg_response = None25 self._last_subscription_response = None26 env_region_name = os.getenv("AWS_REGION", "us-east-1")27 self._sns_client = boto3.client("sns", region_name=env_region_name)28 self._topic_arn = self._create_sns_topic_if_not_exists()29 self._subscribe_mesgtype_endpoint()30 self._logger.info(31 f"Registering messageAction with protocol:{self._protocol} endpoint:{self._message_endpoint} and topic_arn:{self._topic_arn} region:{env_region_name}"32 )33 self._rule_name = rule_name34 def _create_sns_topic_if_not_exists(self):35 try:36 topic = self._sns_client.create_topic(Name=self._topic_name)37 self._logger.info(38 f"topic_name: {self._topic_name} , creating topic returned response:{topic}"39 )40 if topic:41 return topic["TopicArn"]42 except Exception as e:43 self._logger.info(44 f"Caught exception while creating topic:{self._topic_name} exception is: \n {e}"45 )46 return None47 def _check_subscriptions(self, topic_arn, protocol, endpoint):48 try:49 next_token = "random"50 subs = self._sns_client.list_subscriptions()51 sub_array = subs["Subscriptions"]52 while next_token is not None:53 for sub_dict in sub_array:54 proto = sub_dict["Protocol"]55 ep = sub_dict["Endpoint"]56 topic = sub_dict["TopicArn"]57 if proto == protocol and topic == topic_arn and ep == endpoint:58 self._logger.info(f"Existing Subscription found: {sub_dict}")59 return True60 if "NextToken" in subs:61 next_token = subs["NextToken"]62 subs = self._sns_client.list_subscriptions(NextToken=next_token)63 sub_array = subs["Subscriptions"]64 continue65 else:66 next_token = None67 except Exception as e:68 self._logger.info(69 f"Caught exception while list subscription topic:{self._topic_name} exception is: \n {e}"70 )71 return False72 def _subscribe_mesgtype_endpoint(self):73 response = None74 try:75 if self._topic_arn and self._protocol and self._message_endpoint:76 filter_policy = {}77 if self._protocol == "sms":78 filter_policy["phone_num"] = [self._message_endpoint]79 else:80 filter_policy["email"] = [self._message_endpoint]81 if not self._check_subscriptions(82 self._topic_arn, self._protocol, self._message_endpoint83 ):84 response = self._sns_client.subscribe(85 TopicArn=self._topic_arn,86 Protocol=self._protocol, # sms or email87 Endpoint=self._message_endpoint, # phone number or email addresss88 Attributes={"FilterPolicy": json.dumps(filter_policy)},89 ReturnSubscriptionArn=False, # True means always return ARN90 )91 else:92 response = f"Subscription exists for topic:{self._topic_arn}, protocol:{self._protocol}, endpoint:{self._message_endpoint}"93 except Exception as e:94 self._logger.info(95 f"Caught exception while subscribing endpoint on topic:{self._topic_arn} exception is: \n {e}"96 )97 self._logger.info(f"response for sns subscribe is {response} ")98 self._last_subscription_response = response99 def _send_message(self, message):100 response = None101 message = f"SMDebugRule:{self._rule_name} fired. {message}"102 try:103 if self._protocol == "sms":104 msg_attributes = {105 "phone_num": {"DataType": "String", "StringValue": self._message_endpoint}106 }107 else:108 msg_attributes = {109 "email": {"DataType": "String", "StringValue": self._message_endpoint}110 }111 response = self._sns_client.publish(112 TopicArn=self._topic_arn,113 Message=message,114 Subject=f"SMDebugRule:{self._rule_name} fired",115 # MessageStructure='json',116 MessageAttributes=msg_attributes,117 )118 except Exception as e:119 self._logger.info(120 f"Caught exception while publishing message on topic:{self._topic_arn} exception is: \n {e}"121 )122 self._logger.info(f"Response of send message:{response}")123 self._last_send_mesg_response = response124 return response125 def invoke(self, message):...

Full Screen

Full Screen

sns.py

Source:sns.py Github

copy

Full Screen

1class SNSTopic(object):2 """ Helper for SNS Topic creation/management.3 Working with SNS topics is kind of annoying by default. This provides a4 more simple interface for working with them.5 """6 def __init__(self, connection, topic_name):7 self._conn = connection8 self._topic_name = topic_name9 self._topic_arn = None10 def _setup_topic(self):11 if self._topic_arn:12 return13 conn = self._conn14 response = conn.create_topic(self._topic_name)['CreateTopicResponse']15 self._topic_arn = response['CreateTopicResult']['TopicArn']16 def publish(self, *args, **kwargs):17 self._setup_topic()18 return self._conn.publish(self._topic_arn, *args, **kwargs)19 def subscribe_sqs_queue(self, *args, **kwargs):20 self._setup_topic()21 response = self._conn.subscribe_sqs_queue(self._topic_arn,22 *args, **kwargs)23 response = response['SubscribeResponse']24 result = response['SubscribeResult']...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful