Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py  
...360            )361            event_source_mapping_uuid = result["UUID"]362            _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)363            dynamodb_client.put_item(TableName=table_name, Item=item)364            def verify_failure_received():365                res = sqs_client.receive_message(QueueUrl=destination_queue)366                msg = res["Messages"][0]367                body = json.loads(msg["Body"])368                assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"369                assert body["DDBStreamBatchInfo"]["batchSize"] == 1370                assert body["DDBStreamBatchInfo"]["streamArn"] in stream_arn371            retry(verify_failure_received, retries=5, sleep=5, sleep_before=5)372        finally:373            lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)374class TestLambdaHttpInvocation:375    def test_http_invocation_with_apigw_proxy(self, create_lambda_function):376        lambda_name = f"test_lambda_{short_uid()}"377        lambda_resource = "/api/v1/{proxy+}"378        lambda_path = "/api/v1/hello/world"379        lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path380        lambda_request_context_resource_path = lambda_resource381        create_lambda_function(382            func_name=lambda_name,383            handler_file=TEST_LAMBDA_PYTHON,384            libs=TEST_LAMBDA_LIBS,385        )386        # create API Gateway and connect it to the Lambda proxy backend387        lambda_uri = aws_stack.lambda_function_arn(lambda_name)388        target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"389        result = testutil.connect_api_gateway_to_http_with_lambda_proxy(390            "test_gateway2",391            target_uri,392            path=lambda_resource,393            stage_name=TEST_STAGE_NAME,394        )395        api_id = result["id"]396        url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)397        result = safe_requests.post(398            url, data=b"{}", headers={"User-Agent": "python-requests/testing"}399        )400        content = json.loads(result.content)401        assert lambda_path == content["path"]402        assert lambda_resource == content["resource"]403        assert lambda_request_context_path == content["requestContext"]["path"]404        assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]405class TestKinesisSource:406    def test_create_kinesis_event_source_mapping(407        self,408        create_lambda_function,409        lambda_client,410        kinesis_client,411        kinesis_create_stream,412        lambda_su_role,413        wait_for_stream_ready,414        logs_client,415    ):416        function_name = f"lambda_func-{short_uid()}"417        stream_name = f"test-foobar-{short_uid()}"418        record_data = "hello"419        num_events_kinesis = 10420        try:421            create_lambda_function(422                func_name=function_name,423                handler_file=TEST_LAMBDA_PYTHON_ECHO,424                runtime=LAMBDA_RUNTIME_PYTHON36,425                role=lambda_su_role,426            )427            kinesis_create_stream(StreamName=stream_name, ShardCount=1)428            wait_for_stream_ready(stream_name=stream_name)429            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)430            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1431            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[432                "StreamDescription"433            ]["StreamARN"]434            uuid = lambda_client.create_event_source_mapping(435                EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="LATEST"436            )["UUID"]437            _await_event_source_mapping_enabled(lambda_client, uuid)438            kinesis_client.put_records(439                Records=[440                    {"Data": record_data, "PartitionKey": f"test_{i}"}441                    for i in range(0, num_events_kinesis)442                ],443                StreamName=stream_name,444            )445            events = _get_lambda_invocation_events(446                logs_client, function_name, expected_num_events=1447            )448            records = events[0]["Records"]449            assert len(records) == num_events_kinesis450            for record in records:451                assert "eventID" in record452                assert "eventSourceARN" in record453                assert "eventSource" in record454                assert "eventVersion" in record455                assert "eventName" in record456                assert "invokeIdentityArn" in record457                assert "awsRegion" in record458                assert "kinesis" in record459                actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460                assert actual_record_data == record_data461        finally:462            lambda_client.delete_event_source_mapping(UUID=uuid)463    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464    def test_kinesis_event_source_mapping_with_async_invocation(465        self,466        lambda_client,467        kinesis_client,468        create_lambda_function,469        kinesis_create_stream,470        wait_for_stream_ready,471        logs_client,472        lambda_su_role,473    ):474        # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475        # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476        # appearance that the function was never invoked.477        try:478            function_name = f"lambda_func-{short_uid()}"479            stream_name = f"test-foobar-{short_uid()}"480            num_records_per_batch = 10481            num_batches = 2482            create_lambda_function(483                handler_file=TEST_LAMBDA_PARALLEL_FILE,484                func_name=function_name,485                runtime=LAMBDA_RUNTIME_PYTHON36,486                role=lambda_su_role,487            )488            kinesis_create_stream(StreamName=stream_name, ShardCount=1)489            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490                "StreamDescription"491            ]["StreamARN"]492            wait_for_stream_ready(stream_name=stream_name)493            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495            uuid = lambda_client.create_event_source_mapping(496                EventSourceArn=stream_arn,497                FunctionName=function_name,498                StartingPosition="LATEST",499                BatchSize=num_records_per_batch,500            )["UUID"]501            _await_event_source_mapping_enabled(lambda_client, uuid)502            for i in range(num_batches):503                start = time.perf_counter()504                kinesis_client.put_records(505                    Records=[506                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507                        for j in range(0, num_records_per_batch)508                    ],509                    StreamName=stream_name,510                )511                assert (time.perf_counter() - start) < 1  # this should not take more than a second512            invocation_events = _get_lambda_invocation_events(513                logs_client, function_name, expected_num_events=num_batches514            )515            for i in range(num_batches):516                event = invocation_events[i]517                assert len(event["event"]["Records"]) == num_records_per_batch518                actual_record_ids = []519                for record in event["event"]["Records"]:520                    assert "eventID" in record521                    assert "eventSourceARN" in record522                    assert "eventSource" in record523                    assert "eventVersion" in record524                    assert "eventName" in record525                    assert "invokeIdentityArn" in record526                    assert "awsRegion" in record527                    assert "kinesis" in record528                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529                    actual_record_id = json.loads(record_data)["record_id"]530                    actual_record_ids.append(actual_record_id)531                actual_record_ids.sort()532                assert actual_record_ids == [i for i in range(num_records_per_batch)]533            assert (534                invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535            ) > 5536        finally:537            lambda_client.delete_event_source_mapping(UUID=uuid)538    def test_kinesis_event_source_trim_horizon(539        self,540        lambda_client,541        kinesis_client,542        create_lambda_function,543        kinesis_create_stream,544        wait_for_stream_ready,545        logs_client,546        lambda_su_role,547    ):548        function_name = f"lambda_func-{short_uid()}"549        stream_name = f"test-foobar-{short_uid()}"550        num_records_per_batch = 10551        num_batches = 3552        try:553            create_lambda_function(554                handler_file=TEST_LAMBDA_PARALLEL_FILE,555                func_name=function_name,556                runtime=LAMBDA_RUNTIME_PYTHON36,557                role=lambda_su_role,558            )559            kinesis_create_stream(StreamName=stream_name, ShardCount=1)560            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561                "StreamDescription"562            ]["StreamARN"]563            wait_for_stream_ready(stream_name=stream_name)564            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566            # insert some records before event source mapping created567            for i in range(num_batches - 1):568                kinesis_client.put_records(569                    Records=[570                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571                        for j in range(0, num_records_per_batch)572                    ],573                    StreamName=stream_name,574                )575            uuid = lambda_client.create_event_source_mapping(576                EventSourceArn=stream_arn,577                FunctionName=function_name,578                StartingPosition="TRIM_HORIZON",579                BatchSize=num_records_per_batch,580            )["UUID"]581            # insert some more records582            kinesis_client.put_records(583                Records=[584                    {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585                    for i in range(0, num_records_per_batch)586                ],587                StreamName=stream_name,588            )589            invocation_events = _get_lambda_invocation_events(590                logs_client, function_name, expected_num_events=num_batches591            )592            for i in range(num_batches):593                event = invocation_events[i]594                assert len(event["event"]["Records"]) == num_records_per_batch595                actual_record_ids = []596                for record in event["event"]["Records"]:597                    assert "eventID" in record598                    assert "eventSourceARN" in record599                    assert "eventSource" in record600                    assert "eventVersion" in record601                    assert "eventName" in record602                    assert "invokeIdentityArn" in record603                    assert "awsRegion" in record604                    assert "kinesis" in record605                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606                    actual_record_id = json.loads(record_data)["record_id"]607                    actual_record_ids.append(actual_record_id)608                actual_record_ids.sort()609                assert actual_record_ids == [i for i in range(num_records_per_batch)]610        finally:611            lambda_client.delete_event_source_mapping(UUID=uuid)612    def test_disable_kinesis_event_source_mapping(613        self,614        lambda_client,615        kinesis_client,616        create_lambda_function,617        kinesis_create_stream,618        wait_for_stream_ready,619        logs_client,620        lambda_su_role,621    ):622        function_name = f"lambda_func-{short_uid()}"623        stream_name = f"test-foobar-{short_uid()}"624        num_records_per_batch = 10625        try:626            create_lambda_function(627                handler_file=TEST_LAMBDA_PYTHON_ECHO,628                func_name=function_name,629                runtime=LAMBDA_RUNTIME_PYTHON36,630                role=lambda_su_role,631            )632            kinesis_create_stream(StreamName=stream_name, ShardCount=1)633            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634                "StreamDescription"635            ]["StreamARN"]636            wait_for_stream_ready(stream_name=stream_name)637            event_source_uuid = lambda_client.create_event_source_mapping(638                EventSourceArn=stream_arn,639                FunctionName=function_name,640                StartingPosition="LATEST",641                BatchSize=num_records_per_batch,642            )["UUID"]643            _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644            kinesis_client.put_records(645                Records=[646                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647                    for i in range(0, num_records_per_batch)648                ],649                StreamName=stream_name,650            )651            events = _get_lambda_invocation_events(652                logs_client, function_name, expected_num_events=1653            )654            assert len(events) == 1655            lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656            time.sleep(2)657            kinesis_client.put_records(658                Records=[659                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660                    for i in range(0, num_records_per_batch)661                ],662                StreamName=stream_name,663            )664            time.sleep(7)  # wait for records to pass through stream665            # should still only get the first batch from before mapping was disabled666            events = _get_lambda_invocation_events(667                logs_client, function_name, expected_num_events=1668            )669            assert len(events) == 1670        finally:671            lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672    def test_kinesis_event_source_mapping_with_on_failure_destination_config(673        self,674        lambda_client,675        create_lambda_function,676        sqs_client,677        sqs_queue_arn,678        sqs_create_queue,679        create_iam_role_with_policy,680        kinesis_client,681        wait_for_stream_ready,682    ):683        try:684            function_name = f"lambda_func-{short_uid()}"685            role = f"test-lambda-role-{short_uid()}"686            policy_name = f"test-lambda-policy-{short_uid()}"687            kinesis_name = f"test-kinesis-{short_uid()}"688            role_arn = create_iam_role_with_policy(689                RoleName=role,690                PolicyName=policy_name,691                RoleDefinition=lambda_role,692                PolicyDefinition=s3_lambda_permission,693            )694            create_lambda_function(695                handler_file=TEST_LAMBDA_PYTHON,696                func_name=function_name,697                runtime=LAMBDA_RUNTIME_PYTHON37,698                role=role_arn,699            )700            kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702            kinesis_arn = result["StreamARN"]703            wait_for_stream_ready(stream_name=kinesis_name)704            queue_event_source_mapping = sqs_create_queue()705            destination_queue = sqs_queue_arn(queue_event_source_mapping)706            destination_config = {"OnFailure": {"Destination": destination_queue}}707            message = {708                "input": "hello",709                "value": "world",710                lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711            }712            result = lambda_client.create_event_source_mapping(713                FunctionName=function_name,714                BatchSize=1,715                StartingPosition="LATEST",716                EventSourceArn=kinesis_arn,717                MaximumBatchingWindowInSeconds=1,718                MaximumRetryAttempts=1,719                DestinationConfig=destination_config,720            )721            event_source_mapping_uuid = result["UUID"]722            _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723            kinesis_client.put_record(724                StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725            )726            def verify_failure_received():727                result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728                msg = result["Messages"][0]729                body = json.loads(msg["Body"])730                assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731                assert body["KinesisBatchInfo"]["batchSize"] == 1732                assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733            retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734        finally:735            kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736            lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738    def assert_mapping_enabled():739        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740    retry(assert_mapping_enabled, sleep_before=2, retries=retries)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
