How to use get_lambda_policy method in localstack

Best Python code snippet using localstack_python

import_lambda.py

Source:import_lambda.py Github

copy

Full Screen

1#External libs2import ast3import boto34import json5import sys6import os7from botocore.exceptions import ClientError8#Establish our boto resources9client = boto3.client('lambda')10session = boto3.session.Session()11region = session.region_name12ec2Client = boto3.client('ec2')13def import_config(lambda_name, alias=False):14 '''15 Uses the lambda name to grab existing lambda configuration16 and import it into a config file17 args:18 lambda_name: the name of the lambda you want to import19 alias: the alias, defaults to $LATEST if not present20 '''21 print('Attempting to import configuration')22 #Create an empty config dict, start with the bare minimum23 config_dict = { 24 "initializers": {25 "name": "",26 "description": "",27 "region": region,28 "handler": "",29 "role": ""30 },31 "provisioners": {32 "runtime": "",33 "timeout": 0,34 "mem_size": 035 }36 }37 #If the user didn't pass an alias we want to use $LATEST38 if not alias:39 alias_used = '$LATEST'40 else:41 alias_used = alias42 try:43 lambda_config = client.get_function(44 FunctionName=lambda_name,45 Qualifier=alias_used46 )47 #Set a variable to make things readable later48 config = lambda_config['Configuration']49 except ClientError as error:50 print(error.response)51 sys.exit(1)52 else:53 #Check to make sure there is a config there54 #print(json.dumps(config, indent=4))55 if 'FunctionName' in config:56 #Grab our initializers57 config_dict['initializers']['name'] = config['FunctionName']58 config_dict['initializers']['handler'] = handler = config['Handler']59 config_dict['initializers']['description'] = config['Description']60 config_dict['initializers']['role'] = config['Role'].split('/')[-1]61 62 #Grab our provisioners63 config_dict['provisioners']['timeout'] = config['Timeout']64 config_dict['provisioners']['mem_size'] = config['MemorySize']65 config_dict['provisioners']['runtime'] = config['Runtime']66 #VPC Work67 sg_ids = config['VpcConfig']['SecurityGroupIds']68 if len(sg_ids) != 0:69 vpc_id = config['VpcConfig']['VpcId']70 config_dict['vpc_setting'] = {"vpc_name": vpc_id, "security_group_ids": sg_ids} 71 else:72 pass73 74 #Tracing config75 trace_config = config['TracingConfig']76 if trace_config['Mode'] != "PassThrough":77 config_dict['initializers']['tracing_mode'] = trace_config['Mode'].lower()78 else:79 pass80 if 'DeadLetterConfig' in config:81 #Split DLQ config so we can grab things easily82 dead_letter_split = config['DeadLetterConfig']['TargetArn'].split(':')83 config_dict['dead_letter_config'] = {"type": dead_letter_split[2], "target_name": dead_letter_split[-1]}84 else:85 pass86 #Variables87 if 'Environment' in config:88 config_dict['variables'] = config['Environment']['Variables']89 else:90 pass91 #Write the alias if needed92 if alias_used != "$LATEST":93 config_dict['initializers']['alias'] = alias94 else:95 pass96 #Set a variable for the ARN97 true_arn = config['FunctionArn'].split(':')[:7]98 function_arn = ":".join(true_arn)99 else:100 print("No Lambda found! Please check your config")101 finally:102 return config_dict, function_arn103def import_triggers(lambda_name, alias=False):104 '''105 Uses the lambda name to grab triggers from existing lambda config106 and import it into a config file107 args:108 lambda_name: 109 alias: 110 '''111 print("Attempting to retrieve lambda triggers")112 try:113 if not alias:114 get_lambda_policy = client.get_policy(115 FunctionName=lambda_name116 )117 else:118 get_lambda_policy = client.get_policy(119 FunctionName=lambda_name,120 Qualifier=alias121 ) 122 except ClientError as error:123 print("No policy found")124 principal = ''125 resource = ''126 else:127 policy = ast.literal_eval(get_lambda_policy['Policy'])128 statement_dict = policy['Statement'][0]129 principal = statement_dict['Principal']['Service'].split('.')[0]130 resource = statement_dict['Condition']['ArnLike']['AWS:SourceArn'].split(":")[-1]131 finally:132 return principal, resource133def get_sg_name(sg_id):134 '''135 Grabs the NAME of the security groups, we want them because they are136 friendlier to read than the sg code137 args:138 sg_id: the id of the security group, returned from import_config139 '''140 print('Attempting to retrieve security group names')141 try:142 sg_info = ec2Client.describe_security_groups(143 GroupIds=sg_id144 )145 group_info = sg_info['SecurityGroups'][0]146 except ClientError as error:147 print(error.response)148 sys.exit(1)149 finally:150 print("Retrieved name for groups %s" % group_info['GroupName'])151 return group_info['GroupName']152def get_vpc_name(vpc_id):153 '''154 Grabs the NAME of the VPC155 args:156 vpc_id: the id the VPC, returned from import_config157 '''158 print('Attempting to retrieve VPC name')159 try:160 vpc_info = ec2Client.describe_vpcs(161 VpcIds=[vpc_id]162 )163 vpc_info = vpc_info['Vpcs'][0]164 except ClientError as error:165 print(error.response)166 sys.exit(1)167 else:168 tags = vpc_info['Tags'][0]169 170 if tags['Key'] == 'Name':171 name = tags['Value']172 else:173 print("No VPC found, make sure your VPC is tagged 'Key': 'Name', 'Value': 'Your-VPC'")174 sys.exit(1)175 finally:176 print('Retrieved VPC name %s' % name)177 return name178def get_tags(lambda_arn):179 '''180 Grabs a tags dict from the current lambda181 args:182 lambda_arn: the arn returned from the import config function183 '''184 print('Attempting to retrieve tags')185 try:186 tags = client.list_tags(Resource=lambda_arn)187 tags = tags['Tags']188 except ClientError as error:189 print(error.response)190 sys.exit(1)191 finally:192 return tags193########### Entrypoint ###########194def import_lambda(lambda_name, alias):195 '''196 The main entry point of the module197 args:198 lambda_name: the name of the lambda199 alias: alias of the lambda200 '''201 config_dict, lambda_arn = import_config(lambda_name=lambda_name, alias=alias)202 tag_dict = get_tags(lambda_arn)203 if len(tag_dict) != 0:204 config_dict['tags'] = tag_dict205 if 'vpc_setting' in config_dict:206 config_dict['vpc_setting']['vpc_name'] = get_vpc_name(config_dict['vpc_setting']['vpc_name'])207 config_dict['vpc_setting']['security_group_ids'] = get_sg_name(config_dict['vpc_setting']['security_group_ids'])208 trigger_method, trigger_source = import_triggers(lambda_name=lambda_name, alias=alias)209 210 if len(trigger_method) != 0:211 config_dict['trigger'] = {"method": trigger_method, "source": trigger_source}212 ...

Full Screen

Full Screen

monitoring_lambda.py

Source:monitoring_lambda.py Github

copy

Full Screen

...10 for statement in policy_statements_id_list:11 print(lambda_wrapper.remove_lambda_permission(12 logs_client, lambda_name, statement))13def get_lambda_policy_statements(logs_client, lambda_name):14 res = json.loads(lambda_wrapper.get_lambda_policy(15 logs_client, lambda_name)['Policy'])16 statements = []17 for r in res["Statement"]:18 statements.append(r["Sid"])19 return statements20def exist_subcription_filter(logs_client, log_group_name):21 subcription_filters = logs_wrapper.list_subcription_filters(logs_client)22 for sflist in subcription_filters:23 for sf in sflist:24 if sf['logGroupName'] == log_group_name:25 return True26 return False27def add_log_group_for_monitoring(lambda_client,28 logs_client,29 destination_lambda_name,30 log_group_name, pattern, region, account):31 if not logs_wrapper.exist_log_group(logs_client, log_group_name):32 raise Exception(f'The {log_group_name} does not exist.')33 response_lambda = lambda_wrapper.add_lambda_permission(34 lambda_client, destination_lambda_name, log_group_name, account, region)35 response_logs = logs_wrapper.create_subcription_filter(36 logs_client, destination_lambda_name, log_group_name, pattern, region, account)37 return {'response': [response_lambda, response_logs]}38def remove_log_group_of_monitoring(lambda_client, logs_client, destination_lambda_name, log_group_name):39 res = json.loads(lambda_wrapper.get_lambda_policy(40 lambda_client, destination_lambda_name)['Policy'])41 statements = res['Statement']42 for s in statements:43 ln = s['Sid'].split('_')[1]44 if ln == log_group_name.split('/')[-1]:45 lambda_wrapper.remove_lambda_permission(46 lambda_client, destination_lambda_name, s['Sid'])47 return logs_wrapper.remove_subcription_filter(logs_client, log_group_name)48def add_lambda_for_monitoring(lambda_client,49 logs_client,50 destination_lambda_name,51 lambda_name, pattern, region, account):52 log_group_name = "/aws/lambda/{}".format(lambda_name)53 if not logs_wrapper.exist_log_group(logs_client, log_group_name):54 logs_wrapper.create_lambda_log_group(logs_client, lambda_name)55 response_lambda = lambda_wrapper.add_lambda_permission(56 lambda_client, destination_lambda_name, log_group_name, account, region)57 response_logs = logs_wrapper.create_subcription_filter(58 logs_client, destination_lambda_name, log_group_name, pattern, region, account)59 return {'response': [response_lambda, response_logs]}60def remove_lambda_of_monitoring(lambda_client, logs_client, destination_lambda_name, lambda_name):61 res = json.loads(lambda_wrapper.get_lambda_policy(62 lambda_client, destination_lambda_name)['Policy'])63 statements = res['Statement']64 for s in statements:65 ln = s['Sid'].split('_')[1]66 if ln == lambda_name:67 lambda_wrapper.remove_lambda_permission(68 lambda_client, destination_lambda_name, s['Sid'])69 return logs_wrapper.remove_subcription_filter(logs_client, "/aws/lambda/{}".format(lambda_name))70if __name__ == "__main__":71 print(lambda_wrapper.invoke_lambda_function(72 lambda_client, "hello-world-lambda-py", {}))73 # print(json.dumps(logs_wrapper.list_logs_events(...

Full Screen

Full Screen

utils.py

Source:utils.py Github

copy

Full Screen

...15 region_name=os.environ.get('AWS_REGION') or 'us-east-1'16)17cloud_watch = session.client('logs')18lambda_func = session.client('lambda')19def get_lambda_policy():20 try:21 lambda_policy = lambda_func.get_policy(FunctionName=LOG_SHIPPER_FUNCTION).get("Policy")22 return json.loads(lambda_policy)23 except ClientError as error:24 raise error25def remove_lambda_permissions(sid):26 try:27 remove_permission = lambda_func.remove_permission(FunctionName=LOG_SHIPPER_FUNCTION, StatementId=sid)28 return remove_permission29 except ClientError as error:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful