Best Python code snippet using localstack_python
app.py
Source:app.py  
...10    aws_sqs11)12class ApigwDynamodbStepFunctionStack(core.Stack):13    LAMBDA_PYTHON_RUNTIME = lambda_.Runtime.PYTHON_3_814    def _create_lambda_function(15            self,16            function_name: str,17            environment: dict18    ):19        return lambda_.Function(20            scope=self,21            id=f"{function_name}_lambda_function",22            runtime=self.LAMBDA_PYTHON_RUNTIME,23            handler=f"lambda_handler.{function_name}",24            code=lambda_.Code.asset("./lambda_script"),25            environment=environment26        )27    def _dynamodb_update_in_sfn(28            self,29            table: aws_dynamodb.Table,30            status: str31    ):32        return aws_sfn_tasks.DynamoUpdateItem(33            scope=self,34            id=f"dynamodb_status_updated_as_{status}",35            # get id from StepFunctions state36            key={"id": aws_sfn_tasks.DynamoAttributeValue.from_string(aws_sfn.JsonPath.string_at("$.id"))},37            table=table,38            update_expression="set #status = :status",39            expression_attribute_names={40                "#status": "status"41            },42            expression_attribute_values={43                ":status": aws_sfn_tasks.DynamoAttributeValue.from_string(status)44            },45            result_path=f"$.status_{status}"46        )47    def __init__(self, scope: core.App, id_: str, stack_env: str, **kwargs) -> None:48        super().__init__(scope, id_, **kwargs)49        # create dynamo table50        demo_table = aws_dynamodb.Table(51            scope=self,52            id="demo_table",53            partition_key=aws_dynamodb.Attribute(54                name="id",55                type=aws_dynamodb.AttributeType.STRING56            ),57            write_capacity=3,58            read_capacity=3,59            removal_policy=core.RemovalPolicy.DESTROY60        )61        queue = aws_sqs.Queue(self, f"{id_}-SQSQueue")62        # create producer lambda function63        producer_lambda = self._create_lambda_function(64            function_name="producer",65            environment={66                "TABLE_NAME": demo_table.table_name,67                "QUEUE_URL": queue.queue_url68            }69        )70        queue.grant_send_messages(producer_lambda)71        # grant permission to lambda to write to demo table72        demo_table.grant_write_data(producer_lambda)73        # create consumer lambda function74        consumer_lambda = self._create_lambda_function(75            function_name="consumer",76            environment={"TABLE_NAME": demo_table.table_name}77        )78        # grant permission to lambda to read from demo table79        demo_table.grant_read_data(consumer_lambda)80        # api_gateway for root81        base_api = apigw_.RestApi(82            scope=self,83            id=f"{id_}-{stack_env}-apigw",84            rest_api_name=f"{id_}-{stack_env}-apigw",85            deploy_options=apigw_.StageOptions(stage_name=stack_env)86        )87        # /example entity88        api_entity = base_api.root.add_resource("example")89        # GET /example90        api_entity.add_method(91            http_method="GET",92            integration=apigw_.LambdaIntegration(93                handler=consumer_lambda,94                integration_responses=[95                    apigw_.IntegrationResponse(96                        status_code="200"97                    )98                ]99            )100        )101        # POST /example102        api_entity.add_method(103            http_method="POST",104            integration=apigw_.LambdaIntegration(105                handler=producer_lambda,106                integration_responses=[107                    apigw_.IntegrationResponse(108                        status_code="200"109                    )110                ]111            )112        )113        # ============= #114        # StepFunctions #115        # ============= #116        dynamodb_update_running_task = self._dynamodb_update_in_sfn(table=demo_table, status="running")117        wait_1_min = aws_sfn.Wait(118            scope=self,119            id="Wait one minutes as heavy task",120            time=aws_sfn.WaitTime.duration(core.Duration.minutes(1)),121        )122        dynamodb_update_complete_task = self._dynamodb_update_in_sfn(table=demo_table, status="complete")123        dynamodb_update_failure_task = self._dynamodb_update_in_sfn(table=demo_table, status="failure")124        check_task_status = aws_sfn.Choice(scope=self, id="Job Complete?")\125            .when(aws_sfn.Condition.string_equals("$.job_status", "success"), dynamodb_update_complete_task) \126            .otherwise(dynamodb_update_failure_task)127        # StepFunctions128        definition = dynamodb_update_running_task \129            .next(wait_1_min) \130            .next(check_task_status)131        sfn_process = aws_sfn.StateMachine(132            scope=self,133            id=f"{id_}-{stack_env}",134            definition=definition135        )136        # Lambda to invoke StepFunction137        sfn_invoke_lambda = self._create_lambda_function(138            function_name="invoke_step_function",139            environment={140                "STEP_FUNCTION_ARN": sfn_process.state_machine_arn,141                "QUEUE_URL": queue.queue_url142            }143        )144        # grant145        queue.grant_consume_messages(sfn_invoke_lambda)146        sfn_process.grant_start_execution(sfn_invoke_lambda)147        # ================ #148        # CloudWatch Event #149        # ================ #150        # Runs every 2 hour151        invoke_automatically = aws_events.Rule(...test_lambda_api.py
Source:test_lambda_api.py  
...33def create_lambda_function_aws(34    lambda_client,35):36    lambda_arns = []37    def _create_lambda_function(**kwargs):38        def _create_function():39            resp = lambda_client.create_function(**kwargs)40            lambda_arns.append(resp["FunctionArn"])41            def _is_not_pending():42                try:43                    result = (44                        lambda_client.get_function(FunctionName=resp["FunctionName"])[45                            "Configuration"46                        ]["State"]47                        != "Pending"48                    )49                    return result50                except Exception as e:51                    LOG.error(e)..._manager.py
Source:_manager.py  
...12        return True13    def create(self, name: str, zip_file_path: str, handler_name: str,14               role: Role) -> LambdaFunctionMetadata:15        zip_file = _read_zip_file_content(zip_file_path)16        self._create_lambda_function(name, role, handler_name, zip_file)17        function_url = self._create_lambda_function_public_url(name)18        return LambdaFunctionMetadata(name, function_url, role)19    def delete(self, name: str):20        self._client.delete_function_url_config(21            FunctionName=name,22        )23        self._client.delete_function(24            FunctionName=name,25        )26    def get_lambda_function(self, name: str) -> Optional[LambdaFunctionMetadata]:27        try:28            response = self._client.get_function(29                FunctionName=name,30            )31        except self._client.exceptions.ResourceNotFoundException:32            return None33        role = get_role_from_arn(arn=response["Configuration"]["Role"])34        response = self._client.get_function_url_config(FunctionName=name)35        url = response["FunctionUrl"]36        return LambdaFunctionMetadata(name, url, role)37    def _create_lambda_function_public_url(self, name):38        response = self._client.create_function_url_config(39            FunctionName=name,40            AuthType="NONE",41        )42        function_url = response['FunctionUrl']43        self._client.add_permission(44            Action="lambda:InvokeFunctionUrl",45            FunctionName=name,46            Principal="*",47            StatementId="FunctionURLAllowPublicAccess",48            FunctionUrlAuthType="NONE",49        )50        return function_url51    def _create_lambda_function(self, name, role, handler_name, zip_file):52        self._client.create_function(53            FunctionName=name,54            Runtime='python3.9',55            Role=role.arn,56            Handler=handler_name,57            Code=dict(ZipFile=zip_file),58        )59def _read_zip_file_content(zip_file_path: str):60    with open(zip_file_path, 'rb') as file:61        zip_file = file.read()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
