Best Python code snippet using localstack_python
test_streams.py
Source:test_streams.py  
1# Copyright 2020 ScyllaDB2#3# This file is part of Scylla.4#5# Scylla is free software: you can redistribute it and/or modify6# it under the terms of the GNU Affero General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# Scylla is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU Affero General Public License16# along with Scylla.  If not, see <http://www.gnu.org/licenses/>.17# Tests for stream operations: ListStreams, DescribeStream, GetShardIterator,18# GetRecords.19import pytest20import time21import urllib.request22from botocore.exceptions import ClientError23from util import list_tables, test_table_name, create_test_table, random_string, freeze24from contextlib import contextmanager25from urllib.error import URLError26from boto3.dynamodb.types import TypeDeserializer27stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']28def disable_stream(dynamodbstreams, table):29    table.update(StreamSpecification={'StreamEnabled': False});30    # Wait for the stream to really be disabled. A table may have multiple31    # historic streams - we need all of them to become DISABLED. One of32    # them (the current one) may remain DISABLING for some time.33    exp = time.process_time() + 6034    while time.process_time() < exp:35        streams = dynamodbstreams.list_streams(TableName=table.name)36        disabled = True37        for stream in streams['Streams']:38            desc = dynamodbstreams.describe_stream(StreamArn=stream['StreamArn'])['StreamDescription']39            if desc['StreamStatus'] != 'DISABLED':40                disabled = False41                break42        if disabled:43            print('disabled stream on {}'.format(table.name))44            return45        time.sleep(0.5)46    pytest.fail("timed out")47            48# Cannot use fixtures. Because real dynamodb cannot _remove_ a stream49# once created. It can only expire 24h later. So reusing test_table for 50# example works great for scylla/local testing, but once we run against51# actual aws instances, we get lingering streams, and worse: cannot 52# create new ones to replace it, because we have overlapping types etc. 53# 54# So we have to create and delete a table per test. And not run this 55# test to often against aws.  56@contextmanager57def create_stream_test_table(dynamodb, StreamViewType=None):58    spec = { 'StreamEnabled': False }59    if StreamViewType != None:60        spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}61    table = create_test_table(dynamodb, StreamSpecification=spec,62        KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },63                    { 'AttributeName': 'c', 'KeyType': 'RANGE' }64        ],65        AttributeDefinitions=[66                    { 'AttributeName': 'p', 'AttributeType': 'S' },67                    { 'AttributeName': 'c', 'AttributeType': 'S' },68        ])69    yield table70    while True:71        try:72            table.delete()73            return74        except ClientError as ce:75            # if the table has a stream currently being created we cannot76            # delete the table immediately. Again, only with real dynamo77            if ce.response['Error']['Code'] == 'ResourceInUseException':78                print('Could not delete table yet. Sleeping 5s.')79                time.sleep(5)80                continue;81            raise82def wait_for_active_stream(dynamodbstreams, table, timeout=60):83    exp = time.process_time() + timeout84    while time.process_time() < exp:85        streams = dynamodbstreams.list_streams(TableName=table.name)86        for stream in streams['Streams']:87            arn = stream['StreamArn']88            if arn == None:89                continue90            desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']91            if not 'StreamStatus' in desc or desc.get('StreamStatus') == 'ENABLED':92                return (arn, stream['StreamLabel']);93        # real dynamo takes some time until a stream is usable94        print("Stream not available. Sleep 5s...")95        time.sleep(5)96    assert False97# Local java dynamodb server version behaves differently from 98# the "real" one. Most importantly, it does not verify a number of 99# parameters, and consequently does not throw when called with borked 100# args. This will try to check if we are in fact running against 101# this test server, and if so, just raise the error here and be done 102# with it. All this just so we can run through the tests on 103# aws, scylla and local. 104def is_local_java(dynamodbstreams):105    # no good way to check, but local server runs on a Jetty, 106    # so check for that. 107    url = dynamodbstreams.meta.endpoint_url108    try: 109        urllib.request.urlopen(url)110    except URLError as e:111        if hasattr(e, 'info'):112            return e.info()['Server'].startswith('Jetty')113    return False114def ensure_java_server(dynamodbstreams, error='ValidationException'):115    # no good way to check, but local server has a "shell" builtin, 116    # so check for that. 117    if is_local_java(dynamodbstreams):118        if error != None:119            raise ClientError({'Error': { 'Code' : error }}, '')120        return121    assert False122def test_list_streams_create(dynamodb, dynamodbstreams):123    for type in stream_types:124        with create_stream_test_table(dynamodb, StreamViewType=type) as table:125            wait_for_active_stream(dynamodbstreams, table)126def test_list_streams_alter(dynamodb, dynamodbstreams):127    for type in stream_types:128        with create_stream_test_table(dynamodb, StreamViewType=None) as table:129            res = table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': type});130            wait_for_active_stream(dynamodbstreams, table)131def test_list_streams_paged(dynamodb, dynamodbstreams):132    for type in stream_types:133        with create_stream_test_table(dynamodb, StreamViewType=type) as table1:134            with create_stream_test_table(dynamodb, StreamViewType=type) as table2:135                wait_for_active_stream(dynamodbstreams, table1)136                wait_for_active_stream(dynamodbstreams, table2)137                streams = dynamodbstreams.list_streams(Limit=1)138                assert streams139                assert streams.get('Streams')140                assert streams.get('LastEvaluatedStreamArn')141                tables = [ table1.name, table2.name ]142                while True:143                    for s in streams['Streams']:144                        name = s['TableName']145                        if name in tables: tables.remove(name)146                    if not tables:147                        break148                    streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])149@pytest.mark.skip(reason="Python driver validates Limit, so trying to test it is pointless")150def test_list_streams_zero_limit(dynamodb, dynamodbstreams):151    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:152        with pytest.raises(ClientError, match='ValidationException'):153            wait_for_active_stream(dynamodbstreams, table)154            dynamodbstreams.list_streams(Limit=0)155def test_create_streams_wrong_type(dynamodb, dynamodbstreams, test_table):156    with pytest.raises(ClientError, match='ValidationException'):157        # should throw158        test_table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'Fisk'});159        # just in case the test fails, disable stream again160        test_table.update(StreamSpecification={'StreamEnabled': False});161def test_list_streams_empty(dynamodb, dynamodbstreams, test_table):162    streams = dynamodbstreams.list_streams(TableName=test_table.name)163    assert 'Streams' in streams164    assert not streams['Streams'] # empty165def test_list_streams_with_nonexistent_last_stream(dynamodb, dynamodbstreams):166    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:167        with pytest.raises(ClientError, match='ValidationException'):168            streams = dynamodbstreams.list_streams(TableName=table.name, ExclusiveStartStreamArn='kossaapaaasdafsdaasdasdasdasasdasfadfadfasdasdas')169            assert 'Streams' in streams170            assert not streams['Streams'] # empty171            # local java dynamodb does _not_ raise validation error for 172            # malformed stream arn here. verify 173            ensure_java_server(dynamodbstreams)174def test_describe_stream(dynamodb, dynamodbstreams):175    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:176        streams = dynamodbstreams.list_streams(TableName=table.name)177        arn = streams['Streams'][0]['StreamArn'];178        desc = dynamodbstreams.describe_stream(StreamArn=arn)179        assert desc;180        assert desc.get('StreamDescription')181        assert desc['StreamDescription']['StreamArn'] == arn182        assert desc['StreamDescription']['StreamStatus'] != 'DISABLED'183        assert desc['StreamDescription']['StreamViewType'] == 'KEYS_ONLY'184        assert desc['StreamDescription']['TableName'] == table.name185        assert desc['StreamDescription'].get('Shards')186        assert desc['StreamDescription']['Shards'][0].get('ShardId')187        assert desc['StreamDescription']['Shards'][0].get('SequenceNumberRange')188        assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange'].get('StartingSequenceNumber')189        # We don't know what the sequence number is supposed to be, but the190        # DynamoDB documentation requires that it contains only numeric191        # characters and some libraries rely on this. Reproduces issue #7158:192        assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'].isdecimal()193@pytest.mark.xfail(reason="alternator does not have creation time on streams")194def test_describe_stream_create_time(dynamodb, dynamodbstreams):195    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:196        streams = dynamodbstreams.list_streams(TableName=table.name)197        arn = streams['Streams'][0]['StreamArn'];198        desc = dynamodbstreams.describe_stream(StreamArn=arn)199        assert desc;200        assert desc.get('StreamDescription')201        # note these are non-required attributes202        assert 'CreationRequestDateTime' in desc['StreamDescription']203def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):204    with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):205        streams = dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')206def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):207    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:208        streams = dynamodbstreams.list_streams(TableName=table.name)209        arn = streams['Streams'][0]['StreamArn'];210        try:211            desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId='zzzzzzzzzzzzzzzzzzzzzzzzsfasdgagadfadfgagkjsdfsfsdjfjks')212            assert not desc['StreamDescription']['Shards']213        except:214            # local java throws here. real does not. 215            ensure_java_server(dynamodbstreams, error=None)216def test_get_shard_iterator(dynamodb, dynamodbstreams):217    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:218        streams = dynamodbstreams.list_streams(TableName=table.name)219        arn = streams['Streams'][0]['StreamArn'];220        desc = dynamodbstreams.describe_stream(StreamArn=arn)221        shard_id = desc['StreamDescription']['Shards'][0]['ShardId'];222        seq = desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'];223        # spec says shard_id must be 65 chars or less224        assert len(shard_id) <= 65225        for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']: 226            iter = dynamodbstreams.get_shard_iterator(227                StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq228            )229            assert iter.get('ShardIterator')230        for type in ['TRIM_HORIZON', 'LATEST']: 231            iter = dynamodbstreams.get_shard_iterator(232                StreamArn=arn, ShardId=shard_id, ShardIteratorType=type233            )234            assert iter.get('ShardIterator')235        for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']: 236            # must have seq in these modes237            with pytest.raises(ClientError, match='ValidationException'):238                iter = dynamodbstreams.get_shard_iterator(239                    StreamArn=arn, ShardId=shard_id, ShardIteratorType=type240                )241        for type in ['TRIM_HORIZON', 'LATEST']: 242            # should not set "seq" in these modes243            with pytest.raises(ClientError, match='ValidationException'):244                dynamodbstreams.get_shard_iterator(245                    StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq246                )247        # bad arn248        with pytest.raises(ClientError, match='ValidationException'):249            iter = dynamodbstreams.get_shard_iterator(250                StreamArn='sdfadsfsdfsdgdfsgsfdabadfbabdadsfsdfsdfsdfsdfsdfsdfdfdssdffbdfdf', ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq251            )252        # bad shard id  253        with pytest.raises(ClientError, match='ResourceNotFoundException'):254            dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId='semprinidfaasdasfsdvacsdcfsdsvsdvsdvsdvsdvsdv', 255                ShardIteratorType='LATEST'256                )257        # bad iter type258        with pytest.raises(ClientError, match='ValidationException'):259            dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, 260                ShardIteratorType='bulle', SequenceNumber=seq261                )262        # bad seq 263        with pytest.raises(ClientError, match='ValidationException'):264            dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, 265                ShardIteratorType='LATEST', SequenceNumber='sdfsafglldfngjdafnasdflgnaldklkafdsgklnasdlv'266                )267def test_get_shard_iterator_for_nonexistent_stream(dynamodb, dynamodbstreams):268    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:269        (arn, label) = wait_for_active_stream(dynamodbstreams, table)270        desc = dynamodbstreams.describe_stream(StreamArn=arn)271        shards = desc['StreamDescription']['Shards']272        with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):273            dynamodbstreams.get_shard_iterator(274                    StreamArn='sdfadfsddafgdafsgjnadflkgnalngalsdfnlkasnlkasdfasdfasf', ShardId=shards[0]['ShardId'], ShardIteratorType='LATEST'275                )276def test_get_shard_iterator_for_nonexistent_shard(dynamodb, dynamodbstreams):277    with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:278        streams = dynamodbstreams.list_streams(TableName=table.name)279        arn = streams['Streams'][0]['StreamArn'];280        with pytest.raises(ClientError, match='ResourceNotFoundException'):281            dynamodbstreams.get_shard_iterator(282                    StreamArn=arn, ShardId='adfasdasdasdasdasdasdasdasdasasdasd', ShardIteratorType='LATEST'283                )284def test_get_records(dynamodb, dynamodbstreams):285    # TODO: add tests for storage/transactionable variations and global/local index286    with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES') as table:287        (arn, label) = wait_for_active_stream(dynamodbstreams, table)288        p = 'piglet'289        c = 'ninja'290        val = 'lucifers'291        val2 = 'flowers'292        table.put_item(Item={'p': p, 'c': c, 'a1': val, 'a2': val2})293        nval = 'semprini'294        nval2 = 'nudge'295        table.update_item(Key={'p': p, 'c': c}, 296            AttributeUpdates={ 'a1': {'Value': nval, 'Action': 'PUT'},297                'a2': {'Value': nval2, 'Action': 'PUT'}298            })299        has_insert = False300        # in truth, we should sleep already here, since at least scylla301        # will not be able to produce any stream content until 302        # ~30s after insert/update (confidence iterval)303        # but it is useful to see a working null-iteration as well, so 304        # lets go already.305        while True:306            desc = dynamodbstreams.describe_stream(StreamArn=arn)307            iterators = []308            while True:309                shards = desc['StreamDescription']['Shards']310                for shard in shards:311                    shard_id = shard['ShardId']312                    start = shard['SequenceNumberRange']['StartingSequenceNumber']313                    iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']314                    iterators.append(iter)315                last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")316                if not last_shard:317                    break318                desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=last_shard)319            next_iterators = []320            while iterators:321                iter = iterators.pop(0)322                response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)323                if 'NextShardIterator' in response:324                    next_iterators.append(response['NextShardIterator'])325                records = response.get('Records')326                # print("Query {} -> {}".format(iter, records))327                if records:328                    for record in records:329                        # print("Record: {}".format(record))330                        type = record['eventName']331                        dynamodb = record['dynamodb']332                        keys = dynamodb['Keys']333                        334                        assert keys.get('p')335                        assert keys.get('c')336                        assert keys['p'].get('S')337                        assert keys['p']['S'] == p338                        assert keys['c'].get('S')339                        assert keys['c']['S'] == c340                        if type == 'MODIFY' or type == 'INSERT':341                            assert dynamodb.get('NewImage')342                            newimage = dynamodb['NewImage'];343                            assert newimage.get('a1')344                            assert newimage.get('a2')345                        if type == 'INSERT' or (type == 'MODIFY' and not has_insert):346                            assert newimage['a1']['S'] == val347                            assert newimage['a2']['S'] == val2348                            has_insert = True349                            continue350                        if type == 'MODIFY':351                            assert has_insert352                            assert newimage['a1']['S'] == nval353                            assert newimage['a2']['S'] == nval2354                            assert dynamodb.get('OldImage')355                            oldimage = dynamodb['OldImage'];356                            assert oldimage.get('a1')357                            assert oldimage.get('a2')358                            assert oldimage['a1']['S'] == val359                            assert oldimage['a2']['S'] == val2360                            return361            print("Not done. Sleep 10s...")362            time.sleep(10)363            iterators = next_iterators364def test_get_records_nonexistent_iterator(dynamodbstreams):365    with pytest.raises(ClientError, match='ValidationException'):366        dynamodbstreams.get_records(ShardIterator='sdfsdfsgagaddafgagasgasgasdfasdfasdfasdfasdgasdasdgasdg', Limit=1000)367##############################################################################368# Fixtures for creating a table with a stream enabled with one of the allowed369# StreamViewType settings (KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES).370# Unfortunately changing the StreamViewType setting of an existing stream is371# not allowed (see test_streams_change_type), and while removing and re-adding372# a stream is posssible, it is very slow. So we create four different fixtures373# with the four different StreamViewType settings for these four fixtures.374#375# It turns out that DynamoDB makes reusing the same table in different tests376# very difficult, because when we request a "LATEST" iterator we sometimes377# miss the immediately following write (this issue doesn't happen in378# ALternator, just in DynamoDB - presumably LATEST adds some time slack?)379# So all the fixtures we create below have scope="function", meaning that a380# separate table is created for each of the tests using these fixtures. This381# slows the tests down a bit, but not by much (about 0.05 seconds per test).382# It is still worthwhile to use a fixture rather than to create a table383# explicitly - it is convenient, safe (the table gets deleted automatically)384# and if in the future we can work around the DynamoDB problem, we can return385# these fixtures to session scope.386def create_table_ss(dynamodb, dynamodbstreams, type):387    table = create_test_table(dynamodb,388        KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],389        AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }],390        StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })391    (arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)392    yield table, arn393    table.delete()394@pytest.fixture(scope="function")395def test_table_ss_keys_only(dynamodb, dynamodbstreams):396    yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY')397@pytest.fixture(scope="function")398def test_table_ss_new_image(dynamodb, dynamodbstreams):399    yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE')400@pytest.fixture(scope="function")401def test_table_ss_old_image(dynamodb, dynamodbstreams):402    yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE')403@pytest.fixture(scope="function")404def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):405    yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')406# Test that it is, sadly, not allowed to use UpdateTable on a table which407# already has a stream enabled to change that stream's StreamViewType.408# Currently, Alternator does allow this (see issue #6939), so the test is409# marked xfail.410@pytest.mark.xfail(reason="Alternator allows changing StreamViewType - see issue #6939")411def test_streams_change_type(test_table_ss_keys_only):412    table, arn = test_table_ss_keys_only413    with pytest.raises(ClientError, match='ValidationException.*already'):414        table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE'});415        # If the above change succeeded (because of issue #6939), switch it back :-)416        table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});417# Utility function for listing all the shards of the given stream arn.418# Implemented by multiple calls to DescribeStream, possibly several pages419# until all the shards are returned. The return of this function should be420# cached - it is potentially slow, and DynamoDB documentation even states421# DescribeStream may only be called at a maximum rate of 10 times per second.422# list_shards() only returns the shard IDs, not the information about the423# shards' sequence number range, which is also returned by DescribeStream.424def list_shards(dynamodbstreams, arn):425    # By default DescribeStream lists a limit of 100 shards. For faster426    # tests we reduced the number of shards in the testing setup to427    # 32 (16 vnodes x 2 cpus), see issue #6979, so to still exercise this428    # paging feature, lets use a limit of 10.429    limit = 10430    response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit)['StreamDescription']431    assert len(response['Shards']) <= limit432    shards = [x['ShardId'] for x in response['Shards']]433    while 'LastEvaluatedShardId' in response:434        # 7409 kinesis ignores LastEvaluatedShardId and just looks at last shard435        assert shards[-1] == response['LastEvaluatedShardId']436        response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit,437            ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']438        assert len(response['Shards']) <= limit439        shards.extend([x['ShardId'] for x in response['Shards']])440    print('Number of shards in stream: {}'.format(len(shards)))441    assert len(set(shards)) == len(shards)442    # 7409 - kinesis required shards to be in lexical order.443    # verify.444    assert shards == sorted(shards)445    # special test: ensure we get nothing more if we ask for starting at the last446    # of the last447    response = dynamodbstreams.describe_stream(StreamArn=arn,448        ExclusiveStartShardId=shards[-1])['StreamDescription']449    assert len(response['Shards']) == 0450    return shards451# Utility function for getting shard iterators starting at "LATEST" for452# all the shards of the given stream arn.453def latest_iterators(dynamodbstreams, arn):454    iterators = []455    for shard_id in list_shards(dynamodbstreams, arn):456        iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,457            ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'])458    assert len(set(iterators)) == len(iterators)459    return iterators460# Utility function for fetching more content from the stream (given its461# array of iterators) into an "output" array. Call repeatedly to get more462# content - the function returns a new array of iterators which should be463# used to replace the input list of iterators.464# Note that the order of the updates is guaranteed for the same partition,465# but cannot be trusted for *different* partitions.466def fetch_more(dynamodbstreams, iterators, output):467    new_iterators = []468    for iter in iterators:469        response = dynamodbstreams.get_records(ShardIterator=iter)470        if 'NextShardIterator' in response:471            new_iterators.append(response['NextShardIterator'])472        output.extend(response['Records'])473    assert len(set(new_iterators)) == len(new_iterators)474    return new_iterators475# Utility function for comparing "output" as fetched by fetch_more(), to a list476# expected_events, each of which looks like:477#   [type, keys, old_image, new_image]478# where type is REMOVE, INSERT or MODIFY.479# The "mode" parameter specifies which StreamViewType mode (KEYS_ONLY,480# OLD_IMAGE, NEW_IMAGE, NEW_AND_OLD_IMAGES) was supposedly used to generate481# "output". This mode dictates what we can compare - e.g., in KEYS_ONLY mode482# the compare_events() function ignores the the old and new image in483# expected_events.484# compare_events() throws an exception immediately if it sees an unexpected485# event, but if some of the expected events are just missing in the "output",486# it only returns false - suggesting maybe the caller needs to try again487# later - maybe more output is coming.488# Note that the order of events is only guaranteed (and therefore compared)489# inside a single partition.490def compare_events(expected_events, output, mode):491    # The order of expected_events is only meaningful inside a partition, so492    # let's convert it into a map indexed by partition key.493    expected_events_map = {}494    for event in expected_events:495        expected_type, expected_key, expected_old_image, expected_new_image = event496        # For simplicity, we actually use the entire key, not just the partiton497        # key. We only lose a bit of testing power we didn't plan to test anyway498        # (that events for different items in the same partition are ordered).499        key = freeze(expected_key)500        if not key in expected_events_map:501            expected_events_map[key] = []502        expected_events_map[key].append(event)503    # Iterate over the events in output. An event for a certain key needs to504    # be the *first* remaining event for this key in expected_events_map (and505    # then we remove this matched even from expected_events_map)506    for event in output:507        # In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to508        # a *different* value - 'scylladb:alternator'. Issue #6931.509        assert 'eventSource' in event510        # Alternator is missing "awsRegion", which makes little sense for it511        # (although maybe we should have provided the DC name). Issue #6931.512        #assert 'awsRegion' in event513        # Alternator is also missing the "eventVersion" entry. Issue #6931.514        #assert 'eventVersion' in event515        # Check that eventID appears, but can't check much on what it is.516        assert 'eventID' in event517        op = event['eventName']518        record = event['dynamodb']519        # record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we520        # want to deserialize it to match our expected_events content.521        deserializer = TypeDeserializer()522        key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}523        expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)524        if expected_type != '*': # hack to allow a caller to not test op, to bypass issue #6918.525            assert op == expected_type526        assert record['StreamViewType'] == mode527        # We don't know what ApproximateCreationDateTime should be, but we do528        # know it needs to be a timestamp - there is conflicting documentation529        # in what format (ISO 8601?). In any case, boto3 parses this timestamp530        # for us, so we can't check it here, beyond checking it exists.531        assert 'ApproximateCreationDateTime' in record532        # We don't know what SequenceNumber is supposed to be, but the DynamoDB533        # documentation requires that it contains only numeric characters and534        # some libraries rely on this. This reproduces issue #7158:535        assert 'SequenceNumber' in record536        assert record['SequenceNumber'].isdecimal()537        # Alternator doesn't set the SizeBytes member. Issue #6931.538        #assert 'SizeBytes' in record539        if mode == 'KEYS_ONLY':540            assert not 'NewImage' in record541            assert not 'OldImage' in record542        elif mode == 'NEW_IMAGE':543            assert not 'OldImage' in record544            if expected_new_image == None:545                assert not 'NewImage' in record546            else:547                new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}548                assert expected_new_image == new_image549        elif mode == 'OLD_IMAGE':550            assert not 'NewImage' in record551            if expected_old_image == None:552                assert not 'OldImage' in record553                pass554            else:555                old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}556                assert expected_old_image == old_image557        elif mode == 'NEW_AND_OLD_IMAGES':558            if expected_new_image == None:559                assert not 'NewImage' in record560            else:561                new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}562                assert expected_new_image == new_image563            if expected_old_image == None:564                assert not 'OldImage' in record565            else:566                old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}567                assert expected_old_image == old_image568        else:569            pytest.fail('cannot happen')570    # After the above loop, expected_events_map should remain empty arrays.571    # If it isn't, one of the expected events did not yet happen. Return False.572    for entry in expected_events_map.values():573        if len(entry) > 0:574            return False575    return True576# Convenience funtion used to implement several tests below. It runs a given577# function "updatefunc" which is supposed to do some updates to the table578# and also return an expected_events list. do_test() then fetches the streams579# data and compares it to the expected_events using compare_events().580def do_test(test_table_ss_stream, dynamodbstreams, updatefunc, mode, p = random_string(), c = random_string()):581    table, arn = test_table_ss_stream582    iterators = latest_iterators(dynamodbstreams, arn)583    expected_events = updatefunc(table, p, c)584    # Updating the stream is asynchronous. Moreover, it may even be delayed585    # artificially (see Alternator's alternator_streams_time_window_s config).586    # So if compare_events() reports the stream data is missing some of the587    # expected events (by returning False), we need to retry it for some time.588    # Note that compare_events() throws if the stream data contains *wrong*589    # (unexpected) data, so even failing tests usually finish reasonably590    # fast - depending on the alternator_streams_time_window_s parameter.591    # This is optimization is important to keep *failing* tests reasonably592    # fast and not have to wait until the following arbitrary timeout.593    timeout = time.time() + 20594    output = []595    while time.time() < timeout:596        iterators = fetch_more(dynamodbstreams, iterators, output)597        print("after fetch_more number expected_events={}, output={}".format(len(expected_events), len(output)))598        if compare_events(expected_events, output, mode):599            # success!600            return601        time.sleep(0.5)602    # If we're still here, the last compare_events returned false.603    pytest.fail('missing events in output: {}'.format(output))604# Test a single PutItem of a new item. Should result in a single INSERT605# event. Currently fails because in Alternator, PutItem - which generates a606# tombstone to *replace* an item - generates REMOVE+MODIFY (issue #6930).607@pytest.mark.xfail(reason="Currently fails - see issue #6930")608def test_streams_putitem_keys_only(test_table_ss_keys_only, dynamodbstreams):609    def do_updates(table, p, c):610        events = []611        table.put_item(Item={'p': p, 'c': c, 'x': 2})612        events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])613        return events614    do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')615# Test a single UpdateItem. Should result in a single INSERT event.616# Currently fails because Alternator generates a MODIFY event even though617# this is a new item (issue #6918).618@pytest.mark.xfail(reason="Currently fails - see issue #6918")619def test_streams_updateitem_keys_only(test_table_ss_keys_only, dynamodbstreams):620    def do_updates(table, p, c):621        events = []622        table.update_item(Key={'p': p, 'c': c},623            UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})624        events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])625        return events626    do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')627# This is exactly the same test as test_streams_updateitem_keys_only except628# we don't verify the type of even we find (MODIFY or INSERT). It allows us629# to have at least one good GetRecords test until solving issue #6918.630# When we do solve that issue, this test should be removed.631def test_streams_updateitem_keys_only_2(test_table_ss_keys_only, dynamodbstreams):632    def do_updates(table, p, c):633        events = []634        table.update_item(Key={'p': p, 'c': c},635            UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})636        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])637        return events638    do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')639# Test OLD_IMAGE using UpdateItem. Verify that the OLD_IMAGE indeed includes,640# as needed, the entire old item and not just the modified columns.641# Reproduces issue #6935642def test_streams_updateitem_old_image(test_table_ss_old_image, dynamodbstreams):643    def do_updates(table, p, c):644        events = []645        table.update_item(Key={'p': p, 'c': c},646            UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})647        # We use here '*' instead of the really expected 'INSERT' to avoid648        # checking again the same Alternator bug already checked by649        # test_streams_updateitem_keys_only (issue #6918).650        # Note: The "None" as OldImage here tests that the OldImage should be651        # missing because the item didn't exist. This reproduces issue #6933.652        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])653        table.update_item(Key={'p': p, 'c': c},654            UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})655        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])656        return events657    do_test(test_table_ss_old_image, dynamodbstreams, do_updates, 'OLD_IMAGE')658# Above we verified that if an item did not previously exist, the OLD_IMAGE659# would be missing, but if the item did previously exist, OLD_IMAGE should660# be present and must include the key. Here we confirm the special case the661# latter case - the case of a pre-existing *empty* item, which has just the662# key - in this case since the item did exist, OLD_IMAGE should be returned -663# and include just the key. This is a special case of reproducing #6935 -664# the first patch for this issue failed in this special case.665def test_streams_updateitem_old_image_empty_item(test_table_ss_old_image, dynamodbstreams):666    def do_updates(table, p, c):667        events = []668        # Create an *empty* item, with nothing except a key:669        table.update_item(Key={'p': p, 'c': c})670        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c}])671        table.update_item(Key={'p': p, 'c': c},672            UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})673        # Note that OLD_IMAGE should be present and be the empty item,674        # with just a key, not entirely missing.675        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}])676        return events677    do_test(test_table_ss_old_image, dynamodbstreams, do_updates, 'OLD_IMAGE')678# Test that OLD_IMAGE indeed includes the entire old item and not just the679# modified attributes, in the special case of attributes which are a key of680# a secondary index.681# The unique thing about this case is that as currently implemented,682# secondary-index key attributes are real Scylla columns, contrasting with683# other attributes which are just elements of a map. And our CDC684# implementation treats those cases differently - when a map is modified685# the preimage includes the entire content of the map, but for regular686# columns they are only included in the preimage if they change.687# Currently fails in Alternator because the item's key is missing in688# OldImage (#6935) and the LSI key is also missing (#7030).689@pytest.fixture(scope="function")690def test_table_ss_old_image_and_lsi(dynamodb, dynamodbstreams):691    table = create_test_table(dynamodb,692        KeySchema=[693            {'AttributeName': 'p', 'KeyType': 'HASH'},694            {'AttributeName': 'c', 'KeyType': 'RANGE'}],695        AttributeDefinitions=[696            { 'AttributeName': 'p', 'AttributeType': 'S' },697            { 'AttributeName': 'c', 'AttributeType': 'S' },698            { 'AttributeName': 'k', 'AttributeType': 'S' }],699        LocalSecondaryIndexes=[{700            'IndexName': 'index',701            'KeySchema': [702                {'AttributeName': 'p', 'KeyType': 'HASH'},703                {'AttributeName': 'k', 'KeyType': 'RANGE'}],704            'Projection': { 'ProjectionType': 'ALL' }705        }],706        StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE' })707    (arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)708    yield table, arn709    table.delete()710def test_streams_updateitem_old_image_lsi(test_table_ss_old_image_and_lsi, dynamodbstreams):711    def do_updates(table, p, c):712        events = []713        table.update_item(Key={'p': p, 'c': c},714            UpdateExpression='SET x = :x, k = :k',715            ExpressionAttributeValues={':x': 2, ':k': 'dog'})716        # We use here '*' instead of the really expected 'INSERT' to avoid717        # checking again the same Alternator bug already checked by718        # test_streams_updateitem_keys_only (issue #6918).719        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}])720        table.update_item(Key={'p': p, 'c': c},721            UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})722        # In issue #7030, the 'k' value was missing from the OldImage.723        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}, {'p': p, 'c': c, 'x': 2, 'k': 'dog', 'y': 3}])724        return events725    do_test(test_table_ss_old_image_and_lsi, dynamodbstreams, do_updates, 'OLD_IMAGE')726# Tests similar to the above tests for OLD_IMAGE, just for NEW_IMAGE mode.727# Verify that the NEW_IMAGE includes the entire old item (including the key),728# that deleting the item results in a missing NEW_IMAGE, and that setting the729# item to be empty has a different result - a NEW_IMAGE with just a key.730# Reproduces issue #7107.731def test_streams_new_image(test_table_ss_new_image, dynamodbstreams):732    def do_updates(table, p, c):733        events = []734        table.update_item(Key={'p': p, 'c': c},735            UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})736        # Confirm that when adding one attribute "x", the NewImage contains both737        # the new x, and the key columns (p and c).738        # We use here '*' instead of 'INSERT' to avoid testing issue #6918 here.739        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])740        # Confirm that when adding just attribute "y", the NewImage will contain741        # all the attributes, including the old "x":742        table.update_item(Key={'p': p, 'c': c},743            UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})744        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])745        # Confirm that when deleting the columns x and y, the NewImage becomes746        # empty - but still exists and contains the key columns,747        table.update_item(Key={'p': p, 'c': c},748            UpdateExpression='REMOVE x, y')749        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])750        # Confirm that deleting the item results in a missing NewImage:751        table.delete_item(Key={'p': p, 'c': c})752        events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c}, None])753        return events754    do_test(test_table_ss_new_image, dynamodbstreams, do_updates, 'NEW_IMAGE')755# Test similar to the above test for NEW_IMAGE corner cases, but here for756# NEW_AND_OLD_IMAGES mode.757# Although it is likely that if both OLD_IMAGE and NEW_IMAGE work correctly then758# so will the combined NEW_AND_OLD_IMAGES mode, it is still possible that the759# implementation of the combined mode has unique bugs, so it is worth testing760# it separately.761# Reproduces issue #7107.762def test_streams_new_and_old_images(test_table_ss_new_and_old_images, dynamodbstreams):763    def do_updates(table, p, c):764        events = []765        table.update_item(Key={'p': p, 'c': c},766            UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})767        # The item doesn't yet exist, so OldImage is missing.768        # We use here '*' instead of 'INSERT' to avoid testing issue #6918 here.769        events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])770        # Confirm that when adding just attribute "y", the NewImage will contain771        # all the attributes, including the old "x". Also, OldImage contains the772        # key attributes, not just "x":773        table.update_item(Key={'p': p, 'c': c},774            UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})775        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])776        # Confirm that when deleting the attributes x and y, the NewImage becomes777        # empty - but still exists and contains the key attributes:778        table.update_item(Key={'p': p, 'c': c},779            UpdateExpression='REMOVE x, y')780        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])781        # Confirm that when adding an attribute z to the empty item, OldItem is782        # not missing - it just contains only the key attributes:783        table.update_item(Key={'p': p, 'c': c},784            UpdateExpression='SET z = :val1', ExpressionAttributeValues={':val1': 4})785        events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}])786        # Confirm that deleting the item results in a missing NewImage:787        table.delete_item(Key={'p': p, 'c': c})788        events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}, None])789        return events790    do_test(test_table_ss_new_and_old_images, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')791# Test that when a stream shard has no data to read, GetRecords returns an792# empty Records array - not a missing one. Reproduces issue #6926.793def test_streams_no_records(test_table_ss_keys_only, dynamodbstreams):794    table, arn = test_table_ss_keys_only795    # Get just one shard - any shard - and its LATEST iterator. Because it's796    # LATEST, there will be no data to read from this iterator.797    shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]798    shard_id = shard['ShardId']799    iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']800    response = dynamodbstreams.get_records(ShardIterator=iter)801    assert 'NextShardIterator' in response or 'EndingSequenceNumber' in shard['SequenceNumberRange']802    assert 'Records' in response803    # We expect Records to be empty - there is no data at the LATEST iterator.804    assert response['Records'] == []805# Test that after fetching the last result from a shard, we don't get it806# yet again. Reproduces issue #6942.807def test_streams_last_result(test_table_ss_keys_only, dynamodbstreams):808    table, arn = test_table_ss_keys_only809    iterators = latest_iterators(dynamodbstreams, arn)810    # Do an UpdateItem operation that is expected to leave one event in the811    # stream.812    table.update_item(Key={'p': random_string(), 'c': random_string()},813        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})814    # Eventually (we may need to retry this for a while), *one* of the815    # stream shards will return one event:816    timeout = time.time() + 15817    while time.time() < timeout:818        for iter in iterators:819            response = dynamodbstreams.get_records(ShardIterator=iter)820            if 'Records' in response and response['Records'] != []:821                # Found the shard with the data! Test that it only has822                # one event and that if we try to read again, we don't823                # get more data (this was issue #6942).824                assert len(response['Records']) == 1825                assert 'NextShardIterator' in response826                response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])827                assert response['Records'] == []828                return829        time.sleep(0.5)830    pytest.fail("timed out")831# In test_streams_last_result above we tested that there is no further events832# after reading the only one. In this test we verify that if we *do* perform833# another change on the same key, we do get another event and it happens on the834# *same* shard.835def test_streams_another_result(test_table_ss_keys_only, dynamodbstreams):836    table, arn = test_table_ss_keys_only837    iterators = latest_iterators(dynamodbstreams, arn)838    # Do an UpdateItem operation that is expected to leave one event in the839    # stream.840    p = random_string()841    c = random_string()842    table.update_item(Key={'p': p, 'c': c},843        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})844    # Eventually, *one* of the stream shards will return one event:845    timeout = time.time() + 15846    while time.time() < timeout:847        for iter in iterators:848            response = dynamodbstreams.get_records(ShardIterator=iter)849            if 'Records' in response and response['Records'] != []:850                # Finally found the shard reporting the changes to this key851                assert len(response['Records']) == 1852                assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}853                assert 'NextShardIterator' in response854                iter = response['NextShardIterator']855                # Found the shard with the data. It only has one event so if856                # we try to read again, we find nothing (this is the same as857                # what test_streams_last_result tests).858                response = dynamodbstreams.get_records(ShardIterator=iter)859                assert response['Records'] == []860                assert 'NextShardIterator' in response861                iter = response['NextShardIterator']862                # Do another UpdateItem operation to the same key, so it is863                # expected to write to the *same* shard:864                table.update_item(Key={'p': p, 'c': c},865                    UpdateExpression='SET x = :val2', ExpressionAttributeValues={':val2': 7})866                # Again we may need to wait for the event to appear on the stream:867                timeout = time.time() + 15868                while time.time() < timeout:869                    response = dynamodbstreams.get_records(ShardIterator=iter)870                    if 'Records' in response and response['Records'] != []:871                        assert len(response['Records']) == 1872                        assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}873                        assert 'NextShardIterator' in response874                        # The test is done, successfully.875                        return876                    time.sleep(0.5)877                pytest.fail("timed out")878        time.sleep(0.5)879    pytest.fail("timed out")880# Test the SequenceNumber attribute returned for stream events, and the881# "AT_SEQUENCE_NUMBER" iterator that can be used to re-read from the same882# event again given its saved "sequence number".883def test_streams_at_sequence_number(test_table_ss_keys_only, dynamodbstreams):884    table, arn = test_table_ss_keys_only885    # List all the shards and their LATEST iterators:886    shards_and_iterators = []887    for shard_id in list_shards(dynamodbstreams, arn):888        shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,889            ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))890    # Do an UpdateItem operation that is expected to leave one event in the891    # stream.892    p = random_string()893    c = random_string()894    table.update_item(Key={'p': p, 'c': c},895        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})896    # Eventually, *one* of the stream shards will return the one event:897    timeout = time.time() + 15898    while time.time() < timeout:899        for (shard_id, iter) in shards_and_iterators:900            response = dynamodbstreams.get_records(ShardIterator=iter)901            if 'Records' in response and response['Records'] != []:902                # Finally found the shard reporting the changes to this key:903                assert len(response['Records']) == 1904                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}905                assert 'NextShardIterator' in response906                sequence_number = response['Records'][0]['dynamodb']['SequenceNumber']907                # Found the shard with the data. It only has one event so if908                # we try to read again, we find nothing (this is the same as909                # what test_streams_last_result tests).910                iter = response['NextShardIterator']911                response = dynamodbstreams.get_records(ShardIterator=iter)912                assert response['Records'] == []913                assert 'NextShardIterator' in response914                # If we use the SequenceNumber of the first event to create an915                # AT_SEQUENCE_NUMBER iterator, we can read the same event again.916                # We don't need a loop and a timeout, because this event is already917                # available.918                iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,919                    ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',920                    SequenceNumber=sequence_number)['ShardIterator']921                response = dynamodbstreams.get_records(ShardIterator=iter)922                assert 'Records' in response923                assert len(response['Records']) == 1924                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}925                assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number926                return927        time.sleep(0.5)928    pytest.fail("timed out")929# Test the SequenceNumber attribute returned for stream events, and the930# "AFTER_SEQUENCE_NUMBER" iterator that can be used to re-read *after* the same931# event again given its saved "sequence number".932def test_streams_after_sequence_number(test_table_ss_keys_only, dynamodbstreams):933    table, arn = test_table_ss_keys_only934    # List all the shards and their LATEST iterators:935    shards_and_iterators = []936    for shard_id in list_shards(dynamodbstreams, arn):937        shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,938            ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))939    # Do two UpdateItem operations to the same key, that are expected to leave940    # two events in the stream.941    p = random_string()942    c = random_string()943    table.update_item(Key={'p': p, 'c': c},944        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})945    table.update_item(Key={'p': p, 'c': c},946        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})947    # Eventually, *one* of the stream shards will return the two events:948    timeout = time.time() + 15949    while time.time() < timeout:950        for (shard_id, iter) in shards_and_iterators:951            response = dynamodbstreams.get_records(ShardIterator=iter)952            if 'Records' in response and len(response['Records']) == 2:953                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}954                assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}955                sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']956                sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']957                # #7424 - AWS sdk assumes sequence numbers can be compared958                # as bigints, and are monotonically growing.959                assert int(sequence_number_1) < int(sequence_number_2)960                # If we use the SequenceNumber of the first event to create an961                # AFTER_SEQUENCE_NUMBER iterator, we can read the second event962                # (only) again. We don't need a loop and a timeout, because this963                # event is already available.964                iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,965                    ShardId=shard_id, ShardIteratorType='AFTER_SEQUENCE_NUMBER',966                    SequenceNumber=sequence_number_1)['ShardIterator']967                response = dynamodbstreams.get_records(ShardIterator=iter)968                assert 'Records' in response969                assert len(response['Records']) == 1970                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}971                assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_2972                return973        time.sleep(0.5)974    pytest.fail("timed out")975# Test the "TRIM_HORIZON" iterator, which can be used to re-read *all* the976# previously-read events of the stream shard again.977# NOTE: This test relies on the test_table_ss_keys_only fixture giving us a978# brand new stream, with no old events saved from other tests. If we ever979# change this, we should change this test to use a different fixture.980def test_streams_trim_horizon(test_table_ss_keys_only, dynamodbstreams):981    table, arn = test_table_ss_keys_only982    # List all the shards and their LATEST iterators:983    shards_and_iterators = []984    for shard_id in list_shards(dynamodbstreams, arn):985        shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,986            ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))987    # Do two UpdateItem operations to the same key, that are expected to leave988    # two events in the stream.989    p = random_string()990    c = random_string()991    table.update_item(Key={'p': p, 'c': c},992        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})993    table.update_item(Key={'p': p, 'c': c},994        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})995    # Eventually, *one* of the stream shards will return the two events:996    timeout = time.time() + 15997    while time.time() < timeout:998        for (shard_id, iter) in shards_and_iterators:999            response = dynamodbstreams.get_records(ShardIterator=iter)1000            if 'Records' in response and len(response['Records']) == 2:1001                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1002                assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1003                sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']1004                sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']1005                # If we use the TRIM_HORIZON iterator, we should receive the1006                # same two events again, in the same order.1007                # Note that we assume that the fixture gave us a brand new1008                # stream, with no old events saved from other tests. If we1009                # couldn't assume this, this test would need to become much1010                # more complex, and would need to read from this shard until1011                # we find the two events we are looking for.1012                iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,1013                    ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']1014                response = dynamodbstreams.get_records(ShardIterator=iter)1015                assert 'Records' in response1016                assert len(response['Records']) == 21017                assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1018                assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1019                assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_11020                assert response['Records'][1]['dynamodb']['SequenceNumber'] == sequence_number_21021                return1022        time.sleep(0.5)1023    pytest.fail("timed out")1024# Above we tested some specific operations in small tests aimed to reproduce1025# a specific bug, in the following tests we do a all the different operations,1026# PutItem, DeleteItem, BatchWriteItem and UpdateItem, and check the resulting1027# stream for correctness.1028# The following tests focus on mulitple operations on the *same* item. Those1029# should appear in the stream in the correct order.1030def do_updates_1(table, p, c):1031    events = []1032    # a first put_item appears as an INSERT event. Note also empty old_image.1033    table.put_item(Item={'p': p, 'c': c, 'x': 2})1034    events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])1035    # a second put_item of the *same* key and same value, doesn't appear in the log at all!1036    table.put_item(Item={'p': p, 'c': c, 'x': 2})1037    # a second put_item of the *same* key and different value, appears as a MODIFY event1038    table.put_item(Item={'p': p, 'c': c, 'y': 3})1039    events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'y': 3}])1040    # deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.1041    table.delete_item(Key={'p': p, 'c': c})1042    events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}, None])1043    # deleting a non-existant item doesn't appear in the log at all.1044    table.delete_item(Key={'p': p, 'c': c})1045    # If update_item creates an item, the event is INSERT as well.1046    table.update_item(Key={'p': p, 'c': c},1047        UpdateExpression='SET b = :val1',1048        ExpressionAttributeValues={':val1': 4})1049    events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'b': 4}])1050    # If update_item modifies the item, note how old and new image includes both old and new columns1051    table.update_item(Key={'p': p, 'c': c},1052        UpdateExpression='SET x = :val1',1053        ExpressionAttributeValues={':val1': 5})1054    events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4}, {'p': p, 'c': c, 'b': 4, 'x': 5}])1055    # TODO: incredibly, if we uncomment the "REMOVE b" update below, it will be1056    # completely missing from the DynamoDB stream - the test continues to1057    # pass even though we didn't add another expected event, and even though1058    # the preimage in the following expected event includes this "b" we will1059    # remove. I couldn't reproduce this apparant DynamoDB bug in a smaller test.1060    #table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE b')1061    # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.1062    table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})1063    events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])1064    return events1065@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1066def test_streams_1_keys_only(test_table_ss_keys_only, dynamodbstreams):1067    do_test(test_table_ss_keys_only, dynamodbstreams, do_updates_1, 'KEYS_ONLY')1068@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1069def test_streams_1_new_image(test_table_ss_new_image, dynamodbstreams):1070    do_test(test_table_ss_new_image, dynamodbstreams, do_updates_1, 'NEW_IMAGE')1071@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1072def test_streams_1_old_image(test_table_ss_old_image, dynamodbstreams):1073    do_test(test_table_ss_old_image, dynamodbstreams, do_updates_1, 'OLD_IMAGE')1074@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1075def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodbstreams):1076    do_test(test_table_ss_new_and_old_images, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')1077# A fixture which creates a test table with a stream enabled, and returns a1078# bunch of interesting information collected from the CreateTable response.1079# This fixture is session-scoped - it can be shared by multiple tests below,1080# because we are not going to actually use or change this stream, we will1081# just do multiple tests on its setup.1082@pytest.fixture(scope="session")1083def test_table_stream_with_result(dynamodb, dynamodbstreams):1084    tablename = test_table_name()1085    result = dynamodb.meta.client.create_table(TableName=tablename,1086        BillingMode='PAY_PER_REQUEST',1087        StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},1088        KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },1089                    { 'AttributeName': 'c', 'KeyType': 'RANGE' }1090        ],1091        AttributeDefinitions=[1092                    { 'AttributeName': 'p', 'AttributeType': 'S' },1093                    { 'AttributeName': 'c', 'AttributeType': 'S' },1094        ])1095    waiter = dynamodb.meta.client.get_waiter('table_exists')1096    waiter.config.delay = 11097    waiter.config.max_attempts = 2001098    waiter.wait(TableName=tablename)1099    table = dynamodb.Table(tablename)1100    yield result, table1101    while True:1102        try:1103            table.delete()1104            return1105        except ClientError as ce:1106            # if the table has a stream currently being created we cannot1107            # delete the table immediately. Again, only with real dynamo1108            if ce.response['Error']['Code'] == 'ResourceInUseException':1109                print('Could not delete table yet. Sleeping 5s.')1110                time.sleep(5)1111                continue;1112            raise1113# Test that in a table with Streams enabled, LatestStreamArn is returned1114# by CreateTable, DescribeTable and UpdateTable, and is the same ARN as1115# returned by ListStreams. Reproduces issue #7157.1116def test_latest_stream_arn(test_table_stream_with_result, dynamodbstreams):1117    (result, table) = test_table_stream_with_result1118    assert 'LatestStreamArn' in result['TableDescription']1119    arn_in_create_table = result['TableDescription']['LatestStreamArn']1120    # Check that ListStreams returns the same stream ARN as returned1121    # by the original CreateTable1122    (arn_in_list_streams, label) = wait_for_active_stream(dynamodbstreams, table)1123    assert arn_in_create_table == arn_in_list_streams1124    # Check that DescribeTable also includes the same LatestStreamArn:1125    desc = table.meta.client.describe_table(TableName=table.name)['Table']1126    assert 'LatestStreamArn' in desc1127    assert desc['LatestStreamArn'] == arn_in_create_table1128    # Check that UpdateTable also includes the same LatestStreamArn.1129    # The "update" changes nothing (it just sets BillingMode to what it was).1130    desc = table.meta.client.update_table(TableName=table.name,1131            BillingMode='PAY_PER_REQUEST')['TableDescription']1132    assert desc['LatestStreamArn'] == arn_in_create_table1133# Test that in a table with Streams enabled, LatestStreamLabel is returned1134# by CreateTable, DescribeTable and UpdateTable, and is the same "label" as1135# returned by ListStreams. Reproduces issue #7162.1136def test_latest_stream_label(test_table_stream_with_result, dynamodbstreams):1137    (result, table) = test_table_stream_with_result1138    assert 'LatestStreamLabel' in result['TableDescription']1139    label_in_create_table = result['TableDescription']['LatestStreamLabel']1140    # Check that ListStreams returns the same stream label as returned1141    # by the original CreateTable1142    (arn, label) = wait_for_active_stream(dynamodbstreams, table)1143    assert label_in_create_table == label1144    # Check that DescribeTable also includes the same LatestStreamLabel:1145    desc = table.meta.client.describe_table(TableName=table.name)['Table']1146    assert 'LatestStreamLabel' in desc1147    assert desc['LatestStreamLabel'] == label_in_create_table1148    # Check that UpdateTable also includes the same LatestStreamLabel.1149    # The "update" changes nothing (it just sets BillingMode to what it was).1150    desc = table.meta.client.update_table(TableName=table.name,1151            BillingMode='PAY_PER_REQUEST')['TableDescription']1152    assert desc['LatestStreamLabel'] == label_in_create_table1153# Test that in a table with Streams enabled, StreamSpecification is returned1154# by CreateTable, DescribeTable and UpdateTable. Reproduces issue #7163.1155def test_stream_specification(test_table_stream_with_result, dynamodbstreams):1156    # StreamSpecification as set in test_table_stream_with_result:1157    stream_specification = {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}1158    (result, table) = test_table_stream_with_result1159    assert 'StreamSpecification' in result['TableDescription']1160    assert stream_specification == result['TableDescription']['StreamSpecification']1161    # Check that DescribeTable also includes the same StreamSpecification:1162    desc = table.meta.client.describe_table(TableName=table.name)['Table']1163    assert 'StreamSpecification' in desc1164    assert stream_specification == desc['StreamSpecification']1165    # Check that UpdateTable also includes the same StreamSpecification.1166    # The "update" changes nothing (it just sets BillingMode to what it was).1167    desc = table.meta.client.update_table(TableName=table.name,1168            BillingMode='PAY_PER_REQUEST')['TableDescription']1169    assert stream_specification == desc['StreamSpecification']1170# The following test checks the behavior of *closed* shards.1171# We achieve a closed shard by disabling the stream - the DynamoDB1172# documentation states that "If you disable a stream, any shards that are1173# still open will be closed. The data in the stream will continue to be1174# readable for 24 hours". In the test we verify that indeed, after a shard1175# is closed, it is still readable with GetRecords (reproduces issue #7239).1176# Moreover, upon reaching the end of data in the shard, the NextShardIterator1177# attribute should say that the end was reached. The DynamoDB documentation1178# says that NextShardIterator should be "set to null" in this case - but it1179# is not clear what "null" means in this context: Should NextShardIterator1180# be missing? Or a "null" JSON type? Or an empty string? This test verifies1181# that the right answer is that NextShardIterator should be *missing*1182# (reproduces issue #7237).1183@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")1184def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):1185    table, arn = test_table_ss_keys_only1186    iterators = latest_iterators(dynamodbstreams, arn)1187    # Do an UpdateItem operation that is expected to leave one event in the1188    # stream.1189    table.update_item(Key={'p': random_string(), 'c': random_string()},1190        UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})1191    # Disable streaming for this table. Note that the test_table_ss_keys_only1192    # fixture has "function" scope so it is fine to ruin table, it will not1193    # be used in other tests.1194    disable_stream(dynamodbstreams, table)1195    # Even after streaming is disabled for the table, we can still read1196    # from the earlier stream (it is guaranteed to work for 24 hours).1197    # The iterators we got earlier should still be fully usable, and1198    # eventually *one* of the stream shards will return one event:1199    timeout = time.time() + 151200    while time.time() < timeout:1201        for iter in iterators:1202            response = dynamodbstreams.get_records(ShardIterator=iter)1203            if 'Records' in response and response['Records'] != []:1204                # Found the shard with the data! Test that it only has1205                # one event. NextShardIterator should either be missing now,1206                # indicating that it is a closed shard (DynamoDB does this),1207                # or, it may (and currently does in Alternator) return another1208                # and reading from *that* iterator should then tell us that1209                # we reached the end of the shard (i.e., zero results and1210                # missing NextShardIterator).1211                assert len(response['Records']) == 11212                if 'NextShardIterator' in response:1213                    response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])1214                    assert len(response['Records']) == 01215                    assert not 'NextShardIterator' in response1216                return1217        time.sleep(0.5)1218    pytest.fail("timed out")1219# TODO: tests on multiple partitions1220# TODO: write a test that disabling the stream and re-enabling it works, but1221#   requires the user to wait for the first stream to become DISABLED before1222#   creating the new one. Then ListStreams should return the two streams,1223#   one DISABLED and one ENABLED? I'm not sure we want or can do this in1224#   Alternator.1225# TODO: Can we test shard splitting? (shard splitting1226#   requires the user to - periodically or following shards ending - to call...__init__.py
Source:__init__.py  
1from __future__ import unicode_literals2from .models import dynamodbstreams_backends3from ..core.models import base_decorator4dynamodbstreams_backend = dynamodbstreams_backends["us-east-1"]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
