How to use sns_topic_arn method in localstack

Best Python code snippet using localstack_python

func.py

Source:func.py Github

copy

Full Screen

1###This script works with API gateway, SNS, SQS, DynamoDB2import json3import decimal4from boto3.dynamodb.conditions import Attr5import botocore6import json7import os8import logging9import time10import boto311import re12from collections import namedtuple13from boto3.dynamodb.conditions import Key as Key_boto314from botocore.exceptions import ClientError15import sys16from assets_utils.assets_utils import deserialize_to_json17from assets_utils.assets_utils import *18from assets_utils import assets_utils19import assets_utils.simplejson as simplejson20from decimal import Decimal21def lambda_handler(event, context):22 print('## ENVIRONMENT VARIABLES')23 print(os.environ)24 print('## EVENT')25 print(event)26 print('###CONTEXT')27 print(context)28 try:29 api_id = event["requestContext"]["apiId"]30 api = True31 except:32 api = False33 34 # Defining variables35 logger = logging.getLogger()36 AWS_REGION = "eu-central-1"37 sns_client = boto3.client('sns', region_name=AWS_REGION)38 dynamodb = boto3.client('dynamodb')39 sqs = boto3.client('sqs')40 ###ARN of the SNS to send notifications to41 sns_topic_arn = os.environ['sns_topic_arn']42 ### DB table to insert data into43 db_table_name = 'testing-assets-table-tf'44 ### ARN of the SQS to send messages to45 queue_url = os.environ['sqs_url']46 path = event['path']47 message = "Error"48 subject = "API" + event['path'] 49 50 if api: #payload from api gateway, insert in dynamo db51 print(event)52 53 try:54 events=json.loads(event["body"], parse_float=Decimal)55 except:56 pass57 if event['resource'] == '/companies' and event['httpMethod'] == "GET":58 pk = "null"59 sk = "/companies/"60 61 subject = "Lambda queried data from DynamoDb table"62 try:63 dbresponse = dynamodb.query(64 TableName=db_table_name,65 KeyConditionExpression='parent = :parent AND begins_with ( uri , :uri )',66 ExpressionAttributeValues={67 ':parent': {'S': pk},68 ':uri': {'S': sk}69 }70 )71 if not dbresponse['Items']:72 assets_utils.publish_message(sns_topic_arn, assets_utils.answer(404, message, path), subject)73 return assets_utils.answer(404, message, path)74 except BaseException as e:75 print(e) 76 return assets_utils.answer(400, e, path)77 78 #We setup the subject of the message for sns topic79 message=deserialize_to_json(dbresponse['Items'])80 81 #We publish message to sns topic 82 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, "message"), subject)83 return assets_utils.answer(200, message)84 elif event['resource'] == '/companies/{id}' and event['httpMethod'] == "GET":85 86 try:87 pk = "null"88 sk = event["path"]89 subject = f"Lambda queried data from DynamoDb {db_table_name} table"90 key_item={91 "parent": {92 'S': pk93 },94 "uri": {95 'S': sk96 }97 }98 dbresponse = dynamodb.get_item(TableName=db_table_name,Key=key_item)99 if not dbresponse['Item']:100 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)101 return assets_utils.answer(404, message)102 except KeyError as e:103 print(e) 104 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)105 return assets_utils.answer(400, message, path)106 message=deserialize_to_json(dbresponse['Item'])107 return assets_utils.answer(200,message)108 elif event['resource'] == '/companies/{id}' and event['httpMethod'] == "PUT":109 try:110 events=json.loads(event["body"], parse_float=Decimal)111 pk = "null"112 sk = event["path"]113 subject = f"Lambda queried data from DynamoDb {db_table_name} table"114 key_item={115 "parent": {116 'S': pk117 },118 "uri": {119 'S': sk120 }121 }122 dbresponse = dynamodb.get_item(TableName=db_table_name,Key=key_item)123 if not dbresponse['Item']:124 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)125 return assets_utils.answer(404, message, path)126 except KeyError as e:127 print(e) 128 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)129 return assets_utils.answer(404, message, path)130 try:131 key_item={132 "parent": "null",133 "uri": "/companies/" + event["pathParameters"]["id"]134 }135 #new_data= ddb_data(event["body"])136 if type(json.loads(event["body"])) is dict:137 dbresponse = {}138 for key in events:139 print(key)140 if type(events['parent']) is str or type(events[key]) is bool:141 if key == "parent" or key == "uri":142 pass143 else:144 value = events[key]145 database_response = assets_utils.dynamo_update_item(db_table_name, AWS_REGION, key_item, key, value)146 print(database_response)147 dbresponse.update(database_response)148 elif type(events[key]) is dict:149 for key2 in events[key]:150 if type(events[key][key2]) is str:151 value = events[key][key2]152 database_response=assets_utils.update_map(db_table_name, AWS_REGION, key_item, key, key2, value)153 dbresponse.update(database_response)154 elif type(events[key][key2]) is dict:155 print(key2)156 for key3 in events[key][key2]:157 value = events[key][key2][key3]158 if isinstance(value, float):159 #replace_num_to_decimal(value)160 print(value)161 database_response=update_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, value)162 dbresponse.update(database_response)163 else:164 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)165 return assets_utils.answer(400, message, path)166 else:167 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)168 return assets_utils.answer(400, message, path)169 else:170 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)171 return assets_utils.answer(400, message, path)172 except KeyError as e:173 print(e) 174 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)175 return assets_utils.answer(400, message, path)176 except botocore.exceptions.ClientError as error:177 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)178 return assets_utils.answer(400, message, path)179 except BaseException as e:180 print(e)181 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)182 return assets_utils.answer(400, message, path)183 184 185 message=" "186 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)187 return assets_utils.answer(201, message)188 189 190 elif event['resource'] == '/companies/{id}' and event['httpMethod'] == "DELETE":191 subject = event['path']192 try:193 if json.loads(event['body'])['deleted'] == True or json.loads(event['body'])['deleted'] == 'true': 194 choice = "true"195 pass196 elif json.loads(event['body'])['deleted'] == False or json.loads(event['body'])['deleted'] == 'false': 197 choice = "false"198 pass199 else:200 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)201 return assets_utils.answer(400, message, path)202 sk = event["path"]203 response = dynamodb.get_item(204 TableName=db_table_name,205 Key={206 'parent': {'S': 'null'},207 'uri': {'S': sk}208 }209 )210 211 print(response) 212 if not response['Item']:213 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)214 return assets_utils.answer(404, message, path)215 216 except BaseException as e:217 print(e)218 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)219 return assets_utils.answer(400,message, path)220 221 subject = f"Lambda updated data in DynamoDb {db_table_name} table"222 223 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)224 table = dynamodb.Table(db_table_name)225 try: 226 dbresponse = table.update_item(227 Key={228 "parent": "null",229 "uri": event["path"]230 },231 UpdateExpression="set #deleted = :d",232 ExpressionAttributeNames = {233 '#deleted': 'deleted'234 },235 ExpressionAttributeValues={236 ':d': choice237 },238 ReturnValues="UPDATED_NEW"239 )240 241 message = " "242 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(204, message), subject)243 return assets_utils.answer(204, message)244 except TypeError as e:245 print(e)246 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)247 return assets_utils.answer(400, message, path)248 except KeyError as e:249 print(e) 250 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)251 return assets_utils.answer(400, message, path)252 except BaseException as e:253 print(e) 254 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)255 return assets_utils.answer(400,message, path)256 257 elif event['resource'] == '/companies' and event['httpMethod'] == "POST":258 #We setup the subject of the message for sns topic259 subject = f"Lambda posted data to DynamoDb {db_table_name} table"260 #This will instert in the database 261 try:262 add_id = {"uri": event['resource'] + "/" + event["requestContext"]["requestId"]}263 body=json.loads(event['body'])264 body.update(add_id)265 key_item = serialize_to_dynamodb(body)266 dbresponse = dynamodb.put_item(TableName=db_table_name, Item=key_item)267 print(dbresponse)268 message = deserialize_to_json(key_item)269 assets_utils.publish_message(sns_topic_arn, assets_utils.answer(200, message), subject)270 return assets_utils.answer(201, " ")271 except botocore.exceptions.ClientError as e:272 print(f"1: {e}")273 assets_utils.publish_message(sns_topic_arn, message, subject)274 return assets_utils.answer(400, message, path) 275 except botocore.exceptions.ParamValidationError as e:276 print(e)277 assets_utils.publish_message(sns_topic_arn, message, subject)278 return assets_utils.answer(400, message, path)279 except BaseException as e:280 print(e)281 assets_utils.publish_message(sns_topic_arn, message, subject)282 return assets_utils.answer(400, message, path)283 284 elif event['resource'] == '/locations' and event['httpMethod'] == "POST":285 try:286 pk=json.loads(event["body"])["parent"]287 for key in json.loads(event["body"]):288 if type(events[key]) is str:289 if key == "parent" or key == "uri":290 response = dynamodb.get_item(291 TableName=db_table_name,292 Key={293 'parent': {'S': 'null'},294 'uri': {'S': json.loads(event["body"])["parent"]}295 }296 )297 try:298 if not response['Item']:299 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)300 return assets_utils.answer(404, message, path)301 else:302 break303 except KeyError as e:304 print(e) 305 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)306 return assets_utils.answer(404, message, path)307 308 elif key != "parent" or key != "uri": 309 break310 else:311 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)312 return assets_utils.answer(404, message, path)313 except BaseException as e:314 print(e)315 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)316 return assets_utils.answer(400, message, path)317 318 #We setup the subject of the message for sns topic319 subject = f"Lambda posted data to DynamoDb {db_table_name} table"320 try:321 #This will instert in the database 322 add_id = {"uri": event['resource'] + "/" + event["requestContext"]["requestId"]}323 body=json.loads(event['body'])324 body.update(add_id)325 key_item = serialize_to_dynamodb(body)326 dbresponse = dynamodb.put_item(TableName=db_table_name, Item=key_item)327 print(dbresponse)328 message = deserialize_to_json(key_item)329 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message, path), subject)330 return assets_utils.answer(201, " ")331 except BaseException as e:332 print(e) 333 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)334 return assets_utils.answer(400, message, path)335 336 337 if event['resource'] == '/locations' and event['httpMethod'] == "GET":338 #dynamodb_resource = boto3.resource('dynamodb', region_name=AWS_REGION)339 #table = dynamodb_resource.Table(db_table_name)340 #We setup the subject of the message for sns topic341 try:342 subject = "Lambda queried data from DynamoDb table"343 pattern = event['resource']344 pk = "companies"345 sk = "/locations/"346 dbresponse = assets_utils.get_companies(db_table_name, pattern, pk, sk)347 if not dbresponse:348 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)349 return assets_utils.answer(404, message, path)350 except BaseException:351 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)352 return assets_utils.answer(400, message, path)353 message=deserialize_to_json(dbresponse)354 #We publish message to sns topic 355 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)356 return assets_utils.answer(200, message)357 358 elif event['resource'] == '/locations/{id}' and event['httpMethod'] == "DELETE":359 try:360 if json.loads(event['body'])['deleted'] == True or json.loads(event['body'])['deleted'] == 'true': 361 choice = "true"362 pass363 elif json.loads(event['body'])['deleted'] == False or json.loads(event['body'])['deleted'] == 'false': 364 choice = "false"365 pass366 else:367 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)368 return assets_utils.answer(400, message, path)369 for key in json.loads(event["body"]):370 if type(events["parent"]) is str:371 if key == "parent" or key == "uri":372 response = dynamodb.get_item(373 TableName=db_table_name,374 Key={375 'parent': {'S': json.loads(event["body"])["parent"]},376 'uri': {'S': event["path"]}377 }378 )379 try:380 if not response['Item']:381 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)382 return assets_utils.answer(404, message, path)383 except BaseException:384 print(e)385 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)386 return assets_utils.answer(404, message, path)387 388 elif key != "parent" or key != "uri": 389 break390 else:391 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)392 return assets_utils.answer(400, message, path)393 except KeyError as e:394 print(e)395 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)396 return assets_utils.answer(400, message, path)397 except BaseException as e:398 print(e) 399 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)400 return assets_utils.answer(400, message, path)401 subject = f"Lambda updated data in DynamoDb {db_table_name} table"402 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)403 table = dynamodb.Table(db_table_name)404 dbresponse = table.update_item(405 Key={406 "parent": "/companies/" + json.loads(event["body"])["parent"].split('/')[2] ,407 "uri": event["path"]408 },409 UpdateExpression="set #deleted = :d",410 ExpressionAttributeNames = {411 '#deleted': 'deleted'412 },413 ExpressionAttributeValues={414 ':d': json.loads(event["body"])["deleted"]415 },416 ReturnValues="UPDATED_NEW"417 )418 419 message = " "420 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(204, message), subject)421 return assets_utils.answer(204, message)422 423 elif event['resource'] == '/locations/{id}' and event['httpMethod'] == "PUT":424 try:425 events=json.loads(event["body"], parse_float=Decimal)426 print(events)427 for key in events:428 if type(events['parent']) is str:429 response = dynamodb.get_item(430 TableName=db_table_name,431 Key={432 'parent': {'S': json.loads(event["body"])["parent"]},433 'uri': {'S': event["path"]}434 }435 )436 print(response)437 try:438 if not response['Item']:439 print("Missing Item")440 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)441 return assets_utils.answer(404, message, path)442 except KeyError as e:443 print(e)444 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)445 return assets_utils.answer(404, message, path)446 except KeyError as e:447 print(e)448 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)449 return assets_utils.answer(404, message, path)450 except BaseException as e:451 print(e)452 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)453 return assets_utils.answer(404, message, path)454 455 subject = f"Lambda updated data in DynamoDb {db_table_name} table"456 key_item={457 "parent": '/companies/' + json.loads(event["body"])["parent"].split("/")[2],458 "uri": "/locations/" + event["path"].split("/")[2]459 }460 if type(events) is dict:461 dbresponse = {}462 try:463 for key in events:464 if type(events[key]) is str or type(events[key]) is bool:465 if key == "parent" or key == "uri":466 pass467 else:468 value = events[key]469 database_response = assets_utils.dynamo_update_item(db_table_name, AWS_REGION, key_item, key, value)470 dbresponse.update(database_response)471 elif type(events[key]) is dict:472 for key2 in events[key]:473 if type(events[key][key2]) is str:474 value = events[key][key2]475 database_response=assets_utils.update_map(db_table_name, AWS_REGION, key_item, key, key2, value)476 dbresponse.update(database_response)477 elif type(events[key][key2]) is dict:478 for key3 in events[key][key2]:479 value = events[key][key2][key3]480 database_response=assets_utils.update_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, value)481 dbresponse.update(database_response)482 else:483 print("wrong structure1")484 print(type(events[key][key2]))485 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)486 return assets_utils.answer(400, message, path)487 else:488 print("wrong structure2")489 print(type(events[key]))490 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)491 return assets_utils.answer(400, message, path)492 except KeyError as e:493 print(e)494 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)495 return assets_utils.answer(400, message, path)496 message = " "497 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(201, message, path), subject)498 return assets_utils.answer(201, message)499 500 elif event['resource'] == '/locations/{id}' and event['httpMethod'] == "GET":501 try:502 subject = f"Lambda queried data from DynamoDb {db_table_name} table"503 pk = "null"504 sk = "/locations/" + event["pathParameters"]["id"]505 dbresponse = assets_utils.getitem(db_table_name, pk, sk)506 if not dbresponse:507 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)508 return assets_utils.answer(404, message, path)509 except KeyError as e:510 print(e) 511 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)512 return assets_utils.answer(400, message, path)513 except BaseException as e:514 print(e) 515 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)516 return assets_utils.answer(400, message, path)517 message = deserialize_to_json(dbresponse)518 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)519 return assets_utils.answer(200, message)520 521 elif event['resource'] == '/halls' and event['httpMethod'] == "POST":522 try:523 for key in json.loads(event["body"]):524 if type(events['parent']) is str:525 pattern = event['resource']526 pk = "null"527 sk = json.loads(event["body"])["parent"]528 data = assets_utils.getitem(db_table_name, pk, sk)529 print(data)530 try:531 if not data:532 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)533 return assets_utils.answer(404, message, path)534 except BaseException as e:535 print(e)536 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)537 return assets_utils.answer(404, message, path)538 539 else:540 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)541 return assets_utils.answer(400, message, path)542 except KeyError as e:543 print(e) 544 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)545 return assets_utils.answer(400, message, path)546 try: 547 #We setup the subject of the message for sns topic548 subject = f"Lambda posted data to DynamoDb {db_table_name} table"549 #This will instert in the database 550 add_id = {"uri": event['resource'] + "/" + event["requestContext"]["requestId"]}551 body=json.loads(event['body'])552 body.update(add_id)553 key_item = serialize_to_dynamodb(body)554 dbresponse = dynamodb.put_item(TableName=db_table_name, Item=key_item)555 print(dbresponse)556 message = deserialize_to_json(key_item)557 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message, path), subject)558 return assets_utils.answer(201, " ")559 except BaseException as e:560 print(e) 561 return assets_utils.answer(400, message, path)562 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)563 if event['resource'] == '/halls' and event['httpMethod'] == "GET":564 #dynamodb_resource = boto3.resource('dynamodb', region_name=AWS_REGION)565 #table = dynamodb_resource.Table(db_table_name)566 #We setup the subject of the message for sns topic567 try:568 subject = "Lambda queried data from DynamoDb table"569 pattern = event['resource']570 pk = "null"571 sk = "/halls/" 572 dbresponse = assets_utils.get_companies(db_table_name, pattern, pk, sk)573 if not dbresponse:574 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)575 return assets_utils.answer(404, message, path)576 except BaseException as e:577 print(e)578 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)579 return assets_utils.answer(400, message, path)580 581 message = deserialize_to_json(dbresponse)582 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)583 return assets_utils.answer(200, message)584 585 elif event['resource'] == '/halls/{id}' and event['httpMethod'] == "DELETE":586 try:587 if json.loads(event['body'])['deleted'] == True or json.loads(event['body'])['deleted'] == 'true': 588 choice = "true"589 pass590 elif json.loads(event['body'])['deleted'] == False or json.loads(event['body'])['deleted'] == 'false': 591 choice = "false"592 pass593 else:594 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)595 return assets_utils.answer(400, message, path)596 for key in json.loads(event["body"]):597 if type(events['parent']) is str:598 response = dynamodb.get_item(599 TableName=db_table_name,600 Key={601 'parent': {'S': json.loads(event["body"])["parent"]},602 'uri': {'S': event["path"]}603 }604 )605 try:606 if not response['Item']:607 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)608 return assets_utils.answer(404, message, path)609 except BaseException:610 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)611 return assets_utils.answer(404, message, path)612 613 else:614 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)615 return assets_utils.answer(400, message, path)616 except BaseException as e:617 print(e) 618 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)619 return assets_utils.answer(400, message, path)620 subject = f"Lambda updated data in DynamoDb {db_table_name} table"621 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)622 table = dynamodb.Table(db_table_name)623 dbresponse = table.update_item(624 Key={625 "parent": "/locations/" + json.loads(event["body"])["parent"].split('/')[2] ,626 "uri": event["path"]627 },628 UpdateExpression="set #deleted = :d",629 ExpressionAttributeNames = {630 '#deleted': 'deleted'631 },632 ExpressionAttributeValues={633 ':d': json.loads(event["body"])["deleted"]634 },635 ReturnValues="UPDATED_NEW"636 )637 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(204, message), subject)638 return assets_utils.answer(204, message)639 640 elif event['resource'] == '/halls/{id}' and event['httpMethod'] == "GET":641 try:642 subject = f"Lambda queried data from DynamoDb {db_table_name} table"643 pk = "null"644 sk = "/halls/" + event["pathParameters"]["id"]645 dbresponse = assets_utils.getitem(db_table_name, pk, sk)646 if not dbresponse:647 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)648 return assets_utils.answer(404, message, path)649 except BaseException as e:650 print(e)651 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)652 return assets_utils.answer(404, message, path)653 message = deserialize_to_json(dbresponse)654 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)655 return assets_utils.answer(200, message)656 elif event['resource'] == '/halls/{id}' and event['httpMethod'] == "PUT":657 try: 658 events=json.loads(event["body"], parse_float=Decimal)659 print(events)660 pk = events["parent"]661 sk = event["path"]662 for key in events:663 if type(events['parent']) is str:664 response = dynamodb.get_item(665 TableName=db_table_name,666 Key={667 'parent': {'S': pk},668 'uri': {'S': sk}669 }670 )671 print(response)672 try:673 if not response['Item']:674 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)675 return assets_utils.answer(404, message, path)676 else: 677 break678 except BaseException:679 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)680 return assets_utils.answer(400, message, path)681 else:682 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)683 return assets_utils.answer(400, message, path)684 except BaseException as e:685 print(e) 686 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)687 assets_utils.answer(400, message, path)688 try: 689 subject = f"Lambda got items from DynamoDb {db_table_name} table"690 key_item={691 "parent": '/locations/' + json.loads(event["body"])["parent"].split("/")[2],692 "uri": "/halls/" + event["path"].split("/")[2]693 }694 if type(events) is dict:695 dbresponse = {}696 print(events)697 for key in events:698 if type(events[key]) is str or type(events[key]) is bool:699 if key == "parent" or key == "uri":700 pass701 else:702 value = events[key]703 database_response = assets_utils.dynamo_update_item(db_table_name, AWS_REGION, key_item, key, value)704 dbresponse.update(database_response)705 elif type(events[key]) is dict:706 for key2 in events[key]:707 if type(events[key][key2]) is str or type(events[key][key2]) is bool or type(events[key][key2]) is Decimal:708 print(events[key][key2])709 print(key2)710 value = events[key][key2]711 database_response=assets_utils.update_map(db_table_name, AWS_REGION, key_item, key, key2, value)712 dbresponse.update(database_response)713 elif type(events[key][key2]) is dict:714 for key3 in events[key][key2]:715 value = events[key][key2][key3]716 database_response=update_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, value)717 dbresponse.update(database_response)718 else:719 print()720 print("#####Key2")721 print(type(events[key][key2]))722 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)723 return assets_utils.answer(400, message)724 else:725 print("#####Key1")726 print(type(events[key])) 727 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)728 return assets_utils.answer(400, message)729 else:730 print("#####events")731 print(type(events)) 732 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)733 return assets_utils.answer(400, message)734 message = " "735 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(201, message), subject)736 return assets_utils.answer(201, message)737 except BaseException as e:738 print(e) 739 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)740 return assets_utils.answer(400, message, path)741 elif event['resource'] == '/lines' and event['httpMethod'] == "POST":742 try: 743 for key in json.loads(event["body"]):744 if type(events['parent']) is str:745 pattern = event['resource']746 pk = "halls"747 sk = json.loads(event["body"])["parent"]748 data = assets_utils.getitem(db_table_name, pk, sk)749 print(type(data))750 try:751 if not data:752 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)753 return assets_utils.answer(404, message, path)754 except:755 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)756 return assets_utils.answer(404, message, path)757 else:758 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)759 return assets_utils.answer(400, message, path)760 except BaseException as e:761 print(e)762 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)763 return assets_utils.answer(400, message, path)764 765 try:766 #We setup the subject of the message for sns topic767 subject = f"Lambda posted data to DynamoDb {db_table_name} table"768 #This will instert in the database 769 add_id = {"uri": event['resource'] + "/" + event["requestContext"]["requestId"]}770 body=json.loads(event['body'])771 body.update(add_id)772 key_item = serialize_to_dynamodb(body)773 dbresponse = dynamodb.put_item(TableName=db_table_name, Item=key_item)774 print(dbresponse)775 message = deserialize_to_json(key_item)776 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message, path), subject)777 return assets_utils.answer(201, " ")778 except BaseException as e:779 print(e) 780 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)781 return assets_utils.answer(400, message, path)782 if event['resource'] == '/lines' and event['httpMethod'] == "GET":783 #We setup the subject of the message for sns topic784 try:785 subject = "Lambda queried data from DynamoDb table"786 pattern = event['resource']787 pk = "null"788 sk = "/lines/"789 dbresponse = assets_utils.get_companies(db_table_name, pattern, pk, sk)790 if not dbresponse:791 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)792 return assets_utils.answer(404, message, path)793 except BaseException as e:794 print(e)795 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)796 assets_utils.answer(404, message, path)797 message = deserialize_to_json(dbresponse)798 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)799 return assets_utils.answer(200, message)800 elif event['resource'] == '/lines/{id}' and event['httpMethod'] == "DELETE":801 try:802 if json.loads(event['body'])['deleted'] == True or json.loads(event['body'])['deleted'] == 'true': 803 choice = "true"804 pass805 elif json.loads(event['body'])['deleted'] == False or json.loads(event['body'])['deleted'] == 'false': 806 choice = "false"807 pass808 else:809 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)810 return assets_utils.answer(400, message, path)811 for key in json.loads(event["body"]):812 if type(events['parent']) is str:813 response = dynamodb.get_item(814 TableName=db_table_name,815 Key={816 'parent': {'S': json.loads(event["body"])["parent"]},817 'uri': {'S': event["path"]}818 }819 )820 try:821 if not response['Item']:822 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)823 return assets_utils.answer(404, message, path)824 except BaseException:825 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)826 return assets_utils.answer(404, message, path)827 828 else:829 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)830 return assets_utils.answer(400, message, path)831 except BaseException as e:832 print(e) 833 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)834 return assets_utils.answer(400, message, path)835 subject = f"Lambda updated data in DynamoDb {db_table_name} table"836 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)837 table = dynamodb.Table(db_table_name)838 dbresponse = table.update_item(839 Key={840 "parent": "/halls/" + json.loads(event["body"])["parent"].split('/')[2] ,841 "uri": event["path"]842 },843 UpdateExpression="set #deleted = :d",844 ExpressionAttributeNames = {845 '#deleted': 'deleted'846 },847 ExpressionAttributeValues={848 ':d': json.loads(event["body"])["deleted"]849 },850 ReturnValues="UPDATED_NEW"851 )852 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(204, message), subject)853 return assets_utils.answer(204, message)854 855 elif event['resource'] == '/lines/{id}' and event['httpMethod'] == "GET":856 try:857 subject = f"Lambda queried data from DynamoDb {db_table_name} table"858 pk = "null"859 sk = "/lines/" + event["pathParameters"]["id"]860 dbresponse = assets_utils.getitem(db_table_name, pk, sk)861 if not dbresponse:862 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)863 return assets_utils.answer(404, message, path)864 except BaseException as e:865 print(e)866 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)867 return assets_utils.answer(404, message, path)868 message = deserialize_to_json(dbresponse)869 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)870 return assets_utils.answer(200, message)871 elif event['resource'] == '/lines/{id}' and event['httpMethod'] == "PUT":872 try:873 events=json.loads(event["body"], parse_float=Decimal)874 pk = events["parent"]875 sk = event["path"]876 for key in json.loads(event["body"]):877 if type(events['parent']) is str:878 response = dynamodb.get_item(879 TableName=db_table_name,880 Key={881 'parent': {'S': pk},882 'uri': {'S': sk}883 }884 )885 print(response)886 try:887 if not response['Item']:888 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(500, message, path), subject)889 return assets_utils.answer(400, message, path)890 except KeyError as e:891 print(e)892 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)893 return assets_utils.answer(404, message, path)894 else:895 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)896 return assets_utils.answer(400, message, path)897 except KeyError as e:898 print(e)899 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)900 return assets_utils.answer(404, message, path)901 except BaseException as e:902 print(e)903 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)904 return assets_utils.answer(400, message, path)905 try:906 subject = f"Lambda got items from DynamoDb {db_table_name} table"907 key_item={908 "parent": '/halls/' + json.loads(event["body"])["parent"].split("/")[2],909 "uri": "/lines/" + event["path"].split("/")[2]910 }911 912 if type(json.loads(event["body"])) is dict:913 dbresponse = {}914 for key in json.loads(event["body"]):915 if type(events[key]) is str or type(events[key]) is bool:916 if key == "parent" or key == "uri":917 pass918 else:919 value = events[key]920 database_response = assets_utils.dynamo_update_item(db_table_name, AWS_REGION, key_item, key, value)921 dbresponse.update(database_response)922 elif type(events[key]) is dict:923 for key2 in events[key]:924 if type(events[key][key2]) is str:925 value = events[key][key2]926 database_response=assets_utils.update_map(db_table_name, AWS_REGION, key_item, key, key2, value)927 dbresponse.update(database_response)928 elif type(events[key][key2]) is dict:929 for key3 in events[key][key2]:930 value = events[key][key2][key3]931 database_response=update_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, value)932 dbresponse.update(database_response)933 else:934 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)935 return assets_utils.answer(400, message, path)936 else:937 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)938 return assets_utils.answer(400, message, path)939 else:940 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)941 return assets_utils.answer(400, message, path)942 message = " "943 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)944 return assets_utils.answer(201, message)945 except BaseException as e:946 print(e)947 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)948 return assets_utils.answer(400, message, path)949 elif event['resource'] == '/machines' and event['httpMethod'] == "POST":950 try: 951 for key in json.loads(event["body"]):952 if type(events['parent']) is str:953 pattern = event['resource']954 pk = "halls"955 sk = json.loads(event["body"])["parent"]956 data = assets_utils.getitem(db_table_name, pk, sk)957 print(type(data))958 try:959 if not data:960 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)961 return assets_utils.answer(404, message, path)962 except:963 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)964 return assets_utils.answer(404, message, path)965 else:966 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)967 return assets_utils.answer(400, message, path)968 except BaseException as e:969 print(e)970 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)971 return assets_utils.answer(400, message, path)972 try:973 #We setup the subject of the message for sns topic974 subject = f"Lambda posted data to DynamoDb {db_table_name} table"975 #This will instert in the database 976 add_id = {"uri": event['resource'] + "/" + event["requestContext"]["requestId"]}977 body=json.loads(event['body'])978 body.update(add_id)979 key_item = serialize_to_dynamodb(body)980 dbresponse = dynamodb.put_item(TableName=db_table_name, Item=key_item)981 print(dbresponse)982 message = deserialize_to_json(key_item)983 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message, path), subject)984 return assets_utils.answer(201, " ")985 except BaseException as e:986 print(e) 987 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)988 return assets_utils.answer(400, message, path)989 if event['resource'] == '/machines' and event['httpMethod'] == "GET":990 try:991 #We setup the subject of the message for sns topic992 subject = "Lambda queried data from DynamoDb table"993 pattern = event['resource']994 pk = "null"995 sk = "/machines/"996 dbresponse = assets_utils.get_companies(db_table_name, pattern, pk, sk)997 if not dbresponse:998 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)999 return assets_utils.answer(404, message, path)1000 except BaseException as e:1001 print(e)1002 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1003 assets_utils.answer(404, message, path)1004 message = deserialize_to_json(dbresponse)1005 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)1006 return assets_utils.answer(200, message)1007 1008 elif event['resource'] == '/machines/{id}' and event['httpMethod'] == "DELETE":1009 try:1010 if json.loads(event['body'])['deleted'] == True or json.loads(event['body'])['deleted'] == 'true': 1011 choice = "true"1012 pass1013 elif json.loads(event['body'])['deleted'] == False or json.loads(event['body'])['deleted'] == 'false': 1014 choice = "false"1015 pass1016 else:1017 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1018 return assets_utils.answer(400, message, path)1019 for key in json.loads(event["body"]):1020 if type(events['parent']) is str:1021 response = dynamodb.get_item(1022 TableName=db_table_name,1023 Key={1024 'parent': {'S': json.loads(event["body"])["parent"]},1025 'uri': {'S': event["path"]}1026 }1027 )1028 try:1029 if not response['Item']:1030 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1031 return assets_utils.answer(404, message, path)1032 except BaseException:1033 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1034 return assets_utils.answer(404, message, path)1035 1036 else:1037 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1038 return assets_utils.answer(400, message, path)1039 except BaseException as e:1040 print(e) 1041 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1042 return assets_utils.answer(400, message, path)1043 subject = f"Lambda updated data in DynamoDb {db_table_name} table"1044 dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)1045 table = dynamodb.Table(db_table_name)1046 dbresponse = table.update_item(1047 Key={1048 "parent": "/lines/" + json.loads(event["body"])["parent"].split('/')[2] ,1049 "uri": event["path"]1050 },1051 UpdateExpression="set #deleted = :d",1052 ExpressionAttributeNames = {1053 '#deleted': 'deleted'1054 },1055 ExpressionAttributeValues={1056 ':d': json.loads(event["body"])["deleted"]1057 },1058 ReturnValues="UPDATED_NEW"1059 )1060 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(204, message), subject)1061 return assets_utils.answer(204, message)1062 1063 elif event['resource'] == '/machines/{id}' and event['httpMethod'] == "GET":1064 try:1065 subject = f"Lambda queried data from DynamoDb {db_table_name} table"1066 pk = "null"1067 sk = "/machines/" + event["pathParameters"]["id"]1068 dbresponse = assets_utils.getitem(db_table_name, pk, sk)1069 if not dbresponse:1070 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1071 return assets_utils.answer(404, message, path)1072 except BaseException as e:1073 print(e)1074 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1075 return assets_utils.answer(404, message, path)1076 message = deserialize_to_json(dbresponse)1077 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)1078 return assets_utils.answer(200, message)1079 elif event['resource'] == '/machines/{id}' and event['httpMethod'] == "PUT":1080 try:1081 events=json.loads(event["body"], parse_float=Decimal)1082 pk = events["parent"]1083 sk = event["path"]1084 for key in json.loads(event["body"]):1085 if type(events[key]) is str:1086 if key == "parent" or key == "uri":1087 response = dynamodb.get_item(1088 TableName=db_table_name,1089 Key={1090 'parent': {'S': pk},1091 'uri': {'S': sk}1092 }1093 )1094 try:1095 if not response['Item']:1096 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(404, message, path), subject)1097 return assets_utils.answer(404, message, path)1098 else: 1099 break1100 except BaseException as e:1101 print(e)1102 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1103 return assets_utils.answer(400, message, path)1104 else:1105 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1106 return assets_utils.answer(400, message, path)1107 except BaseException as e:1108 print(e) 1109 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1110 assets_utils.answer(400, message, path)1111 try:1112 subject = f"Lambda got items from DynamoDb {db_table_name} table"1113 key_item={1114 "parent": '/lines/' + json.loads(event["body"])["parent"].split("/")[2],1115 "uri": "/machines/" + event["path"].split("/")[2]1116 }1117 1118 if type(json.loads(event["body"])) is dict:1119 dbresponse = {}1120 for key in json.loads(event["body"]):1121 if type(events[key]) is str or type(events[key]) is bool:1122 if key == "parent" or key == "uri":1123 pass1124 else:1125 print(key)1126 value = events[key]1127 database_response = assets_utils.dynamo_update_item(db_table_name, AWS_REGION, key_item, key, value)1128 dbresponse.update(database_response)1129 elif type(events[key]) is dict:1130 for key2 in events[key]:1131 if type(events[key][key2]) is str:1132 print(key2)1133 value = events[key][key2]1134 database_response=assets_utils.update_map(db_table_name, AWS_REGION, key_item, key, key2, value)1135 dbresponse.update(database_response)1136 elif type(events[key][key2]) is dict:1137 for key3 in events[key][key2]:1138 if events[key][key2][key3] is str:1139 value = events[key][key2][key3]1140 database_response=update_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, value)1141 dbresponse.update(database_response)1142 else:1143 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1144 assets_utils.answer(400, message, path)1145 elif type(events[key][key2]) is list:1146 for x in range(len(events[key][key2])):1147 for key3 in events[key][key2][x]:1148 print(f"set {key}.{key2}[{x}].#{key3} = :d")1149 value = events[key][key2][x][key3]1150 database_response=assets_utils.update_list_maps(db_table_name, AWS_REGION, key_item, key, key2, key3, x, value)1151 dbresponse.update(database_response)1152 else:1153 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1154 return assets_utils.answer(400, message, path)1155 else:1156 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1157 return assets_utils.answer(400, message, path)1158 else:1159 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1160 return assets_utils.answer(400, message, path)1161 message = " "1162 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(200, message), subject)1163 return assets_utils.answer(201, message)1164 except BaseException as e:1165 print(e) 1166 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, message, path), subject)1167 return assets_utils.answer(400, message, path)1168 else:1169 #This lambda suports just 2 paths from lambda1170 response = "Please call this lambda from known paths in order to access it"1171 #Send Mesage to SNS1172 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, response), subject)1173 return assets_utils.answer(400, response)1174 else:1175 #If we do not call this lambda from RestAPI1176 response = {1177 '@@ERROR@': {1178 '########Error:':'Please call this lambda from an RestAPI'1179 }1180 }1181 subject = 'Error, Wrong Call, check lambda'1182 #Send Mesage to SNS1183 assets_utils.publish_message(sns_topic_arn,assets_utils.answer(400, response), subject)...

