How to use _create_lambda_function method in localstack

Best Python code snippet using localstack_python

app.py

Source:app.py Github

copy

Full Screen

...10 aws_sqs11)12class ApigwDynamodbStepFunctionStack(core.Stack):13 LAMBDA_PYTHON_RUNTIME = lambda_.Runtime.PYTHON_3_814 def _create_lambda_function(15 self,16 function_name: str,17 environment: dict18 ):19 return lambda_.Function(20 scope=self,21 id=f"{function_name}_lambda_function",22 runtime=self.LAMBDA_PYTHON_RUNTIME,23 handler=f"lambda_handler.{function_name}",24 code=lambda_.Code.asset("./lambda_script"),25 environment=environment26 )27 def _dynamodb_update_in_sfn(28 self,29 table: aws_dynamodb.Table,30 status: str31 ):32 return aws_sfn_tasks.DynamoUpdateItem(33 scope=self,34 id=f"dynamodb_status_updated_as_{status}",35 # get id from StepFunctions state36 key={"id": aws_sfn_tasks.DynamoAttributeValue.from_string(aws_sfn.JsonPath.string_at("$.id"))},37 table=table,38 update_expression="set #status = :status",39 expression_attribute_names={40 "#status": "status"41 },42 expression_attribute_values={43 ":status": aws_sfn_tasks.DynamoAttributeValue.from_string(status)44 },45 result_path=f"$.status_{status}"46 )47 def __init__(self, scope: core.App, id_: str, stack_env: str, **kwargs) -> None:48 super().__init__(scope, id_, **kwargs)49 # create dynamo table50 demo_table = aws_dynamodb.Table(51 scope=self,52 id="demo_table",53 partition_key=aws_dynamodb.Attribute(54 name="id",55 type=aws_dynamodb.AttributeType.STRING56 ),57 write_capacity=3,58 read_capacity=3,59 removal_policy=core.RemovalPolicy.DESTROY60 )61 queue = aws_sqs.Queue(self, f"{id_}-SQSQueue")62 # create producer lambda function63 producer_lambda = self._create_lambda_function(64 function_name="producer",65 environment={66 "TABLE_NAME": demo_table.table_name,67 "QUEUE_URL": queue.queue_url68 }69 )70 queue.grant_send_messages(producer_lambda)71 # grant permission to lambda to write to demo table72 demo_table.grant_write_data(producer_lambda)73 # create consumer lambda function74 consumer_lambda = self._create_lambda_function(75 function_name="consumer",76 environment={"TABLE_NAME": demo_table.table_name}77 )78 # grant permission to lambda to read from demo table79 demo_table.grant_read_data(consumer_lambda)80 # api_gateway for root81 base_api = apigw_.RestApi(82 scope=self,83 id=f"{id_}-{stack_env}-apigw",84 rest_api_name=f"{id_}-{stack_env}-apigw",85 deploy_options=apigw_.StageOptions(stage_name=stack_env)86 )87 # /example entity88 api_entity = base_api.root.add_resource("example")89 # GET /example90 api_entity.add_method(91 http_method="GET",92 integration=apigw_.LambdaIntegration(93 handler=consumer_lambda,94 integration_responses=[95 apigw_.IntegrationResponse(96 status_code="200"97 )98 ]99 )100 )101 # POST /example102 api_entity.add_method(103 http_method="POST",104 integration=apigw_.LambdaIntegration(105 handler=producer_lambda,106 integration_responses=[107 apigw_.IntegrationResponse(108 status_code="200"109 )110 ]111 )112 )113 # ============= #114 # StepFunctions #115 # ============= #116 dynamodb_update_running_task = self._dynamodb_update_in_sfn(table=demo_table, status="running")117 wait_1_min = aws_sfn.Wait(118 scope=self,119 id="Wait one minutes as heavy task",120 time=aws_sfn.WaitTime.duration(core.Duration.minutes(1)),121 )122 dynamodb_update_complete_task = self._dynamodb_update_in_sfn(table=demo_table, status="complete")123 dynamodb_update_failure_task = self._dynamodb_update_in_sfn(table=demo_table, status="failure")124 check_task_status = aws_sfn.Choice(scope=self, id="Job Complete?")\125 .when(aws_sfn.Condition.string_equals("$.job_status", "success"), dynamodb_update_complete_task) \126 .otherwise(dynamodb_update_failure_task)127 # StepFunctions128 definition = dynamodb_update_running_task \129 .next(wait_1_min) \130 .next(check_task_status)131 sfn_process = aws_sfn.StateMachine(132 scope=self,133 id=f"{id_}-{stack_env}",134 definition=definition135 )136 # Lambda to invoke StepFunction137 sfn_invoke_lambda = self._create_lambda_function(138 function_name="invoke_step_function",139 environment={140 "STEP_FUNCTION_ARN": sfn_process.state_machine_arn,141 "QUEUE_URL": queue.queue_url142 }143 )144 # grant145 queue.grant_consume_messages(sfn_invoke_lambda)146 sfn_process.grant_start_execution(sfn_invoke_lambda)147 # ================ #148 # CloudWatch Event #149 # ================ #150 # Runs every 2 hour151 invoke_automatically = aws_events.Rule(...

Full Screen

Full Screen

test_lambda_api.py

Source:test_lambda_api.py Github

copy

Full Screen

...33def create_lambda_function_aws(34 lambda_client,35):36 lambda_arns = []37 def _create_lambda_function(**kwargs):38 def _create_function():39 resp = lambda_client.create_function(**kwargs)40 lambda_arns.append(resp["FunctionArn"])41 def _is_not_pending():42 try:43 result = (44 lambda_client.get_function(FunctionName=resp["FunctionName"])[45 "Configuration"46 ]["State"]47 != "Pending"48 )49 return result50 except Exception as e:51 LOG.error(e)...

Full Screen

Full Screen

_manager.py

Source:_manager.py Github

copy

Full Screen

...12 return True13 def create(self, name: str, zip_file_path: str, handler_name: str,14 role: Role) -> LambdaFunctionMetadata:15 zip_file = _read_zip_file_content(zip_file_path)16 self._create_lambda_function(name, role, handler_name, zip_file)17 function_url = self._create_lambda_function_public_url(name)18 return LambdaFunctionMetadata(name, function_url, role)19 def delete(self, name: str):20 self._client.delete_function_url_config(21 FunctionName=name,22 )23 self._client.delete_function(24 FunctionName=name,25 )26 def get_lambda_function(self, name: str) -> Optional[LambdaFunctionMetadata]:27 try:28 response = self._client.get_function(29 FunctionName=name,30 )31 except self._client.exceptions.ResourceNotFoundException:32 return None33 role = get_role_from_arn(arn=response["Configuration"]["Role"])34 response = self._client.get_function_url_config(FunctionName=name)35 url = response["FunctionUrl"]36 return LambdaFunctionMetadata(name, url, role)37 def _create_lambda_function_public_url(self, name):38 response = self._client.create_function_url_config(39 FunctionName=name,40 AuthType="NONE",41 )42 function_url = response['FunctionUrl']43 self._client.add_permission(44 Action="lambda:InvokeFunctionUrl",45 FunctionName=name,46 Principal="*",47 StatementId="FunctionURLAllowPublicAccess",48 FunctionUrlAuthType="NONE",49 )50 return function_url51 def _create_lambda_function(self, name, role, handler_name, zip_file):52 self._client.create_function(53 FunctionName=name,54 Runtime='python3.9',55 Role=role.arn,56 Handler=handler_name,57 Code=dict(ZipFile=zip_file),58 )59def _read_zip_file_content(zip_file_path: str):60 with open(zip_file_path, 'rb') as file:61 zip_file = file.read()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful