Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py  
...441                    for i in range(0, num_events_kinesis)442                ],443                StreamName=stream_name,444            )445            events = _get_lambda_invocation_events(446                logs_client, function_name, expected_num_events=1447            )448            records = events[0]["Records"]449            assert len(records) == num_events_kinesis450            for record in records:451                assert "eventID" in record452                assert "eventSourceARN" in record453                assert "eventSource" in record454                assert "eventVersion" in record455                assert "eventName" in record456                assert "invokeIdentityArn" in record457                assert "awsRegion" in record458                assert "kinesis" in record459                actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460                assert actual_record_data == record_data461        finally:462            lambda_client.delete_event_source_mapping(UUID=uuid)463    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464    def test_kinesis_event_source_mapping_with_async_invocation(465        self,466        lambda_client,467        kinesis_client,468        create_lambda_function,469        kinesis_create_stream,470        wait_for_stream_ready,471        logs_client,472        lambda_su_role,473    ):474        # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475        # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476        # appearance that the function was never invoked.477        try:478            function_name = f"lambda_func-{short_uid()}"479            stream_name = f"test-foobar-{short_uid()}"480            num_records_per_batch = 10481            num_batches = 2482            create_lambda_function(483                handler_file=TEST_LAMBDA_PARALLEL_FILE,484                func_name=function_name,485                runtime=LAMBDA_RUNTIME_PYTHON36,486                role=lambda_su_role,487            )488            kinesis_create_stream(StreamName=stream_name, ShardCount=1)489            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490                "StreamDescription"491            ]["StreamARN"]492            wait_for_stream_ready(stream_name=stream_name)493            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495            uuid = lambda_client.create_event_source_mapping(496                EventSourceArn=stream_arn,497                FunctionName=function_name,498                StartingPosition="LATEST",499                BatchSize=num_records_per_batch,500            )["UUID"]501            _await_event_source_mapping_enabled(lambda_client, uuid)502            for i in range(num_batches):503                start = time.perf_counter()504                kinesis_client.put_records(505                    Records=[506                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507                        for j in range(0, num_records_per_batch)508                    ],509                    StreamName=stream_name,510                )511                assert (time.perf_counter() - start) < 1  # this should not take more than a second512            invocation_events = _get_lambda_invocation_events(513                logs_client, function_name, expected_num_events=num_batches514            )515            for i in range(num_batches):516                event = invocation_events[i]517                assert len(event["event"]["Records"]) == num_records_per_batch518                actual_record_ids = []519                for record in event["event"]["Records"]:520                    assert "eventID" in record521                    assert "eventSourceARN" in record522                    assert "eventSource" in record523                    assert "eventVersion" in record524                    assert "eventName" in record525                    assert "invokeIdentityArn" in record526                    assert "awsRegion" in record527                    assert "kinesis" in record528                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529                    actual_record_id = json.loads(record_data)["record_id"]530                    actual_record_ids.append(actual_record_id)531                actual_record_ids.sort()532                assert actual_record_ids == [i for i in range(num_records_per_batch)]533            assert (534                invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535            ) > 5536        finally:537            lambda_client.delete_event_source_mapping(UUID=uuid)538    def test_kinesis_event_source_trim_horizon(539        self,540        lambda_client,541        kinesis_client,542        create_lambda_function,543        kinesis_create_stream,544        wait_for_stream_ready,545        logs_client,546        lambda_su_role,547    ):548        function_name = f"lambda_func-{short_uid()}"549        stream_name = f"test-foobar-{short_uid()}"550        num_records_per_batch = 10551        num_batches = 3552        try:553            create_lambda_function(554                handler_file=TEST_LAMBDA_PARALLEL_FILE,555                func_name=function_name,556                runtime=LAMBDA_RUNTIME_PYTHON36,557                role=lambda_su_role,558            )559            kinesis_create_stream(StreamName=stream_name, ShardCount=1)560            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561                "StreamDescription"562            ]["StreamARN"]563            wait_for_stream_ready(stream_name=stream_name)564            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566            # insert some records before event source mapping created567            for i in range(num_batches - 1):568                kinesis_client.put_records(569                    Records=[570                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571                        for j in range(0, num_records_per_batch)572                    ],573                    StreamName=stream_name,574                )575            uuid = lambda_client.create_event_source_mapping(576                EventSourceArn=stream_arn,577                FunctionName=function_name,578                StartingPosition="TRIM_HORIZON",579                BatchSize=num_records_per_batch,580            )["UUID"]581            # insert some more records582            kinesis_client.put_records(583                Records=[584                    {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585                    for i in range(0, num_records_per_batch)586                ],587                StreamName=stream_name,588            )589            invocation_events = _get_lambda_invocation_events(590                logs_client, function_name, expected_num_events=num_batches591            )592            for i in range(num_batches):593                event = invocation_events[i]594                assert len(event["event"]["Records"]) == num_records_per_batch595                actual_record_ids = []596                for record in event["event"]["Records"]:597                    assert "eventID" in record598                    assert "eventSourceARN" in record599                    assert "eventSource" in record600                    assert "eventVersion" in record601                    assert "eventName" in record602                    assert "invokeIdentityArn" in record603                    assert "awsRegion" in record604                    assert "kinesis" in record605                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606                    actual_record_id = json.loads(record_data)["record_id"]607                    actual_record_ids.append(actual_record_id)608                actual_record_ids.sort()609                assert actual_record_ids == [i for i in range(num_records_per_batch)]610        finally:611            lambda_client.delete_event_source_mapping(UUID=uuid)612    def test_disable_kinesis_event_source_mapping(613        self,614        lambda_client,615        kinesis_client,616        create_lambda_function,617        kinesis_create_stream,618        wait_for_stream_ready,619        logs_client,620        lambda_su_role,621    ):622        function_name = f"lambda_func-{short_uid()}"623        stream_name = f"test-foobar-{short_uid()}"624        num_records_per_batch = 10625        try:626            create_lambda_function(627                handler_file=TEST_LAMBDA_PYTHON_ECHO,628                func_name=function_name,629                runtime=LAMBDA_RUNTIME_PYTHON36,630                role=lambda_su_role,631            )632            kinesis_create_stream(StreamName=stream_name, ShardCount=1)633            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634                "StreamDescription"635            ]["StreamARN"]636            wait_for_stream_ready(stream_name=stream_name)637            event_source_uuid = lambda_client.create_event_source_mapping(638                EventSourceArn=stream_arn,639                FunctionName=function_name,640                StartingPosition="LATEST",641                BatchSize=num_records_per_batch,642            )["UUID"]643            _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644            kinesis_client.put_records(645                Records=[646                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647                    for i in range(0, num_records_per_batch)648                ],649                StreamName=stream_name,650            )651            events = _get_lambda_invocation_events(652                logs_client, function_name, expected_num_events=1653            )654            assert len(events) == 1655            lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656            time.sleep(2)657            kinesis_client.put_records(658                Records=[659                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660                    for i in range(0, num_records_per_batch)661                ],662                StreamName=stream_name,663            )664            time.sleep(7)  # wait for records to pass through stream665            # should still only get the first batch from before mapping was disabled666            events = _get_lambda_invocation_events(667                logs_client, function_name, expected_num_events=1668            )669            assert len(events) == 1670        finally:671            lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672    def test_kinesis_event_source_mapping_with_on_failure_destination_config(673        self,674        lambda_client,675        create_lambda_function,676        sqs_client,677        sqs_queue_arn,678        sqs_create_queue,679        create_iam_role_with_policy,680        kinesis_client,681        wait_for_stream_ready,682    ):683        try:684            function_name = f"lambda_func-{short_uid()}"685            role = f"test-lambda-role-{short_uid()}"686            policy_name = f"test-lambda-policy-{short_uid()}"687            kinesis_name = f"test-kinesis-{short_uid()}"688            role_arn = create_iam_role_with_policy(689                RoleName=role,690                PolicyName=policy_name,691                RoleDefinition=lambda_role,692                PolicyDefinition=s3_lambda_permission,693            )694            create_lambda_function(695                handler_file=TEST_LAMBDA_PYTHON,696                func_name=function_name,697                runtime=LAMBDA_RUNTIME_PYTHON37,698                role=role_arn,699            )700            kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702            kinesis_arn = result["StreamARN"]703            wait_for_stream_ready(stream_name=kinesis_name)704            queue_event_source_mapping = sqs_create_queue()705            destination_queue = sqs_queue_arn(queue_event_source_mapping)706            destination_config = {"OnFailure": {"Destination": destination_queue}}707            message = {708                "input": "hello",709                "value": "world",710                lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711            }712            result = lambda_client.create_event_source_mapping(713                FunctionName=function_name,714                BatchSize=1,715                StartingPosition="LATEST",716                EventSourceArn=kinesis_arn,717                MaximumBatchingWindowInSeconds=1,718                MaximumRetryAttempts=1,719                DestinationConfig=destination_config,720            )721            event_source_mapping_uuid = result["UUID"]722            _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723            kinesis_client.put_record(724                StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725            )726            def verify_failure_received():727                result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728                msg = result["Messages"][0]729                body = json.loads(msg["Body"])730                assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731                assert body["KinesisBatchInfo"]["batchSize"] == 1732                assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733            retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734        finally:735            kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736            lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738    def assert_mapping_enabled():739        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740    retry(assert_mapping_enabled, sleep_before=2, retries=retries)741def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):742    def assert_table_active():743        assert (744            dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"745        )746    retry(assert_table_active, retries=retries, sleep_before=2)747def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):748    def get_events():749        events = get_lambda_log_events(function_name, logs_client=logs_client)750        assert len(events) == expected_num_events751        return events...test_lambda_integration_kinesis.py
Source:test_lambda_integration_kinesis.py  
...100                    for i in range(0, num_events_kinesis)101                ],102                StreamName=stream_name,103            )104            return _get_lambda_invocation_events(105                logs_client, function_name, expected_num_events=1, retries=5106            )107        # need to retry here in case the LATEST StartingPosition of the event source mapping does not catch records108        events = retry(_send_and_receive_messages, retries=3)109        records = events[0]110        snapshot.match("kinesis_records", records)111    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)112    @pytest.mark.aws_validated113    def test_kinesis_event_source_mapping_with_async_invocation(114        self,115        lambda_client,116        kinesis_client,117        create_lambda_function,118        kinesis_create_stream,119        wait_for_stream_ready,120        logs_client,121        lambda_su_role,122        cleanups,123        snapshot,124    ):125        # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!126        # apparently this particular configuration prevents lambda logs from being extracted properly, giving the127        # appearance that the function was never invoked.128        function_name = f"lambda_func-{short_uid()}"129        stream_name = f"test-foobar-{short_uid()}"130        num_records_per_batch = 10131        num_batches = 2132        create_lambda_function(133            handler_file=TEST_LAMBDA_PARALLEL_FILE,134            func_name=function_name,135            runtime=LAMBDA_RUNTIME_PYTHON39,136            role=lambda_su_role,137        )138        kinesis_create_stream(StreamName=stream_name, ShardCount=1)139        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][140            "StreamARN"141        ]142        wait_for_stream_ready(stream_name=stream_name)143        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)144        assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1145        create_event_source_mapping_response = lambda_client.create_event_source_mapping(146            EventSourceArn=stream_arn,147            FunctionName=function_name,148            StartingPosition="LATEST",149            BatchSize=num_records_per_batch,150        )151        snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)152        uuid = create_event_source_mapping_response["UUID"]153        cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=uuid))154        _await_event_source_mapping_enabled(lambda_client, uuid)155        def _send_and_receive_messages():156            for i in range(num_batches):157                start = time.perf_counter()158                kinesis_client.put_records(159                    Records=[160                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}161                        for j in range(0, num_records_per_batch)162                    ],163                    StreamName=stream_name,164                )165                assert (time.perf_counter() - start) < 1  # this should not take more than a second166            return _get_lambda_invocation_events(167                logs_client, function_name, expected_num_events=num_batches, retries=5168            )169        # need to retry here in case the LATEST StartingPosition of the event source mapping does not catch records170        invocation_events = retry(_send_and_receive_messages, retries=3)171        snapshot.match("invocation_events", invocation_events)172        assert (invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]) > 5173    @pytest.mark.aws_validated174    def test_kinesis_event_source_trim_horizon(175        self,176        lambda_client,177        kinesis_client,178        create_lambda_function,179        kinesis_create_stream,180        wait_for_stream_ready,181        logs_client,182        lambda_su_role,183        cleanups,184        snapshot,185    ):186        function_name = f"lambda_func-{short_uid()}"187        stream_name = f"test-foobar-{short_uid()}"188        num_records_per_batch = 10189        num_batches = 3190        create_lambda_function(191            handler_file=TEST_LAMBDA_PARALLEL_FILE,192            func_name=function_name,193            runtime=LAMBDA_RUNTIME_PYTHON39,194            role=lambda_su_role,195        )196        kinesis_create_stream(StreamName=stream_name, ShardCount=1)197        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][198            "StreamARN"199        ]200        wait_for_stream_ready(stream_name=stream_name)201        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)202        assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1203        # insert some records before event source mapping created204        for i in range(num_batches - 1):205            kinesis_client.put_records(206                Records=[207                    {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}208                    for j in range(0, num_records_per_batch)209                ],210                StreamName=stream_name,211            )212        create_event_source_mapping_response = lambda_client.create_event_source_mapping(213            EventSourceArn=stream_arn,214            FunctionName=function_name,215            StartingPosition="TRIM_HORIZON",216            BatchSize=num_records_per_batch,217        )218        snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)219        uuid = create_event_source_mapping_response["UUID"]220        cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=uuid))221        # insert some more records222        kinesis_client.put_records(223            Records=[224                {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}225                for i in range(0, num_records_per_batch)226            ],227            StreamName=stream_name,228        )229        invocation_events = _get_lambda_invocation_events(230            logs_client, function_name, expected_num_events=num_batches231        )232        snapshot.match("invocation_events", invocation_events)233    @pytest.mark.aws_validated234    def test_disable_kinesis_event_source_mapping(235        self,236        lambda_client,237        kinesis_client,238        create_lambda_function,239        kinesis_create_stream,240        wait_for_stream_ready,241        logs_client,242        lambda_su_role,243        cleanups,244        snapshot,245    ):246        function_name = f"lambda_func-{short_uid()}"247        stream_name = f"test-foobar-{short_uid()}"248        num_records_per_batch = 10249        create_lambda_function(250            handler_file=TEST_LAMBDA_PYTHON_ECHO,251            func_name=function_name,252            runtime=LAMBDA_RUNTIME_PYTHON39,253            role=lambda_su_role,254        )255        kinesis_create_stream(StreamName=stream_name, ShardCount=1)256        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][257            "StreamARN"258        ]259        wait_for_stream_ready(stream_name=stream_name)260        create_event_source_mapping_response = lambda_client.create_event_source_mapping(261            EventSourceArn=stream_arn,262            FunctionName=function_name,263            StartingPosition="LATEST",264            BatchSize=num_records_per_batch,265        )266        snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)267        event_source_uuid = create_event_source_mapping_response["UUID"]268        cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=event_source_uuid))269        _await_event_source_mapping_enabled(lambda_client, event_source_uuid)270        def _send_and_receive_messages():271            kinesis_client.put_records(272                Records=[273                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}274                    for i in range(0, num_records_per_batch)275                ],276                StreamName=stream_name,277            )278            return _get_lambda_invocation_events(279                logs_client, function_name, expected_num_events=1, retries=10280            )281        invocation_events = retry(_send_and_receive_messages, retries=3)282        snapshot.match("invocation_events", invocation_events)283        lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)284        _await_event_source_mapping_state(lambda_client, event_source_uuid, state="Disabled")285        # we need to wait here, so the event source mapping is for sure disabled, sadly the state is no real indication286        if is_aws_cloud():287            time.sleep(60)288        kinesis_client.put_records(289            Records=[290                {"Data": json.dumps({"record_id_disabled": i}), "PartitionKey": "test"}291                for i in range(0, num_records_per_batch)292            ],293            StreamName=stream_name,294        )295        time.sleep(7)  # wait for records to pass through stream296        # should still only get the first batch from before mapping was disabled297        _get_lambda_invocation_events(logs_client, function_name, expected_num_events=1, retries=10)298    @pytest.mark.skip_snapshot_verify(299        paths=[300            "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfFirstRecord",301            "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfLastRecord",302            "$..Messages..Body.KinesisBatchInfo.shardId",303            "$..Messages..Body.KinesisBatchInfo.streamArn",304            "$..Messages..Body.requestContext.approximateInvokeCount",305            "$..Messages..Body.requestContext.functionArn",306            "$..Messages..Body.responseContext.statusCode",307            # destination config arn missing, which leads to those having wrong resource ids308            "$..EventSourceArn",309            "$..FunctionArn",310        ],311        condition=is_old_provider,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