Full Screen

Full Screen

setup-s3-notify-through-sns-publish-to-sqs.py

Source:setup-s3-notify-through-sns-publish-to-sqs.py Github

copy

Full Screen

1#!/usr/bin/python2# -*- coding: utf-8 -*-3# S3 Bucket Notification to SQS/SNS on Object Creation4import boto3, json5REGION_NAME = "ap-south-1"6s3BucketName="trawler-urls-bucket"7ami_id=""8EC2_URL = "https://" + REGION_NAME + ".ec2.amazonaws.com"9AWS_AUTO_SCALING_URL = "https://autoscaling." + REGION_NAME +".amazonaws.com"10# email_address = user@example.com11sns_topic_name = "s3-object-created-" + s3BucketName.replace (" ", "_")12sqs_queue_name = "trawlerDataProcessor"13## Create the `S3` bucket14s3_client = boto3.client('s3',REGION_NAME)15s3_client.create_bucket(Bucket= s3BucketName , CreateBucketConfiguration = { 'LocationConstraint': REGION_NAME })16## Lets create the SNS Topic17sns_client = boto3.client('sns', REGION_NAME)18sns_topic_arn = sns_client.create_topic( Name = sns_topic_name)['TopicArn']19### Allow S3 to publish to the SNS topic for activity in the specific S3 bucket.20#### Policy to allow S3 to publish to SNS Topic21s3PubToSnsPolicy = { "Version": "2008-10-17",22 "Id": "Policy-S3-publish-to-sns",23 "Statement": [{24 "Effect": "Allow",25 "Principal": { "AWS" : "*" },26 "Action": [ "sns:Publish" ],27 "Resource": sns_topic_arn,28 "Condition": {29 "ArnLike": {30 "aws:SourceArn": "arn:aws:s3:*:*:"+ s3BucketName + ""31 }32 }33 }]34}35sns_client.set_topic_attributes( TopicArn = sns_topic_arn , 36 AttributeName = "Policy" , 37 AttributeValue = json.dumps(s3PubToSnsPolicy)38 )39### Set a nice little diplay name for the topic40sns_client.set_topic_attributes( TopicArn = sns_topic_arn , 41 AttributeName = "DisplayName" , 42 AttributeValue = 'Urls2Crawl' 43 )44#### Add a notification to the S3 bucket so that it sends messages to the SNS topic when objects are created (or updated)45bucket_notifications_configuration = { 46 "TopicConfiguration" : { 47 "Events" : [ "s3:ObjectCreated:*" ], 48 "Topic" : sns_topic_arn 49 }50 }51s3_client.put_bucket_notification( Bucket= s3BucketName,52 NotificationConfiguration = bucket_notifications_configuration 53 )54### Subscribe to the SNS Topic for EMail Notification55sns_client.subscribe( TopicArn = sns_topic_arn , Protocol = "email", Endpoint="SOMEUSER@gmail.com" )56aws sns subscribe \57 --topic-arn "$sns_topic_arn" \58 --protocol email \59 --notification-endpoint "$email_address"60##### The above example connects an SNS topic to the S3 bucket notification configuration. Amazon also supports having the bucket notifications go directly to an SQS queue, but I do not recommend it.61##### Instead, send the S3 bucket notification to SNS and have SNS forward it to SQS. 62##### This way, you can easily add other listeners to the SNS topic as desired. You can even have multiple SQS queues subscribed, which is not possible when using a direct notification configuration.63### Create the SQS queue 64sqs_client = boto3.client('sqs', REGION_NAME)65#### Set permissions to allow SNS topic to post to the SQS queue66sqs_policy = {67 "Version":"2012-10-17",68 "Statement":[69 {70 "Effect" : "Allow",71 "Principal" : { "AWS": "*" },72 "Action" : "sqs:SendMessage",73 "Resource" : sqs_queue_arn,74 "Condition" : {75 "ArnEquals" : {76 "aws:SourceArn" : sns_topic_arn77 }78 }79 }80 ]81}82sqs_attributes = { 'DelaySeconds':'3',83 'ReceiveMessageWaitTimeSeconds' : '20',84 'VisibilityTimeout' : '300',85 'MessageRetentionPeriod':'86400',86 'Policy' : json.dumps(sqs_policy)87 }88resp = sqs_client.create_queue( QueueName = sqs_queue_name, Attributes = sqs_attributes )89sqs_queue_url = resp['QueueUrl']90#### Get SQS Queue Attributes91resp = sqs_client.get_queue_attributes( QueueUrl = sqs_queue_url, AttributeNames=[ 'QueueArn' ] )92sqs_queue_arn = resp['Attributes']['QueueArn']93## Subscribe the SQS queue to the SNS topic.94sns_client.subscribe(95 TopicArn = sns_topic_arn,96 Protocol = "sqs",97 Endpoint = sqs_queue_arn98)99### Test SNS publishing to SQS100##### Upload test file to the S3 bucket, which will now generate both the email and a message to the SQS queue.101# aws s3 cp [SOMEFILE] s3://$s3_bucket_name/testfile-02102s3Up = boto3.resource('s3')103s3Up.meta.client.upload_file('SOME_FILE.TXT', s3BucketName, 'hello.txt')104 """105 Function to clean up all the resources106 """107 def cleanAll(resourcesDict=None):108 # Delete S3 Bucket 109 ### All of the keys in a bucket must be deleted before the bucket itself can be deleted:110 111 bucket = s3.Bucket( s3BucketName )112 113 for key in bucket.objects.all():114 key.delete()115 bucket.delete()116 117 # Delete SNS Topic118 sns_client.delete_topic( TopicArn = sns_topic_arn)119 # Delete SQS Topic120 sqs_client.delete_queue( QueueUrl = sqs_queue_url)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful