How to use sns_topic method in localstack

Best Python code snippet using localstack_python

cdk_glue_db_table_infra_stack.py

Source:cdk_glue_db_table_infra_stack.py Github

copy

Full Screen

...38 policy_doc=kms_pol_doc39 )40 41 # SNS infra setup42 sns_topic = CdkGlueDbTableInfraStack.setup_sns_topic(config, env, stack)43 44 # Actuals Lambda Infra Setup45 actuals_lambda = CdkGlueDbTableInfraStack.create_actuals_lambda_functions(46 stack=stack,47 config=config,48 env=env,49 kms_key=kms_key,50 sns_topic=sns_topic51 )52 53 # Actuals Step Function Infra54 CdkGlueDbTableInfraStack.create_actuals_step_function(55 stack=stack,56 config=config,57 env=env,58 sns_topic=sns_topic,59 state_machine_name=f"{config['global']['app-name']}-actuals-stateMachine",60 actuals_lambda=actuals_lambda61 )62 63 64 @staticmethod65 def setup_sns_topic(66 config: dict,67 env: str,68 stack: core.Stack) -> sns.Topic:69 """Setup the SNS topic and returns an SNS Topic Object"""70 sns_topic = SNSConstruct.create_sns_topic(71 stack=stack,72 env=env,73 config=config74 )75 SNSConstruct.subscribe_email(env=env, config=config, topic=sns_topic)76 return sns_topic77 78 @staticmethod79 def create_actuals_lambda_functions(80 stack: core.Stack,81 config: dict,82 env: str,83 kms_key: kms.Key,84 sns_topic: sns.Topic) -> Dict[str, aws_lambda.Function]:...

Full Screen

Full Screen

insights.py

Source:insights.py Github

copy

Full Screen

1import time2import os3import json4import logging5from queryHandler import QueryHandler6from datetime import datetime, timedelta7logger = logging.getLogger(__name__)8logger.setLevel(logging.DEBUG)9def add_to_dynamo(query_handler, query_name, user, total, threshold, sns_topic):10 """11 This method creates the item for DynamoDB (message),12 and takes the information from 'query_to_stream'. It13 checks to see if the user already exists in the table14 and if an alert for meeting the threshold was already 15 sent. If the threshold is met, an email will be sent16 to the SNS Topic. If the alert was already sent, 17 continue to update the 'total' value, but don't send 18 the alert.19 Args: Class object, JSON information, user info,20 user total, threshold for the query, SNS Topic21 """22 message = query_handler.get_message(23 query_name, user['username'], total, threshold24 )25 if not query_handler.threshold_check(int(total)):26 query_handler.put_item(message)27 else:28 try:29 message['alert_sent'] = True30 if user['alert_sent']:31 query_handler.put_item(message)32 elif not user['alert_sent']:33 query_handler.send_sns(34 sns_topic, user['username'],35 total, query_handler.log_group,36 query_handler.log_stream37 )38 query_handler.put_item(message)39 except KeyError:40 query_handler.send_sns(41 sns_topic, user['username'],42 total, query_handler.log_group,43 query_handler.log_stream44 )45 query_handler.put_item(message)46 except Exception as err:47 logger.error(f"Error adding item to Dynamo: {err}")48def insights_query(query_handler):49 """50 This method does the insight query based on the51 log group and log stream provided, and get the 52 usernames as well as the total amount of logs53 that the user has within the log group/log stream54 Args: Class object55 Returns: 'usercount_list' as a list of users and56 their total log count.57 """58 start_time = int((datetime.today() - timedelta(hours=1)).timestamp())59 end_time = int(datetime.now().timestamp())60 try:61 client_logs = query_handler.client_logs62 start_query_response = client_logs.start_query(63 logGroupName=query_handler.log_group,64 startTime=start_time,65 endTime=end_time,66 queryString=query_handler.query67 )68 query_id = start_query_response['queryId']69 response = None70 while response is None or response['status'] == 'Running':71 response = client_logs.get_query_results(72 queryId=query_id73 )74 logger.info(f"Query completed. Query ID: {query_id}")75 usercount_list = []76 for user in response['results']:77 tmp_dict = {}78 tmp_dict[user[0]['field']] = user[0]['value']79 tmp_dict[user[1]['field']] = user[1]['value']80 usercount_list.append(tmp_dict)81 logger.info(82 f'Query Total from {query_handler.log_stream}: {usercount_list}'83 )84 return usercount_list85 except Exception as err:86 logger.error(f"Error occurred while running query: {err}")87 return {88 'statusCode': 500,89 'error': f"Error occurred while running query: {err}"90 }91def query_to_stream(query_handler, sns_topic):92 """93 This will call 'insights_query' to get the user 94 list, and for each user, it will input the 95 arguments passed into this method, and send it to96 'add_to_dynamo'97 Args: Class object, JSON information, SNS Topic. 98 """99 usercount_list = insights_query(query_handler)100 try:101 for user in usercount_list:102 response = query_handler.query_table(user['username'])103 if not response:104 add_to_dynamo(105 query_handler, query_handler.query_name, user,106 int(user['total']), query_handler.threshold,107 sns_topic108 )109 elif response:110 dynamo_user = response[0]111 new_total = int(user['total']) + int(dynamo_user['total'])112 add_to_dynamo(113 query_handler, query_handler.query_name, dynamo_user,114 new_total, query_handler.threshold, sns_topic115 )116 except Exception as err:117 logger.error(f"Error moving information to 'add_to_dynamo': {err}")118 return {119 "statusCode": 500,120 "error": f"Error moving information to 'add_to_dynamo': {err}"121 }122def main(event, context):123 sns_topic = os.environ['SNS_TOPIC']124 with open('config/personal.json', 'r') as f:125 # JSON with all the Log Groups, Log Streams and queries126 data = json.load(f)127 for queries in data.values(): # Retrieves the information required for the handler128 try:129 for i in range(len(queries)):130 query = queries[i]131 query_handler = QueryHandler(132 query['query'], query['query_name'],133 query['threshold'], query['logGroup'],134 query['logStream']135 )136 query_to_stream(query_handler, sns_topic)137 except Exception as err:138 logger.error(139 f"Error moving information to 'query_to_stream': {err}")140 return {141 "statusCode": 500,142 "error": f"Error moving information to 'query_to_stream': {err}"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful