How to use lambda_function_name method in localstack

Best Python code snippet using localstack_python

lambda.py

Source:lambda.py Github

copy

Full Screen

1import io2import boto33import json4import time5import zipfile6from botocore.exceptions import ClientError7class LambdaAPI(object):8 def __init__(self, client) -> None:9 self.client = client10 def exponential_retry(self, func, error_code, *func_args, **func_kwargs):11 """12 Retries the specified function with a simple exponential backoff algorithm.13 This is necessary when AWS is not yet ready to perform an action because all14 resources have not been fully deployed.15 Credit: https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/python/example_code/lambda/boto_client_examples/lambda_handler_basic.py16 :param func: The function to retry.17 :param error_code: The error code to retry. Other errors are raised again.18 :param func_args: The positional arguments to pass to the function.19 :param func_kwargs: The keyword arguments to pass to the function.20 :return: The return value of the retried function.21 """22 sleepy_time = 123 func_return = None24 while sleepy_time < 33 and func_return is None:25 try:26 func_return = func(*func_args, **func_kwargs)27 print(f'Ran {func.__name__}, got {func_return}.')28 except ClientError as error:29 if error.response['Error']['Code'] == error_code:30 print(f'Sleeping for {sleepy_time} to give AWS time to connect resources.')31 time.sleep(sleepy_time)32 sleepy_time = sleepy_time*233 else:34 raise35 return func_return36 def create_zip_package(self, func_file_name):37 """38 Create a zip archive of the lambda function and also read the file as bytes39 :param func_file_name: The name of the file containing lambda handler function40 :return: return a byte representation of read file41 """42 try:43 buffer = io.BytesIO()44 with zipfile.ZipFile(buffer, 'w') as ptr:45 ptr.write(func_file_name)46 buffer.seek(0)47 return buffer.read()48 except Exception as e:49 print(e)50 def list_functions(self, **kwargs):51 """52 Returns a list of Lambda functions, with the version-specific configuration53 of each. Lambda returns up to 50 functions per call.54 :return: response containing every information of the functions55 """56 try:57 response = self.client.list_functions(58 FunctionVersion='ALL'59 )60 return response61 except ClientError as e:62 print(e)63 def create_iam_role_for_lambda(self, iam_resource, iam_role_name):64 """65 Creates an AWS Identity and Access Management (IAM) role that grants the66 AWS Lambda function basic permission to run. If a role with the specified67 name already exists, it is used for the demo.68 :param iam_resource: The Boto3 IAM resource object.69 :param iam_role_name: The name of the role to create.70 :return: The newly created role.71 """72 lambda_assume_role_policy = {73 'Version': '2012-10-17',74 'Statement': [75 {76 'Effect': 'Allow',77 'Principal': {78 'Service': 'lambda.amazonaws.com'79 },80 'Action': 'sts:AssumeRole'81 }82 ]83 }84 policy_arn_basic = 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'85 policy_arn_ec2 = 'arn:aws:iam::aws:policy/AmazonEC2FullAccess'86 try:87 role = iam_resource.create_role(88 RoleName=iam_role_name,89 AssumeRolePolicyDocument=json.dumps(lambda_assume_role_policy)90 )91 iam_resource.meta.client.get_waiter(92 'role_exists').wait(RoleName=iam_role_name)93 print(f'Created role {role.name}')94 role.attach_policy(PolicyArn=policy_arn_basic)95 role.attach_policy(PolicyArn=policy_arn_ec2)96 print(f'Attached basic execution policy to role {role.name}')97 except ClientError as error:98 if error.response['Error']['Code'] == 'EntityAlreadyExists':99 role = iam_resource.Role(iam_role_name)100 print(f'The role {iam_role_name} already exists. Using it.')101 else:102 print(f'Could not create role {iam_role_name} or attach policy')103 raise104 return role105 def deploy_lambda_function(self, function_name, description,106 handler_name, iam_role, deployment_package, runtime='python3.9'):107 """108 Deploys the AWS Lambda function.109 :param function_name: The name of the AWS Lambda function.110 :param description: The description of the Lambda function.111 :param handler_name: The fully qualified name of the handler function. This112 must include the file name and the function name.113 :param iam_role: The IAM role to use for the function.114 :param deployment_package: The deployment package that contains the function115 code in ZIP format.116 :param runtime: The language specification to use during runtime.117 :return: The Amazon Resource Name (ARN) of the newly created function.118 """119 try:120 response = self.client.create_function(121 FunctionName=function_name,122 Description=description,123 Runtime=runtime,124 Role=iam_role.arn,125 Handler=handler_name,126 Code={'ZipFile': deployment_package},127 Publish=True128 )129 function_arn = response['FunctionArn']130 print(f"Created function '{function_name}' with ARN: '{function_arn}'.")131 return function_arn132 except ClientError as e:133 print(e)134 def delete_lambda_function(self, function_name):135 """136 Deletes an AWS Lambda function.137 138 :param function_name: The name of the function to delete.139 """140 try:141 self.client.delete_function(FunctionName=function_name)142 except ClientError:143 print(f'Could not delete function {function_name}.')144 raise145 def invoke_lambda_function(self, function_name, function_params):146 """147 Invokes an AWS Lambda function.148 149 :param function_name: The name of the function to invoke.150 :param function_params: The parameters of the function as a dict. This dict151 is serialized to JSON before it is sent to AWS Lambda.152 :return: The response from the function invocation.153 """154 try:155 response = self.client.invoke(156 FunctionName=function_name,157 Payload=json.dumps(function_params).encode())158 print(f'Invoked function {function_name}.')159 return response160 except ClientError:161 print(f'Could not invoke function {function_name}.')162 raise163 def schedule_lambda_function(164 self, event_rule_name, event_schedule, lambda_function_name, lambda_function_arn):165 """166 Creates a schedule rule with Amazon EventBridge and registers an AWS Lambda167 function to be invoked according to the specified schedule.168 169 :param event_rule_name: The name of the scheduled event rule.170 :param event_schedule: The specified schedule in either cron or rate format.171 :param lambda_client: The Boto3 Lambda client.172 :param lambda_function_name: The name of the AWS Lambda function to invoke.173 :param lambda_function_arn: The Amazon Resource Name (ARN) of the function.174 :return: The ARN of the EventBridge rule.175 """176 eventbridge_client = boto3.client('events')177 try:178 response = eventbridge_client.put_rule(179 Name=event_rule_name, ScheduleExpression=event_schedule)180 event_rule_arn = response['RuleArn']181 print(f'Put rule {event_rule_name} with ARN {event_rule_arn}.')182 except ClientError:183 print(f'Could not put rule {event_rule_name}.')184 raise185 try:186 self.client.add_permission(187 FunctionName=lambda_function_name,188 StatementId=f'{lambda_function_name}-invoke',189 Action='lambda:InvokeFunction',190 Principal='events.amazonaws.com',191 SourceArn=event_rule_arn)192 print(f'Granted permission to let Amazon EventBridge call function {lambda_function_name}')193 except ClientError:194 print(f'Could not add permission to let Amazon EventBridge call function {lambda_function_name}.')195 raise196 try:197 response = eventbridge_client.put_targets(198 Rule=event_rule_name,199 Targets=[{'Id': lambda_function_name, 'Arn': lambda_function_arn}])200 if response['FailedEntryCount'] > 0:201 print(f'Could not set {lambda_function_name} as the target for {event_rule_name}.')202 else:203 print(f'Set {lambda_function_name} as the target of {event_rule_name}.')204 except ClientError:205 print(f'Could not set {lambda_function_name} as the target of {event_rule_name}.')206 raise207 return event_rule_arn208 def update_event_rule(self, event_rule_name, enable):209 """210 Updates the schedule event rule by enabling or disabling it.211 212 :param event_rule_name: The name of the rule to update.213 :param enable: When True, the rule is enabled. Otherwise, it is disabled.214 """215 try:216 eventbridge_client = boto3.client('events')217 if enable:218 eventbridge_client.enable_rule(Name=event_rule_name)219 else:220 eventbridge_client.disable_rule(Name=event_rule_name)221 print(f'{event_rule_name} is now {"enabled" if enable else "disabled"}.')222 except ClientError:223 print(f'Could not {"enable" if enable else "disable"} {event_rule_name}.')224 raise225 def get_event_rule_enabled(self, event_rule_name):226 """227 Indicates whether the specified rule is enabled or disabled.228 229 :param event_rule_name: The name of the rule query.230 :return: True when the rule is enabled. Otherwise, False.231 """232 try:233 eventbridge_client = boto3.client('events')234 response = eventbridge_client.describe_rule(Name=event_rule_name)235 enabled = response['State'] == 'ENABLED'236 print(f'{event_rule_name} is {enabled}.')237 except ClientError:238 print(f'Could not get state of {event_rule_name}.')239 raise240 else:241 return enabled242 def delete_event_rule(self, event_rule_name, lambda_function_name):243 """244 Removes the specified targets from the event rule and deletes the rule.245 246 :param event_rule_name: The name of the rule to delete.247 :param lambda_function_name: The name of the AWS Lambda function to remove248 as a target.249 """250 try:251 eventbridge_client = boto3.client('events')252 eventbridge_client.remove_targets(253 Rule=event_rule_name, Ids=[lambda_function_name])254 eventbridge_client.delete_rule(Name=event_rule_name)255 print(f'Removed rule {event_rule_name}.')256 except ClientError:257 print(f'Could not remove rule {event_rule_name}.')258 raise259client = boto3.client('lambda')260iam_resource = boto3.resource('iam')261l = LambdaAPI(client=client)262lambda_function_filename = 'lambda_handler_scheduled.py'263lambda_handler_name = 'lambda_handler_scheduled.lambda_handler'264lambda_role_name = 'demo-lambda-role'265lambda_function_name = 'demo-lambda-scheduled'266event_rule_name = 'demo-event-scheduled'267event_schedule = 'cron(38 11 * * ? *)'268print(f"Creating AWS Lambda function {lambda_function_name} from the "269 f"{lambda_handler_name} function in {lambda_function_filename}...")270deployment_package = l.create_zip_package(lambda_function_filename)271iam_role = l.create_iam_role_for_lambda(iam_resource, lambda_role_name)272lambda_function_arn = l.exponential_retry(273 l.deploy_lambda_function, 'InvalidParameterValueException',274 lambda_function_name, 'Demo Lambda to Stop EC2 instance', lambda_handler_name, iam_role,275 deployment_package276)277print(f"Scheduling {lambda_function_name} to run as per cron job")278l.schedule_lambda_function(279 event_rule_name, event_schedule,280 lambda_function_name, lambda_function_arn281)282print(f"Sleeping for 3 minutes to let our function trigger...")283time.sleep(3*60)284print(f"Disabling event {event_rule_name}...")285l.update_event_rule(event_rule_name, False)286l.get_event_rule_enabled(event_rule_name)287print("Cleaning up all resources created for the demo...")288l.delete_event_rule(event_rule_name, lambda_function_name)289l.delete_lambda_function(lambda_function_name)290print(f"Deleted {lambda_function_name}.")291for policy in iam_role.attached_policies.all():292 policy.detach_role(RoleName=iam_role.name)293iam_role.delete()...

Full Screen

Full Screen

scheduled_lambda.py

Source:scheduled_lambda.py Github

copy

Full Screen

1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03"""4Purpose5Shows how to use the AWS SDK for Python (Boto3) to register an AWS Lambda function6that is invoked by Amazon EventBridge on a regular schedule.7Instead of using the low-level Boto3 client APIs shown in this example, you can use8AWS Chalice to more easily create a scheduled AWS Lambda function. For more9information on AWS Chalice, see https://github.com/aws/chalice.10"""11import logging12import time13import boto314from botocore.exceptions import ClientError15import lambda_basics16logger = logging.getLogger(__name__)17def schedule_lambda_function(18 eventbridge_client, event_rule_name, event_schedule,19 lambda_client, lambda_function_name, lambda_function_arn):20 """21 Creates a schedule rule with Amazon EventBridge and registers an AWS Lambda22 function to be invoked according to the specified schedule.23 :param eventbridge_client: The Boto3 EventBridge client.24 :param event_rule_name: The name of the scheduled event rule.25 :param event_schedule: The specified schedule in either cron or rate format.26 :param lambda_client: The Boto3 Lambda client.27 :param lambda_function_name: The name of the AWS Lambda function to invoke.28 :param lambda_function_arn: The Amazon Resource Name (ARN) of the function.29 :return: The ARN of the EventBridge rule.30 """31 try:32 response = eventbridge_client.put_rule(33 Name=event_rule_name, ScheduleExpression=event_schedule)34 event_rule_arn = response['RuleArn']35 logger.info("Put rule %s with ARN %s.", event_rule_name, event_rule_arn)36 except ClientError:37 logger.exception("Couldn't put rule %s.", event_rule_name)38 raise39 try:40 lambda_client.add_permission(41 FunctionName=lambda_function_name,42 StatementId=f'{lambda_function_name}-invoke',43 Action='lambda:InvokeFunction',44 Principal='events.amazonaws.com',45 SourceArn=event_rule_arn)46 logger.info(47 "Granted permission to let Amazon EventBridge call function %s",48 lambda_function_name)49 except ClientError:50 logger.exception(51 "Couldn't add permission to let Amazon EventBridge call function %s.",52 lambda_function_name)53 raise54 try:55 response = eventbridge_client.put_targets(56 Rule=event_rule_name,57 Targets=[{'Id': lambda_function_name, 'Arn': lambda_function_arn}])58 if response['FailedEntryCount'] > 0:59 logger.error(60 "Couldn't set %s as the target for %s.",61 lambda_function_name, event_rule_name)62 else:63 logger.info(64 "Set %s as the target of %s.", lambda_function_name, event_rule_name)65 except ClientError:66 logger.exception(67 "Couldn't set %s as the target of %s.", lambda_function_name,68 event_rule_name)69 raise70 return event_rule_arn71def update_event_rule(eventbridge_client, event_rule_name, enable):72 """73 Updates the schedule event rule by enabling or disabling it.74 :param eventbridge_client: The Boto3 EventBridge client.75 :param event_rule_name: The name of the rule to update.76 :param enable: When True, the rule is enabled. Otherwise, it is disabled.77 """78 try:79 if enable:80 eventbridge_client.enable_rule(Name=event_rule_name)81 else:82 eventbridge_client.disable_rule(Name=event_rule_name)83 logger.info(84 "%s is now %s.", event_rule_name, 'enabled' if enable else 'disabled')85 except ClientError:86 logger.exception(87 "Couldn't %s %s.", 'enable' if enable else 'disable', event_rule_name)88 raise89def get_event_rule_enabled(eventbridge_client, event_rule_name):90 """91 Indicates whether the specified rule is enabled or disabled.92 :param eventbridge_client: The Boto3 EventBridge client.93 :param event_rule_name: The name of the rule query.94 :return: True when the rule is enabled. Otherwise, False.95 """96 try:97 response = eventbridge_client.describe_rule(Name=event_rule_name)98 enabled = response['State'] == 'ENABLED'99 logger.info("%s is %s.", event_rule_name, response['State'])100 except ClientError:101 logger.exception("Couldn't get state of %s.", event_rule_name)102 raise103 else:104 return enabled105def delete_event_rule(eventbridge_client, event_rule_name, lambda_function_name):106 """107 Removes the specified targets from the event rule and deletes the rule.108 :param eventbridge_client: The Boto3 EventBridge client.109 :param event_rule_name: The name of the rule to delete.110 :param lambda_function_name: The name of the AWS Lambda function to remove111 as a target.112 """113 try:114 eventbridge_client.remove_targets(115 Rule=event_rule_name, Ids=[lambda_function_name])116 eventbridge_client.delete_rule(Name=event_rule_name)117 logger.info("Removed rule %s.", event_rule_name)118 except ClientError:119 logger.exception("Couldn't remove rule %s.", event_rule_name)120 raise121def usage_demo():122 """123 Shows how to deploy an AWS Lambda function, create an Amazon EventBridge schedule124 rule that invokes the function, and how to clean up the resources after the demo125 completes.126 """127 logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')128 print('-'*88)129 print("Welcome to the AWS Lambda basics demo.")130 print('-'*88)131 lambda_function_filename = 'lambda_handler_scheduled.py'132 lambda_handler_name = 'lambda_handler_scheduled.lambda_handler'133 lambda_role_name = 'demo-lambda-role'134 lambda_function_name = 'demo-lambda-scheduled'135 event_rule_name = 'demo-event-scheduled'136 event_schedule = 'rate(1 minute)'137 iam_resource = boto3.resource('iam')138 lambda_client = boto3.client('lambda')139 eventbridge_client = boto3.client('events')140 logs_client = boto3.client('logs')141 print(f"Creating AWS Lambda function {lambda_function_name} from the "142 f"{lambda_handler_name} function in {lambda_function_filename}...")143 deployment_package = lambda_basics.create_lambda_deployment_package(144 lambda_function_filename)145 iam_role = lambda_basics.create_iam_role_for_lambda(iam_resource, lambda_role_name)146 lambda_function_arn = lambda_basics.exponential_retry(147 lambda_basics.deploy_lambda_function, 'InvalidParameterValueException',148 lambda_client, lambda_function_name, lambda_handler_name, iam_role,149 deployment_package)150 print(f"Scheduling {lambda_function_name} to run once per minute...")151 schedule_lambda_function(152 eventbridge_client, event_rule_name, event_schedule, lambda_client,153 lambda_function_name, lambda_function_arn)154 print(f"Sleeping for 3 minutes to let our function trigger a few times...")155 time.sleep(3*60)156 print(f"Getting last 20 Amazon CloudWatch log events for {lambda_function_name}...")157 log_group_name = f'/aws/lambda/{lambda_function_name}'158 log_streams = logs_client.describe_log_streams(159 logGroupName=log_group_name, orderBy='LastEventTime', descending=True, limit=1)160 log_events = logs_client.get_log_events(161 logGroupName=log_group_name,162 logStreamName=log_streams['logStreams'][0]['logStreamName'],163 limit=20)164 print(*[evt['message'] for evt in log_events['events']])165 print(f"Disabling event {event_rule_name}...")166 update_event_rule(eventbridge_client, event_rule_name, False)167 get_event_rule_enabled(eventbridge_client, event_rule_name)168 print("Cleaning up all resources created for the demo...")169 delete_event_rule(eventbridge_client, event_rule_name, lambda_function_name)170 lambda_basics.delete_lambda_function(lambda_client, lambda_function_name)171 print(f"Deleted {lambda_function_name}.")172 for policy in iam_role.attached_policies.all():173 policy.detach_role(RoleName=iam_role.name)174 iam_role.delete()175 print(f"Deleted {iam_role.name}.")176 print("Thanks for watching!")177if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful