Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py  
...96                EventSourceArn=queue_arn_1, FunctionName=function_name97            )98            uuid = rs["UUID"]99            assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]100            _await_event_source_mapping_enabled(lambda_client, uuid)101            with pytest.raises(ClientError) as e:102                # Update batch size with invalid value103                lambda_client.update_event_source_mapping(104                    UUID=uuid,105                    FunctionName=function_name,106                    BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,107                )108            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)109            queue_url_2 = sqs_create_queue(QueueName=queue_name_2)110            queue_arn_2 = sqs_queue_arn(queue_url_2)111            with pytest.raises(ClientError) as e:112                # Create event source mapping with invalid batch size value113                lambda_client.create_event_source_mapping(114                    EventSourceArn=queue_arn_2,115                    FunctionName=function_name,116                    BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,117                )118            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)119        finally:120            lambda_client.delete_event_source_mapping(UUID=uuid)121    def test_sqs_event_source_mapping(122        self,123        create_lambda_function,124        lambda_client,125        sqs_client,126        sqs_create_queue,127        sqs_queue_arn,128        logs_client,129        lambda_su_role,130    ):131        function_name = f"lambda_func-{short_uid()}"132        queue_name_1 = f"queue-{short_uid()}-1"133        try:134            create_lambda_function(135                func_name=function_name,136                handler_file=TEST_LAMBDA_PYTHON_ECHO,137                runtime=LAMBDA_RUNTIME_PYTHON36,138                role=lambda_su_role,139            )140            queue_url_1 = sqs_create_queue(QueueName=queue_name_1)141            queue_arn_1 = sqs_queue_arn(queue_url_1)142            mapping_uuid = lambda_client.create_event_source_mapping(143                EventSourceArn=queue_arn_1,144                FunctionName=function_name,145                MaximumBatchingWindowInSeconds=1,146            )["UUID"]147            _await_event_source_mapping_enabled(lambda_client, mapping_uuid)148            sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))149            retry(150                check_expected_lambda_log_events_length,151                retries=10,152                sleep=1,153                function_name=function_name,154                expected_length=1,155                logs_client=logs_client,156            )157            rs = sqs_client.receive_message(QueueUrl=queue_url_1)158            assert rs.get("Messages") is None159        finally:160            lambda_client.delete_event_source_mapping(UUID=mapping_uuid)161class TestDynamoDBEventSourceMapping:162    def test_dynamodb_event_source_mapping(163        self,164        lambda_client,165        create_lambda_function,166        create_iam_role_with_policy,167        dynamodb_client,168        dynamodb_create_table,169        logs_client,170        check_lambda_logs,171    ):172        def check_logs():173            expected = [174                r'.*"Records":.*',175                r'.*"dynamodb": {(.*)}.*',176                r'.*"eventSource": ("aws:dynamodb").*',177                r'.*"eventName": ("INSERT").*',178                r'.*"Keys": {0}.*'.format(json.dumps(db_item)),179            ]180            check_lambda_logs(function_name, expected_lines=expected)181        function_name = f"lambda_func-{short_uid()}"182        role = f"test-lambda-role-{short_uid()}"183        policy_name = f"test-lambda-policy-{short_uid()}"184        table_name = f"test-table-{short_uid()}"185        partition_key = "my_partition_key"186        db_item = {partition_key: {"S": "hello world"}}187        try:188            role_arn = create_iam_role_with_policy(189                RoleName=role,190                PolicyName=policy_name,191                RoleDefinition=lambda_role,192                PolicyDefinition=s3_lambda_permission,193            )194            create_lambda_function(195                handler_file=TEST_LAMBDA_PYTHON_ECHO,196                func_name=function_name,197                runtime=LAMBDA_RUNTIME_PYTHON37,198                role=role_arn,199            )200            dynamodb_create_table(table_name=table_name, partition_key=partition_key)201            _await_dynamodb_table_active(dynamodb_client, table_name)202            stream_arn = dynamodb_client.update_table(203                TableName=table_name,204                StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},205            )["TableDescription"]["LatestStreamArn"]206            event_source_uuid = lambda_client.create_event_source_mapping(207                FunctionName=function_name,208                BatchSize=1,209                StartingPosition="LATEST",210                EventSourceArn=stream_arn,211                MaximumBatchingWindowInSeconds=1,212                MaximumRetryAttempts=1,213            )["UUID"]214            _await_event_source_mapping_enabled(lambda_client, event_source_uuid)215            dynamodb_client.put_item(TableName=table_name, Item=db_item)216            retry(check_logs, retries=50, sleep=2)217        finally:218            lambda_client.delete_event_source_mapping(UUID=event_source_uuid)219    def test_disabled_dynamodb_event_source_mapping(220        self,221        create_lambda_function,222        lambda_client,223        dynamodb_resource,224        dynamodb_client,225        dynamodb_create_table,226        logs_client,227        dynamodbstreams_client,228        lambda_su_role,229    ):230        def is_stream_enabled():231            return (232                dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[233                    "StreamDescription"234                ]["StreamStatus"]235                == "ENABLED"236            )237        function_name = f"lambda_func-{short_uid()}"238        ddb_table = f"ddb_table-{short_uid()}"239        items = [240            {"id": short_uid(), "data": "data1"},241            {"id": short_uid(), "data": "data2"},242        ]243        try:244            create_lambda_function(245                func_name=function_name,246                handler_file=TEST_LAMBDA_PYTHON_ECHO,247                runtime=LAMBDA_RUNTIME_PYTHON36,248                role=lambda_su_role,249            )250            latest_stream_arn = dynamodb_create_table(251                table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"252            )["TableDescription"]["LatestStreamArn"]253            rs = lambda_client.create_event_source_mapping(254                FunctionName=function_name,255                EventSourceArn=latest_stream_arn,256                StartingPosition="TRIM_HORIZON",257                MaximumBatchingWindowInSeconds=1,258            )259            uuid = rs["UUID"]260            _await_event_source_mapping_enabled(lambda_client, uuid)261            assert poll_condition(is_stream_enabled, timeout=30)262            table = dynamodb_resource.Table(ddb_table)263            table.put_item(Item=items[0])264            # Lambda should be invoked 1 time265            retry(266                check_expected_lambda_log_events_length,267                retries=10,268                sleep=3,269                function_name=function_name,270                expected_length=1,271                logs_client=logs_client,272            )273            # disable event source mapping274            lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)275            time.sleep(2)276            table.put_item(Item=items[1])277            # lambda no longer invoked, still have 1 event278            check_expected_lambda_log_events_length(279                expected_length=1, function_name=function_name, logs_client=logs_client280            )281        finally:282            lambda_client.delete_event_source_mapping(UUID=uuid)283    # TODO invalid test against AWS, this behavior just is not correct284    def test_deletion_event_source_mapping_with_dynamodb(285        self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role286    ):287        function_name = f"lambda_func-{short_uid()}"288        ddb_table = f"ddb_table-{short_uid()}"289        create_lambda_function(290            func_name=function_name,291            handler_file=TEST_LAMBDA_PYTHON_ECHO,292            runtime=LAMBDA_RUNTIME_PYTHON36,293            role=lambda_su_role,294        )295        latest_stream_arn = aws_stack.create_dynamodb_table(296            table_name=ddb_table,297            partition_key="id",298            client=dynamodb_client,299            stream_view_type="NEW_IMAGE",300        )["TableDescription"]["LatestStreamArn"]301        lambda_client.create_event_source_mapping(302            FunctionName=function_name,303            EventSourceArn=latest_stream_arn,304            StartingPosition="TRIM_HORIZON",305        )306        _await_dynamodb_table_active(dynamodb_client, ddb_table)307        dynamodb_client.delete_table(TableName=ddb_table)308        result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)309        assert 1 == len(result["EventSourceMappings"])310    def test_dynamodb_event_source_mapping_with_on_failure_destination_config(311        self,312        lambda_client,313        create_lambda_function,314        sqs_client,315        sqs_queue_arn,316        sqs_create_queue,317        create_iam_role_with_policy,318        dynamodb_client,319        dynamodb_create_table,320    ):321        function_name = f"lambda_func-{short_uid()}"322        role = f"test-lambda-role-{short_uid()}"323        policy_name = f"test-lambda-policy-{short_uid()}"324        table_name = f"test-table-{short_uid()}"325        partition_key = "my_partition_key"326        item = {partition_key: {"S": "hello world"}}327        try:328            role_arn = create_iam_role_with_policy(329                RoleName=role,330                PolicyName=policy_name,331                RoleDefinition=lambda_role,332                PolicyDefinition=s3_lambda_permission,333            )334            create_lambda_function(335                handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,336                func_name=function_name,337                runtime=LAMBDA_RUNTIME_PYTHON37,338                role=role_arn,339            )340            dynamodb_create_table(table_name=table_name, partition_key=partition_key)341            _await_dynamodb_table_active(dynamodb_client, table_name)342            result = dynamodb_client.update_table(343                TableName=table_name,344                StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},345            )346            stream_arn = result["TableDescription"]["LatestStreamArn"]347            destination_queue = sqs_create_queue()348            queue_failure_event_source_mapping_arn = sqs_queue_arn(destination_queue)349            destination_config = {350                "OnFailure": {"Destination": queue_failure_event_source_mapping_arn}351            }352            result = lambda_client.create_event_source_mapping(353                FunctionName=function_name,354                BatchSize=1,355                StartingPosition="LATEST",356                EventSourceArn=stream_arn,357                MaximumBatchingWindowInSeconds=1,358                MaximumRetryAttempts=1,359                DestinationConfig=destination_config,360            )361            event_source_mapping_uuid = result["UUID"]362            _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)363            dynamodb_client.put_item(TableName=table_name, Item=item)364            def verify_failure_received():365                res = sqs_client.receive_message(QueueUrl=destination_queue)366                msg = res["Messages"][0]367                body = json.loads(msg["Body"])368                assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"369                assert body["DDBStreamBatchInfo"]["batchSize"] == 1370                assert body["DDBStreamBatchInfo"]["streamArn"] in stream_arn371            retry(verify_failure_received, retries=5, sleep=5, sleep_before=5)372        finally:373            lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)374class TestLambdaHttpInvocation:375    def test_http_invocation_with_apigw_proxy(self, create_lambda_function):376        lambda_name = f"test_lambda_{short_uid()}"377        lambda_resource = "/api/v1/{proxy+}"378        lambda_path = "/api/v1/hello/world"379        lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path380        lambda_request_context_resource_path = lambda_resource381        create_lambda_function(382            func_name=lambda_name,383            handler_file=TEST_LAMBDA_PYTHON,384            libs=TEST_LAMBDA_LIBS,385        )386        # create API Gateway and connect it to the Lambda proxy backend387        lambda_uri = aws_stack.lambda_function_arn(lambda_name)388        target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"389        result = testutil.connect_api_gateway_to_http_with_lambda_proxy(390            "test_gateway2",391            target_uri,392            path=lambda_resource,393            stage_name=TEST_STAGE_NAME,394        )395        api_id = result["id"]396        url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)397        result = safe_requests.post(398            url, data=b"{}", headers={"User-Agent": "python-requests/testing"}399        )400        content = json.loads(result.content)401        assert lambda_path == content["path"]402        assert lambda_resource == content["resource"]403        assert lambda_request_context_path == content["requestContext"]["path"]404        assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]405class TestKinesisSource:406    def test_create_kinesis_event_source_mapping(407        self,408        create_lambda_function,409        lambda_client,410        kinesis_client,411        kinesis_create_stream,412        lambda_su_role,413        wait_for_stream_ready,414        logs_client,415    ):416        function_name = f"lambda_func-{short_uid()}"417        stream_name = f"test-foobar-{short_uid()}"418        record_data = "hello"419        num_events_kinesis = 10420        try:421            create_lambda_function(422                func_name=function_name,423                handler_file=TEST_LAMBDA_PYTHON_ECHO,424                runtime=LAMBDA_RUNTIME_PYTHON36,425                role=lambda_su_role,426            )427            kinesis_create_stream(StreamName=stream_name, ShardCount=1)428            wait_for_stream_ready(stream_name=stream_name)429            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)430            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1431            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[432                "StreamDescription"433            ]["StreamARN"]434            uuid = lambda_client.create_event_source_mapping(435                EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="LATEST"436            )["UUID"]437            _await_event_source_mapping_enabled(lambda_client, uuid)438            kinesis_client.put_records(439                Records=[440                    {"Data": record_data, "PartitionKey": f"test_{i}"}441                    for i in range(0, num_events_kinesis)442                ],443                StreamName=stream_name,444            )445            events = _get_lambda_invocation_events(446                logs_client, function_name, expected_num_events=1447            )448            records = events[0]["Records"]449            assert len(records) == num_events_kinesis450            for record in records:451                assert "eventID" in record452                assert "eventSourceARN" in record453                assert "eventSource" in record454                assert "eventVersion" in record455                assert "eventName" in record456                assert "invokeIdentityArn" in record457                assert "awsRegion" in record458                assert "kinesis" in record459                actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460                assert actual_record_data == record_data461        finally:462            lambda_client.delete_event_source_mapping(UUID=uuid)463    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464    def test_kinesis_event_source_mapping_with_async_invocation(465        self,466        lambda_client,467        kinesis_client,468        create_lambda_function,469        kinesis_create_stream,470        wait_for_stream_ready,471        logs_client,472        lambda_su_role,473    ):474        # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475        # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476        # appearance that the function was never invoked.477        try:478            function_name = f"lambda_func-{short_uid()}"479            stream_name = f"test-foobar-{short_uid()}"480            num_records_per_batch = 10481            num_batches = 2482            create_lambda_function(483                handler_file=TEST_LAMBDA_PARALLEL_FILE,484                func_name=function_name,485                runtime=LAMBDA_RUNTIME_PYTHON36,486                role=lambda_su_role,487            )488            kinesis_create_stream(StreamName=stream_name, ShardCount=1)489            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490                "StreamDescription"491            ]["StreamARN"]492            wait_for_stream_ready(stream_name=stream_name)493            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495            uuid = lambda_client.create_event_source_mapping(496                EventSourceArn=stream_arn,497                FunctionName=function_name,498                StartingPosition="LATEST",499                BatchSize=num_records_per_batch,500            )["UUID"]501            _await_event_source_mapping_enabled(lambda_client, uuid)502            for i in range(num_batches):503                start = time.perf_counter()504                kinesis_client.put_records(505                    Records=[506                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507                        for j in range(0, num_records_per_batch)508                    ],509                    StreamName=stream_name,510                )511                assert (time.perf_counter() - start) < 1  # this should not take more than a second512            invocation_events = _get_lambda_invocation_events(513                logs_client, function_name, expected_num_events=num_batches514            )515            for i in range(num_batches):516                event = invocation_events[i]517                assert len(event["event"]["Records"]) == num_records_per_batch518                actual_record_ids = []519                for record in event["event"]["Records"]:520                    assert "eventID" in record521                    assert "eventSourceARN" in record522                    assert "eventSource" in record523                    assert "eventVersion" in record524                    assert "eventName" in record525                    assert "invokeIdentityArn" in record526                    assert "awsRegion" in record527                    assert "kinesis" in record528                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529                    actual_record_id = json.loads(record_data)["record_id"]530                    actual_record_ids.append(actual_record_id)531                actual_record_ids.sort()532                assert actual_record_ids == [i for i in range(num_records_per_batch)]533            assert (534                invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535            ) > 5536        finally:537            lambda_client.delete_event_source_mapping(UUID=uuid)538    def test_kinesis_event_source_trim_horizon(539        self,540        lambda_client,541        kinesis_client,542        create_lambda_function,543        kinesis_create_stream,544        wait_for_stream_ready,545        logs_client,546        lambda_su_role,547    ):548        function_name = f"lambda_func-{short_uid()}"549        stream_name = f"test-foobar-{short_uid()}"550        num_records_per_batch = 10551        num_batches = 3552        try:553            create_lambda_function(554                handler_file=TEST_LAMBDA_PARALLEL_FILE,555                func_name=function_name,556                runtime=LAMBDA_RUNTIME_PYTHON36,557                role=lambda_su_role,558            )559            kinesis_create_stream(StreamName=stream_name, ShardCount=1)560            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561                "StreamDescription"562            ]["StreamARN"]563            wait_for_stream_ready(stream_name=stream_name)564            stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565            assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566            # insert some records before event source mapping created567            for i in range(num_batches - 1):568                kinesis_client.put_records(569                    Records=[570                        {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571                        for j in range(0, num_records_per_batch)572                    ],573                    StreamName=stream_name,574                )575            uuid = lambda_client.create_event_source_mapping(576                EventSourceArn=stream_arn,577                FunctionName=function_name,578                StartingPosition="TRIM_HORIZON",579                BatchSize=num_records_per_batch,580            )["UUID"]581            # insert some more records582            kinesis_client.put_records(583                Records=[584                    {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585                    for i in range(0, num_records_per_batch)586                ],587                StreamName=stream_name,588            )589            invocation_events = _get_lambda_invocation_events(590                logs_client, function_name, expected_num_events=num_batches591            )592            for i in range(num_batches):593                event = invocation_events[i]594                assert len(event["event"]["Records"]) == num_records_per_batch595                actual_record_ids = []596                for record in event["event"]["Records"]:597                    assert "eventID" in record598                    assert "eventSourceARN" in record599                    assert "eventSource" in record600                    assert "eventVersion" in record601                    assert "eventName" in record602                    assert "invokeIdentityArn" in record603                    assert "awsRegion" in record604                    assert "kinesis" in record605                    record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606                    actual_record_id = json.loads(record_data)["record_id"]607                    actual_record_ids.append(actual_record_id)608                actual_record_ids.sort()609                assert actual_record_ids == [i for i in range(num_records_per_batch)]610        finally:611            lambda_client.delete_event_source_mapping(UUID=uuid)612    def test_disable_kinesis_event_source_mapping(613        self,614        lambda_client,615        kinesis_client,616        create_lambda_function,617        kinesis_create_stream,618        wait_for_stream_ready,619        logs_client,620        lambda_su_role,621    ):622        function_name = f"lambda_func-{short_uid()}"623        stream_name = f"test-foobar-{short_uid()}"624        num_records_per_batch = 10625        try:626            create_lambda_function(627                handler_file=TEST_LAMBDA_PYTHON_ECHO,628                func_name=function_name,629                runtime=LAMBDA_RUNTIME_PYTHON36,630                role=lambda_su_role,631            )632            kinesis_create_stream(StreamName=stream_name, ShardCount=1)633            stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634                "StreamDescription"635            ]["StreamARN"]636            wait_for_stream_ready(stream_name=stream_name)637            event_source_uuid = lambda_client.create_event_source_mapping(638                EventSourceArn=stream_arn,639                FunctionName=function_name,640                StartingPosition="LATEST",641                BatchSize=num_records_per_batch,642            )["UUID"]643            _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644            kinesis_client.put_records(645                Records=[646                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647                    for i in range(0, num_records_per_batch)648                ],649                StreamName=stream_name,650            )651            events = _get_lambda_invocation_events(652                logs_client, function_name, expected_num_events=1653            )654            assert len(events) == 1655            lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656            time.sleep(2)657            kinesis_client.put_records(658                Records=[659                    {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660                    for i in range(0, num_records_per_batch)661                ],662                StreamName=stream_name,663            )664            time.sleep(7)  # wait for records to pass through stream665            # should still only get the first batch from before mapping was disabled666            events = _get_lambda_invocation_events(667                logs_client, function_name, expected_num_events=1668            )669            assert len(events) == 1670        finally:671            lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672    def test_kinesis_event_source_mapping_with_on_failure_destination_config(673        self,674        lambda_client,675        create_lambda_function,676        sqs_client,677        sqs_queue_arn,678        sqs_create_queue,679        create_iam_role_with_policy,680        kinesis_client,681        wait_for_stream_ready,682    ):683        try:684            function_name = f"lambda_func-{short_uid()}"685            role = f"test-lambda-role-{short_uid()}"686            policy_name = f"test-lambda-policy-{short_uid()}"687            kinesis_name = f"test-kinesis-{short_uid()}"688            role_arn = create_iam_role_with_policy(689                RoleName=role,690                PolicyName=policy_name,691                RoleDefinition=lambda_role,692                PolicyDefinition=s3_lambda_permission,693            )694            create_lambda_function(695                handler_file=TEST_LAMBDA_PYTHON,696                func_name=function_name,697                runtime=LAMBDA_RUNTIME_PYTHON37,698                role=role_arn,699            )700            kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702            kinesis_arn = result["StreamARN"]703            wait_for_stream_ready(stream_name=kinesis_name)704            queue_event_source_mapping = sqs_create_queue()705            destination_queue = sqs_queue_arn(queue_event_source_mapping)706            destination_config = {"OnFailure": {"Destination": destination_queue}}707            message = {708                "input": "hello",709                "value": "world",710                lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711            }712            result = lambda_client.create_event_source_mapping(713                FunctionName=function_name,714                BatchSize=1,715                StartingPosition="LATEST",716                EventSourceArn=kinesis_arn,717                MaximumBatchingWindowInSeconds=1,718                MaximumRetryAttempts=1,719                DestinationConfig=destination_config,720            )721            event_source_mapping_uuid = result["UUID"]722            _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723            kinesis_client.put_record(724                StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725            )726            def verify_failure_received():727                result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728                msg = result["Messages"][0]729                body = json.loads(msg["Body"])730                assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731                assert body["KinesisBatchInfo"]["batchSize"] == 1732                assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733            retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734        finally:735            kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736            lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738    def assert_mapping_enabled():739        assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740    retry(assert_mapping_enabled, sleep_before=2, retries=retries)741def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):742    def assert_table_active():743        assert (744            dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"745        )746    retry(assert_table_active, retries=retries, sleep_before=2)747def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):748    def get_events():749        events = get_lambda_log_events(function_name, logs_client=logs_client)750        assert len(events) == expected_num_events751        return events...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
