Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py  
...472        for test in test_data:473            filter_policy = test[1]474            attributes = test[2]475            expected = test[3]476            assert expected == check_filter_policy(filter_policy, attributes)477    def test_is_raw_message_delivery(self, subscriber):478        valid_true_values = ["true", "True", True]479        for true_value in valid_true_values:480            subscriber["RawMessageDelivery"] = true_value481            assert is_raw_message_delivery(subscriber)482    def test_is_not_raw_message_delivery(self, subscriber):483        invalid_values = ["false", "False", False, "somevalue", ""]484        for invalid_values in invalid_values:485            subscriber["RawMessageDelivery"] = invalid_values486            assert not is_raw_message_delivery(subscriber)487        del subscriber["RawMessageDelivery"]...sns_listener.py
Source:sns_listener.py  
...57                sqs_client = aws_stack.connect_to_service('sqs')58                for subscriber in SNS_SUBSCRIPTIONS[topic_arn]:59                    filter_policy = json.loads(subscriber.get('FilterPolicy', '{}'))60                    message_attributes = get_message_attributes(req_data)61                    if check_filter_policy(filter_policy, message_attributes):62                        if subscriber['Protocol'] == 'sqs':63                            endpoint = subscriber['Endpoint']64                            if 'sqs_queue_url' in subscriber:65                                queue_url = subscriber.get('sqs_queue_url')66                            elif '://' in endpoint:67                                queue_url = endpoint68                            else:69                                queue_name = endpoint.split(':')[5]70                                queue_url = aws_stack.get_sqs_queue_url(queue_name)71                                subscriber['sqs_queue_url'] = queue_url72                            try:73                                sqs_client.send_message(74                                    QueueUrl=queue_url,75                                    MessageBody=create_sns_message_body(subscriber, req_data)76                                )77                            except Exception as exc:78                                return make_error(message=str(exc), code=400)79                        elif subscriber['Protocol'] == 'lambda':80                            lambda_api.process_sns_notification(81                                subscriber['Endpoint'],82                                topic_arn, message, subject=req_data.get('Subject', [None])[0]83                            )84                        elif subscriber['Protocol'] in ['http', 'https']:85                            try:86                                message_body = create_sns_message_body(subscriber, req_data)87                            except Exception as exc:88                                return make_error(message=str(exc), code=400)89                            requests.post(90                                subscriber['Endpoint'],91                                headers={92                                    'Content-Type': 'text/plain',93                                    'x-amz-sns-message-type': 'Notification'94                                },95                                data=message_body96                            )97                        else:98                            LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])99                # return response here because we do not want the request to be forwarded to SNS100                return make_response(req_action)101        return True102    def return_response(self, method, path, data, headers, response):103        # This method is executed by the proxy after we've already received a104        # response from the backend, hence we can utilize the "response" variable here105        if method == 'POST' and path == '/':106            req_data = urlparse.parse_qs(to_str(data))107            req_action = req_data['Action'][0]108            if req_action == 'Subscribe' and response.status_code < 400:109                response_data = xmltodict.parse(response.content)110                topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]111                sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']112                do_subscribe(topic_arn, req_data['Endpoint'][0], req_data['Protocol'][0], sub_arn)113# instantiate listener114UPDATE_SNS = ProxyListenerSNS()115def do_create_topic(topic_arn):116    if topic_arn not in SNS_SUBSCRIPTIONS:117        SNS_SUBSCRIPTIONS[topic_arn] = []118def do_subscribe(topic_arn, endpoint, protocol, subscription_arn):119    subscription = {120        # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html121        'TopicArn': topic_arn,122        'Endpoint': endpoint,123        'Protocol': protocol,124        'SubscriptionArn': subscription_arn,125        'RawMessageDelivery': 'false'126    }127    SNS_SUBSCRIPTIONS[topic_arn].append(subscription)128def do_unsubscribe(subscription_arn):129    for topic_arn in SNS_SUBSCRIPTIONS:130        SNS_SUBSCRIPTIONS[topic_arn] = [131            sub for sub in SNS_SUBSCRIPTIONS[topic_arn]132            if sub['SubscriptionArn'] != subscription_arn133        ]134# ---------------135# HELPER METHODS136# ---------------137def get_topic_by_arn(topic_arn):138    if topic_arn in SNS_SUBSCRIPTIONS:139        return SNS_SUBSCRIPTIONS[topic_arn]140    else:141        return None142def get_subscription_by_arn(sub_arn):143    # TODO maintain separate map instead of traversing all items144    for key, subscriptions in SNS_SUBSCRIPTIONS.items():145        for sub in subscriptions:146            if sub['SubscriptionArn'] == sub_arn:147                return sub148def make_response(op_name, content=''):149    response = Response()150    if not content:151        content = '<MessageId>%s</MessageId>' % short_uid()152    response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">153        <{op_name}Result>154            {content}155        </{op_name}Result>156        <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>157        </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())158    response.status_code = 200159    return response160def make_error(message, code=400, code_string='InvalidParameter'):161    response = Response()162    response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>163        <Type>Sender</Type>164        <Code>{code_string}</Code>165        <Message>{message}</Message>166        </Error><RequestId>{req_id}</RequestId>167        </ErrorResponse>""".format(message=message, code_string=code_string, req_id=short_uid())168    response.status_code = code169    return response170def create_sns_message_body(subscriber, req_data):171    message = req_data['Message'][0]172    subject = req_data.get('Subject', [None])[0]173    protocol = subscriber['Protocol']174    if subscriber['RawMessageDelivery'] == 'true':175        return message176    if req_data.get('MessageStructure') == ['json']:177        message = json.loads(message)178        try:179            message = message.get(protocol, message['default'])180        except KeyError:181            raise Exception("Unable to find 'default' key in message payload")182    data = {}183    data['MessageId'] = str(uuid.uuid4())184    data['Type'] = 'Notification'185    data['Message'] = message186    data['TopicArn'] = subscriber['TopicArn']187    if subject is not None:188        data['Subject'] = subject189    attributes = get_message_attributes(req_data)190    if attributes:191        data['MessageAttributes'] = attributes192    return json.dumps(data)193def get_message_attributes(req_data):194    attributes = {}195    x = 1196    while True:197        name = req_data.get('MessageAttributes.entry.' + str(x) + '.Name', [None])[0]198        if name is not None:199            attribute = {}200            attribute['Type'] = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.DataType', [None])[0]201            string_value = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.StringValue', [None])[0]202            binary_value = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.BinaryValue', [None])[0]203            if string_value is not None:204                attribute['Value'] = string_value205            elif binary_value is not None:206                attribute['Value'] = binary_value207            attributes[name] = attribute208            x += 1209        else:210            break211    return attributes212def is_number(x):213    try:214        float(x)215        return True216    except ValueError:217        return False218def evaluate_numeric_condition(conditions, value):219    if not is_number(value):220        return False221    for i in range(0, len(conditions), 2):222        operator = conditions[i]223        operand = conditions[i + 1]224        if operator == '=':225            if value != operand:226                return False227        elif operator == '>':228            if value <= operand:229                return False230        elif operator == '<':231            if value >= operand:232                return False233        elif operator == '>=':234            if value < operand:235                return False236        elif operator == '<=':237            if value > operand:238                return False239    return True240def evaluate_condition(value, condition):241    if type(condition) is not dict:242        return value == condition243    elif condition.get('anything-but'):244        return value not in condition.get('anything-but')245    elif condition.get('prefix'):246        prefix = condition.get('prefix')247        return value.startswith(prefix)248    elif condition.get('numeric'):249        return evaluate_numeric_condition(condition.get('numeric'), value)250    return False251def evaluate_filter_policy_conditions(conditions, attribute):252    if type(conditions) is not list:253        conditions = [conditions]254    if attribute['Type'] == 'String.Array':255        values = ast.literal_eval(attribute['Value'])256        for value in values:257            for condition in conditions:258                if evaluate_condition(value, condition):259                    return True260    else:261        for condition in conditions:262            if evaluate_condition(attribute['Value'], condition):263                return True264    return False265def check_filter_policy(filter_policy, message_attributes):266    if not filter_policy:267        return True268    for criteria in filter_policy:269        conditions = filter_policy.get(criteria)270        attribute = message_attributes.get(criteria)271        if attribute is None:272            return False273        if evaluate_filter_policy_conditions(conditions, attribute) is False:274            return False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
