Best Python code snippet using localstack_python
test_dynamodb.py
Source:test_dynamodb.py  
1# -*- coding: utf-8 -*-2import json3import re4from datetime import datetime5from time import sleep6from typing import Dict7import pytest8from boto3.dynamodb.conditions import Key9from boto3.dynamodb.types import STRING10from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3611from localstack.services.dynamodbstreams.dynamodbstreams_api import get_kinesis_stream_name12from localstack.utils import testutil13from localstack.utils.aws import aws_stack14from localstack.utils.common import json_safe, long_uid, retry, short_uid15from localstack.utils.testutil import check_expected_lambda_log_events_length16from .awslambda.test_lambda import TEST_LAMBDA_PYTHON_ECHO17from .test_kinesis import get_shard_iterator18PARTITION_KEY = "id"19TEST_DDB_TABLE_NAME = "test-ddb-table-1"20TEST_DDB_TABLE_NAME_2 = "test-ddb-table-2"21TEST_DDB_TABLE_NAME_3 = "test-ddb-table-3"22TEST_DDB_TAGS = [23    {"Key": "Name", "Value": "test-table"},24    {"Key": "TestKey", "Value": "true"},25]26@pytest.fixture()27def dynamodb(dynamodb_resource):28    return dynamodb_resource29class TestDynamoDB:30    def test_non_ascii_chars(self, dynamodb):31        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)32        table = dynamodb.Table(TEST_DDB_TABLE_NAME)33        # write some items containing non-ASCII characters34        items = {35            "id1": {PARTITION_KEY: "id1", "data": "foobar123 â"},36            "id2": {PARTITION_KEY: "id2", "data": "foobar123 £"},37            "id3": {PARTITION_KEY: "id3", "data": "foobar123 ¢"},38        }39        for k, item in items.items():40            table.put_item(Item=item)41        for item_id in items.keys():42            item = table.get_item(Key={PARTITION_KEY: item_id})["Item"]43            # need to fix up the JSON and convert str to unicode for Python 244            item1 = json_safe(item)45            item2 = json_safe(items[item_id])46            assert item1 == item247        # clean up48        delete_table(TEST_DDB_TABLE_NAME)49    def test_large_data_download(self, dynamodb):50        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_2, partition_key=PARTITION_KEY)51        table = dynamodb.Table(TEST_DDB_TABLE_NAME_2)52        # Create a large amount of items53        num_items = 2054        for i in range(0, num_items):55            item = {PARTITION_KEY: "id%s" % i, "data1": "foobar123 " * 1000}56            table.put_item(Item=item)57        # Retrieve the items. The data will be transmitted to the client with chunked transfer encoding58        result = table.scan(TableName=TEST_DDB_TABLE_NAME_2)59        assert len(result["Items"]) == num_items60        # clean up61        delete_table(TEST_DDB_TABLE_NAME_2)62    def test_time_to_live(self, dynamodb):63        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_3, partition_key=PARTITION_KEY)64        table = dynamodb.Table(TEST_DDB_TABLE_NAME_3)65        # Insert some items to the table66        items = {67            "id1": {PARTITION_KEY: "id1", "data": "IT IS"},68            "id2": {PARTITION_KEY: "id2", "data": "TIME"},69            "id3": {PARTITION_KEY: "id3", "data": "TO LIVE!"},70        }71        for k, item in items.items():72            table.put_item(Item=item)73        # Describe TTL when still unset74        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)75        assert response.status_code == 20076        assert (77            json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"78        )79        # Enable TTL for given table80        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)81        assert response.status_code == 20082        assert json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]83        # Describe TTL status after being enabled.84        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)85        assert response.status_code == 20086        assert (87            json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED"88        )89        # Disable TTL for given table90        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, False)91        assert response.status_code == 20092        assert not json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]93        # Describe TTL status after being disabled.94        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)95        assert response.status_code == 20096        assert (97            json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"98        )99        # Enable TTL for given table again100        response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)101        assert response.status_code == 200102        assert json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]103        # Describe TTL status after being enabled again.104        response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)105        assert response.status_code == 200106        assert (107            json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED"108        )109        # clean up110        delete_table(TEST_DDB_TABLE_NAME_3)111    def test_list_tags_of_resource(self, dynamodb):112        table_name = "ddb-table-%s" % short_uid()113        dynamodb = aws_stack.create_external_boto_client("dynamodb")114        rs = dynamodb.create_table(115            TableName=table_name,116            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],117            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],118            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},119            Tags=TEST_DDB_TAGS,120        )121        table_arn = rs["TableDescription"]["TableArn"]122        rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)123        assert rs["Tags"] == TEST_DDB_TAGS124        dynamodb.tag_resource(ResourceArn=table_arn, Tags=[{"Key": "NewKey", "Value": "TestValue"}])125        rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)126        assert len(rs["Tags"]) == len(TEST_DDB_TAGS) + 1127        tags = {tag["Key"]: tag["Value"] for tag in rs["Tags"]}128        assert "NewKey" in tags.keys()129        assert tags["NewKey"] == "TestValue"130        dynamodb.untag_resource(ResourceArn=table_arn, TagKeys=["Name", "NewKey"])131        rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)132        tags = {tag["Key"]: tag["Value"] for tag in rs["Tags"]}133        assert "Name" not in tags.keys()134        assert "NewKey" not in tags.keys()135        delete_table(table_name)136    def test_stream_spec_and_region_replacement(self, dynamodb):137        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")138        kinesis = aws_stack.create_external_boto_client("kinesis")139        table_name = f"ddb-{short_uid()}"140        aws_stack.create_dynamodb_table(141            table_name,142            partition_key=PARTITION_KEY,143            stream_view_type="NEW_AND_OLD_IMAGES",144        )145        table = dynamodb.Table(table_name)146        # assert ARN formats147        expected_arn_prefix = "arn:aws:dynamodb:" + aws_stack.get_local_region()148        assert table.table_arn.startswith(expected_arn_prefix)149        assert table.latest_stream_arn.startswith(expected_arn_prefix)150        # assert stream has been created151        stream_tables = [s["TableName"] for s in ddbstreams.list_streams()["Streams"]]152        assert table_name in stream_tables153        stream_name = get_kinesis_stream_name(table_name)154        assert stream_name in kinesis.list_streams()["StreamNames"]155        # assert shard ID formats156        result = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)["StreamDescription"]157        assert "Shards" in result158        for shard in result["Shards"]:159            assert re.match(r"^shardId-[0-9]{20}-[a-zA-Z0-9]{1,36}$", shard["ShardId"])160        # clean up161        delete_table(table_name)162        def _assert_stream_deleted():163            stream_tables = [s["TableName"] for s in ddbstreams.list_streams()["Streams"]]164            assert table_name not in stream_tables165            assert stream_name not in kinesis.list_streams()["StreamNames"]166        # assert stream has been deleted167        retry(_assert_stream_deleted, sleep=0.4, retries=5)168    def test_multiple_update_expressions(self, dynamodb):169        dynamodb_client = aws_stack.create_external_boto_client("dynamodb")170        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)171        table = dynamodb.Table(TEST_DDB_TABLE_NAME)172        item_id = short_uid()173        table.put_item(Item={PARTITION_KEY: item_id, "data": "foobar123 â"})174        response = dynamodb_client.update_item(175            TableName=TEST_DDB_TABLE_NAME,176            Key={PARTITION_KEY: {"S": item_id}},177            UpdateExpression="SET attr1 = :v1, attr2 = :v2",178            ExpressionAttributeValues={":v1": {"S": "value1"}, ":v2": {"S": "value2"}},179        )180        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200181        item = table.get_item(Key={PARTITION_KEY: item_id})["Item"]182        assert item["attr1"] == "value1"183        assert item["attr2"] == "value2"184        attributes = [{"AttributeName": "id", "AttributeType": STRING}]185        user_id_idx = [186            {187                "Create": {188                    "IndexName": "id-index",189                    "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],190                    "Projection": {191                        "ProjectionType": "INCLUDE",192                        "NonKeyAttributes": ["data"],193                    },194                    "ProvisionedThroughput": {195                        "ReadCapacityUnits": 5,196                        "WriteCapacityUnits": 5,197                    },198                }199            },200        ]201        # for each index202        table.update(AttributeDefinitions=attributes, GlobalSecondaryIndexUpdates=user_id_idx)203        with pytest.raises(Exception) as ctx:204            table.query(205                TableName=TEST_DDB_TABLE_NAME,206                IndexName="id-index",207                KeyConditionExpression=Key(PARTITION_KEY).eq(item_id),208                Select="ALL_ATTRIBUTES",209            )210        assert ctx.match("ValidationException")211    def test_invalid_query_index(self, dynamodb):212        """Raises an exception when a query requests ALL_ATTRIBUTES,213        but the index does not have a ProjectionType of ALL"""214        table_name = f"test-table-{short_uid()}"215        table = dynamodb.create_table(216            TableName=table_name,217            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],218            AttributeDefinitions=[219                {"AttributeName": "id", "AttributeType": "S"},220                {"AttributeName": "field_a", "AttributeType": "S"},221            ],222            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},223            Tags=TEST_DDB_TAGS,224            GlobalSecondaryIndexes=[225                {226                    "IndexName": "field_a_index",227                    "KeySchema": [{"AttributeName": "field_a", "KeyType": "HASH"}],228                    "Projection": {"ProjectionType": "KEYS_ONLY"},229                    "ProvisionedThroughput": {230                        "ReadCapacityUnits": 1,231                        "WriteCapacityUnits": 1,232                    },233                },234            ],235        )236        with pytest.raises(Exception) as ctx:237            table.query(238                TableName=table_name,239                IndexName="field_a_index",240                KeyConditionExpression=Key("field_a").eq("xyz"),241                Select="ALL_ATTRIBUTES",242            )243        assert ctx.match("ValidationException")244        # clean up245        delete_table(table_name)246    def test_valid_query_index(self, dynamodb):247        """Query requests ALL_ATTRIBUTES and the named index has a ProjectionType of ALL,248        no exception should be raised."""249        table_name = f"test-table-{short_uid()}"250        table = dynamodb.create_table(251            TableName=table_name,252            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],253            AttributeDefinitions=[254                {"AttributeName": "id", "AttributeType": "S"},255                {"AttributeName": "field_a", "AttributeType": "S"},256                {"AttributeName": "field_b", "AttributeType": "S"},257            ],258            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},259            Tags=TEST_DDB_TAGS,260            GlobalSecondaryIndexes=[261                {262                    "IndexName": "field_a_index",263                    "KeySchema": [{"AttributeName": "field_a", "KeyType": "HASH"}],264                    "Projection": {"ProjectionType": "KEYS_ONLY"},265                    "ProvisionedThroughput": {266                        "ReadCapacityUnits": 1,267                        "WriteCapacityUnits": 1,268                    },269                },270                {271                    "IndexName": "field_b_index",272                    "KeySchema": [{"AttributeName": "field_b", "KeyType": "HASH"}],273                    "Projection": {"ProjectionType": "ALL"},274                    "ProvisionedThroughput": {275                        "ReadCapacityUnits": 1,276                        "WriteCapacityUnits": 1,277                    },278                },279            ],280        )281        table.query(282            TableName=table_name,283            IndexName="field_b_index",284            KeyConditionExpression=Key("field_b").eq("xyz"),285            Select="ALL_ATTRIBUTES",286        )287        # clean up288        delete_table(table_name)289    def test_valid_local_secondary_index(290        self, dynamodb_client, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active291    ):292        try:293            table_name = f"test-table-{short_uid()}"294            dynamodb_create_table_with_parameters(295                TableName=table_name,296                KeySchema=[297                    {"AttributeName": "PK", "KeyType": "HASH"},298                    {"AttributeName": "SK", "KeyType": "RANGE"},299                ],300                AttributeDefinitions=[301                    {"AttributeName": "PK", "AttributeType": "S"},302                    {"AttributeName": "SK", "AttributeType": "S"},303                    {"AttributeName": "LSI1SK", "AttributeType": "N"},304                ],305                LocalSecondaryIndexes=[306                    {307                        "IndexName": "LSI1",308                        "KeySchema": [309                            {"AttributeName": "PK", "KeyType": "HASH"},310                            {"AttributeName": "LSI1SK", "KeyType": "RANGE"},311                        ],312                        "Projection": {"ProjectionType": "ALL"},313                    }314                ],315                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},316                Tags=TEST_DDB_TAGS,317            )318            dynamodb_wait_for_table_active(table_name)319            item = {"SK": {"S": "hello"}, "LSI1SK": {"N": "123"}, "PK": {"S": "test one"}}320            dynamodb_client.put_item(TableName=table_name, Item=item)321            result = dynamodb_client.query(322                TableName=table_name,323                IndexName="LSI1",324                KeyConditionExpression="PK = :v1",325                ExpressionAttributeValues={":v1": {"S": "test one"}},326                Select="ALL_ATTRIBUTES",327            )328            assert result["Items"] == [item]329        finally:330            dynamodb_client.delete_table(TableName=table_name)331    def test_more_than_20_global_secondary_indexes(self, dynamodb, dynamodb_client):332        table_name = f"test-table-{short_uid()}"333        num_gsis = 25334        attrs = [{"AttributeName": f"a{i}", "AttributeType": "S"} for i in range(num_gsis)]335        gsis = [336            {337                "IndexName": f"gsi_{i}",338                "KeySchema": [{"AttributeName": f"a{i}", "KeyType": "HASH"}],339                "Projection": {"ProjectionType": "ALL"},340            }341            for i in range(num_gsis)342        ]343        dynamodb.create_table(344            TableName=table_name,345            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],346            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}, *attrs],347            GlobalSecondaryIndexes=gsis,348            BillingMode="PAY_PER_REQUEST",349        )350        table = dynamodb_client.describe_table(TableName=table_name)351        assert len(table["Table"]["GlobalSecondaryIndexes"]) == num_gsis352        # clean up353        delete_table(table_name)354    @pytest.mark.aws_validated355    def test_return_values_in_put_item(self, dynamodb, dynamodb_client):356        aws_stack.create_dynamodb_table(357            TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client358        )359        table = dynamodb.Table(TEST_DDB_TABLE_NAME)360        def _validate_response(response, expected: dict = {}):361            """362            Validates the response against the optionally expected one.363            It checks that the response doesn't contain `Attributes`,364            `ConsumedCapacity` and `ItemCollectionMetrics` unless they are expected.365            """366            should_not_contain = {367                "Attributes",368                "ConsumedCapacity",369                "ItemCollectionMetrics",370            } - expected.keys()371            assert response["ResponseMetadata"]["HTTPStatusCode"] == 200372            assert expected.items() <= response.items()373            assert response.keys().isdisjoint(should_not_contain)374        # items which are being used to put in the table375        item1 = {PARTITION_KEY: "id1", "data": "foobar"}376        item1b = {PARTITION_KEY: "id1", "data": "barfoo"}377        item2 = {PARTITION_KEY: "id2", "data": "foobar"}378        response = table.put_item(Item=item1, ReturnValues="ALL_OLD")379        # there is no data present in the table already so even if return values380        # is set to 'ALL_OLD' as there is no data it will not return any data.381        _validate_response(response)382        # now the same data is present so when we pass return values as 'ALL_OLD'383        # it should give us attributes384        response = table.put_item(Item=item1, ReturnValues="ALL_OLD")385        _validate_response(response, expected={"Attributes": item1})386        # now a previous version of data is present, so when we pass return387        # values as 'ALL_OLD' it should give us the old attributes388        response = table.put_item(Item=item1b, ReturnValues="ALL_OLD")389        _validate_response(response, expected={"Attributes": item1})390        response = table.put_item(Item=item2)391        # we do not have any same item as item2 already so when we add this by default392        # return values is set to None so no Attribute values should be returned393        _validate_response(response)394        response = table.put_item(Item=item2)395        # in this case we already have item2 in the table so on this request396        # it should not return any data as return values is set to None so no397        # Attribute values should be returned398        _validate_response(response)399        # cleanup400        table.delete()401    @pytest.mark.aws_validated402    def test_empty_and_binary_values(self, dynamodb, dynamodb_client):403        aws_stack.create_dynamodb_table(404            TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client405        )406        table = dynamodb.Table(TEST_DDB_TABLE_NAME)407        # items which are being used to put in the table408        item1 = {PARTITION_KEY: "id1", "data": ""}409        item2 = {PARTITION_KEY: "id2", "data": b"\x90"}410        response = table.put_item(Item=item1)411        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200412        response = table.put_item(Item=item2)413        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414        # clean up415        table.delete()416    def test_batch_write_binary(self, dynamodb_client):417        table_name = "table_batch_binary_%s" % short_uid()418        dynamodb_client.create_table(419            TableName=table_name,420            AttributeDefinitions=[421                {"AttributeName": "PK", "AttributeType": "S"},422                {"AttributeName": "SK", "AttributeType": "S"},423            ],424            KeySchema=[425                {"AttributeName": "PK", "KeyType": "HASH"},426                {"AttributeName": "SK", "KeyType": "RANGE"},427            ],428            BillingMode="PAY_PER_REQUEST",429        )430        dynamodb_client.put_item(431            TableName=table_name,432            Item={"PK": {"S": "hello"}, "SK": {"S": "user"}, "data": {"B": b"test"}},433        )434        item = {435            "Item": {436                "PK": {"S": "hello-1"},437                "SK": {"S": "user-1"},438                "data": {"B": b"test-1"},439            }440        }441        item_non_decodable = {442            "Item": {443                "PK": {"S": "hello-2"},444                "SK": {"S": "user-2"},445                "data": {"B": b"test \xc0 \xed"},446            }447        }448        response = dynamodb_client.batch_write_item(449            RequestItems={table_name: [{"PutRequest": item}, {"PutRequest": item_non_decodable}]}450        )451        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200452        dynamodb_client.delete_table(TableName=table_name)453    def test_binary_data_with_stream(454        self,455        wait_for_stream_ready,456        dynamodb_create_table_with_parameters,457        dynamodb_client,458        kinesis_client,459    ):460        table_name = f"table-{short_uid()}"461        dynamodb_create_table_with_parameters(462            TableName=table_name,463            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],464            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],465            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},466            StreamSpecification={467                "StreamEnabled": True,468                "StreamViewType": "NEW_AND_OLD_IMAGES",469            },470        )471        stream_name = get_kinesis_stream_name(table_name)472        wait_for_stream_ready(stream_name)473        response = dynamodb_client.put_item(474            TableName=table_name, Item={"id": {"S": "id1"}, "data": {"B": b"\x90"}}475        )476        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200477        iterator = get_shard_iterator(stream_name, kinesis_client)478        response = kinesis_client.get_records(ShardIterator=iterator)479        json_records = response.get("Records")480        assert 1 == len(json_records)481        assert "Data" in json_records[0]482    def test_dynamodb_stream_shard_iterator(self, wait_for_stream_ready):483        dynamodb = aws_stack.create_external_boto_client("dynamodb")484        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")485        table_name = "table_with_stream-%s" % short_uid()486        table = dynamodb.create_table(487            TableName=table_name,488            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],489            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],490            StreamSpecification={491                "StreamEnabled": True,492                "StreamViewType": "NEW_IMAGE",493            },494            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},495        )496        stream_name = get_kinesis_stream_name(table_name)497        wait_for_stream_ready(stream_name)498        stream_arn = table["TableDescription"]["LatestStreamArn"]499        result = ddbstreams.describe_stream(StreamArn=stream_arn)500        response = ddbstreams.get_shard_iterator(501            StreamArn=stream_arn,502            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],503            ShardIteratorType="LATEST",504        )505        assert "ShardIterator" in response506        response = ddbstreams.get_shard_iterator(507            StreamArn=stream_arn,508            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],509            ShardIteratorType="AT_SEQUENCE_NUMBER",510            SequenceNumber=result["StreamDescription"]["Shards"][0]511            .get("SequenceNumberRange")512            .get("StartingSequenceNumber"),513        )514        assert "ShardIterator" in response515    def test_dynamodb_create_table_with_class(self, dynamodb_client):516        table_name = "table_with_class_%s" % short_uid()517        # create table518        result = dynamodb_client.create_table(519            TableName=table_name,520            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],521            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],522            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},523            TableClass="STANDARD",524        )525        assert result["TableDescription"]["TableClassSummary"]["TableClass"] == "STANDARD"526        result = dynamodb_client.describe_table(TableName=table_name)527        assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD"528        result = dynamodb_client.update_table(529            TableName=table_name, TableClass="STANDARD_INFREQUENT_ACCESS"530        )531        assert (532            result["TableDescription"]["TableClassSummary"]["TableClass"]533            == "STANDARD_INFREQUENT_ACCESS"534        )535        result = dynamodb_client.describe_table(TableName=table_name)536        assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD_INFREQUENT_ACCESS"537        # clean resources538        dynamodb_client.delete_table(TableName=table_name)539    def test_dynamodb_execute_transaction(self, dynamodb_client):540        table_name = "table_%s" % short_uid()541        dynamodb_client.create_table(542            TableName=table_name,543            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],544            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],545            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},546        )547        statements = [548            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},549            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user02'}}"},550        ]551        result = dynamodb_client.execute_transaction(TransactStatements=statements)552        assert result["ResponseMetadata"]["HTTPStatusCode"] == 200553        result = dynamodb_client.scan(TableName=table_name)554        assert result["ScannedCount"] == 2555        dynamodb_client.delete_table(TableName=table_name)556    def test_dynamodb_batch_execute_statement(self, dynamodb_client):557        table_name = "table_%s" % short_uid()558        dynamodb_client.create_table(559            TableName=table_name,560            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],561            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],562            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},563        )564        dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "user02"}})565        statements = [566            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},567            {"Statement": f"UPDATE {table_name} SET Age=20 WHERE Username='user02'"},568        ]569        result = dynamodb_client.batch_execute_statement(Statements=statements)570        # actions always succeeds571        assert not any("Error" in r for r in result["Responses"])572        item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user02"}})[573            "Item"574        ]575        assert item["Age"]["N"] == "20"576        item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user01"}})[577            "Item"578        ]579        assert item580        dynamodb_client.delete_table(TableName=table_name)581    def test_dynamodb_partiql_missing(self, dynamodb_client):582        table_name = "table_with_stream_%s" % short_uid()583        # create table584        dynamodb_client.create_table(585            TableName=table_name,586            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],587            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],588            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},589        )590        # create items with FirstName attribute591        dynamodb_client.execute_statement(592            Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice123', 'FirstName':'Alice'}}"593        )594        items = dynamodb_client.execute_statement(595            Statement=f"SELECT * FROM {table_name} WHERE FirstName IS NOT MISSING"596        )["Items"]597        assert len(items) == 1598        items = dynamodb_client.execute_statement(599            Statement=f"SELECT * FROM {table_name} WHERE FirstName IS MISSING"600        )["Items"]601        assert len(items) == 0602        dynamodb_client.delete_table(TableName=table_name)603    def test_dynamodb_stream_stream_view_type(self):604        dynamodb = aws_stack.create_external_boto_client("dynamodb")605        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")606        table_name = "table_with_stream_%s" % short_uid()607        # create table608        table = dynamodb.create_table(609            TableName=table_name,610            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],611            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],612            StreamSpecification={613                "StreamEnabled": True,614                "StreamViewType": "KEYS_ONLY",615            },616            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},617        )618        stream_arn = table["TableDescription"]["LatestStreamArn"]619        # wait for stream to be created620        sleep(1)621        # put item in table - INSERT event622        dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})623        # update item in table - MODIFY event624        dynamodb.update_item(625            TableName=table_name,626            Key={"Username": {"S": "Fred"}},627            UpdateExpression="set S=:r",628            ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},629            ReturnValues="UPDATED_NEW",630        )631        # delete item in table - REMOVE event632        dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})633        result = ddbstreams.describe_stream(StreamArn=stream_arn)634        # assert stream_view_type of the table635        assert result["StreamDescription"]["StreamViewType"] == "KEYS_ONLY"636        # add item via PartiQL query - INSERT event637        dynamodb.execute_statement(638            Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice'}}"639        )640        # run update via PartiQL query - MODIFY event641        dynamodb.execute_statement(642            Statement=f"UPDATE {table_name} SET partiql=1 WHERE Username='Alice'"643        )644        # run update via PartiQL query - REMOVE event645        dynamodb.execute_statement(Statement=f"DELETE FROM {table_name} WHERE Username='Alice'")646        # get shard iterator647        response = ddbstreams.get_shard_iterator(648            StreamArn=stream_arn,649            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],650            ShardIteratorType="AT_SEQUENCE_NUMBER",651            SequenceNumber=result["StreamDescription"]["Shards"][0]652            .get("SequenceNumberRange")653            .get("StartingSequenceNumber"),654        )655        # get stream records656        records = ddbstreams.get_records(ShardIterator=response["ShardIterator"])["Records"]657        assert len(records) == 6658        events = [rec["eventName"] for rec in records]659        assert events == ["INSERT", "MODIFY", "REMOVE"] * 2660        # assert that all records contain proper event IDs661        event_ids = [rec.get("eventID") for rec in records]662        assert all(event_ids)663        # assert that updates have been received from regular table operations and PartiQL query operations664        for idx, record in enumerate(records):665            assert "SequenceNumber" in record["dynamodb"]666            assert record["dynamodb"]["StreamViewType"] == "KEYS_ONLY"667            assert record["dynamodb"]["Keys"] == {"Username": {"S": "Fred" if idx < 3 else "Alice"}}668            assert "OldImage" not in record["dynamodb"]669            assert "NewImage" not in record["dynamodb"]670        # clean up671        delete_table(table_name)672    def test_dynamodb_with_kinesis_stream(self):673        dynamodb = aws_stack.create_external_boto_client("dynamodb")674        kinesis = aws_stack.create_external_boto_client("kinesis")675        # create kinesis datastream676        stream_name = "kinesis_dest_stream"677        kinesis.create_stream(StreamName=stream_name, ShardCount=1)678        # wait for the stream to be created679        sleep(1)680        # Get stream description681        stream_description = kinesis.describe_stream(StreamName=stream_name)["StreamDescription"]682        table_name = "table_with_kinesis_stream-%s" % short_uid()683        # create table684        dynamodb.create_table(685            TableName=table_name,686            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],687            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],688            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},689        )690        # Enable kinesis destination for the table691        dynamodb.enable_kinesis_streaming_destination(692            TableName=table_name, StreamArn=stream_description["StreamARN"]693        )694        # put item into table695        dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})696        # update item in table697        dynamodb.update_item(698            TableName=table_name,699            Key={"Username": {"S": "Fred"}},700            UpdateExpression="set S=:r",701            ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},702            ReturnValues="UPDATED_NEW",703        )704        # delete item in table705        dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})706        def _fetch_records():707            records = aws_stack.kinesis_get_latest_records(708                stream_name, shard_id=stream_description["Shards"][0]["ShardId"]709            )710            assert len(records) == 3711            return records712        # get records from the stream713        records = retry(_fetch_records)714        for record in records:715            record = json.loads(record["Data"])716            assert record["tableName"] == table_name717            # check eventSourceARN not exists in the stream record718            assert "eventSourceARN" not in record719            if record["eventName"] == "INSERT":720                assert "OldImage" not in record["dynamodb"]721                assert "NewImage" in record["dynamodb"]722            elif record["eventName"] == "MODIFY":723                assert "NewImage" in record["dynamodb"]724                assert "OldImage" in record["dynamodb"]725            elif record["eventName"] == "REMOVE":726                assert "NewImage" not in record["dynamodb"]727                assert "OldImage" in record["dynamodb"]728        # describe kinesis streaming destination of the table729        destinations = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)730        destination = destinations["KinesisDataStreamDestinations"][0]731        # assert kinesis streaming destination status732        assert stream_description["StreamARN"] == destination["StreamArn"]733        assert destination["DestinationStatus"] == "ACTIVE"734        # Disable kinesis destination735        dynamodb.disable_kinesis_streaming_destination(736            TableName=table_name, StreamArn=stream_description["StreamARN"]737        )738        # describe kinesis streaming destination of the table739        result = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)740        destination = result["KinesisDataStreamDestinations"][0]741        # assert kinesis streaming destination status742        assert stream_description["StreamARN"] == destination["StreamArn"]743        assert destination["DestinationStatus"] == "DISABLED"744        # clean up745        delete_table(table_name)746        kinesis.delete_stream(StreamName="kinesis_dest_stream")747    def test_global_tables(self):748        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)749        dynamodb = aws_stack.create_external_boto_client("dynamodb")750        # create global table751        regions = [752            {"RegionName": "us-east-1"},753            {"RegionName": "us-west-1"},754            {"RegionName": "eu-central-1"},755        ]756        response = dynamodb.create_global_table(757            GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions758        )["GlobalTableDescription"]759        assert "ReplicationGroup" in response760        assert len(response["ReplicationGroup"]) == len(regions)761        # describe global table762        response = dynamodb.describe_global_table(GlobalTableName=TEST_DDB_TABLE_NAME)[763            "GlobalTableDescription"764        ]765        assert "ReplicationGroup" in response766        assert len(regions) == len(response["ReplicationGroup"])767        # update global table768        updates = [769            {"Create": {"RegionName": "us-east-2"}},770            {"Create": {"RegionName": "us-west-2"}},771            {"Delete": {"RegionName": "us-west-1"}},772        ]773        response = dynamodb.update_global_table(774            GlobalTableName=TEST_DDB_TABLE_NAME, ReplicaUpdates=updates775        )["GlobalTableDescription"]776        assert "ReplicationGroup" in response777        assert len(response["ReplicationGroup"]) == len(regions) + 1778        # assert exceptions for invalid requests779        with pytest.raises(Exception) as ctx:780            dynamodb.create_global_table(781                GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions782            )783        assert ctx.match("GlobalTableAlreadyExistsException")784        with pytest.raises(Exception) as ctx:785            dynamodb.describe_global_table(GlobalTableName="invalid-table-name")786        assert ctx.match("GlobalTableNotFoundException")787    def test_create_duplicate_table(self, dynamodb_create_table_with_parameters):788        table_name = "duplicateTable"789        dynamodb_create_table_with_parameters(790            TableName=table_name,791            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],792            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],793            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},794            Tags=TEST_DDB_TAGS,795        )796        with pytest.raises(Exception) as ctx:797            dynamodb_create_table_with_parameters(798                TableName=table_name,799                KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],800                AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],801                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},802                Tags=TEST_DDB_TAGS,803            )804        ctx.match("ResourceInUseException")805    def test_delete_table(self, dynamodb_client, dynamodb_create_table):806        table_name = "test-ddb-table-%s" % short_uid()807        tables_before = len(dynamodb_client.list_tables()["TableNames"])808        dynamodb_create_table(809            table_name=table_name,810            partition_key=PARTITION_KEY,811        )812        table_list = dynamodb_client.list_tables()813        # TODO: fix assertion, to enable parallel test execution!814        assert tables_before + 1 == len(table_list["TableNames"])815        assert table_name in table_list["TableNames"]816        dynamodb_client.delete_table(TableName=table_name)817        table_list = dynamodb_client.list_tables()818        assert tables_before == len(table_list["TableNames"])819        with pytest.raises(Exception) as ctx:820            dynamodb_client.delete_table(TableName=table_name)821        assert ctx.match("ResourceNotFoundException")822    def test_transaction_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):823        table_name = "test-ddb-table-%s" % short_uid()824        dynamodb_create_table_with_parameters(825            TableName=table_name,826            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],827            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],828            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},829            Tags=TEST_DDB_TAGS,830        )831        response = dynamodb_client.transact_write_items(832            TransactItems=[833                {834                    "ConditionCheck": {835                        "TableName": table_name,836                        "ConditionExpression": "attribute_not_exists(id)",837                        "Key": {"id": {"S": "test1"}},838                    }839                },840                {"Put": {"TableName": table_name, "Item": {"id": {"S": "test2"}}}},841                {842                    "Update": {843                        "TableName": table_name,844                        "Key": {"id": {"S": "test3"}},845                        "UpdateExpression": "SET attr1 = :v1, attr2 = :v2",846                        "ExpressionAttributeValues": {847                            ":v1": {"S": "value1"},848                            ":v2": {"S": "value2"},849                        },850                    }851                },852                {"Delete": {"TableName": table_name, "Key": {"id": {"S": "test4"}}}},853            ]854        )855        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200856    @pytest.mark.aws_validated857    def test_transaction_write_canceled(858        self, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active, dynamodb_client859    ):860        table_name = "table_%s" % short_uid()861        # create table862        dynamodb_create_table_with_parameters(863            TableName=table_name,864            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],865            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],866            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},867        )868        dynamodb_wait_for_table_active(table_name)869        # put item in table - INSERT event870        dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})871        # provoke a TransactionCanceledException by adding a condition which is not met872        with pytest.raises(Exception) as ctx:873            dynamodb_client.transact_write_items(874                TransactItems=[875                    {876                        "ConditionCheck": {877                            "TableName": table_name,878                            "ConditionExpression": "attribute_not_exists(Username)",879                            "Key": {"Username": {"S": "Fred"}},880                        }881                    },882                    {"Delete": {"TableName": table_name, "Key": {"Username": {"S": "Bert"}}}},883                ]884            )885        # Make sure the exception contains the cancellation reasons886        assert ctx.match("TransactionCanceledException")887        assert (888            str(ctx.value)889            == "An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: "890            "Transaction cancelled, please refer cancellation reasons for specific reasons "891            "[ConditionalCheckFailed, None]"892        )893        assert hasattr(ctx.value, "response")894        assert "CancellationReasons" in ctx.value.response895        conditional_check_failed = [896            reason897            for reason in ctx.value.response["CancellationReasons"]898            if reason.get("Code") == "ConditionalCheckFailed"899        ]900        assert len(conditional_check_failed) == 1901        assert "Message" in conditional_check_failed[0]902        # dynamodb-local adds a trailing "." to the message, AWS does not903        assert re.match(904            r"^The conditional request failed\.?$", conditional_check_failed[0]["Message"]905        )906    def test_transaction_write_binary_data(907        self, dynamodb_client, dynamodb_create_table_with_parameters908    ):909        table_name = "test-ddb-table-%s" % short_uid()910        dynamodb_create_table_with_parameters(911            TableName=table_name,912            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],913            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],914            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},915            Tags=TEST_DDB_TAGS,916        )917        binary_item = {"B": b"foobar"}918        response = dynamodb_client.transact_write_items(919            TransactItems=[920                {921                    "Put": {922                        "TableName": table_name,923                        "Item": {924                            "id": {"S": "someUser"},925                            "binaryData": binary_item,926                        },927                    }928                }929            ]930        )931        item = dynamodb_client.get_item(TableName=table_name, Key={"id": {"S": "someUser"}})["Item"]932        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200933        assert item["binaryData"]934        assert item["binaryData"] == binary_item935    def test_transact_get_items(self, dynamodb_client, dynamodb_create_table):936        table_name = "test-ddb-table-%s" % short_uid()937        dynamodb_create_table(938            table_name=table_name,939            partition_key=PARTITION_KEY,940        )941        dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "John"}})942        result = dynamodb_client.transact_get_items(943            TransactItems=[{"Get": {"Key": {"id": {"S": "John"}}, "TableName": table_name}}]944        )945        assert result["ResponseMetadata"]["HTTPStatusCode"] == 200946    def test_batch_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):947        table_name = "test-ddb-table-%s" % short_uid()948        dynamodb_create_table_with_parameters(949            TableName=table_name,950            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],951            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],952            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},953            Tags=TEST_DDB_TAGS,954        )955        dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "Fred"}})956        response = dynamodb_client.batch_write_item(957            RequestItems={958                table_name: [959                    {"DeleteRequest": {"Key": {"id": {"S": "Fred"}}}},960                    {"PutRequest": {"Item": {"id": {"S": "Bob"}}}},961                ]962            }963        )964        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200965    @pytest.mark.xfail(reason="this test flakes regularly in CI")966    def test_dynamodb_stream_records_with_update_item(967        self,968        dynamodb_client,969        dynamodbstreams_client,970        dynamodb_resource,971        dynamodb_create_table,972        wait_for_stream_ready,973    ):974        table_name = f"test-ddb-table-{short_uid()}"975        dynamodb_create_table(976            table_name=table_name,977            partition_key=PARTITION_KEY,978            stream_view_type="NEW_AND_OLD_IMAGES",979        )980        table = dynamodb_resource.Table(table_name)981        stream_name = get_kinesis_stream_name(table_name)982        wait_for_stream_ready(stream_name)983        response = dynamodbstreams_client.describe_stream(StreamArn=table.latest_stream_arn)984        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200985        assert len(response["StreamDescription"]["Shards"]) == 1986        shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]987        starting_sequence_number = int(988            response["StreamDescription"]["Shards"][0]989            .get("SequenceNumberRange")990            .get("StartingSequenceNumber")991        )992        response = dynamodbstreams_client.get_shard_iterator(993            StreamArn=table.latest_stream_arn,994            ShardId=shard_id,995            ShardIteratorType="LATEST",996        )997        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200998        assert "ShardIterator" in response999        iterator_id = response["ShardIterator"]1000        item_id = short_uid()1001        for _ in range(2):1002            dynamodb_client.update_item(1003                TableName=table_name,1004                Key={PARTITION_KEY: {"S": item_id}},1005                UpdateExpression="SET attr1 = :v1, attr2 = :v2",1006                ExpressionAttributeValues={1007                    ":v1": {"S": "value1"},1008                    ":v2": {"S": "value2"},1009                },1010                ReturnValues="ALL_NEW",1011                ReturnConsumedCapacity="INDEXES",1012            )1013        def check_expected_records():1014            records = dynamodbstreams_client.get_records(ShardIterator=iterator_id)1015            assert records["ResponseMetadata"]["HTTPStatusCode"] == 2001016            assert len(records["Records"]) == 21017            assert isinstance(1018                records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"],1019                datetime,1020            )1021            assert records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01022            assert records["Records"][0]["eventVersion"] == "1.1"1023            assert records["Records"][0]["eventName"] == "INSERT"1024            assert "OldImage" not in records["Records"][0]["dynamodb"]1025            assert (1026                int(records["Records"][0]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1027            )1028            assert isinstance(1029                records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"],1030                datetime,1031            )1032            assert records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01033            assert records["Records"][1]["eventVersion"] == "1.1"1034            assert records["Records"][1]["eventName"] == "MODIFY"1035            assert "OldImage" in records["Records"][1]["dynamodb"]1036            assert (1037                int(records["Records"][1]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1038            )1039        retry(check_expected_records, retries=5, sleep=1, sleep_before=2)1040    def test_query_on_deleted_resource(self, dynamodb_client, dynamodb_create_table):1041        table_name = "ddb-table-%s" % short_uid()1042        partition_key = "username"1043        dynamodb_create_table(table_name=table_name, partition_key=partition_key)1044        rs = dynamodb_client.query(1045            TableName=table_name,1046            KeyConditionExpression="{} = :username".format(partition_key),1047            ExpressionAttributeValues={":username": {"S": "test"}},1048        )1049        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 2001050        dynamodb_client.delete_table(TableName=table_name)1051        with pytest.raises(Exception) as ctx:1052            dynamodb_client.query(1053                TableName=table_name,1054                KeyConditionExpression="{} = :username".format(partition_key),1055                ExpressionAttributeValues={":username": {"S": "test"}},1056            )1057        assert ctx.match("ResourceNotFoundException")1058    def test_dynamodb_stream_to_lambda(1059        self, lambda_client, dynamodb_resource, dynamodb_create_table, wait_for_stream_ready1060    ):1061        table_name = "ddb-table-%s" % short_uid()1062        function_name = "func-%s" % short_uid()1063        partition_key = "SK"1064        dynamodb_create_table(1065            table_name=table_name,1066            partition_key=partition_key,1067            stream_view_type="NEW_AND_OLD_IMAGES",1068        )1069        table = dynamodb_resource.Table(table_name)1070        latest_stream_arn = table.latest_stream_arn1071        stream_name = get_kinesis_stream_name(table_name)1072        wait_for_stream_ready(stream_name)1073        testutil.create_lambda_function(1074            handler_file=TEST_LAMBDA_PYTHON_ECHO,1075            func_name=function_name,1076            runtime=LAMBDA_RUNTIME_PYTHON36,1077        )1078        mapping_uuid = lambda_client.create_event_source_mapping(1079            EventSourceArn=latest_stream_arn,1080            FunctionName=function_name,1081            StartingPosition="TRIM_HORIZON",1082        )["UUID"]1083        item = {"SK": short_uid(), "Name": "name-{}".format(short_uid())}1084        table.put_item(Item=item)1085        events = retry(1086            check_expected_lambda_log_events_length,1087            retries=10,1088            sleep=1,1089            function_name=function_name,1090            expected_length=1,1091            regex_filter=r"Records",1092        )1093        assert len(events) == 11094        assert len(events[0]["Records"]) == 11095        dynamodb_event = events[0]["Records"][0]["dynamodb"]1096        assert dynamodb_event["StreamViewType"] == "NEW_AND_OLD_IMAGES"1097        assert dynamodb_event["Keys"] == {"SK": {"S": item["SK"]}}1098        assert dynamodb_event["NewImage"]["Name"] == {"S": item["Name"]}1099        assert "SequenceNumber" in dynamodb_event1100        lambda_client.delete_event_source_mapping(UUID=mapping_uuid)1101    def test_dynamodb_batch_write_item(1102        self, dynamodb_client, dynamodb_create_table_with_parameters1103    ):1104        table_name = "ddb-table-%s" % short_uid()1105        dynamodb_create_table_with_parameters(1106            TableName=table_name,1107            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1108            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1109            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1110            Tags=TEST_DDB_TAGS,1111        )1112        result = dynamodb_client.batch_write_item(1113            RequestItems={1114                table_name: [1115                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test1"}}}},1116                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test2"}}}},1117                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test3"}}}},1118                ]1119            }1120        )1121        assert result.get("UnprocessedItems") == {}1122    def test_dynamodb_pay_per_request(self, dynamodb_create_table_with_parameters):1123        table_name = "ddb-table-%s" % short_uid()1124        with pytest.raises(Exception) as e:1125            dynamodb_create_table_with_parameters(1126                TableName=table_name,1127                KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1128                AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1129                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1130                BillingMode="PAY_PER_REQUEST",1131            )1132        assert e.match("ValidationException")1133    def test_dynamodb_create_table_with_sse_specification(1134        self, dynamodb_create_table_with_parameters1135    ):1136        table_name = "ddb-table-%s" % short_uid()1137        kms_master_key_id = long_uid()1138        sse_specification = {"Enabled": True, "SSEType": "KMS", "KMSMasterKeyId": kms_master_key_id}1139        kms_master_key_arn = aws_stack.kms_key_arn(kms_master_key_id)1140        result = dynamodb_create_table_with_parameters(1141            TableName=table_name,1142            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1143            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1144            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1145            SSESpecification=sse_specification,1146            Tags=TEST_DDB_TAGS,1147        )1148        assert result["TableDescription"]["SSEDescription"]1149        assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1150        assert result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"] == kms_master_key_arn1151    def test_dynamodb_create_table_with_partial_sse_specification(1152        self, dynamodb_create_table_with_parameters, kms_client1153    ):1154        table_name = "ddb-table-%s" % short_uid()1155        sse_specification = {"Enabled": True}1156        result = dynamodb_create_table_with_parameters(1157            TableName=table_name,1158            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1159            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1160            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1161            SSESpecification=sse_specification,1162            Tags=TEST_DDB_TAGS,1163        )1164        assert result["TableDescription"]["SSEDescription"]1165        assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1166        assert result["TableDescription"]["SSEDescription"]["SSEType"] == "KMS"1167        assert "KMSMasterKeyArn" in result["TableDescription"]["SSEDescription"]1168        kms_master_key_arn = result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"]1169        result = kms_client.describe_key(KeyId=kms_master_key_arn)1170        assert result["KeyMetadata"]["KeyManager"] == "AWS"1171    def test_dynamodb_get_batch_items(self, dynamodb_client, dynamodb_create_table_with_parameters):1172        table_name = "ddb-table-%s" % short_uid()1173        dynamodb_create_table_with_parameters(1174            TableName=table_name,1175            KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],1176            AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],1177            ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},1178        )1179        result = dynamodb_client.batch_get_item(1180            RequestItems={table_name: {"Keys": [{"PK": {"S": "test-key"}}]}}1181        )1182        assert list(result["Responses"])[0] == table_name1183    def test_dynamodb_streams_describe_with_exclusive_start_shard_id(1184        self, dynamodb_resource, dynamodb_create_table1185    ):1186        table_name = f"test-ddb-table-{short_uid()}"1187        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")1188        dynamodb_create_table(1189            table_name=table_name,1190            partition_key=PARTITION_KEY,1191            stream_view_type="NEW_AND_OLD_IMAGES",1192        )1193        table = dynamodb_resource.Table(table_name)1194        response = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)1195        assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001196        assert len(response["StreamDescription"]["Shards"]) == 11197        shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]1198        response = ddbstreams.describe_stream(1199            StreamArn=table.latest_stream_arn, ExclusiveStartShardId=shard_id1200        )1201        assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001202        assert len(response["StreamDescription"]["Shards"]) == 01203    @pytest.mark.aws_validated1204    def test_dynamodb_idempotent_writing(1205        self, dynamodb_create_table_with_parameters, dynamodb_client, dynamodb_wait_for_table_active1206    ):1207        table_name = f"ddb-table-{short_uid()}"1208        dynamodb_create_table_with_parameters(1209            TableName=table_name,1210            KeySchema=[1211                {"AttributeName": "id", "KeyType": "HASH"},1212                {"AttributeName": "name", "KeyType": "RANGE"},1213            ],1214            AttributeDefinitions=[1215                {"AttributeName": "id", "AttributeType": "S"},1216                {"AttributeName": "name", "AttributeType": "S"},1217            ],1218            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1219        )1220        dynamodb_wait_for_table_active(table_name)1221        def _transact_write(_d: Dict):1222            response = dynamodb_client.transact_write_items(1223                ClientRequestToken="dedupe_token",1224                TransactItems=[1225                    {1226                        "Put": {1227                            "TableName": table_name,1228                            "Item": _d,1229                        }1230                    },1231                ],1232            )1233            assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001234        _transact_write({"id": {"S": "id1"}, "name": {"S": "name1"}})1235        _transact_write({"name": {"S": "name1"}, "id": {"S": "id1"}})1236def delete_table(name):1237    dynamodb_client = aws_stack.create_external_boto_client("dynamodb")...test_lambda_integration.py
Source:test_lambda_integration.py  
1import base642import json3import os4import time5from unittest.mock import patch6import pytest7from botocore.exceptions import ClientError8from localstack import config9from localstack.services.apigateway.helpers import path_based_url10from localstack.services.awslambda.lambda_api import (11    BATCH_SIZE_RANGES,12    INVALID_PARAMETER_VALUE_EXCEPTION,13)14from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3615from localstack.utils import testutil16from localstack.utils.aws import aws_stack17from localstack.utils.common import retry, safe_requests, short_uid18from localstack.utils.sync import poll_condition19from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events20from .test_lambda import (21    TEST_LAMBDA_FUNCTION_PREFIX,22    TEST_LAMBDA_LIBS,23    TEST_LAMBDA_PYTHON,24    TEST_LAMBDA_PYTHON_ECHO,25    is_old_provider,26)27TEST_STAGE_NAME = "testing"28TEST_SNS_TOPIC_NAME = "sns-topic-1"29THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))30TEST_LAMBDA_PARALLEL_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_parallel.py")31class TestLambdaEventSourceMappings:32    def test_event_source_mapping_default_batch_size(33        self,34        create_lambda_function,35        lambda_client,36        sqs_client,37        sqs_create_queue,38        sqs_queue_arn,39        dynamodb_client,40        dynamodb_create_table,41        lambda_su_role,42    ):43        function_name = f"lambda_func-{short_uid()}"44        queue_name_1 = f"queue-{short_uid()}-1"45        queue_name_2 = f"queue-{short_uid()}-2"46        ddb_table = f"ddb_table-{short_uid()}"47        create_lambda_function(48            func_name=function_name,49            handler_file=TEST_LAMBDA_PYTHON_ECHO,50            runtime=LAMBDA_RUNTIME_PYTHON36,51            role=lambda_su_role,52        )53        queue_url_1 = sqs_create_queue(QueueName=queue_name_1)54        queue_arn_1 = sqs_queue_arn(queue_url_1)55        rs = lambda_client.create_event_source_mapping(56            EventSourceArn=queue_arn_1, FunctionName=function_name57        )58        assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]59        uuid = rs["UUID"]60        def wait_for_event_source_mapping():61            return lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"62        assert poll_condition(wait_for_event_source_mapping, timeout=30)63        with pytest.raises(ClientError) as e:64            # Update batch size with invalid value65            lambda_client.update_event_source_mapping(66                UUID=uuid,67                FunctionName=function_name,68                BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,69            )70        e.match(INVALID_PARAMETER_VALUE_EXCEPTION)71        queue_url_2 = sqs_create_queue(QueueName=queue_name_2)72        queue_arn_2 = sqs_queue_arn(queue_url_2)73        with pytest.raises(ClientError) as e:74            # Create event source mapping with invalid batch size value75            lambda_client.create_event_source_mapping(76                EventSourceArn=queue_arn_2,77                FunctionName=function_name,78                BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,79            )80        e.match(INVALID_PARAMETER_VALUE_EXCEPTION)81        table_description = dynamodb_create_table(82            table_name=ddb_table,83            partition_key="id",84            stream_view_type="NEW_IMAGE",85        )["TableDescription"]86        # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87        if not is_old_provider():88            with pytest.raises(ClientError) as e:89                lambda_client.create_event_source_mapping(90                    EventSourceArn=table_description["TableArn"],91                    FunctionName=function_name,92                    StartingPosition="LATEST",93                )94            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95        # check if event source mapping can be created with latest stream ARN96        rs = lambda_client.create_event_source_mapping(97            EventSourceArn=table_description["LatestStreamArn"],98            FunctionName=function_name,99            StartingPosition="LATEST",100        )101        assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102    def test_disabled_event_source_mapping_with_dynamodb(103        self,104        create_lambda_function,105        lambda_client,106        dynamodb_resource,107        dynamodb_client,108        dynamodb_create_table,109        logs_client,110        dynamodbstreams_client,111        lambda_su_role,112    ):113        function_name = f"lambda_func-{short_uid()}"114        ddb_table = f"ddb_table-{short_uid()}"115        create_lambda_function(116            func_name=function_name,117            handler_file=TEST_LAMBDA_PYTHON_ECHO,118            runtime=LAMBDA_RUNTIME_PYTHON36,119            role=lambda_su_role,120        )121        latest_stream_arn = dynamodb_create_table(122            table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123        )["TableDescription"]["LatestStreamArn"]124        rs = lambda_client.create_event_source_mapping(125            FunctionName=function_name,126            EventSourceArn=latest_stream_arn,127            StartingPosition="TRIM_HORIZON",128            MaximumBatchingWindowInSeconds=1,129        )130        uuid = rs["UUID"]131        def wait_for_table_created():132            return (133                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134                == "ACTIVE"135            )136        assert poll_condition(wait_for_table_created, timeout=30)137        def wait_for_stream_created():138            return (139                dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140                    "StreamDescription"141                ]["StreamStatus"]142                == "ENABLED"143            )144        assert poll_condition(wait_for_stream_created, timeout=30)145        table = dynamodb_resource.Table(ddb_table)146        items = [147            {"id": short_uid(), "data": "data1"},148            {"id": short_uid(), "data": "data2"},149        ]150        table.put_item(Item=items[0])151        def assert_events():152            events = get_lambda_log_events(function_name, logs_client=logs_client)153            # lambda was invoked 1 time154            assert 1 == len(events[0]["Records"])155        # might take some time against AWS156        retry(assert_events, sleep=3, retries=10)157        # disable event source mapping158        lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159        table.put_item(Item=items[1])160        events = get_lambda_log_events(function_name, logs_client=logs_client)161        # lambda no longer invoked, still have 1 event162        assert 1 == len(events[0]["Records"])163    # TODO invalid test against AWS, this behavior just is not correct164    def test_deletion_event_source_mapping_with_dynamodb(165        self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166    ):167        function_name = f"lambda_func-{short_uid()}"168        ddb_table = f"ddb_table-{short_uid()}"169        create_lambda_function(170            func_name=function_name,171            handler_file=TEST_LAMBDA_PYTHON_ECHO,172            runtime=LAMBDA_RUNTIME_PYTHON36,173            role=lambda_su_role,174        )175        latest_stream_arn = aws_stack.create_dynamodb_table(176            table_name=ddb_table,177            partition_key="id",178            client=dynamodb_client,179            stream_view_type="NEW_IMAGE",180        )["TableDescription"]["LatestStreamArn"]181        lambda_client.create_event_source_mapping(182            FunctionName=function_name,183            EventSourceArn=latest_stream_arn,184            StartingPosition="TRIM_HORIZON",185        )186        def wait_for_table_created():187            return (188                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189                == "ACTIVE"190            )191        assert poll_condition(wait_for_table_created, timeout=30)192        dynamodb_client.delete_table(TableName=ddb_table)193        result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194        assert 1 == len(result["EventSourceMappings"])195    def test_event_source_mapping_with_sqs(196        self,197        create_lambda_function,198        lambda_client,199        sqs_client,200        sqs_create_queue,201        sqs_queue_arn,202        logs_client,203        lambda_su_role,204    ):205        function_name = f"lambda_func-{short_uid()}"206        queue_name_1 = f"queue-{short_uid()}-1"207        create_lambda_function(208            func_name=function_name,209            handler_file=TEST_LAMBDA_PYTHON_ECHO,210            runtime=LAMBDA_RUNTIME_PYTHON36,211            role=lambda_su_role,212        )213        queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214        queue_arn_1 = sqs_queue_arn(queue_url_1)215        lambda_client.create_event_source_mapping(216            EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217        )218        sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219        def assert_lambda_log_events():220            events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221            # lambda was invoked 1 time222            assert 1 == len(events[0]["Records"])223        retry(assert_lambda_log_events, sleep_before=3, retries=30)224        rs = sqs_client.receive_message(QueueUrl=queue_url_1)225        assert rs.get("Messages") is None226    def test_create_kinesis_event_source_mapping(227        self,228        create_lambda_function,229        lambda_client,230        kinesis_client,231        kinesis_create_stream,232        lambda_su_role,233        wait_for_stream_ready,234        logs_client,235    ):236        function_name = f"lambda_func-{short_uid()}"237        stream_name = f"test-foobar-{short_uid()}"238        create_lambda_function(239            func_name=function_name,240            handler_file=TEST_LAMBDA_PYTHON_ECHO,241            runtime=LAMBDA_RUNTIME_PYTHON36,242            role=lambda_su_role,243        )244        kinesis_create_stream(StreamName=stream_name, ShardCount=1)245        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246            "StreamARN"247        ]248        # only valid against AWS / new provider (once implemented)249        if not is_old_provider():250            with pytest.raises(ClientError) as e:251                lambda_client.create_event_source_mapping(252                    EventSourceArn=stream_arn, FunctionName=function_name253                )254            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255        wait_for_stream_ready(stream_name=stream_name)256        lambda_client.create_event_source_mapping(257            EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258        )259        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261        num_events_kinesis = 10262        kinesis_client.put_records(263            Records=[264                {"Data": "{}", "PartitionKey": f"test_{i}"} for i in range(0, num_events_kinesis)265            ],266            StreamName=stream_name,267        )268        def get_lambda_events():269            events = get_lambda_log_events(function_name, logs_client=logs_client)270            assert events271            return events272        events = retry(get_lambda_events, retries=30)273        assert 10 == len(events[0]["Records"])274        assert "eventID" in events[0]["Records"][0]275        assert "eventSourceARN" in events[0]["Records"][0]276        assert "eventSource" in events[0]["Records"][0]277        assert "eventVersion" in events[0]["Records"][0]278        assert "eventName" in events[0]["Records"][0]279        assert "invokeIdentityArn" in events[0]["Records"][0]280        assert "awsRegion" in events[0]["Records"][0]281        assert "kinesis" in events[0]["Records"][0]282    def test_python_lambda_subscribe_sns_topic(283        self,284        create_lambda_function,285        sns_client,286        lambda_su_role,287        sns_topic,288        logs_client,289        lambda_client,290    ):291        function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"292        permission_id = f"test-statement-{short_uid()}"293        lambda_creation_response = create_lambda_function(294            func_name=function_name,295            handler_file=TEST_LAMBDA_PYTHON_ECHO,296            runtime=LAMBDA_RUNTIME_PYTHON36,297            role=lambda_su_role,298        )299        lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]300        topic_arn = sns_topic["Attributes"]["TopicArn"]301        lambda_client.add_permission(302            FunctionName=function_name,303            StatementId=permission_id,304            Action="lambda:InvokeFunction",305            Principal="sns.amazonaws.com",306            SourceArn=topic_arn,307        )308        sns_client.subscribe(309            TopicArn=topic_arn,310            Protocol="lambda",311            Endpoint=lambda_arn,312        )313        subject = "[Subject] Test subject"314        message = "Hello world."315        sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)316        events = retry(317            check_expected_lambda_log_events_length,318            retries=10,319            sleep=1,320            function_name=function_name,321            expected_length=1,322            regex_filter="Records.*Sns",323            logs_client=logs_client,324        )325        notification = events[0]["Records"][0]["Sns"]326        assert "Subject" in notification327        assert subject == notification["Subject"]328class TestLambdaHttpInvocation:329    def test_http_invocation_with_apigw_proxy(self, create_lambda_function):330        lambda_name = f"test_lambda_{short_uid()}"331        lambda_resource = "/api/v1/{proxy+}"332        lambda_path = "/api/v1/hello/world"333        lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path334        lambda_request_context_resource_path = lambda_resource335        # create lambda function336        create_lambda_function(337            func_name=lambda_name,338            handler_file=TEST_LAMBDA_PYTHON,339            libs=TEST_LAMBDA_LIBS,340        )341        # create API Gateway and connect it to the Lambda proxy backend342        lambda_uri = aws_stack.lambda_function_arn(lambda_name)343        target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"344        result = testutil.connect_api_gateway_to_http_with_lambda_proxy(345            "test_gateway2",346            target_uri,347            path=lambda_resource,348            stage_name=TEST_STAGE_NAME,349        )350        api_id = result["id"]351        url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)352        result = safe_requests.post(353            url, data=b"{}", headers={"User-Agent": "python-requests/testing"}354        )355        content = json.loads(result.content)356        assert lambda_path == content["path"]357        assert lambda_resource == content["resource"]358        assert lambda_request_context_path == content["requestContext"]["path"]359        assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]360class TestKinesisSource:361    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)362    def test_kinesis_lambda_parallelism(363        self,364        lambda_client,365        kinesis_client,366        create_lambda_function,367        kinesis_create_stream,368        wait_for_stream_ready,369        logs_client,370        lambda_su_role,371    ):372        function_name = f"lambda_func-{short_uid()}"373        stream_name = f"test-foobar-{short_uid()}"374        create_lambda_function(375            handler_file=TEST_LAMBDA_PARALLEL_FILE,376            func_name=function_name,377            runtime=LAMBDA_RUNTIME_PYTHON36,378            role=lambda_su_role,379        )380        kinesis_create_stream(StreamName=stream_name, ShardCount=1)381        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][382            "StreamARN"383        ]384        wait_for_stream_ready(stream_name=stream_name)385        lambda_client.create_event_source_mapping(386            EventSourceArn=stream_arn,387            FunctionName=function_name,388            StartingPosition="TRIM_HORIZON",389            BatchSize=10,390        )391        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)392        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]393        num_events_kinesis = 10394        # assure async call395        start = time.perf_counter()396        kinesis_client.put_records(397            Records=[398                {"Data": '{"batch": 0}', "PartitionKey": f"test_{i}"}399                for i in range(0, num_events_kinesis)400            ],401            StreamName=stream_name,402        )403        assert (time.perf_counter() - start) < 1  # this should not take more than a second404        kinesis_client.put_records(405            Records=[406                {"Data": '{"batch": 1}', "PartitionKey": f"test_{i}"}407                for i in range(0, num_events_kinesis)408            ],409            StreamName=stream_name,410        )411        def get_events():412            events = get_lambda_log_events(413                function_name, regex_filter=r"event.*Records", logs_client=logs_client414            )415            assert len(events) == 2416            return events417        events = retry(get_events, retries=30)418        def assertEvent(event, batch_no):419            assert 10 == len(event["event"]["Records"])420            assert "eventID" in event["event"]["Records"][0]421            assert "eventSourceARN" in event["event"]["Records"][0]422            assert "eventSource" in event["event"]["Records"][0]423            assert "eventVersion" in event["event"]["Records"][0]424            assert "eventName" in event["event"]["Records"][0]425            assert "invokeIdentityArn" in event["event"]["Records"][0]426            assert "awsRegion" in event["event"]["Records"][0]427            assert "kinesis" in event["event"]["Records"][0]428            assert {"batch": batch_no} == json.loads(429                base64.b64decode(event["event"]["Records"][0]["kinesis"]["data"]).decode(430                    config.DEFAULT_ENCODING431                )432            )433        assertEvent(events[0], 0)434        assertEvent(events[1], 1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
