How to use test_sns_event method in localstack

Best Python code snippet using localstack_python

blog_trans_bot.py

Source:blog_trans_bot.py Github

copy

Full Screen

1#!/usr/bin/env python32# -*- encoding: utf-8 -*-3# vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab4from datetime import datetime5import time6import logging7import io8import os9import json10import boto311import arrow12from newspaper import Article13LOGGER = logging.getLogger()14if len(LOGGER.handlers) > 0:15 # The Lambda environment pre-configures a handler logging to stderr.16 # If a handler is already configured, `.basicConfig` does not execute.17 # Thus we set the level directly.18 LOGGER.setLevel(logging.INFO)19else:20 logging.basicConfig(level=logging.INFO)21DRY_RUN = True if 'true' == os.getenv('DRY_RUN', 'true') else False22AWS_REGION = os.getenv('REGION_NAME', 'us-east-1')23S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')24S3_OBJ_KEY_PREFIX = os.getenv('S3_OBJ_KEY_PREFIX', 'posts')25EMAIL_FROM_ADDRESS = os.getenv('EMAIL_FROM_ADDRESS')26EMAIL_TO_ADDRESSES = os.getenv('EMAIL_TO_ADDRESSES')27EMAIL_TO_ADDRESSES = [e.strip() for e in EMAIL_TO_ADDRESSES.split(',')]28TRANS_DEST_LANG = os.getenv('TRANS_DEST_LANG', 'ko')29MAX_SINGLE_TEXT_SIZE = 15*120430TRANS_CLIENT = None31def fwrite_s3(s3_client, doc, s3_bucket, s3_obj_key):32 output = io.StringIO()33 output.write(doc)34 ret = s3_client.put_object(Body=output.getvalue(),35 Bucket=s3_bucket,36 Key=s3_obj_key)37 output.close()38 try:39 status_code = ret['ResponseMetadata']['HTTPStatusCode']40 return (200 == status_code)41 except Exception as ex:42 return False43def gen_html(elem):44 HTML_FORMAT = '''<!DOCTYPE html>45<html>46<head>47<style>48table {{49 font-family: arial, sans-serif;50 border-collapse: collapse;51 width: 100%;52}}53td, th {{54 border: 1px solid #dddddd;55 text-align: left;56 padding: 8px;57}}58tr:nth-child(even) {{59 background-color: #dddddd;60}}61</style>62</head>63<body>64<h2>{title}</h2>65<table>66 <tr>67 <th>key</th>68 <th>value</th>69 </tr>70 <tr>71 <td>doc_id</th>72 <td>{doc_id}</td>73 </tr>74 <tr>75 <td>link</th>76 <td>{link}</td>77 </tr>78 <tr>79 <td>pub_date</th>80 <td>{pub_date}</td>81 </tr>82 <tr>83 <td>section</th>84 <td>{section}</td>85 </tr>86 <tr>87 <td>title_{lang}</th>88 <td>{title_trans}</td>89 </tr>90 <tr>91 <td>body_{lang}</th>92 <td>{body_trans}</td>93 </tr>94 <tr>95 <td>tags</th>96 <td>{tags}</td>97 </tr>98</table>99</body>100</html>'''101 html_doc = HTML_FORMAT.format(title=elem['title'],102 doc_id=elem['doc_id'],103 link=elem['link'],104 pub_date=elem['pub_date'],105 section=elem['section'],106 title_trans=elem['title_trans'],107 body_trans='<br/>'.join([e for e in elem['body_trans']]),108 tags=elem['tags'],109 lang=elem['lang'])110 return html_doc111def send_email(ses_client, from_addr, to_addrs, subject, html_body):112 ret = ses_client.send_email(Destination={'ToAddresses': to_addrs},113 Message={'Body': {114 'Html': {115 'Charset': 'UTF-8',116 'Data': html_body117 }118 },119 'Subject': {120 'Charset': 'UTF-8',121 'Data': subject122 }123 },124 Source=from_addr125 )126 return ret127def get_or_create_translator(region_name):128 global TRANS_CLIENT129 if not TRANS_CLIENT:130 TRANS_CLIENT = boto3.client('translate', region_name=region_name)131 assert TRANS_CLIENT132 return TRANS_CLIENT133def translate(translator, text, src='en', dest='ko'):134 trans_res = translator.translate_text(Text=text,135 SourceLanguageCode=src, TargetLanguageCode=dest)136 trans_text = trans_res['TranslatedText'] if 200 == trans_res['ResponseMetadata']['HTTPStatusCode'] else None137 return trans_text138def lambda_handler(event, context):139 LOGGER.debug('receive SNS message')140 s3_client = boto3.client('s3', region_name=AWS_REGION)141 ses_client = boto3.client('ses', region_name=AWS_REGION)142 for record in event['Records']:143 msg = json.loads(record['Sns']['Message'])144 LOGGER.debug('message: %s' % json.dumps(msg))145 doc_id = msg['id']146 url = msg['link']147 article = Article(url)148 article.download()149 article.parse()150 meta_data = article.meta_data151 section = meta_data['article']['section']152 tag = meta_data['article']['tag']153 published_time = meta_data['article']['published_time']154 title, body_text = article.title, article.text155 #XX: https://py-googletrans.readthedocs.io/en/latest/156 assert len(body_text) < MAX_SINGLE_TEXT_SIZE157 translator = get_or_create_translator(region_name=AWS_REGION)158 trans_title = translate(translator, title, dest=TRANS_DEST_LANG)159 sentences = [e for e in body_text.split('\n') if e]160 trans_body_texts = []161 for sentence in sentences:162 trans_sentence = translate(translator, sentence, dest=TRANS_DEST_LANG)163 trans_body_texts.append(trans_sentence)164 doc = {165 'doc_id': doc_id,166 'link': url,167 'lang': TRANS_DEST_LANG,168 'pub_date': published_time,169 'section': section,170 'title': title,171 'title_trans': trans_title,172 'body_trans': trans_body_texts,173 'tags': tag174 }175 html = gen_html(doc)176 if not DRY_RUN:177 subject = '''[translated] {title}'''.format(title=doc['title'])178 send_email(ses_client, EMAIL_FROM_ADDRESS, EMAIL_TO_ADDRESSES, subject, html)179 s3_obj_key = '{}/{}-{}.html'.format(S3_OBJ_KEY_PREFIX,180 arrow.get(published_time).format('YYYYMMDD'), doc['doc_id'])181 fwrite_s3(s3_client, html, S3_BUCKET_NAME, s3_obj_key)182 LOGGER.debug('done')183if __name__ == '__main__':184 test_sns_event = {185 "Records": [186 {187 "EventSource": "aws:sns",188 "EventVersion": "1.0",189 "EventSubscriptionArn": "arn:aws:sns:us-east-1:{{{accountId}}}:ExampleTopic",190 "Sns": {191 "Type": "Notification",192 "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",193 "TopicArn": "arn:aws:sns:us-east-1:123456789012:ExampleTopic",194 "Subject": "example subject",195 "Message": "example message",196 "Timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ'),197 "SignatureVersion": "1",198 "Signature": "EXAMPLE",199 "SigningCertUrl": "EXAMPLE",200 "UnsubscribeUrl": "EXAMPLE",201 "MessageAttributes": {202 "Test": {203 "Type": "String",204 "Value": "TestString"205 },206 "TestBinary": {207 "Type": "Binary",208 "Value": "TestBinary"209 }210 }211 }212 }213 ]214 }215 msg_body = {216 "id": "6da2a3be3378d3f1",217 "link": "https://aws.amazon.com/blogs/aws/new-redis-6-compatibility-for-amazon-elasticache/",218 "pub_date": "2020-10-07T14:50:59-07:00"219 }220 message = json.dumps(msg_body, ensure_ascii=False)221 test_sns_event['Records'][0]['Sns']['Subject'] = 'blog posts from {topic}'.format(topic='AWS')222 test_sns_event['Records'][0]['Sns']['Message'] = message223 LOGGER.debug(json.dumps(test_sns_event))224 start_t = time.time()225 lambda_handler(test_sns_event, {})226 end_t = time.time()...

