Best Python code snippet using localstack_python
message_action.py
Source:message_action.py  
1# Standard Library2import json3import os4# Third Party5import boto36# First Party7from smdebug.core.logger import get_logger8# action :9# {name:'sms' or 'email', 'endpoint':'phone or emailid'}10class MessageAction:11    def __init__(self, rule_name, message_type, message_endpoint):12        self._topic_name = "SMDebugRules"13        self._logger = get_logger()14        if message_type == "sms" or message_type == "email":15            self._protocol = message_type16        else:17            self._protocol = None18            self._logger.info(19                f"Unsupported message type:{message_type} in MessageAction. Returning"20            )21            return22        self._message_endpoint = message_endpoint23        # Below 2 is to help in tests24        self._last_send_mesg_response = None25        self._last_subscription_response = None26        env_region_name = os.getenv("AWS_REGION", "us-east-1")27        self._sns_client = boto3.client("sns", region_name=env_region_name)28        self._topic_arn = self._create_sns_topic_if_not_exists()29        self._subscribe_mesgtype_endpoint()30        self._logger.info(31            f"Registering messageAction with protocol:{self._protocol} endpoint:{self._message_endpoint} and topic_arn:{self._topic_arn} region:{env_region_name}"32        )33        self._rule_name = rule_name34    def _create_sns_topic_if_not_exists(self):35        try:36            topic = self._sns_client.create_topic(Name=self._topic_name)37            self._logger.info(38                f"topic_name: {self._topic_name} , creating topic returned response:{topic}"39            )40            if topic:41                return topic["TopicArn"]42        except Exception as e:43            self._logger.info(44                f"Caught exception while creating topic:{self._topic_name} exception is: \n {e}"45            )46        return None47    def _check_subscriptions(self, topic_arn, protocol, endpoint):48        try:49            next_token = "random"50            subs = self._sns_client.list_subscriptions()51            sub_array = subs["Subscriptions"]52            while next_token is not None:53                for sub_dict in sub_array:54                    proto = sub_dict["Protocol"]55                    ep = sub_dict["Endpoint"]56                    topic = sub_dict["TopicArn"]57                    if proto == protocol and topic == topic_arn and ep == endpoint:58                        self._logger.info(f"Existing Subscription found: {sub_dict}")59                        return True60                if "NextToken" in subs:61                    next_token = subs["NextToken"]62                    subs = self._sns_client.list_subscriptions(NextToken=next_token)63                    sub_array = subs["Subscriptions"]64                    continue65                else:66                    next_token = None67        except Exception as e:68            self._logger.info(69                f"Caught exception while list subscription topic:{self._topic_name} exception is: \n {e}"70            )71        return False72    def _subscribe_mesgtype_endpoint(self):73        response = None74        try:75            if self._topic_arn and self._protocol and self._message_endpoint:76                filter_policy = {}77                if self._protocol == "sms":78                    filter_policy["phone_num"] = [self._message_endpoint]79                else:80                    filter_policy["email"] = [self._message_endpoint]81                if not self._check_subscriptions(82                    self._topic_arn, self._protocol, self._message_endpoint83                ):84                    response = self._sns_client.subscribe(85                        TopicArn=self._topic_arn,86                        Protocol=self._protocol,  # sms or email87                        Endpoint=self._message_endpoint,  # phone number or email addresss88                        Attributes={"FilterPolicy": json.dumps(filter_policy)},89                        ReturnSubscriptionArn=False,  # True means always return ARN90                    )91                else:92                    response = f"Subscription exists for topic:{self._topic_arn}, protocol:{self._protocol}, endpoint:{self._message_endpoint}"93        except Exception as e:94            self._logger.info(95                f"Caught exception while subscribing endpoint on topic:{self._topic_arn} exception is: \n {e}"96            )97        self._logger.info(f"response for sns subscribe is {response} ")98        self._last_subscription_response = response99    def _send_message(self, message):100        response = None101        message = f"SMDebugRule:{self._rule_name} fired. {message}"102        try:103            if self._protocol == "sms":104                msg_attributes = {105                    "phone_num": {"DataType": "String", "StringValue": self._message_endpoint}106                }107            else:108                msg_attributes = {109                    "email": {"DataType": "String", "StringValue": self._message_endpoint}110                }111            response = self._sns_client.publish(112                TopicArn=self._topic_arn,113                Message=message,114                Subject=f"SMDebugRule:{self._rule_name} fired",115                # MessageStructure='json',116                MessageAttributes=msg_attributes,117            )118        except Exception as e:119            self._logger.info(120                f"Caught exception while publishing message on topic:{self._topic_arn} exception is: \n {e}"121            )122        self._logger.info(f"Response of send message:{response}")123        self._last_send_mesg_response = response124        return response125    def invoke(self, message):...sns.py
Source:sns.py  
1class SNSTopic(object):2    """ Helper for SNS Topic creation/management.3    Working with SNS topics is kind of annoying by default.  This provides a4    more simple interface for working with them.5    """6    def __init__(self, connection, topic_name):7        self._conn = connection8        self._topic_name = topic_name9        self._topic_arn = None10    def _setup_topic(self):11        if self._topic_arn:12            return13        conn = self._conn14        response = conn.create_topic(self._topic_name)['CreateTopicResponse']15        self._topic_arn = response['CreateTopicResult']['TopicArn']16    def publish(self, *args, **kwargs):17        self._setup_topic()18        return self._conn.publish(self._topic_arn, *args, **kwargs)19    def subscribe_sqs_queue(self, *args, **kwargs):20        self._setup_topic()21        response = self._conn.subscribe_sqs_queue(self._topic_arn,22                                                  *args, **kwargs)23        response = response['SubscribeResponse']24        result = response['SubscribeResult']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
