Best Python code snippet using localstack_python
sns_listener.py
Source:sns_listener.py  
...74                            try:75                                sqs_client.send_message(76                                    QueueUrl=queue_url,77                                    MessageBody=create_sns_message_body(subscriber, req_data),78                                    MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes)79                                )80                            except Exception as exc:81                                return make_error(message=str(exc), code=400)82                        elif subscriber['Protocol'] == 'lambda':83                            lambda_api.process_sns_notification(84                                subscriber['Endpoint'],85                                topic_arn, message, subject=req_data.get('Subject', [None])[0]86                            )87                        elif subscriber['Protocol'] in ['http', 'https']:88                            try:89                                message_body = create_sns_message_body(subscriber, req_data)90                            except Exception as exc:91                                return make_error(message=str(exc), code=400)92                            requests.post(93                                subscriber['Endpoint'],94                                headers={95                                    'Content-Type': 'text/plain',96                                    'x-amz-sns-message-type': 'Notification'97                                },98                                data=message_body99                            )100                        else:101                            LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])102                # return response here because we do not want the request to be forwarded to SNS103                return make_response(req_action)104        return True105    def return_response(self, method, path, data, headers, response):106        # This method is executed by the proxy after we've already received a107        # response from the backend, hence we can utilize the "response" variable here108        if method == 'POST' and path == '/':109            req_data = urlparse.parse_qs(to_str(data))110            req_action = req_data['Action'][0]111            if req_action == 'Subscribe' and response.status_code < 400:112                response_data = xmltodict.parse(response.content)113                topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]114                sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']115                do_subscribe(topic_arn, req_data['Endpoint'][0], req_data['Protocol'][0], sub_arn)116# instantiate listener117UPDATE_SNS = ProxyListenerSNS()118def do_create_topic(topic_arn):119    if topic_arn not in SNS_SUBSCRIPTIONS:120        SNS_SUBSCRIPTIONS[topic_arn] = []121def do_delete_topic(topic_arn):122    if topic_arn in SNS_SUBSCRIPTIONS:123        del SNS_SUBSCRIPTIONS[topic_arn]124def do_subscribe(topic_arn, endpoint, protocol, subscription_arn):125    subscription = {126        # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html127        'TopicArn': topic_arn,128        'Endpoint': endpoint,129        'Protocol': protocol,130        'SubscriptionArn': subscription_arn,131        'RawMessageDelivery': 'false'132    }133    SNS_SUBSCRIPTIONS[topic_arn].append(subscription)134def do_unsubscribe(subscription_arn):135    for topic_arn in SNS_SUBSCRIPTIONS:136        SNS_SUBSCRIPTIONS[topic_arn] = [137            sub for sub in SNS_SUBSCRIPTIONS[topic_arn]138            if sub['SubscriptionArn'] != subscription_arn139        ]140# ---------------141# HELPER METHODS142# ---------------143def get_topic_by_arn(topic_arn):144    if topic_arn in SNS_SUBSCRIPTIONS:145        return SNS_SUBSCRIPTIONS[topic_arn]146    else:147        return None148def get_subscription_by_arn(sub_arn):149    # TODO maintain separate map instead of traversing all items150    for key, subscriptions in SNS_SUBSCRIPTIONS.items():151        for sub in subscriptions:152            if sub['SubscriptionArn'] == sub_arn:153                return sub154def make_response(op_name, content=''):155    response = Response()156    if not content:157        content = '<MessageId>%s</MessageId>' % short_uid()158    response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">159        <{op_name}Result>160            {content}161        </{op_name}Result>162        <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>163        </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())164    response.status_code = 200165    return response166def make_error(message, code=400, code_string='InvalidParameter'):167    response = Response()168    response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>169        <Type>Sender</Type>170        <Code>{code_string}</Code>171        <Message>{message}</Message>172        </Error><RequestId>{req_id}</RequestId>173        </ErrorResponse>""".format(message=message, code_string=code_string, req_id=short_uid())174    response.status_code = code175    return response176def create_sns_message_body(subscriber, req_data):177    message = req_data['Message'][0]178    subject = req_data.get('Subject', [None])[0]179    protocol = subscriber['Protocol']180    if subscriber['RawMessageDelivery'] == 'true':181        return message182    if req_data.get('MessageStructure') == ['json']:183        message = json.loads(message)184        try:185            message = message.get(protocol, message['default'])186        except KeyError:187            raise Exception("Unable to find 'default' key in message payload")188    data = {}189    data['MessageId'] = str(uuid.uuid4())190    data['Type'] = 'Notification'191    data['Message'] = message192    data['TopicArn'] = subscriber['TopicArn']193    if subject is not None:194        data['Subject'] = subject195    attributes = get_message_attributes(req_data)196    if attributes:197        data['MessageAttributes'] = attributes198    return json.dumps(data)199def create_sqs_message_attributes(subscriber, attributes):200    if subscriber.get('RawMessageDelivery') not in ('true', True):201        return {}202    message_attributes = {}203    for key, value in attributes.items():204        attribute = {}205        attribute['DataType'] = value['Type']206        if value['Type'] == 'Binary':207            attribute['BinaryValue'] = value['Value']208        else:209            attribute['StringValue'] = value['Value']210        message_attributes[key] = attribute211    return message_attributes212def get_message_attributes(req_data):213    attributes = {}...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