Full Screen

Full Screen

test_event_unpack.py

Source:test_event_unpack.py Github

copy

Full Screen

...43 # THEN there should be the correct values44 assert len(s3_artifacts) == elementCount45 for orig, extracted in zip(events['Records'], s3_artifacts):46 assert orig == extracted47def test_sns_event():48 # Given that there is only 1 sns event with 1 s3 event49 s3_events = build_S3_event('bogus', 'bogus')50 events = build_SNS_event(s3_events)51 # WHEN the events are separated52 s3_artifacts = list(unpacked_s3_events(events))53 # THEN there should be just one value54 assert len(s3_artifacts) == 155 assert s3_events['Records'][0] == s3_artifacts[0]56def test_sns_events():57 # Given that there is 1 sns event with multiple s3 events58 elementCount = 359 s3_events = build_S3_event('bogus', 'bogus', elementCount)60 events = build_SNS_event(s3_events)61 # WHEN the events are separated...

Full Screen

Full Screen

test_sns.py

Source:test_sns.py Github

copy

Full Screen

1from aws_lambda_typing.events import SNSEvent2def test_sns_event() -> None:3 event: SNSEvent = {4 "Records": [5 {6 "EventVersion": "1.0",7 "EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", # noqa: E5018 "EventSource": "aws:sns",9 "Sns": {10 "SignatureVersion": "1",11 "Timestamp": "2019-01-02T12:45:07.000Z",12 "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", # noqa: E50113 "SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", # noqa: E50114 "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",15 "Message": "Hello from SNS!",16 "MessageAttributes": {...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful