Best Python code snippet using localstack_python
test_dynamodb.py
Source:test_dynamodb.py  
...314                ],315                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},316                Tags=TEST_DDB_TAGS,317            )318            dynamodb_wait_for_table_active(table_name)319            item = {"SK": {"S": "hello"}, "LSI1SK": {"N": "123"}, "PK": {"S": "test one"}}320            dynamodb_client.put_item(TableName=table_name, Item=item)321            result = dynamodb_client.query(322                TableName=table_name,323                IndexName="LSI1",324                KeyConditionExpression="PK = :v1",325                ExpressionAttributeValues={":v1": {"S": "test one"}},326                Select="ALL_ATTRIBUTES",327            )328            assert result["Items"] == [item]329        finally:330            dynamodb_client.delete_table(TableName=table_name)331    def test_more_than_20_global_secondary_indexes(self, dynamodb, dynamodb_client):332        table_name = f"test-table-{short_uid()}"333        num_gsis = 25334        attrs = [{"AttributeName": f"a{i}", "AttributeType": "S"} for i in range(num_gsis)]335        gsis = [336            {337                "IndexName": f"gsi_{i}",338                "KeySchema": [{"AttributeName": f"a{i}", "KeyType": "HASH"}],339                "Projection": {"ProjectionType": "ALL"},340            }341            for i in range(num_gsis)342        ]343        dynamodb.create_table(344            TableName=table_name,345            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],346            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}, *attrs],347            GlobalSecondaryIndexes=gsis,348            BillingMode="PAY_PER_REQUEST",349        )350        table = dynamodb_client.describe_table(TableName=table_name)351        assert len(table["Table"]["GlobalSecondaryIndexes"]) == num_gsis352        # clean up353        delete_table(table_name)354    @pytest.mark.aws_validated355    def test_return_values_in_put_item(self, dynamodb, dynamodb_client):356        aws_stack.create_dynamodb_table(357            TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client358        )359        table = dynamodb.Table(TEST_DDB_TABLE_NAME)360        def _validate_response(response, expected: dict = {}):361            """362            Validates the response against the optionally expected one.363            It checks that the response doesn't contain `Attributes`,364            `ConsumedCapacity` and `ItemCollectionMetrics` unless they are expected.365            """366            should_not_contain = {367                "Attributes",368                "ConsumedCapacity",369                "ItemCollectionMetrics",370            } - expected.keys()371            assert response["ResponseMetadata"]["HTTPStatusCode"] == 200372            assert expected.items() <= response.items()373            assert response.keys().isdisjoint(should_not_contain)374        # items which are being used to put in the table375        item1 = {PARTITION_KEY: "id1", "data": "foobar"}376        item1b = {PARTITION_KEY: "id1", "data": "barfoo"}377        item2 = {PARTITION_KEY: "id2", "data": "foobar"}378        response = table.put_item(Item=item1, ReturnValues="ALL_OLD")379        # there is no data present in the table already so even if return values380        # is set to 'ALL_OLD' as there is no data it will not return any data.381        _validate_response(response)382        # now the same data is present so when we pass return values as 'ALL_OLD'383        # it should give us attributes384        response = table.put_item(Item=item1, ReturnValues="ALL_OLD")385        _validate_response(response, expected={"Attributes": item1})386        # now a previous version of data is present, so when we pass return387        # values as 'ALL_OLD' it should give us the old attributes388        response = table.put_item(Item=item1b, ReturnValues="ALL_OLD")389        _validate_response(response, expected={"Attributes": item1})390        response = table.put_item(Item=item2)391        # we do not have any same item as item2 already so when we add this by default392        # return values is set to None so no Attribute values should be returned393        _validate_response(response)394        response = table.put_item(Item=item2)395        # in this case we already have item2 in the table so on this request396        # it should not return any data as return values is set to None so no397        # Attribute values should be returned398        _validate_response(response)399        # cleanup400        table.delete()401    @pytest.mark.aws_validated402    def test_empty_and_binary_values(self, dynamodb, dynamodb_client):403        aws_stack.create_dynamodb_table(404            TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client405        )406        table = dynamodb.Table(TEST_DDB_TABLE_NAME)407        # items which are being used to put in the table408        item1 = {PARTITION_KEY: "id1", "data": ""}409        item2 = {PARTITION_KEY: "id2", "data": b"\x90"}410        response = table.put_item(Item=item1)411        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200412        response = table.put_item(Item=item2)413        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414        # clean up415        table.delete()416    def test_batch_write_binary(self, dynamodb_client):417        table_name = "table_batch_binary_%s" % short_uid()418        dynamodb_client.create_table(419            TableName=table_name,420            AttributeDefinitions=[421                {"AttributeName": "PK", "AttributeType": "S"},422                {"AttributeName": "SK", "AttributeType": "S"},423            ],424            KeySchema=[425                {"AttributeName": "PK", "KeyType": "HASH"},426                {"AttributeName": "SK", "KeyType": "RANGE"},427            ],428            BillingMode="PAY_PER_REQUEST",429        )430        dynamodb_client.put_item(431            TableName=table_name,432            Item={"PK": {"S": "hello"}, "SK": {"S": "user"}, "data": {"B": b"test"}},433        )434        item = {435            "Item": {436                "PK": {"S": "hello-1"},437                "SK": {"S": "user-1"},438                "data": {"B": b"test-1"},439            }440        }441        item_non_decodable = {442            "Item": {443                "PK": {"S": "hello-2"},444                "SK": {"S": "user-2"},445                "data": {"B": b"test \xc0 \xed"},446            }447        }448        response = dynamodb_client.batch_write_item(449            RequestItems={table_name: [{"PutRequest": item}, {"PutRequest": item_non_decodable}]}450        )451        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200452        dynamodb_client.delete_table(TableName=table_name)453    def test_binary_data_with_stream(454        self,455        wait_for_stream_ready,456        dynamodb_create_table_with_parameters,457        dynamodb_client,458        kinesis_client,459    ):460        table_name = f"table-{short_uid()}"461        dynamodb_create_table_with_parameters(462            TableName=table_name,463            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],464            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],465            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},466            StreamSpecification={467                "StreamEnabled": True,468                "StreamViewType": "NEW_AND_OLD_IMAGES",469            },470        )471        stream_name = get_kinesis_stream_name(table_name)472        wait_for_stream_ready(stream_name)473        response = dynamodb_client.put_item(474            TableName=table_name, Item={"id": {"S": "id1"}, "data": {"B": b"\x90"}}475        )476        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200477        iterator = get_shard_iterator(stream_name, kinesis_client)478        response = kinesis_client.get_records(ShardIterator=iterator)479        json_records = response.get("Records")480        assert 1 == len(json_records)481        assert "Data" in json_records[0]482    def test_dynamodb_stream_shard_iterator(self, wait_for_stream_ready):483        dynamodb = aws_stack.create_external_boto_client("dynamodb")484        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")485        table_name = "table_with_stream-%s" % short_uid()486        table = dynamodb.create_table(487            TableName=table_name,488            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],489            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],490            StreamSpecification={491                "StreamEnabled": True,492                "StreamViewType": "NEW_IMAGE",493            },494            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},495        )496        stream_name = get_kinesis_stream_name(table_name)497        wait_for_stream_ready(stream_name)498        stream_arn = table["TableDescription"]["LatestStreamArn"]499        result = ddbstreams.describe_stream(StreamArn=stream_arn)500        response = ddbstreams.get_shard_iterator(501            StreamArn=stream_arn,502            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],503            ShardIteratorType="LATEST",504        )505        assert "ShardIterator" in response506        response = ddbstreams.get_shard_iterator(507            StreamArn=stream_arn,508            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],509            ShardIteratorType="AT_SEQUENCE_NUMBER",510            SequenceNumber=result["StreamDescription"]["Shards"][0]511            .get("SequenceNumberRange")512            .get("StartingSequenceNumber"),513        )514        assert "ShardIterator" in response515    def test_dynamodb_create_table_with_class(self, dynamodb_client):516        table_name = "table_with_class_%s" % short_uid()517        # create table518        result = dynamodb_client.create_table(519            TableName=table_name,520            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],521            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],522            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},523            TableClass="STANDARD",524        )525        assert result["TableDescription"]["TableClassSummary"]["TableClass"] == "STANDARD"526        result = dynamodb_client.describe_table(TableName=table_name)527        assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD"528        result = dynamodb_client.update_table(529            TableName=table_name, TableClass="STANDARD_INFREQUENT_ACCESS"530        )531        assert (532            result["TableDescription"]["TableClassSummary"]["TableClass"]533            == "STANDARD_INFREQUENT_ACCESS"534        )535        result = dynamodb_client.describe_table(TableName=table_name)536        assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD_INFREQUENT_ACCESS"537        # clean resources538        dynamodb_client.delete_table(TableName=table_name)539    def test_dynamodb_execute_transaction(self, dynamodb_client):540        table_name = "table_%s" % short_uid()541        dynamodb_client.create_table(542            TableName=table_name,543            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],544            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],545            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},546        )547        statements = [548            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},549            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user02'}}"},550        ]551        result = dynamodb_client.execute_transaction(TransactStatements=statements)552        assert result["ResponseMetadata"]["HTTPStatusCode"] == 200553        result = dynamodb_client.scan(TableName=table_name)554        assert result["ScannedCount"] == 2555        dynamodb_client.delete_table(TableName=table_name)556    def test_dynamodb_batch_execute_statement(self, dynamodb_client):557        table_name = "table_%s" % short_uid()558        dynamodb_client.create_table(559            TableName=table_name,560            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],561            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],562            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},563        )564        dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "user02"}})565        statements = [566            {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},567            {"Statement": f"UPDATE {table_name} SET Age=20 WHERE Username='user02'"},568        ]569        result = dynamodb_client.batch_execute_statement(Statements=statements)570        # actions always succeeds571        assert not any("Error" in r for r in result["Responses"])572        item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user02"}})[573            "Item"574        ]575        assert item["Age"]["N"] == "20"576        item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user01"}})[577            "Item"578        ]579        assert item580        dynamodb_client.delete_table(TableName=table_name)581    def test_dynamodb_partiql_missing(self, dynamodb_client):582        table_name = "table_with_stream_%s" % short_uid()583        # create table584        dynamodb_client.create_table(585            TableName=table_name,586            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],587            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],588            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},589        )590        # create items with FirstName attribute591        dynamodb_client.execute_statement(592            Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice123', 'FirstName':'Alice'}}"593        )594        items = dynamodb_client.execute_statement(595            Statement=f"SELECT * FROM {table_name} WHERE FirstName IS NOT MISSING"596        )["Items"]597        assert len(items) == 1598        items = dynamodb_client.execute_statement(599            Statement=f"SELECT * FROM {table_name} WHERE FirstName IS MISSING"600        )["Items"]601        assert len(items) == 0602        dynamodb_client.delete_table(TableName=table_name)603    def test_dynamodb_stream_stream_view_type(self):604        dynamodb = aws_stack.create_external_boto_client("dynamodb")605        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")606        table_name = "table_with_stream_%s" % short_uid()607        # create table608        table = dynamodb.create_table(609            TableName=table_name,610            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],611            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],612            StreamSpecification={613                "StreamEnabled": True,614                "StreamViewType": "KEYS_ONLY",615            },616            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},617        )618        stream_arn = table["TableDescription"]["LatestStreamArn"]619        # wait for stream to be created620        sleep(1)621        # put item in table - INSERT event622        dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})623        # update item in table - MODIFY event624        dynamodb.update_item(625            TableName=table_name,626            Key={"Username": {"S": "Fred"}},627            UpdateExpression="set S=:r",628            ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},629            ReturnValues="UPDATED_NEW",630        )631        # delete item in table - REMOVE event632        dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})633        result = ddbstreams.describe_stream(StreamArn=stream_arn)634        # assert stream_view_type of the table635        assert result["StreamDescription"]["StreamViewType"] == "KEYS_ONLY"636        # add item via PartiQL query - INSERT event637        dynamodb.execute_statement(638            Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice'}}"639        )640        # run update via PartiQL query - MODIFY event641        dynamodb.execute_statement(642            Statement=f"UPDATE {table_name} SET partiql=1 WHERE Username='Alice'"643        )644        # run update via PartiQL query - REMOVE event645        dynamodb.execute_statement(Statement=f"DELETE FROM {table_name} WHERE Username='Alice'")646        # get shard iterator647        response = ddbstreams.get_shard_iterator(648            StreamArn=stream_arn,649            ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],650            ShardIteratorType="AT_SEQUENCE_NUMBER",651            SequenceNumber=result["StreamDescription"]["Shards"][0]652            .get("SequenceNumberRange")653            .get("StartingSequenceNumber"),654        )655        # get stream records656        records = ddbstreams.get_records(ShardIterator=response["ShardIterator"])["Records"]657        assert len(records) == 6658        events = [rec["eventName"] for rec in records]659        assert events == ["INSERT", "MODIFY", "REMOVE"] * 2660        # assert that all records contain proper event IDs661        event_ids = [rec.get("eventID") for rec in records]662        assert all(event_ids)663        # assert that updates have been received from regular table operations and PartiQL query operations664        for idx, record in enumerate(records):665            assert "SequenceNumber" in record["dynamodb"]666            assert record["dynamodb"]["StreamViewType"] == "KEYS_ONLY"667            assert record["dynamodb"]["Keys"] == {"Username": {"S": "Fred" if idx < 3 else "Alice"}}668            assert "OldImage" not in record["dynamodb"]669            assert "NewImage" not in record["dynamodb"]670        # clean up671        delete_table(table_name)672    def test_dynamodb_with_kinesis_stream(self):673        dynamodb = aws_stack.create_external_boto_client("dynamodb")674        kinesis = aws_stack.create_external_boto_client("kinesis")675        # create kinesis datastream676        stream_name = "kinesis_dest_stream"677        kinesis.create_stream(StreamName=stream_name, ShardCount=1)678        # wait for the stream to be created679        sleep(1)680        # Get stream description681        stream_description = kinesis.describe_stream(StreamName=stream_name)["StreamDescription"]682        table_name = "table_with_kinesis_stream-%s" % short_uid()683        # create table684        dynamodb.create_table(685            TableName=table_name,686            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],687            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],688            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},689        )690        # Enable kinesis destination for the table691        dynamodb.enable_kinesis_streaming_destination(692            TableName=table_name, StreamArn=stream_description["StreamARN"]693        )694        # put item into table695        dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})696        # update item in table697        dynamodb.update_item(698            TableName=table_name,699            Key={"Username": {"S": "Fred"}},700            UpdateExpression="set S=:r",701            ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},702            ReturnValues="UPDATED_NEW",703        )704        # delete item in table705        dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})706        def _fetch_records():707            records = aws_stack.kinesis_get_latest_records(708                stream_name, shard_id=stream_description["Shards"][0]["ShardId"]709            )710            assert len(records) == 3711            return records712        # get records from the stream713        records = retry(_fetch_records)714        for record in records:715            record = json.loads(record["Data"])716            assert record["tableName"] == table_name717            # check eventSourceARN not exists in the stream record718            assert "eventSourceARN" not in record719            if record["eventName"] == "INSERT":720                assert "OldImage" not in record["dynamodb"]721                assert "NewImage" in record["dynamodb"]722            elif record["eventName"] == "MODIFY":723                assert "NewImage" in record["dynamodb"]724                assert "OldImage" in record["dynamodb"]725            elif record["eventName"] == "REMOVE":726                assert "NewImage" not in record["dynamodb"]727                assert "OldImage" in record["dynamodb"]728        # describe kinesis streaming destination of the table729        destinations = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)730        destination = destinations["KinesisDataStreamDestinations"][0]731        # assert kinesis streaming destination status732        assert stream_description["StreamARN"] == destination["StreamArn"]733        assert destination["DestinationStatus"] == "ACTIVE"734        # Disable kinesis destination735        dynamodb.disable_kinesis_streaming_destination(736            TableName=table_name, StreamArn=stream_description["StreamARN"]737        )738        # describe kinesis streaming destination of the table739        result = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)740        destination = result["KinesisDataStreamDestinations"][0]741        # assert kinesis streaming destination status742        assert stream_description["StreamARN"] == destination["StreamArn"]743        assert destination["DestinationStatus"] == "DISABLED"744        # clean up745        delete_table(table_name)746        kinesis.delete_stream(StreamName="kinesis_dest_stream")747    def test_global_tables(self):748        aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)749        dynamodb = aws_stack.create_external_boto_client("dynamodb")750        # create global table751        regions = [752            {"RegionName": "us-east-1"},753            {"RegionName": "us-west-1"},754            {"RegionName": "eu-central-1"},755        ]756        response = dynamodb.create_global_table(757            GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions758        )["GlobalTableDescription"]759        assert "ReplicationGroup" in response760        assert len(response["ReplicationGroup"]) == len(regions)761        # describe global table762        response = dynamodb.describe_global_table(GlobalTableName=TEST_DDB_TABLE_NAME)[763            "GlobalTableDescription"764        ]765        assert "ReplicationGroup" in response766        assert len(regions) == len(response["ReplicationGroup"])767        # update global table768        updates = [769            {"Create": {"RegionName": "us-east-2"}},770            {"Create": {"RegionName": "us-west-2"}},771            {"Delete": {"RegionName": "us-west-1"}},772        ]773        response = dynamodb.update_global_table(774            GlobalTableName=TEST_DDB_TABLE_NAME, ReplicaUpdates=updates775        )["GlobalTableDescription"]776        assert "ReplicationGroup" in response777        assert len(response["ReplicationGroup"]) == len(regions) + 1778        # assert exceptions for invalid requests779        with pytest.raises(Exception) as ctx:780            dynamodb.create_global_table(781                GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions782            )783        assert ctx.match("GlobalTableAlreadyExistsException")784        with pytest.raises(Exception) as ctx:785            dynamodb.describe_global_table(GlobalTableName="invalid-table-name")786        assert ctx.match("GlobalTableNotFoundException")787    def test_create_duplicate_table(self, dynamodb_create_table_with_parameters):788        table_name = "duplicateTable"789        dynamodb_create_table_with_parameters(790            TableName=table_name,791            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],792            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],793            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},794            Tags=TEST_DDB_TAGS,795        )796        with pytest.raises(Exception) as ctx:797            dynamodb_create_table_with_parameters(798                TableName=table_name,799                KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],800                AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],801                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},802                Tags=TEST_DDB_TAGS,803            )804        ctx.match("ResourceInUseException")805    def test_delete_table(self, dynamodb_client, dynamodb_create_table):806        table_name = "test-ddb-table-%s" % short_uid()807        tables_before = len(dynamodb_client.list_tables()["TableNames"])808        dynamodb_create_table(809            table_name=table_name,810            partition_key=PARTITION_KEY,811        )812        table_list = dynamodb_client.list_tables()813        # TODO: fix assertion, to enable parallel test execution!814        assert tables_before + 1 == len(table_list["TableNames"])815        assert table_name in table_list["TableNames"]816        dynamodb_client.delete_table(TableName=table_name)817        table_list = dynamodb_client.list_tables()818        assert tables_before == len(table_list["TableNames"])819        with pytest.raises(Exception) as ctx:820            dynamodb_client.delete_table(TableName=table_name)821        assert ctx.match("ResourceNotFoundException")822    def test_transaction_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):823        table_name = "test-ddb-table-%s" % short_uid()824        dynamodb_create_table_with_parameters(825            TableName=table_name,826            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],827            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],828            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},829            Tags=TEST_DDB_TAGS,830        )831        response = dynamodb_client.transact_write_items(832            TransactItems=[833                {834                    "ConditionCheck": {835                        "TableName": table_name,836                        "ConditionExpression": "attribute_not_exists(id)",837                        "Key": {"id": {"S": "test1"}},838                    }839                },840                {"Put": {"TableName": table_name, "Item": {"id": {"S": "test2"}}}},841                {842                    "Update": {843                        "TableName": table_name,844                        "Key": {"id": {"S": "test3"}},845                        "UpdateExpression": "SET attr1 = :v1, attr2 = :v2",846                        "ExpressionAttributeValues": {847                            ":v1": {"S": "value1"},848                            ":v2": {"S": "value2"},849                        },850                    }851                },852                {"Delete": {"TableName": table_name, "Key": {"id": {"S": "test4"}}}},853            ]854        )855        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200856    @pytest.mark.aws_validated857    def test_transaction_write_canceled(858        self, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active, dynamodb_client859    ):860        table_name = "table_%s" % short_uid()861        # create table862        dynamodb_create_table_with_parameters(863            TableName=table_name,864            KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],865            AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],866            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},867        )868        dynamodb_wait_for_table_active(table_name)869        # put item in table - INSERT event870        dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})871        # provoke a TransactionCanceledException by adding a condition which is not met872        with pytest.raises(Exception) as ctx:873            dynamodb_client.transact_write_items(874                TransactItems=[875                    {876                        "ConditionCheck": {877                            "TableName": table_name,878                            "ConditionExpression": "attribute_not_exists(Username)",879                            "Key": {"Username": {"S": "Fred"}},880                        }881                    },882                    {"Delete": {"TableName": table_name, "Key": {"Username": {"S": "Bert"}}}},883                ]884            )885        # Make sure the exception contains the cancellation reasons886        assert ctx.match("TransactionCanceledException")887        assert (888            str(ctx.value)889            == "An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: "890            "Transaction cancelled, please refer cancellation reasons for specific reasons "891            "[ConditionalCheckFailed, None]"892        )893        assert hasattr(ctx.value, "response")894        assert "CancellationReasons" in ctx.value.response895        conditional_check_failed = [896            reason897            for reason in ctx.value.response["CancellationReasons"]898            if reason.get("Code") == "ConditionalCheckFailed"899        ]900        assert len(conditional_check_failed) == 1901        assert "Message" in conditional_check_failed[0]902        # dynamodb-local adds a trailing "." to the message, AWS does not903        assert re.match(904            r"^The conditional request failed\.?$", conditional_check_failed[0]["Message"]905        )906    def test_transaction_write_binary_data(907        self, dynamodb_client, dynamodb_create_table_with_parameters908    ):909        table_name = "test-ddb-table-%s" % short_uid()910        dynamodb_create_table_with_parameters(911            TableName=table_name,912            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],913            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],914            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},915            Tags=TEST_DDB_TAGS,916        )917        binary_item = {"B": b"foobar"}918        response = dynamodb_client.transact_write_items(919            TransactItems=[920                {921                    "Put": {922                        "TableName": table_name,923                        "Item": {924                            "id": {"S": "someUser"},925                            "binaryData": binary_item,926                        },927                    }928                }929            ]930        )931        item = dynamodb_client.get_item(TableName=table_name, Key={"id": {"S": "someUser"}})["Item"]932        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200933        assert item["binaryData"]934        assert item["binaryData"] == binary_item935    def test_transact_get_items(self, dynamodb_client, dynamodb_create_table):936        table_name = "test-ddb-table-%s" % short_uid()937        dynamodb_create_table(938            table_name=table_name,939            partition_key=PARTITION_KEY,940        )941        dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "John"}})942        result = dynamodb_client.transact_get_items(943            TransactItems=[{"Get": {"Key": {"id": {"S": "John"}}, "TableName": table_name}}]944        )945        assert result["ResponseMetadata"]["HTTPStatusCode"] == 200946    def test_batch_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):947        table_name = "test-ddb-table-%s" % short_uid()948        dynamodb_create_table_with_parameters(949            TableName=table_name,950            KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],951            AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],952            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},953            Tags=TEST_DDB_TAGS,954        )955        dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "Fred"}})956        response = dynamodb_client.batch_write_item(957            RequestItems={958                table_name: [959                    {"DeleteRequest": {"Key": {"id": {"S": "Fred"}}}},960                    {"PutRequest": {"Item": {"id": {"S": "Bob"}}}},961                ]962            }963        )964        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200965    @pytest.mark.xfail(reason="this test flakes regularly in CI")966    def test_dynamodb_stream_records_with_update_item(967        self,968        dynamodb_client,969        dynamodbstreams_client,970        dynamodb_resource,971        dynamodb_create_table,972        wait_for_stream_ready,973    ):974        table_name = f"test-ddb-table-{short_uid()}"975        dynamodb_create_table(976            table_name=table_name,977            partition_key=PARTITION_KEY,978            stream_view_type="NEW_AND_OLD_IMAGES",979        )980        table = dynamodb_resource.Table(table_name)981        stream_name = get_kinesis_stream_name(table_name)982        wait_for_stream_ready(stream_name)983        response = dynamodbstreams_client.describe_stream(StreamArn=table.latest_stream_arn)984        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200985        assert len(response["StreamDescription"]["Shards"]) == 1986        shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]987        starting_sequence_number = int(988            response["StreamDescription"]["Shards"][0]989            .get("SequenceNumberRange")990            .get("StartingSequenceNumber")991        )992        response = dynamodbstreams_client.get_shard_iterator(993            StreamArn=table.latest_stream_arn,994            ShardId=shard_id,995            ShardIteratorType="LATEST",996        )997        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200998        assert "ShardIterator" in response999        iterator_id = response["ShardIterator"]1000        item_id = short_uid()1001        for _ in range(2):1002            dynamodb_client.update_item(1003                TableName=table_name,1004                Key={PARTITION_KEY: {"S": item_id}},1005                UpdateExpression="SET attr1 = :v1, attr2 = :v2",1006                ExpressionAttributeValues={1007                    ":v1": {"S": "value1"},1008                    ":v2": {"S": "value2"},1009                },1010                ReturnValues="ALL_NEW",1011                ReturnConsumedCapacity="INDEXES",1012            )1013        def check_expected_records():1014            records = dynamodbstreams_client.get_records(ShardIterator=iterator_id)1015            assert records["ResponseMetadata"]["HTTPStatusCode"] == 2001016            assert len(records["Records"]) == 21017            assert isinstance(1018                records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"],1019                datetime,1020            )1021            assert records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01022            assert records["Records"][0]["eventVersion"] == "1.1"1023            assert records["Records"][0]["eventName"] == "INSERT"1024            assert "OldImage" not in records["Records"][0]["dynamodb"]1025            assert (1026                int(records["Records"][0]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1027            )1028            assert isinstance(1029                records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"],1030                datetime,1031            )1032            assert records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01033            assert records["Records"][1]["eventVersion"] == "1.1"1034            assert records["Records"][1]["eventName"] == "MODIFY"1035            assert "OldImage" in records["Records"][1]["dynamodb"]1036            assert (1037                int(records["Records"][1]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1038            )1039        retry(check_expected_records, retries=5, sleep=1, sleep_before=2)1040    def test_query_on_deleted_resource(self, dynamodb_client, dynamodb_create_table):1041        table_name = "ddb-table-%s" % short_uid()1042        partition_key = "username"1043        dynamodb_create_table(table_name=table_name, partition_key=partition_key)1044        rs = dynamodb_client.query(1045            TableName=table_name,1046            KeyConditionExpression="{} = :username".format(partition_key),1047            ExpressionAttributeValues={":username": {"S": "test"}},1048        )1049        assert rs["ResponseMetadata"]["HTTPStatusCode"] == 2001050        dynamodb_client.delete_table(TableName=table_name)1051        with pytest.raises(Exception) as ctx:1052            dynamodb_client.query(1053                TableName=table_name,1054                KeyConditionExpression="{} = :username".format(partition_key),1055                ExpressionAttributeValues={":username": {"S": "test"}},1056            )1057        assert ctx.match("ResourceNotFoundException")1058    def test_dynamodb_stream_to_lambda(1059        self, lambda_client, dynamodb_resource, dynamodb_create_table, wait_for_stream_ready1060    ):1061        table_name = "ddb-table-%s" % short_uid()1062        function_name = "func-%s" % short_uid()1063        partition_key = "SK"1064        dynamodb_create_table(1065            table_name=table_name,1066            partition_key=partition_key,1067            stream_view_type="NEW_AND_OLD_IMAGES",1068        )1069        table = dynamodb_resource.Table(table_name)1070        latest_stream_arn = table.latest_stream_arn1071        stream_name = get_kinesis_stream_name(table_name)1072        wait_for_stream_ready(stream_name)1073        testutil.create_lambda_function(1074            handler_file=TEST_LAMBDA_PYTHON_ECHO,1075            func_name=function_name,1076            runtime=LAMBDA_RUNTIME_PYTHON36,1077        )1078        mapping_uuid = lambda_client.create_event_source_mapping(1079            EventSourceArn=latest_stream_arn,1080            FunctionName=function_name,1081            StartingPosition="TRIM_HORIZON",1082        )["UUID"]1083        item = {"SK": short_uid(), "Name": "name-{}".format(short_uid())}1084        table.put_item(Item=item)1085        events = retry(1086            check_expected_lambda_log_events_length,1087            retries=10,1088            sleep=1,1089            function_name=function_name,1090            expected_length=1,1091            regex_filter=r"Records",1092        )1093        assert len(events) == 11094        assert len(events[0]["Records"]) == 11095        dynamodb_event = events[0]["Records"][0]["dynamodb"]1096        assert dynamodb_event["StreamViewType"] == "NEW_AND_OLD_IMAGES"1097        assert dynamodb_event["Keys"] == {"SK": {"S": item["SK"]}}1098        assert dynamodb_event["NewImage"]["Name"] == {"S": item["Name"]}1099        assert "SequenceNumber" in dynamodb_event1100        lambda_client.delete_event_source_mapping(UUID=mapping_uuid)1101    def test_dynamodb_batch_write_item(1102        self, dynamodb_client, dynamodb_create_table_with_parameters1103    ):1104        table_name = "ddb-table-%s" % short_uid()1105        dynamodb_create_table_with_parameters(1106            TableName=table_name,1107            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1108            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1109            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1110            Tags=TEST_DDB_TAGS,1111        )1112        result = dynamodb_client.batch_write_item(1113            RequestItems={1114                table_name: [1115                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test1"}}}},1116                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test2"}}}},1117                    {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test3"}}}},1118                ]1119            }1120        )1121        assert result.get("UnprocessedItems") == {}1122    def test_dynamodb_pay_per_request(self, dynamodb_create_table_with_parameters):1123        table_name = "ddb-table-%s" % short_uid()1124        with pytest.raises(Exception) as e:1125            dynamodb_create_table_with_parameters(1126                TableName=table_name,1127                KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1128                AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1129                ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1130                BillingMode="PAY_PER_REQUEST",1131            )1132        assert e.match("ValidationException")1133    def test_dynamodb_create_table_with_sse_specification(1134        self, dynamodb_create_table_with_parameters1135    ):1136        table_name = "ddb-table-%s" % short_uid()1137        kms_master_key_id = long_uid()1138        sse_specification = {"Enabled": True, "SSEType": "KMS", "KMSMasterKeyId": kms_master_key_id}1139        kms_master_key_arn = aws_stack.kms_key_arn(kms_master_key_id)1140        result = dynamodb_create_table_with_parameters(1141            TableName=table_name,1142            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1143            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1144            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1145            SSESpecification=sse_specification,1146            Tags=TEST_DDB_TAGS,1147        )1148        assert result["TableDescription"]["SSEDescription"]1149        assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1150        assert result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"] == kms_master_key_arn1151    def test_dynamodb_create_table_with_partial_sse_specification(1152        self, dynamodb_create_table_with_parameters, kms_client1153    ):1154        table_name = "ddb-table-%s" % short_uid()1155        sse_specification = {"Enabled": True}1156        result = dynamodb_create_table_with_parameters(1157            TableName=table_name,1158            KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1159            AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1160            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1161            SSESpecification=sse_specification,1162            Tags=TEST_DDB_TAGS,1163        )1164        assert result["TableDescription"]["SSEDescription"]1165        assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1166        assert result["TableDescription"]["SSEDescription"]["SSEType"] == "KMS"1167        assert "KMSMasterKeyArn" in result["TableDescription"]["SSEDescription"]1168        kms_master_key_arn = result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"]1169        result = kms_client.describe_key(KeyId=kms_master_key_arn)1170        assert result["KeyMetadata"]["KeyManager"] == "AWS"1171    def test_dynamodb_get_batch_items(self, dynamodb_client, dynamodb_create_table_with_parameters):1172        table_name = "ddb-table-%s" % short_uid()1173        dynamodb_create_table_with_parameters(1174            TableName=table_name,1175            KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],1176            AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],1177            ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},1178        )1179        result = dynamodb_client.batch_get_item(1180            RequestItems={table_name: {"Keys": [{"PK": {"S": "test-key"}}]}}1181        )1182        assert list(result["Responses"])[0] == table_name1183    def test_dynamodb_streams_describe_with_exclusive_start_shard_id(1184        self, dynamodb_resource, dynamodb_create_table1185    ):1186        table_name = f"test-ddb-table-{short_uid()}"1187        ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")1188        dynamodb_create_table(1189            table_name=table_name,1190            partition_key=PARTITION_KEY,1191            stream_view_type="NEW_AND_OLD_IMAGES",1192        )1193        table = dynamodb_resource.Table(table_name)1194        response = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)1195        assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001196        assert len(response["StreamDescription"]["Shards"]) == 11197        shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]1198        response = ddbstreams.describe_stream(1199            StreamArn=table.latest_stream_arn, ExclusiveStartShardId=shard_id1200        )1201        assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001202        assert len(response["StreamDescription"]["Shards"]) == 01203    @pytest.mark.aws_validated1204    def test_dynamodb_idempotent_writing(1205        self, dynamodb_create_table_with_parameters, dynamodb_client, dynamodb_wait_for_table_active1206    ):1207        table_name = f"ddb-table-{short_uid()}"1208        dynamodb_create_table_with_parameters(1209            TableName=table_name,1210            KeySchema=[1211                {"AttributeName": "id", "KeyType": "HASH"},1212                {"AttributeName": "name", "KeyType": "RANGE"},1213            ],1214            AttributeDefinitions=[1215                {"AttributeName": "id", "AttributeType": "S"},1216                {"AttributeName": "name", "AttributeType": "S"},1217            ],1218            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1219        )1220        dynamodb_wait_for_table_active(table_name)1221        def _transact_write(_d: Dict):1222            response = dynamodb_client.transact_write_items(1223                ClientRequestToken="dedupe_token",1224                TransactItems=[1225                    {1226                        "Put": {1227                            "TableName": table_name,1228                            "Item": _d,1229                        }1230                    },1231                ],1232            )1233            assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001234        _transact_write({"id": {"S": "id1"}, "name": {"S": "name1"}})...fixtures.py
Source:fixtures.py  
...233@pytest.fixture(scope="class")234def route53_client() -> "Route53Client":235    return _client("route53")236@pytest.fixture237def dynamodb_wait_for_table_active(dynamodb_client):238    def wait_for_table_active(table_name: str):239        def wait():240            return (241                dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"]242                == "ACTIVE"243            )244        poll_condition(wait, timeout=30)245    return wait_for_table_active246@pytest.fixture247def dynamodb_create_table_with_parameters(dynamodb_client, dynamodb_wait_for_table_active):248    tables = []249    def factory(**kwargs):250        if "TableName" not in kwargs:251            kwargs["TableName"] = "test-table-%s" % short_uid()252        tables.append(kwargs["TableName"])253        return dynamodb_client.create_table(**kwargs)254    yield factory255    # cleanup256    for table in tables:257        try:258            # table has to be in ACTIVE state before deletion259            dynamodb_wait_for_table_active(table)260            dynamodb_client.delete_table(TableName=table)261        except Exception as e:262            LOG.debug("error cleaning up table %s: %s", table, e)263@pytest.fixture264def dynamodb_create_table(dynamodb_client, dynamodb_wait_for_table_active):265    # beware, this swallows exception in create_dynamodb_table utility function266    tables = []267    def factory(**kwargs):268        kwargs["client"] = dynamodb_client269        if "table_name" not in kwargs:270            kwargs["table_name"] = "test-table-%s" % short_uid()271        if "partition_key" not in kwargs:272            kwargs["partition_key"] = "id"273        tables.append(kwargs["table_name"])274        return create_dynamodb_table(**kwargs)275    yield factory276    # cleanup277    for table in tables:278        try:279            # table has to be in ACTIVE state before deletion280            dynamodb_wait_for_table_active(table)281            dynamodb_client.delete_table(TableName=table)282        except Exception as e:283            LOG.debug("error cleaning up table %s: %s", table, e)284@pytest.fixture285def s3_create_bucket(s3_client, s3_resource):286    buckets = []287    def factory(**kwargs) -> str:288        if "Bucket" not in kwargs:289            kwargs["Bucket"] = "test-bucket-%s" % short_uid()290        s3_client.create_bucket(**kwargs)291        buckets.append(kwargs["Bucket"])292        return kwargs["Bucket"]293    yield factory294    # cleanup...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
