Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py  
1import base642import json3import os4import time5from unittest.mock import patch6import pytest7from botocore.exceptions import ClientError8from localstack import config9from localstack.services.apigateway.helpers import path_based_url10from localstack.services.awslambda.lambda_api import (11    BATCH_SIZE_RANGES,12    INVALID_PARAMETER_VALUE_EXCEPTION,13)14from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3615from localstack.utils import testutil16from localstack.utils.aws import aws_stack17from localstack.utils.common import retry, safe_requests, short_uid18from localstack.utils.sync import poll_condition19from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events20from .test_lambda import (21    TEST_LAMBDA_FUNCTION_PREFIX,22    TEST_LAMBDA_LIBS,23    TEST_LAMBDA_PYTHON,24    TEST_LAMBDA_PYTHON_ECHO,25    is_old_provider,26)27TEST_STAGE_NAME = "testing"28TEST_SNS_TOPIC_NAME = "sns-topic-1"29THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))30TEST_LAMBDA_PARALLEL_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_parallel.py")31class TestLambdaEventSourceMappings:32    def test_event_source_mapping_default_batch_size(33        self,34        create_lambda_function,35        lambda_client,36        sqs_client,37        sqs_create_queue,38        sqs_queue_arn,39        dynamodb_client,40        dynamodb_create_table,41        lambda_su_role,42    ):43        function_name = f"lambda_func-{short_uid()}"44        queue_name_1 = f"queue-{short_uid()}-1"45        queue_name_2 = f"queue-{short_uid()}-2"46        ddb_table = f"ddb_table-{short_uid()}"47        create_lambda_function(48            func_name=function_name,49            handler_file=TEST_LAMBDA_PYTHON_ECHO,50            runtime=LAMBDA_RUNTIME_PYTHON36,51            role=lambda_su_role,52        )53        queue_url_1 = sqs_create_queue(QueueName=queue_name_1)54        queue_arn_1 = sqs_queue_arn(queue_url_1)55        rs = lambda_client.create_event_source_mapping(56            EventSourceArn=queue_arn_1, FunctionName=function_name57        )58        assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]59        uuid = rs["UUID"]60        def wait_for_event_source_mapping():61            return lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"62        assert poll_condition(wait_for_event_source_mapping, timeout=30)63        with pytest.raises(ClientError) as e:64            # Update batch size with invalid value65            lambda_client.update_event_source_mapping(66                UUID=uuid,67                FunctionName=function_name,68                BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,69            )70        e.match(INVALID_PARAMETER_VALUE_EXCEPTION)71        queue_url_2 = sqs_create_queue(QueueName=queue_name_2)72        queue_arn_2 = sqs_queue_arn(queue_url_2)73        with pytest.raises(ClientError) as e:74            # Create event source mapping with invalid batch size value75            lambda_client.create_event_source_mapping(76                EventSourceArn=queue_arn_2,77                FunctionName=function_name,78                BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,79            )80        e.match(INVALID_PARAMETER_VALUE_EXCEPTION)81        table_description = dynamodb_create_table(82            table_name=ddb_table,83            partition_key="id",84            stream_view_type="NEW_IMAGE",85        )["TableDescription"]86        # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87        if not is_old_provider():88            with pytest.raises(ClientError) as e:89                lambda_client.create_event_source_mapping(90                    EventSourceArn=table_description["TableArn"],91                    FunctionName=function_name,92                    StartingPosition="LATEST",93                )94            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95        # check if event source mapping can be created with latest stream ARN96        rs = lambda_client.create_event_source_mapping(97            EventSourceArn=table_description["LatestStreamArn"],98            FunctionName=function_name,99            StartingPosition="LATEST",100        )101        assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102    def test_disabled_event_source_mapping_with_dynamodb(103        self,104        create_lambda_function,105        lambda_client,106        dynamodb_resource,107        dynamodb_client,108        dynamodb_create_table,109        logs_client,110        dynamodbstreams_client,111        lambda_su_role,112    ):113        function_name = f"lambda_func-{short_uid()}"114        ddb_table = f"ddb_table-{short_uid()}"115        create_lambda_function(116            func_name=function_name,117            handler_file=TEST_LAMBDA_PYTHON_ECHO,118            runtime=LAMBDA_RUNTIME_PYTHON36,119            role=lambda_su_role,120        )121        latest_stream_arn = dynamodb_create_table(122            table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123        )["TableDescription"]["LatestStreamArn"]124        rs = lambda_client.create_event_source_mapping(125            FunctionName=function_name,126            EventSourceArn=latest_stream_arn,127            StartingPosition="TRIM_HORIZON",128            MaximumBatchingWindowInSeconds=1,129        )130        uuid = rs["UUID"]131        def wait_for_table_created():132            return (133                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134                == "ACTIVE"135            )136        assert poll_condition(wait_for_table_created, timeout=30)137        def wait_for_stream_created():138            return (139                dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140                    "StreamDescription"141                ]["StreamStatus"]142                == "ENABLED"143            )144        assert poll_condition(wait_for_stream_created, timeout=30)145        table = dynamodb_resource.Table(ddb_table)146        items = [147            {"id": short_uid(), "data": "data1"},148            {"id": short_uid(), "data": "data2"},149        ]150        table.put_item(Item=items[0])151        def assert_events():152            events = get_lambda_log_events(function_name, logs_client=logs_client)153            # lambda was invoked 1 time154            assert 1 == len(events[0]["Records"])155        # might take some time against AWS156        retry(assert_events, sleep=3, retries=10)157        # disable event source mapping158        lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159        table.put_item(Item=items[1])160        events = get_lambda_log_events(function_name, logs_client=logs_client)161        # lambda no longer invoked, still have 1 event162        assert 1 == len(events[0]["Records"])163    # TODO invalid test against AWS, this behavior just is not correct164    def test_deletion_event_source_mapping_with_dynamodb(165        self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166    ):167        function_name = f"lambda_func-{short_uid()}"168        ddb_table = f"ddb_table-{short_uid()}"169        create_lambda_function(170            func_name=function_name,171            handler_file=TEST_LAMBDA_PYTHON_ECHO,172            runtime=LAMBDA_RUNTIME_PYTHON36,173            role=lambda_su_role,174        )175        latest_stream_arn = aws_stack.create_dynamodb_table(176            table_name=ddb_table,177            partition_key="id",178            client=dynamodb_client,179            stream_view_type="NEW_IMAGE",180        )["TableDescription"]["LatestStreamArn"]181        lambda_client.create_event_source_mapping(182            FunctionName=function_name,183            EventSourceArn=latest_stream_arn,184            StartingPosition="TRIM_HORIZON",185        )186        def wait_for_table_created():187            return (188                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189                == "ACTIVE"190            )191        assert poll_condition(wait_for_table_created, timeout=30)192        dynamodb_client.delete_table(TableName=ddb_table)193        result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194        assert 1 == len(result["EventSourceMappings"])195    def test_event_source_mapping_with_sqs(196        self,197        create_lambda_function,198        lambda_client,199        sqs_client,200        sqs_create_queue,201        sqs_queue_arn,202        logs_client,203        lambda_su_role,204    ):205        function_name = f"lambda_func-{short_uid()}"206        queue_name_1 = f"queue-{short_uid()}-1"207        create_lambda_function(208            func_name=function_name,209            handler_file=TEST_LAMBDA_PYTHON_ECHO,210            runtime=LAMBDA_RUNTIME_PYTHON36,211            role=lambda_su_role,212        )213        queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214        queue_arn_1 = sqs_queue_arn(queue_url_1)215        lambda_client.create_event_source_mapping(216            EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217        )218        sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219        def assert_lambda_log_events():220            events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221            # lambda was invoked 1 time222            assert 1 == len(events[0]["Records"])223        retry(assert_lambda_log_events, sleep_before=3, retries=30)224        rs = sqs_client.receive_message(QueueUrl=queue_url_1)225        assert rs.get("Messages") is None226    def test_create_kinesis_event_source_mapping(227        self,228        create_lambda_function,229        lambda_client,230        kinesis_client,231        kinesis_create_stream,232        lambda_su_role,233        wait_for_stream_ready,234        logs_client,235    ):236        function_name = f"lambda_func-{short_uid()}"237        stream_name = f"test-foobar-{short_uid()}"238        create_lambda_function(239            func_name=function_name,240            handler_file=TEST_LAMBDA_PYTHON_ECHO,241            runtime=LAMBDA_RUNTIME_PYTHON36,242            role=lambda_su_role,243        )244        kinesis_create_stream(StreamName=stream_name, ShardCount=1)245        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246            "StreamARN"247        ]248        # only valid against AWS / new provider (once implemented)249        if not is_old_provider():250            with pytest.raises(ClientError) as e:251                lambda_client.create_event_source_mapping(252                    EventSourceArn=stream_arn, FunctionName=function_name253                )254            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255        wait_for_stream_ready(stream_name=stream_name)256        lambda_client.create_event_source_mapping(257            EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258        )259        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261        num_events_kinesis = 10262        kinesis_client.put_records(263            Records=[264                {"Data": "{}", "PartitionKey": f"test_{i}"} for i in range(0, num_events_kinesis)265            ],266            StreamName=stream_name,267        )268        def get_lambda_events():269            events = get_lambda_log_events(function_name, logs_client=logs_client)270            assert events271            return events272        events = retry(get_lambda_events, retries=30)273        assert 10 == len(events[0]["Records"])274        assert "eventID" in events[0]["Records"][0]275        assert "eventSourceARN" in events[0]["Records"][0]276        assert "eventSource" in events[0]["Records"][0]277        assert "eventVersion" in events[0]["Records"][0]278        assert "eventName" in events[0]["Records"][0]279        assert "invokeIdentityArn" in events[0]["Records"][0]280        assert "awsRegion" in events[0]["Records"][0]281        assert "kinesis" in events[0]["Records"][0]282    def test_python_lambda_subscribe_sns_topic(283        self,284        create_lambda_function,285        sns_client,286        lambda_su_role,287        sns_topic,288        logs_client,289        lambda_client,290    ):291        function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"292        permission_id = f"test-statement-{short_uid()}"293        lambda_creation_response = create_lambda_function(294            func_name=function_name,295            handler_file=TEST_LAMBDA_PYTHON_ECHO,296            runtime=LAMBDA_RUNTIME_PYTHON36,297            role=lambda_su_role,298        )299        lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]300        topic_arn = sns_topic["Attributes"]["TopicArn"]301        lambda_client.add_permission(302            FunctionName=function_name,303            StatementId=permission_id,304            Action="lambda:InvokeFunction",305            Principal="sns.amazonaws.com",306            SourceArn=topic_arn,307        )308        sns_client.subscribe(309            TopicArn=topic_arn,310            Protocol="lambda",311            Endpoint=lambda_arn,312        )313        subject = "[Subject] Test subject"314        message = "Hello world."315        sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)316        events = retry(317            check_expected_lambda_log_events_length,318            retries=10,319            sleep=1,320            function_name=function_name,321            expected_length=1,322            regex_filter="Records.*Sns",323            logs_client=logs_client,324        )325        notification = events[0]["Records"][0]["Sns"]326        assert "Subject" in notification327        assert subject == notification["Subject"]328class TestLambdaHttpInvocation:329    def test_http_invocation_with_apigw_proxy(self, create_lambda_function):330        lambda_name = f"test_lambda_{short_uid()}"331        lambda_resource = "/api/v1/{proxy+}"332        lambda_path = "/api/v1/hello/world"333        lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path334        lambda_request_context_resource_path = lambda_resource335        # create lambda function336        create_lambda_function(337            func_name=lambda_name,338            handler_file=TEST_LAMBDA_PYTHON,339            libs=TEST_LAMBDA_LIBS,340        )341        # create API Gateway and connect it to the Lambda proxy backend342        lambda_uri = aws_stack.lambda_function_arn(lambda_name)343        target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"344        result = testutil.connect_api_gateway_to_http_with_lambda_proxy(345            "test_gateway2",346            target_uri,347            path=lambda_resource,348            stage_name=TEST_STAGE_NAME,349        )350        api_id = result["id"]351        url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)352        result = safe_requests.post(353            url, data=b"{}", headers={"User-Agent": "python-requests/testing"}354        )355        content = json.loads(result.content)356        assert lambda_path == content["path"]357        assert lambda_resource == content["resource"]358        assert lambda_request_context_path == content["requestContext"]["path"]359        assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]360class TestKinesisSource:361    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)362    def test_kinesis_lambda_parallelism(363        self,364        lambda_client,365        kinesis_client,366        create_lambda_function,367        kinesis_create_stream,368        wait_for_stream_ready,369        logs_client,370        lambda_su_role,371    ):372        function_name = f"lambda_func-{short_uid()}"373        stream_name = f"test-foobar-{short_uid()}"374        create_lambda_function(375            handler_file=TEST_LAMBDA_PARALLEL_FILE,376            func_name=function_name,377            runtime=LAMBDA_RUNTIME_PYTHON36,378            role=lambda_su_role,379        )380        kinesis_create_stream(StreamName=stream_name, ShardCount=1)381        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][382            "StreamARN"383        ]384        wait_for_stream_ready(stream_name=stream_name)385        lambda_client.create_event_source_mapping(386            EventSourceArn=stream_arn,387            FunctionName=function_name,388            StartingPosition="TRIM_HORIZON",389            BatchSize=10,390        )391        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)392        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]393        num_events_kinesis = 10394        # assure async call395        start = time.perf_counter()396        kinesis_client.put_records(397            Records=[398                {"Data": '{"batch": 0}', "PartitionKey": f"test_{i}"}399                for i in range(0, num_events_kinesis)400            ],401            StreamName=stream_name,402        )403        assert (time.perf_counter() - start) < 1  # this should not take more than a second404        kinesis_client.put_records(405            Records=[406                {"Data": '{"batch": 1}', "PartitionKey": f"test_{i}"}407                for i in range(0, num_events_kinesis)408            ],409            StreamName=stream_name,410        )411        def get_events():412            events = get_lambda_log_events(413                function_name, regex_filter=r"event.*Records", logs_client=logs_client414            )415            assert len(events) == 2416            return events417        events = retry(get_events, retries=30)418        def assertEvent(event, batch_no):419            assert 10 == len(event["event"]["Records"])420            assert "eventID" in event["event"]["Records"][0]421            assert "eventSourceARN" in event["event"]["Records"][0]422            assert "eventSource" in event["event"]["Records"][0]423            assert "eventVersion" in event["event"]["Records"][0]424            assert "eventName" in event["event"]["Records"][0]425            assert "invokeIdentityArn" in event["event"]["Records"][0]426            assert "awsRegion" in event["event"]["Records"][0]427            assert "kinesis" in event["event"]["Records"][0]428            assert {"batch": batch_no} == json.loads(429                base64.b64decode(event["event"]["Records"][0]["kinesis"]["data"]).decode(430                    config.DEFAULT_ENCODING431                )432            )433        assertEvent(events[0], 0)434        assertEvent(events[1], 1)...test_lambda_size_lims.py
Source:test_lambda_size_lims.py  
1import os2from io import BytesIO3import pytest4from botocore.exceptions import ClientError5from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER6from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON377from localstack.utils import testutil8from localstack.utils.common import short_uid9THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))10TEST_LAMBDA_PYTHON_ECHO = os.path.join(THIS_FOLDER, "functions", "lambda_echo.py")11FUNCTION_MAX_UNZIPPED_SIZE = 26214400012def generate_sized_python_str(size):13    """Generate a text file of the specified size."""14    with open(TEST_LAMBDA_PYTHON_ECHO, "r") as f:15        py_str = f.read()16    py_str += "#" * (size - len(py_str))17    return py_str18class TestLambdaSizeLimits:19    @pytest.mark.aws_validated20    def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):21        function_name = f"test_lambda_{short_uid()}"22        bucket_key = "test_lambda.zip"23        code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE)24        # upload zip file to S325        zip_file = testutil.create_lambda_archive(26            code_str, get_content=True, runtime=LAMBDA_RUNTIME_PYTHON3727        )28        s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)29        # create lambda function30        with pytest.raises(ClientError) as e:31            lambda_client.create_function(32                FunctionName=function_name,33                Runtime=LAMBDA_RUNTIME_PYTHON37,34                Handler=LAMBDA_DEFAULT_HANDLER,35                Role=lambda_su_role,36                Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},37                Timeout=10,38            )39        assert e40        assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 40041        e.match(42            r"An error occurred \(InvalidParameterValueException\) when calling the CreateFunction operation\: Unzipped size must be smaller than [0-9]* bytes"43        )44    @pytest.mark.aws_validated45    def test_large_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):46        function_name = f"test_lambda_{short_uid()}"47        bucket_key = "test_lambda.zip"48        code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE - 1000)49        # upload zip file to S350        zip_file = testutil.create_lambda_archive(51            code_str, get_content=True, runtime=LAMBDA_RUNTIME_PYTHON3752        )53        s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)54        # create lambda function55        try:56            result = lambda_client.create_function(57                FunctionName=function_name,58                Runtime=LAMBDA_RUNTIME_PYTHON37,59                Handler=LAMBDA_DEFAULT_HANDLER,60                Role=lambda_su_role,61                Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},62                Timeout=10,63            )64            function_arn = result["FunctionArn"]65            assert testutil.response_arn_matches_partition(lambda_client, function_arn)66        finally:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
