How to use dynamodbstreams method in localstack

Best Python code snippet using localstack_python

test_streams.py

Source:test_streams.py Github

copy

Full Screen

1# Copyright 2020 ScyllaDB2#3# This file is part of Scylla.4#5# Scylla is free software: you can redistribute it and/or modify6# it under the terms of the GNU Affero General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# Scylla is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU Affero General Public License16# along with Scylla. If not, see <http://www.gnu.org/licenses/>.17# Tests for stream operations: ListStreams, DescribeStream, GetShardIterator,18# GetRecords.19import pytest20import time21import urllib.request22from botocore.exceptions import ClientError23from util import list_tables, test_table_name, create_test_table, random_string, freeze24from contextlib import contextmanager25from urllib.error import URLError26from boto3.dynamodb.types import TypeDeserializer27stream_types = [ 'OLD_IMAGE', 'NEW_IMAGE', 'KEYS_ONLY', 'NEW_AND_OLD_IMAGES']28def disable_stream(dynamodbstreams, table):29 table.update(StreamSpecification={'StreamEnabled': False});30 # Wait for the stream to really be disabled. A table may have multiple31 # historic streams - we need all of them to become DISABLED. One of32 # them (the current one) may remain DISABLING for some time.33 exp = time.process_time() + 6034 while time.process_time() < exp:35 streams = dynamodbstreams.list_streams(TableName=table.name)36 disabled = True37 for stream in streams['Streams']:38 desc = dynamodbstreams.describe_stream(StreamArn=stream['StreamArn'])['StreamDescription']39 if desc['StreamStatus'] != 'DISABLED':40 disabled = False41 break42 if disabled:43 print('disabled stream on {}'.format(table.name))44 return45 time.sleep(0.5)46 pytest.fail("timed out")47 48# Cannot use fixtures. Because real dynamodb cannot _remove_ a stream49# once created. It can only expire 24h later. So reusing test_table for 50# example works great for scylla/local testing, but once we run against51# actual aws instances, we get lingering streams, and worse: cannot 52# create new ones to replace it, because we have overlapping types etc. 53# 54# So we have to create and delete a table per test. And not run this 55# test to often against aws. 56@contextmanager57def create_stream_test_table(dynamodb, StreamViewType=None):58 spec = { 'StreamEnabled': False }59 if StreamViewType != None:60 spec = {'StreamEnabled': True, 'StreamViewType': StreamViewType}61 table = create_test_table(dynamodb, StreamSpecification=spec,62 KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },63 { 'AttributeName': 'c', 'KeyType': 'RANGE' }64 ],65 AttributeDefinitions=[66 { 'AttributeName': 'p', 'AttributeType': 'S' },67 { 'AttributeName': 'c', 'AttributeType': 'S' },68 ])69 yield table70 while True:71 try:72 table.delete()73 return74 except ClientError as ce:75 # if the table has a stream currently being created we cannot76 # delete the table immediately. Again, only with real dynamo77 if ce.response['Error']['Code'] == 'ResourceInUseException':78 print('Could not delete table yet. Sleeping 5s.')79 time.sleep(5)80 continue;81 raise82def wait_for_active_stream(dynamodbstreams, table, timeout=60):83 exp = time.process_time() + timeout84 while time.process_time() < exp:85 streams = dynamodbstreams.list_streams(TableName=table.name)86 for stream in streams['Streams']:87 arn = stream['StreamArn']88 if arn == None:89 continue90 desc = dynamodbstreams.describe_stream(StreamArn=arn)['StreamDescription']91 if not 'StreamStatus' in desc or desc.get('StreamStatus') == 'ENABLED':92 return (arn, stream['StreamLabel']);93 # real dynamo takes some time until a stream is usable94 print("Stream not available. Sleep 5s...")95 time.sleep(5)96 assert False97# Local java dynamodb server version behaves differently from 98# the "real" one. Most importantly, it does not verify a number of 99# parameters, and consequently does not throw when called with borked 100# args. This will try to check if we are in fact running against 101# this test server, and if so, just raise the error here and be done 102# with it. All this just so we can run through the tests on 103# aws, scylla and local. 104def is_local_java(dynamodbstreams):105 # no good way to check, but local server runs on a Jetty, 106 # so check for that. 107 url = dynamodbstreams.meta.endpoint_url108 try: 109 urllib.request.urlopen(url)110 except URLError as e:111 if hasattr(e, 'info'):112 return e.info()['Server'].startswith('Jetty')113 return False114def ensure_java_server(dynamodbstreams, error='ValidationException'):115 # no good way to check, but local server has a "shell" builtin, 116 # so check for that. 117 if is_local_java(dynamodbstreams):118 if error != None:119 raise ClientError({'Error': { 'Code' : error }}, '')120 return121 assert False122def test_list_streams_create(dynamodb, dynamodbstreams):123 for type in stream_types:124 with create_stream_test_table(dynamodb, StreamViewType=type) as table:125 wait_for_active_stream(dynamodbstreams, table)126def test_list_streams_alter(dynamodb, dynamodbstreams):127 for type in stream_types:128 with create_stream_test_table(dynamodb, StreamViewType=None) as table:129 res = table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': type});130 wait_for_active_stream(dynamodbstreams, table)131def test_list_streams_paged(dynamodb, dynamodbstreams):132 for type in stream_types:133 with create_stream_test_table(dynamodb, StreamViewType=type) as table1:134 with create_stream_test_table(dynamodb, StreamViewType=type) as table2:135 wait_for_active_stream(dynamodbstreams, table1)136 wait_for_active_stream(dynamodbstreams, table2)137 streams = dynamodbstreams.list_streams(Limit=1)138 assert streams139 assert streams.get('Streams')140 assert streams.get('LastEvaluatedStreamArn')141 tables = [ table1.name, table2.name ]142 while True:143 for s in streams['Streams']:144 name = s['TableName']145 if name in tables: tables.remove(name)146 if not tables:147 break148 streams = dynamodbstreams.list_streams(Limit=1, ExclusiveStartStreamArn=streams['LastEvaluatedStreamArn'])149@pytest.mark.skip(reason="Python driver validates Limit, so trying to test it is pointless")150def test_list_streams_zero_limit(dynamodb, dynamodbstreams):151 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:152 with pytest.raises(ClientError, match='ValidationException'):153 wait_for_active_stream(dynamodbstreams, table)154 dynamodbstreams.list_streams(Limit=0)155def test_create_streams_wrong_type(dynamodb, dynamodbstreams, test_table):156 with pytest.raises(ClientError, match='ValidationException'):157 # should throw158 test_table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'Fisk'});159 # just in case the test fails, disable stream again160 test_table.update(StreamSpecification={'StreamEnabled': False});161def test_list_streams_empty(dynamodb, dynamodbstreams, test_table):162 streams = dynamodbstreams.list_streams(TableName=test_table.name)163 assert 'Streams' in streams164 assert not streams['Streams'] # empty165def test_list_streams_with_nonexistent_last_stream(dynamodb, dynamodbstreams):166 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:167 with pytest.raises(ClientError, match='ValidationException'):168 streams = dynamodbstreams.list_streams(TableName=table.name, ExclusiveStartStreamArn='kossaapaaasdafsdaasdasdasdasasdasfadfadfasdasdas')169 assert 'Streams' in streams170 assert not streams['Streams'] # empty171 # local java dynamodb does _not_ raise validation error for 172 # malformed stream arn here. verify 173 ensure_java_server(dynamodbstreams)174def test_describe_stream(dynamodb, dynamodbstreams):175 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:176 streams = dynamodbstreams.list_streams(TableName=table.name)177 arn = streams['Streams'][0]['StreamArn'];178 desc = dynamodbstreams.describe_stream(StreamArn=arn)179 assert desc;180 assert desc.get('StreamDescription')181 assert desc['StreamDescription']['StreamArn'] == arn182 assert desc['StreamDescription']['StreamStatus'] != 'DISABLED'183 assert desc['StreamDescription']['StreamViewType'] == 'KEYS_ONLY'184 assert desc['StreamDescription']['TableName'] == table.name185 assert desc['StreamDescription'].get('Shards')186 assert desc['StreamDescription']['Shards'][0].get('ShardId')187 assert desc['StreamDescription']['Shards'][0].get('SequenceNumberRange')188 assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange'].get('StartingSequenceNumber')189 # We don't know what the sequence number is supposed to be, but the190 # DynamoDB documentation requires that it contains only numeric191 # characters and some libraries rely on this. Reproduces issue #7158:192 assert desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'].isdecimal()193@pytest.mark.xfail(reason="alternator does not have creation time on streams")194def test_describe_stream_create_time(dynamodb, dynamodbstreams):195 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:196 streams = dynamodbstreams.list_streams(TableName=table.name)197 arn = streams['Streams'][0]['StreamArn'];198 desc = dynamodbstreams.describe_stream(StreamArn=arn)199 assert desc;200 assert desc.get('StreamDescription')201 # note these are non-required attributes202 assert 'CreationRequestDateTime' in desc['StreamDescription']203def test_describe_nonexistent_stream(dynamodb, dynamodbstreams):204 with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):205 streams = dynamodbstreams.describe_stream(StreamArn='sdfadfsdfnlfkajakfgjalksfgklasjklasdjfklasdfasdfgasf')206def test_describe_stream_with_nonexistent_last_shard(dynamodb, dynamodbstreams):207 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:208 streams = dynamodbstreams.list_streams(TableName=table.name)209 arn = streams['Streams'][0]['StreamArn'];210 try:211 desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId='zzzzzzzzzzzzzzzzzzzzzzzzsfasdgagadfadfgagkjsdfsfsdjfjks')212 assert not desc['StreamDescription']['Shards']213 except:214 # local java throws here. real does not. 215 ensure_java_server(dynamodbstreams, error=None)216def test_get_shard_iterator(dynamodb, dynamodbstreams):217 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:218 streams = dynamodbstreams.list_streams(TableName=table.name)219 arn = streams['Streams'][0]['StreamArn'];220 desc = dynamodbstreams.describe_stream(StreamArn=arn)221 shard_id = desc['StreamDescription']['Shards'][0]['ShardId'];222 seq = desc['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'];223 # spec says shard_id must be 65 chars or less224 assert len(shard_id) <= 65225 for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']: 226 iter = dynamodbstreams.get_shard_iterator(227 StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq228 )229 assert iter.get('ShardIterator')230 for type in ['TRIM_HORIZON', 'LATEST']: 231 iter = dynamodbstreams.get_shard_iterator(232 StreamArn=arn, ShardId=shard_id, ShardIteratorType=type233 )234 assert iter.get('ShardIterator')235 for type in ['AT_SEQUENCE_NUMBER', 'AFTER_SEQUENCE_NUMBER']: 236 # must have seq in these modes237 with pytest.raises(ClientError, match='ValidationException'):238 iter = dynamodbstreams.get_shard_iterator(239 StreamArn=arn, ShardId=shard_id, ShardIteratorType=type240 )241 for type in ['TRIM_HORIZON', 'LATEST']: 242 # should not set "seq" in these modes243 with pytest.raises(ClientError, match='ValidationException'):244 dynamodbstreams.get_shard_iterator(245 StreamArn=arn, ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq246 )247 # bad arn248 with pytest.raises(ClientError, match='ValidationException'):249 iter = dynamodbstreams.get_shard_iterator(250 StreamArn='sdfadsfsdfsdgdfsgsfdabadfbabdadsfsdfsdfsdfsdfsdfsdfdfdssdffbdfdf', ShardId=shard_id, ShardIteratorType=type, SequenceNumber=seq251 )252 # bad shard id 253 with pytest.raises(ClientError, match='ResourceNotFoundException'):254 dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId='semprinidfaasdasfsdvacsdcfsdsvsdvsdvsdvsdvsdv', 255 ShardIteratorType='LATEST'256 )257 # bad iter type258 with pytest.raises(ClientError, match='ValidationException'):259 dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, 260 ShardIteratorType='bulle', SequenceNumber=seq261 )262 # bad seq 263 with pytest.raises(ClientError, match='ValidationException'):264 dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, 265 ShardIteratorType='LATEST', SequenceNumber='sdfsafglldfngjdafnasdflgnaldklkafdsgklnasdlv'266 )267def test_get_shard_iterator_for_nonexistent_stream(dynamodb, dynamodbstreams):268 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:269 (arn, label) = wait_for_active_stream(dynamodbstreams, table)270 desc = dynamodbstreams.describe_stream(StreamArn=arn)271 shards = desc['StreamDescription']['Shards']272 with pytest.raises(ClientError, match='ResourceNotFoundException' if is_local_java(dynamodbstreams) else 'ValidationException'):273 dynamodbstreams.get_shard_iterator(274 StreamArn='sdfadfsddafgdafsgjnadflkgnalngalsdfnlkasnlkasdfasdfasf', ShardId=shards[0]['ShardId'], ShardIteratorType='LATEST'275 )276def test_get_shard_iterator_for_nonexistent_shard(dynamodb, dynamodbstreams):277 with create_stream_test_table(dynamodb, StreamViewType='KEYS_ONLY') as table:278 streams = dynamodbstreams.list_streams(TableName=table.name)279 arn = streams['Streams'][0]['StreamArn'];280 with pytest.raises(ClientError, match='ResourceNotFoundException'):281 dynamodbstreams.get_shard_iterator(282 StreamArn=arn, ShardId='adfasdasdasdasdasdasdasdasdasasdasd', ShardIteratorType='LATEST'283 )284def test_get_records(dynamodb, dynamodbstreams):285 # TODO: add tests for storage/transactionable variations and global/local index286 with create_stream_test_table(dynamodb, StreamViewType='NEW_AND_OLD_IMAGES') as table:287 (arn, label) = wait_for_active_stream(dynamodbstreams, table)288 p = 'piglet'289 c = 'ninja'290 val = 'lucifers'291 val2 = 'flowers'292 table.put_item(Item={'p': p, 'c': c, 'a1': val, 'a2': val2})293 nval = 'semprini'294 nval2 = 'nudge'295 table.update_item(Key={'p': p, 'c': c}, 296 AttributeUpdates={ 'a1': {'Value': nval, 'Action': 'PUT'},297 'a2': {'Value': nval2, 'Action': 'PUT'}298 })299 has_insert = False300 # in truth, we should sleep already here, since at least scylla301 # will not be able to produce any stream content until 302 # ~30s after insert/update (confidence iterval)303 # but it is useful to see a working null-iteration as well, so 304 # lets go already.305 while True:306 desc = dynamodbstreams.describe_stream(StreamArn=arn)307 iterators = []308 while True:309 shards = desc['StreamDescription']['Shards']310 for shard in shards:311 shard_id = shard['ShardId']312 start = shard['SequenceNumberRange']['StartingSequenceNumber']313 iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',SequenceNumber=start)['ShardIterator']314 iterators.append(iter)315 last_shard = desc["StreamDescription"].get("LastEvaluatedShardId")316 if not last_shard:317 break318 desc = dynamodbstreams.describe_stream(StreamArn=arn, ExclusiveStartShardId=last_shard)319 next_iterators = []320 while iterators:321 iter = iterators.pop(0)322 response = dynamodbstreams.get_records(ShardIterator=iter, Limit=1000)323 if 'NextShardIterator' in response:324 next_iterators.append(response['NextShardIterator'])325 records = response.get('Records')326 # print("Query {} -> {}".format(iter, records))327 if records:328 for record in records:329 # print("Record: {}".format(record))330 type = record['eventName']331 dynamodb = record['dynamodb']332 keys = dynamodb['Keys']333 334 assert keys.get('p')335 assert keys.get('c')336 assert keys['p'].get('S')337 assert keys['p']['S'] == p338 assert keys['c'].get('S')339 assert keys['c']['S'] == c340 if type == 'MODIFY' or type == 'INSERT':341 assert dynamodb.get('NewImage')342 newimage = dynamodb['NewImage'];343 assert newimage.get('a1')344 assert newimage.get('a2')345 if type == 'INSERT' or (type == 'MODIFY' and not has_insert):346 assert newimage['a1']['S'] == val347 assert newimage['a2']['S'] == val2348 has_insert = True349 continue350 if type == 'MODIFY':351 assert has_insert352 assert newimage['a1']['S'] == nval353 assert newimage['a2']['S'] == nval2354 assert dynamodb.get('OldImage')355 oldimage = dynamodb['OldImage'];356 assert oldimage.get('a1')357 assert oldimage.get('a2')358 assert oldimage['a1']['S'] == val359 assert oldimage['a2']['S'] == val2360 return361 print("Not done. Sleep 10s...")362 time.sleep(10)363 iterators = next_iterators364def test_get_records_nonexistent_iterator(dynamodbstreams):365 with pytest.raises(ClientError, match='ValidationException'):366 dynamodbstreams.get_records(ShardIterator='sdfsdfsgagaddafgagasgasgasdfasdfasdfasdfasdgasdasdgasdg', Limit=1000)367##############################################################################368# Fixtures for creating a table with a stream enabled with one of the allowed369# StreamViewType settings (KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES).370# Unfortunately changing the StreamViewType setting of an existing stream is371# not allowed (see test_streams_change_type), and while removing and re-adding372# a stream is posssible, it is very slow. So we create four different fixtures373# with the four different StreamViewType settings for these four fixtures.374#375# It turns out that DynamoDB makes reusing the same table in different tests376# very difficult, because when we request a "LATEST" iterator we sometimes377# miss the immediately following write (this issue doesn't happen in378# ALternator, just in DynamoDB - presumably LATEST adds some time slack?)379# So all the fixtures we create below have scope="function", meaning that a380# separate table is created for each of the tests using these fixtures. This381# slows the tests down a bit, but not by much (about 0.05 seconds per test).382# It is still worthwhile to use a fixture rather than to create a table383# explicitly - it is convenient, safe (the table gets deleted automatically)384# and if in the future we can work around the DynamoDB problem, we can return385# these fixtures to session scope.386def create_table_ss(dynamodb, dynamodbstreams, type):387 table = create_test_table(dynamodb,388 KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' }],389 AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' }],390 StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': type })391 (arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)392 yield table, arn393 table.delete()394@pytest.fixture(scope="function")395def test_table_ss_keys_only(dynamodb, dynamodbstreams):396 yield from create_table_ss(dynamodb, dynamodbstreams, 'KEYS_ONLY')397@pytest.fixture(scope="function")398def test_table_ss_new_image(dynamodb, dynamodbstreams):399 yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_IMAGE')400@pytest.fixture(scope="function")401def test_table_ss_old_image(dynamodb, dynamodbstreams):402 yield from create_table_ss(dynamodb, dynamodbstreams, 'OLD_IMAGE')403@pytest.fixture(scope="function")404def test_table_ss_new_and_old_images(dynamodb, dynamodbstreams):405 yield from create_table_ss(dynamodb, dynamodbstreams, 'NEW_AND_OLD_IMAGES')406# Test that it is, sadly, not allowed to use UpdateTable on a table which407# already has a stream enabled to change that stream's StreamViewType.408# Currently, Alternator does allow this (see issue #6939), so the test is409# marked xfail.410@pytest.mark.xfail(reason="Alternator allows changing StreamViewType - see issue #6939")411def test_streams_change_type(test_table_ss_keys_only):412 table, arn = test_table_ss_keys_only413 with pytest.raises(ClientError, match='ValidationException.*already'):414 table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE'});415 # If the above change succeeded (because of issue #6939), switch it back :-)416 table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'});417# Utility function for listing all the shards of the given stream arn.418# Implemented by multiple calls to DescribeStream, possibly several pages419# until all the shards are returned. The return of this function should be420# cached - it is potentially slow, and DynamoDB documentation even states421# DescribeStream may only be called at a maximum rate of 10 times per second.422# list_shards() only returns the shard IDs, not the information about the423# shards' sequence number range, which is also returned by DescribeStream.424def list_shards(dynamodbstreams, arn):425 # By default DescribeStream lists a limit of 100 shards. For faster426 # tests we reduced the number of shards in the testing setup to427 # 32 (16 vnodes x 2 cpus), see issue #6979, so to still exercise this428 # paging feature, lets use a limit of 10.429 limit = 10430 response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit)['StreamDescription']431 assert len(response['Shards']) <= limit432 shards = [x['ShardId'] for x in response['Shards']]433 while 'LastEvaluatedShardId' in response:434 # 7409 kinesis ignores LastEvaluatedShardId and just looks at last shard435 assert shards[-1] == response['LastEvaluatedShardId']436 response = dynamodbstreams.describe_stream(StreamArn=arn, Limit=limit,437 ExclusiveStartShardId=response['LastEvaluatedShardId'])['StreamDescription']438 assert len(response['Shards']) <= limit439 shards.extend([x['ShardId'] for x in response['Shards']])440 print('Number of shards in stream: {}'.format(len(shards)))441 assert len(set(shards)) == len(shards)442 # 7409 - kinesis required shards to be in lexical order.443 # verify.444 assert shards == sorted(shards)445 # special test: ensure we get nothing more if we ask for starting at the last446 # of the last447 response = dynamodbstreams.describe_stream(StreamArn=arn,448 ExclusiveStartShardId=shards[-1])['StreamDescription']449 assert len(response['Shards']) == 0450 return shards451# Utility function for getting shard iterators starting at "LATEST" for452# all the shards of the given stream arn.453def latest_iterators(dynamodbstreams, arn):454 iterators = []455 for shard_id in list_shards(dynamodbstreams, arn):456 iterators.append(dynamodbstreams.get_shard_iterator(StreamArn=arn,457 ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator'])458 assert len(set(iterators)) == len(iterators)459 return iterators460# Utility function for fetching more content from the stream (given its461# array of iterators) into an "output" array. Call repeatedly to get more462# content - the function returns a new array of iterators which should be463# used to replace the input list of iterators.464# Note that the order of the updates is guaranteed for the same partition,465# but cannot be trusted for *different* partitions.466def fetch_more(dynamodbstreams, iterators, output):467 new_iterators = []468 for iter in iterators:469 response = dynamodbstreams.get_records(ShardIterator=iter)470 if 'NextShardIterator' in response:471 new_iterators.append(response['NextShardIterator'])472 output.extend(response['Records'])473 assert len(set(new_iterators)) == len(new_iterators)474 return new_iterators475# Utility function for comparing "output" as fetched by fetch_more(), to a list476# expected_events, each of which looks like:477# [type, keys, old_image, new_image]478# where type is REMOVE, INSERT or MODIFY.479# The "mode" parameter specifies which StreamViewType mode (KEYS_ONLY,480# OLD_IMAGE, NEW_IMAGE, NEW_AND_OLD_IMAGES) was supposedly used to generate481# "output". This mode dictates what we can compare - e.g., in KEYS_ONLY mode482# the compare_events() function ignores the the old and new image in483# expected_events.484# compare_events() throws an exception immediately if it sees an unexpected485# event, but if some of the expected events are just missing in the "output",486# it only returns false - suggesting maybe the caller needs to try again487# later - maybe more output is coming.488# Note that the order of events is only guaranteed (and therefore compared)489# inside a single partition.490def compare_events(expected_events, output, mode):491 # The order of expected_events is only meaningful inside a partition, so492 # let's convert it into a map indexed by partition key.493 expected_events_map = {}494 for event in expected_events:495 expected_type, expected_key, expected_old_image, expected_new_image = event496 # For simplicity, we actually use the entire key, not just the partiton497 # key. We only lose a bit of testing power we didn't plan to test anyway498 # (that events for different items in the same partition are ordered).499 key = freeze(expected_key)500 if not key in expected_events_map:501 expected_events_map[key] = []502 expected_events_map[key].append(event)503 # Iterate over the events in output. An event for a certain key needs to504 # be the *first* remaining event for this key in expected_events_map (and505 # then we remove this matched even from expected_events_map)506 for event in output:507 # In DynamoDB, eventSource is 'aws:dynamodb'. We decided to set it to508 # a *different* value - 'scylladb:alternator'. Issue #6931.509 assert 'eventSource' in event510 # Alternator is missing "awsRegion", which makes little sense for it511 # (although maybe we should have provided the DC name). Issue #6931.512 #assert 'awsRegion' in event513 # Alternator is also missing the "eventVersion" entry. Issue #6931.514 #assert 'eventVersion' in event515 # Check that eventID appears, but can't check much on what it is.516 assert 'eventID' in event517 op = event['eventName']518 record = event['dynamodb']519 # record['Keys'] is "serialized" JSON, ({'S', 'thestring'}), so we520 # want to deserialize it to match our expected_events content.521 deserializer = TypeDeserializer()522 key = {x:deserializer.deserialize(y) for (x,y) in record['Keys'].items()}523 expected_type, expected_key, expected_old_image, expected_new_image = expected_events_map[freeze(key)].pop(0)524 if expected_type != '*': # hack to allow a caller to not test op, to bypass issue #6918.525 assert op == expected_type526 assert record['StreamViewType'] == mode527 # We don't know what ApproximateCreationDateTime should be, but we do528 # know it needs to be a timestamp - there is conflicting documentation529 # in what format (ISO 8601?). In any case, boto3 parses this timestamp530 # for us, so we can't check it here, beyond checking it exists.531 assert 'ApproximateCreationDateTime' in record532 # We don't know what SequenceNumber is supposed to be, but the DynamoDB533 # documentation requires that it contains only numeric characters and534 # some libraries rely on this. This reproduces issue #7158:535 assert 'SequenceNumber' in record536 assert record['SequenceNumber'].isdecimal()537 # Alternator doesn't set the SizeBytes member. Issue #6931.538 #assert 'SizeBytes' in record539 if mode == 'KEYS_ONLY':540 assert not 'NewImage' in record541 assert not 'OldImage' in record542 elif mode == 'NEW_IMAGE':543 assert not 'OldImage' in record544 if expected_new_image == None:545 assert not 'NewImage' in record546 else:547 new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}548 assert expected_new_image == new_image549 elif mode == 'OLD_IMAGE':550 assert not 'NewImage' in record551 if expected_old_image == None:552 assert not 'OldImage' in record553 pass554 else:555 old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}556 assert expected_old_image == old_image557 elif mode == 'NEW_AND_OLD_IMAGES':558 if expected_new_image == None:559 assert not 'NewImage' in record560 else:561 new_image = {x:deserializer.deserialize(y) for (x,y) in record['NewImage'].items()}562 assert expected_new_image == new_image563 if expected_old_image == None:564 assert not 'OldImage' in record565 else:566 old_image = {x:deserializer.deserialize(y) for (x,y) in record['OldImage'].items()}567 assert expected_old_image == old_image568 else:569 pytest.fail('cannot happen')570 # After the above loop, expected_events_map should remain empty arrays.571 # If it isn't, one of the expected events did not yet happen. Return False.572 for entry in expected_events_map.values():573 if len(entry) > 0:574 return False575 return True576# Convenience funtion used to implement several tests below. It runs a given577# function "updatefunc" which is supposed to do some updates to the table578# and also return an expected_events list. do_test() then fetches the streams579# data and compares it to the expected_events using compare_events().580def do_test(test_table_ss_stream, dynamodbstreams, updatefunc, mode, p = random_string(), c = random_string()):581 table, arn = test_table_ss_stream582 iterators = latest_iterators(dynamodbstreams, arn)583 expected_events = updatefunc(table, p, c)584 # Updating the stream is asynchronous. Moreover, it may even be delayed585 # artificially (see Alternator's alternator_streams_time_window_s config).586 # So if compare_events() reports the stream data is missing some of the587 # expected events (by returning False), we need to retry it for some time.588 # Note that compare_events() throws if the stream data contains *wrong*589 # (unexpected) data, so even failing tests usually finish reasonably590 # fast - depending on the alternator_streams_time_window_s parameter.591 # This is optimization is important to keep *failing* tests reasonably592 # fast and not have to wait until the following arbitrary timeout.593 timeout = time.time() + 20594 output = []595 while time.time() < timeout:596 iterators = fetch_more(dynamodbstreams, iterators, output)597 print("after fetch_more number expected_events={}, output={}".format(len(expected_events), len(output)))598 if compare_events(expected_events, output, mode):599 # success!600 return601 time.sleep(0.5)602 # If we're still here, the last compare_events returned false.603 pytest.fail('missing events in output: {}'.format(output))604# Test a single PutItem of a new item. Should result in a single INSERT605# event. Currently fails because in Alternator, PutItem - which generates a606# tombstone to *replace* an item - generates REMOVE+MODIFY (issue #6930).607@pytest.mark.xfail(reason="Currently fails - see issue #6930")608def test_streams_putitem_keys_only(test_table_ss_keys_only, dynamodbstreams):609 def do_updates(table, p, c):610 events = []611 table.put_item(Item={'p': p, 'c': c, 'x': 2})612 events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])613 return events614 do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')615# Test a single UpdateItem. Should result in a single INSERT event.616# Currently fails because Alternator generates a MODIFY event even though617# this is a new item (issue #6918).618@pytest.mark.xfail(reason="Currently fails - see issue #6918")619def test_streams_updateitem_keys_only(test_table_ss_keys_only, dynamodbstreams):620 def do_updates(table, p, c):621 events = []622 table.update_item(Key={'p': p, 'c': c},623 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})624 events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])625 return events626 do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')627# This is exactly the same test as test_streams_updateitem_keys_only except628# we don't verify the type of even we find (MODIFY or INSERT). It allows us629# to have at least one good GetRecords test until solving issue #6918.630# When we do solve that issue, this test should be removed.631def test_streams_updateitem_keys_only_2(test_table_ss_keys_only, dynamodbstreams):632 def do_updates(table, p, c):633 events = []634 table.update_item(Key={'p': p, 'c': c},635 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})636 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])637 return events638 do_test(test_table_ss_keys_only, dynamodbstreams, do_updates, 'KEYS_ONLY')639# Test OLD_IMAGE using UpdateItem. Verify that the OLD_IMAGE indeed includes,640# as needed, the entire old item and not just the modified columns.641# Reproduces issue #6935642def test_streams_updateitem_old_image(test_table_ss_old_image, dynamodbstreams):643 def do_updates(table, p, c):644 events = []645 table.update_item(Key={'p': p, 'c': c},646 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})647 # We use here '*' instead of the really expected 'INSERT' to avoid648 # checking again the same Alternator bug already checked by649 # test_streams_updateitem_keys_only (issue #6918).650 # Note: The "None" as OldImage here tests that the OldImage should be651 # missing because the item didn't exist. This reproduces issue #6933.652 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])653 table.update_item(Key={'p': p, 'c': c},654 UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})655 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])656 return events657 do_test(test_table_ss_old_image, dynamodbstreams, do_updates, 'OLD_IMAGE')658# Above we verified that if an item did not previously exist, the OLD_IMAGE659# would be missing, but if the item did previously exist, OLD_IMAGE should660# be present and must include the key. Here we confirm the special case the661# latter case - the case of a pre-existing *empty* item, which has just the662# key - in this case since the item did exist, OLD_IMAGE should be returned -663# and include just the key. This is a special case of reproducing #6935 -664# the first patch for this issue failed in this special case.665def test_streams_updateitem_old_image_empty_item(test_table_ss_old_image, dynamodbstreams):666 def do_updates(table, p, c):667 events = []668 # Create an *empty* item, with nothing except a key:669 table.update_item(Key={'p': p, 'c': c})670 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c}])671 table.update_item(Key={'p': p, 'c': c},672 UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})673 # Note that OLD_IMAGE should be present and be the empty item,674 # with just a key, not entirely missing.675 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}])676 return events677 do_test(test_table_ss_old_image, dynamodbstreams, do_updates, 'OLD_IMAGE')678# Test that OLD_IMAGE indeed includes the entire old item and not just the679# modified attributes, in the special case of attributes which are a key of680# a secondary index.681# The unique thing about this case is that as currently implemented,682# secondary-index key attributes are real Scylla columns, contrasting with683# other attributes which are just elements of a map. And our CDC684# implementation treats those cases differently - when a map is modified685# the preimage includes the entire content of the map, but for regular686# columns they are only included in the preimage if they change.687# Currently fails in Alternator because the item's key is missing in688# OldImage (#6935) and the LSI key is also missing (#7030).689@pytest.fixture(scope="function")690def test_table_ss_old_image_and_lsi(dynamodb, dynamodbstreams):691 table = create_test_table(dynamodb,692 KeySchema=[693 {'AttributeName': 'p', 'KeyType': 'HASH'},694 {'AttributeName': 'c', 'KeyType': 'RANGE'}],695 AttributeDefinitions=[696 { 'AttributeName': 'p', 'AttributeType': 'S' },697 { 'AttributeName': 'c', 'AttributeType': 'S' },698 { 'AttributeName': 'k', 'AttributeType': 'S' }],699 LocalSecondaryIndexes=[{700 'IndexName': 'index',701 'KeySchema': [702 {'AttributeName': 'p', 'KeyType': 'HASH'},703 {'AttributeName': 'k', 'KeyType': 'RANGE'}],704 'Projection': { 'ProjectionType': 'ALL' }705 }],706 StreamSpecification={ 'StreamEnabled': True, 'StreamViewType': 'OLD_IMAGE' })707 (arn, label) = wait_for_active_stream(dynamodbstreams, table, timeout=60)708 yield table, arn709 table.delete()710def test_streams_updateitem_old_image_lsi(test_table_ss_old_image_and_lsi, dynamodbstreams):711 def do_updates(table, p, c):712 events = []713 table.update_item(Key={'p': p, 'c': c},714 UpdateExpression='SET x = :x, k = :k',715 ExpressionAttributeValues={':x': 2, ':k': 'dog'})716 # We use here '*' instead of the really expected 'INSERT' to avoid717 # checking again the same Alternator bug already checked by718 # test_streams_updateitem_keys_only (issue #6918).719 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}])720 table.update_item(Key={'p': p, 'c': c},721 UpdateExpression='SET y = :y', ExpressionAttributeValues={':y': 3})722 # In issue #7030, the 'k' value was missing from the OldImage.723 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'k': 'dog'}, {'p': p, 'c': c, 'x': 2, 'k': 'dog', 'y': 3}])724 return events725 do_test(test_table_ss_old_image_and_lsi, dynamodbstreams, do_updates, 'OLD_IMAGE')726# Tests similar to the above tests for OLD_IMAGE, just for NEW_IMAGE mode.727# Verify that the NEW_IMAGE includes the entire old item (including the key),728# that deleting the item results in a missing NEW_IMAGE, and that setting the729# item to be empty has a different result - a NEW_IMAGE with just a key.730# Reproduces issue #7107.731def test_streams_new_image(test_table_ss_new_image, dynamodbstreams):732 def do_updates(table, p, c):733 events = []734 table.update_item(Key={'p': p, 'c': c},735 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})736 # Confirm that when adding one attribute "x", the NewImage contains both737 # the new x, and the key columns (p and c).738 # We use here '*' instead of 'INSERT' to avoid testing issue #6918 here.739 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])740 # Confirm that when adding just attribute "y", the NewImage will contain741 # all the attributes, including the old "x":742 table.update_item(Key={'p': p, 'c': c},743 UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})744 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])745 # Confirm that when deleting the columns x and y, the NewImage becomes746 # empty - but still exists and contains the key columns,747 table.update_item(Key={'p': p, 'c': c},748 UpdateExpression='REMOVE x, y')749 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])750 # Confirm that deleting the item results in a missing NewImage:751 table.delete_item(Key={'p': p, 'c': c})752 events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c}, None])753 return events754 do_test(test_table_ss_new_image, dynamodbstreams, do_updates, 'NEW_IMAGE')755# Test similar to the above test for NEW_IMAGE corner cases, but here for756# NEW_AND_OLD_IMAGES mode.757# Although it is likely that if both OLD_IMAGE and NEW_IMAGE work correctly then758# so will the combined NEW_AND_OLD_IMAGES mode, it is still possible that the759# implementation of the combined mode has unique bugs, so it is worth testing760# it separately.761# Reproduces issue #7107.762def test_streams_new_and_old_images(test_table_ss_new_and_old_images, dynamodbstreams):763 def do_updates(table, p, c):764 events = []765 table.update_item(Key={'p': p, 'c': c},766 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 2})767 # The item doesn't yet exist, so OldImage is missing.768 # We use here '*' instead of 'INSERT' to avoid testing issue #6918 here.769 events.append(['*', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])770 # Confirm that when adding just attribute "y", the NewImage will contain771 # all the attributes, including the old "x". Also, OldImage contains the772 # key attributes, not just "x":773 table.update_item(Key={'p': p, 'c': c},774 UpdateExpression='SET y = :val1', ExpressionAttributeValues={':val1': 3})775 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'x': 2, 'y': 3}])776 # Confirm that when deleting the attributes x and y, the NewImage becomes777 # empty - but still exists and contains the key attributes:778 table.update_item(Key={'p': p, 'c': c},779 UpdateExpression='REMOVE x, y')780 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2, 'y': 3}, {'p': p, 'c': c}])781 # Confirm that when adding an attribute z to the empty item, OldItem is782 # not missing - it just contains only the key attributes:783 table.update_item(Key={'p': p, 'c': c},784 UpdateExpression='SET z = :val1', ExpressionAttributeValues={':val1': 4})785 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}])786 # Confirm that deleting the item results in a missing NewImage:787 table.delete_item(Key={'p': p, 'c': c})788 events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'z': 4}, None])789 return events790 do_test(test_table_ss_new_and_old_images, dynamodbstreams, do_updates, 'NEW_AND_OLD_IMAGES')791# Test that when a stream shard has no data to read, GetRecords returns an792# empty Records array - not a missing one. Reproduces issue #6926.793def test_streams_no_records(test_table_ss_keys_only, dynamodbstreams):794 table, arn = test_table_ss_keys_only795 # Get just one shard - any shard - and its LATEST iterator. Because it's796 # LATEST, there will be no data to read from this iterator.797 shard = dynamodbstreams.describe_stream(StreamArn=arn, Limit=1)['StreamDescription']['Shards'][0]798 shard_id = shard['ShardId']799 iter = dynamodbstreams.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']800 response = dynamodbstreams.get_records(ShardIterator=iter)801 assert 'NextShardIterator' in response or 'EndingSequenceNumber' in shard['SequenceNumberRange']802 assert 'Records' in response803 # We expect Records to be empty - there is no data at the LATEST iterator.804 assert response['Records'] == []805# Test that after fetching the last result from a shard, we don't get it806# yet again. Reproduces issue #6942.807def test_streams_last_result(test_table_ss_keys_only, dynamodbstreams):808 table, arn = test_table_ss_keys_only809 iterators = latest_iterators(dynamodbstreams, arn)810 # Do an UpdateItem operation that is expected to leave one event in the811 # stream.812 table.update_item(Key={'p': random_string(), 'c': random_string()},813 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})814 # Eventually (we may need to retry this for a while), *one* of the815 # stream shards will return one event:816 timeout = time.time() + 15817 while time.time() < timeout:818 for iter in iterators:819 response = dynamodbstreams.get_records(ShardIterator=iter)820 if 'Records' in response and response['Records'] != []:821 # Found the shard with the data! Test that it only has822 # one event and that if we try to read again, we don't823 # get more data (this was issue #6942).824 assert len(response['Records']) == 1825 assert 'NextShardIterator' in response826 response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])827 assert response['Records'] == []828 return829 time.sleep(0.5)830 pytest.fail("timed out")831# In test_streams_last_result above we tested that there is no further events832# after reading the only one. In this test we verify that if we *do* perform833# another change on the same key, we do get another event and it happens on the834# *same* shard.835def test_streams_another_result(test_table_ss_keys_only, dynamodbstreams):836 table, arn = test_table_ss_keys_only837 iterators = latest_iterators(dynamodbstreams, arn)838 # Do an UpdateItem operation that is expected to leave one event in the839 # stream.840 p = random_string()841 c = random_string()842 table.update_item(Key={'p': p, 'c': c},843 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})844 # Eventually, *one* of the stream shards will return one event:845 timeout = time.time() + 15846 while time.time() < timeout:847 for iter in iterators:848 response = dynamodbstreams.get_records(ShardIterator=iter)849 if 'Records' in response and response['Records'] != []:850 # Finally found the shard reporting the changes to this key851 assert len(response['Records']) == 1852 assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}853 assert 'NextShardIterator' in response854 iter = response['NextShardIterator']855 # Found the shard with the data. It only has one event so if856 # we try to read again, we find nothing (this is the same as857 # what test_streams_last_result tests).858 response = dynamodbstreams.get_records(ShardIterator=iter)859 assert response['Records'] == []860 assert 'NextShardIterator' in response861 iter = response['NextShardIterator']862 # Do another UpdateItem operation to the same key, so it is863 # expected to write to the *same* shard:864 table.update_item(Key={'p': p, 'c': c},865 UpdateExpression='SET x = :val2', ExpressionAttributeValues={':val2': 7})866 # Again we may need to wait for the event to appear on the stream:867 timeout = time.time() + 15868 while time.time() < timeout:869 response = dynamodbstreams.get_records(ShardIterator=iter)870 if 'Records' in response and response['Records'] != []:871 assert len(response['Records']) == 1872 assert response['Records'][0]['dynamodb']['Keys']== {'p': {'S': p}, 'c': {'S': c}}873 assert 'NextShardIterator' in response874 # The test is done, successfully.875 return876 time.sleep(0.5)877 pytest.fail("timed out")878 time.sleep(0.5)879 pytest.fail("timed out")880# Test the SequenceNumber attribute returned for stream events, and the881# "AT_SEQUENCE_NUMBER" iterator that can be used to re-read from the same882# event again given its saved "sequence number".883def test_streams_at_sequence_number(test_table_ss_keys_only, dynamodbstreams):884 table, arn = test_table_ss_keys_only885 # List all the shards and their LATEST iterators:886 shards_and_iterators = []887 for shard_id in list_shards(dynamodbstreams, arn):888 shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,889 ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))890 # Do an UpdateItem operation that is expected to leave one event in the891 # stream.892 p = random_string()893 c = random_string()894 table.update_item(Key={'p': p, 'c': c},895 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})896 # Eventually, *one* of the stream shards will return the one event:897 timeout = time.time() + 15898 while time.time() < timeout:899 for (shard_id, iter) in shards_and_iterators:900 response = dynamodbstreams.get_records(ShardIterator=iter)901 if 'Records' in response and response['Records'] != []:902 # Finally found the shard reporting the changes to this key:903 assert len(response['Records']) == 1904 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}905 assert 'NextShardIterator' in response906 sequence_number = response['Records'][0]['dynamodb']['SequenceNumber']907 # Found the shard with the data. It only has one event so if908 # we try to read again, we find nothing (this is the same as909 # what test_streams_last_result tests).910 iter = response['NextShardIterator']911 response = dynamodbstreams.get_records(ShardIterator=iter)912 assert response['Records'] == []913 assert 'NextShardIterator' in response914 # If we use the SequenceNumber of the first event to create an915 # AT_SEQUENCE_NUMBER iterator, we can read the same event again.916 # We don't need a loop and a timeout, because this event is already917 # available.918 iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,919 ShardId=shard_id, ShardIteratorType='AT_SEQUENCE_NUMBER',920 SequenceNumber=sequence_number)['ShardIterator']921 response = dynamodbstreams.get_records(ShardIterator=iter)922 assert 'Records' in response923 assert len(response['Records']) == 1924 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}925 assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number926 return927 time.sleep(0.5)928 pytest.fail("timed out")929# Test the SequenceNumber attribute returned for stream events, and the930# "AFTER_SEQUENCE_NUMBER" iterator that can be used to re-read *after* the same931# event again given its saved "sequence number".932def test_streams_after_sequence_number(test_table_ss_keys_only, dynamodbstreams):933 table, arn = test_table_ss_keys_only934 # List all the shards and their LATEST iterators:935 shards_and_iterators = []936 for shard_id in list_shards(dynamodbstreams, arn):937 shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,938 ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))939 # Do two UpdateItem operations to the same key, that are expected to leave940 # two events in the stream.941 p = random_string()942 c = random_string()943 table.update_item(Key={'p': p, 'c': c},944 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})945 table.update_item(Key={'p': p, 'c': c},946 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})947 # Eventually, *one* of the stream shards will return the two events:948 timeout = time.time() + 15949 while time.time() < timeout:950 for (shard_id, iter) in shards_and_iterators:951 response = dynamodbstreams.get_records(ShardIterator=iter)952 if 'Records' in response and len(response['Records']) == 2:953 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}954 assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}955 sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']956 sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']957 # #7424 - AWS sdk assumes sequence numbers can be compared958 # as bigints, and are monotonically growing.959 assert int(sequence_number_1) < int(sequence_number_2)960 # If we use the SequenceNumber of the first event to create an961 # AFTER_SEQUENCE_NUMBER iterator, we can read the second event962 # (only) again. We don't need a loop and a timeout, because this963 # event is already available.964 iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,965 ShardId=shard_id, ShardIteratorType='AFTER_SEQUENCE_NUMBER',966 SequenceNumber=sequence_number_1)['ShardIterator']967 response = dynamodbstreams.get_records(ShardIterator=iter)968 assert 'Records' in response969 assert len(response['Records']) == 1970 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}971 assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_2972 return973 time.sleep(0.5)974 pytest.fail("timed out")975# Test the "TRIM_HORIZON" iterator, which can be used to re-read *all* the976# previously-read events of the stream shard again.977# NOTE: This test relies on the test_table_ss_keys_only fixture giving us a978# brand new stream, with no old events saved from other tests. If we ever979# change this, we should change this test to use a different fixture.980def test_streams_trim_horizon(test_table_ss_keys_only, dynamodbstreams):981 table, arn = test_table_ss_keys_only982 # List all the shards and their LATEST iterators:983 shards_and_iterators = []984 for shard_id in list_shards(dynamodbstreams, arn):985 shards_and_iterators.append((shard_id, dynamodbstreams.get_shard_iterator(StreamArn=arn,986 ShardId=shard_id, ShardIteratorType='LATEST')['ShardIterator']))987 # Do two UpdateItem operations to the same key, that are expected to leave988 # two events in the stream.989 p = random_string()990 c = random_string()991 table.update_item(Key={'p': p, 'c': c},992 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 3})993 table.update_item(Key={'p': p, 'c': c},994 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})995 # Eventually, *one* of the stream shards will return the two events:996 timeout = time.time() + 15997 while time.time() < timeout:998 for (shard_id, iter) in shards_and_iterators:999 response = dynamodbstreams.get_records(ShardIterator=iter)1000 if 'Records' in response and len(response['Records']) == 2:1001 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1002 assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1003 sequence_number_1 = response['Records'][0]['dynamodb']['SequenceNumber']1004 sequence_number_2 = response['Records'][1]['dynamodb']['SequenceNumber']1005 # If we use the TRIM_HORIZON iterator, we should receive the1006 # same two events again, in the same order.1007 # Note that we assume that the fixture gave us a brand new1008 # stream, with no old events saved from other tests. If we1009 # couldn't assume this, this test would need to become much1010 # more complex, and would need to read from this shard until1011 # we find the two events we are looking for.1012 iter = dynamodbstreams.get_shard_iterator(StreamArn=arn,1013 ShardId=shard_id, ShardIteratorType='TRIM_HORIZON')['ShardIterator']1014 response = dynamodbstreams.get_records(ShardIterator=iter)1015 assert 'Records' in response1016 assert len(response['Records']) == 21017 assert response['Records'][0]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1018 assert response['Records'][1]['dynamodb']['Keys'] == {'p': {'S': p}, 'c': {'S': c}}1019 assert response['Records'][0]['dynamodb']['SequenceNumber'] == sequence_number_11020 assert response['Records'][1]['dynamodb']['SequenceNumber'] == sequence_number_21021 return1022 time.sleep(0.5)1023 pytest.fail("timed out")1024# Above we tested some specific operations in small tests aimed to reproduce1025# a specific bug, in the following tests we do a all the different operations,1026# PutItem, DeleteItem, BatchWriteItem and UpdateItem, and check the resulting1027# stream for correctness.1028# The following tests focus on mulitple operations on the *same* item. Those1029# should appear in the stream in the correct order.1030def do_updates_1(table, p, c):1031 events = []1032 # a first put_item appears as an INSERT event. Note also empty old_image.1033 table.put_item(Item={'p': p, 'c': c, 'x': 2})1034 events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'x': 2}])1035 # a second put_item of the *same* key and same value, doesn't appear in the log at all!1036 table.put_item(Item={'p': p, 'c': c, 'x': 2})1037 # a second put_item of the *same* key and different value, appears as a MODIFY event1038 table.put_item(Item={'p': p, 'c': c, 'y': 3})1039 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'x': 2}, {'p': p, 'c': c, 'y': 3}])1040 # deleting an item appears as a REMOVE event. Note no new_image at all, but there is an old_image.1041 table.delete_item(Key={'p': p, 'c': c})1042 events.append(['REMOVE', {'p': p, 'c': c}, {'p': p, 'c': c, 'y': 3}, None])1043 # deleting a non-existant item doesn't appear in the log at all.1044 table.delete_item(Key={'p': p, 'c': c})1045 # If update_item creates an item, the event is INSERT as well.1046 table.update_item(Key={'p': p, 'c': c},1047 UpdateExpression='SET b = :val1',1048 ExpressionAttributeValues={':val1': 4})1049 events.append(['INSERT', {'p': p, 'c': c}, None, {'p': p, 'c': c, 'b': 4}])1050 # If update_item modifies the item, note how old and new image includes both old and new columns1051 table.update_item(Key={'p': p, 'c': c},1052 UpdateExpression='SET x = :val1',1053 ExpressionAttributeValues={':val1': 5})1054 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4}, {'p': p, 'c': c, 'b': 4, 'x': 5}])1055 # TODO: incredibly, if we uncomment the "REMOVE b" update below, it will be1056 # completely missing from the DynamoDB stream - the test continues to1057 # pass even though we didn't add another expected event, and even though1058 # the preimage in the following expected event includes this "b" we will1059 # remove. I couldn't reproduce this apparant DynamoDB bug in a smaller test.1060 #table.update_item(Key={'p': p, 'c': c}, UpdateExpression='REMOVE b')1061 # Test BatchWriteItem as well. This modifies the item, so will be a MODIFY event.1062 table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p, 'c': c, 'x': 5}}}]})1063 events.append(['MODIFY', {'p': p, 'c': c}, {'p': p, 'c': c, 'b': 4, 'x': 5}, {'p': p, 'c': c, 'x': 5}])1064 return events1065@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1066def test_streams_1_keys_only(test_table_ss_keys_only, dynamodbstreams):1067 do_test(test_table_ss_keys_only, dynamodbstreams, do_updates_1, 'KEYS_ONLY')1068@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1069def test_streams_1_new_image(test_table_ss_new_image, dynamodbstreams):1070 do_test(test_table_ss_new_image, dynamodbstreams, do_updates_1, 'NEW_IMAGE')1071@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1072def test_streams_1_old_image(test_table_ss_old_image, dynamodbstreams):1073 do_test(test_table_ss_old_image, dynamodbstreams, do_updates_1, 'OLD_IMAGE')1074@pytest.mark.xfail(reason="Currently fails - because of multiple issues listed above")1075def test_streams_1_new_and_old_images(test_table_ss_new_and_old_images, dynamodbstreams):1076 do_test(test_table_ss_new_and_old_images, dynamodbstreams, do_updates_1, 'NEW_AND_OLD_IMAGES')1077# A fixture which creates a test table with a stream enabled, and returns a1078# bunch of interesting information collected from the CreateTable response.1079# This fixture is session-scoped - it can be shared by multiple tests below,1080# because we are not going to actually use or change this stream, we will1081# just do multiple tests on its setup.1082@pytest.fixture(scope="session")1083def test_table_stream_with_result(dynamodb, dynamodbstreams):1084 tablename = test_table_name()1085 result = dynamodb.meta.client.create_table(TableName=tablename,1086 BillingMode='PAY_PER_REQUEST',1087 StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'},1088 KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },1089 { 'AttributeName': 'c', 'KeyType': 'RANGE' }1090 ],1091 AttributeDefinitions=[1092 { 'AttributeName': 'p', 'AttributeType': 'S' },1093 { 'AttributeName': 'c', 'AttributeType': 'S' },1094 ])1095 waiter = dynamodb.meta.client.get_waiter('table_exists')1096 waiter.config.delay = 11097 waiter.config.max_attempts = 2001098 waiter.wait(TableName=tablename)1099 table = dynamodb.Table(tablename)1100 yield result, table1101 while True:1102 try:1103 table.delete()1104 return1105 except ClientError as ce:1106 # if the table has a stream currently being created we cannot1107 # delete the table immediately. Again, only with real dynamo1108 if ce.response['Error']['Code'] == 'ResourceInUseException':1109 print('Could not delete table yet. Sleeping 5s.')1110 time.sleep(5)1111 continue;1112 raise1113# Test that in a table with Streams enabled, LatestStreamArn is returned1114# by CreateTable, DescribeTable and UpdateTable, and is the same ARN as1115# returned by ListStreams. Reproduces issue #7157.1116def test_latest_stream_arn(test_table_stream_with_result, dynamodbstreams):1117 (result, table) = test_table_stream_with_result1118 assert 'LatestStreamArn' in result['TableDescription']1119 arn_in_create_table = result['TableDescription']['LatestStreamArn']1120 # Check that ListStreams returns the same stream ARN as returned1121 # by the original CreateTable1122 (arn_in_list_streams, label) = wait_for_active_stream(dynamodbstreams, table)1123 assert arn_in_create_table == arn_in_list_streams1124 # Check that DescribeTable also includes the same LatestStreamArn:1125 desc = table.meta.client.describe_table(TableName=table.name)['Table']1126 assert 'LatestStreamArn' in desc1127 assert desc['LatestStreamArn'] == arn_in_create_table1128 # Check that UpdateTable also includes the same LatestStreamArn.1129 # The "update" changes nothing (it just sets BillingMode to what it was).1130 desc = table.meta.client.update_table(TableName=table.name,1131 BillingMode='PAY_PER_REQUEST')['TableDescription']1132 assert desc['LatestStreamArn'] == arn_in_create_table1133# Test that in a table with Streams enabled, LatestStreamLabel is returned1134# by CreateTable, DescribeTable and UpdateTable, and is the same "label" as1135# returned by ListStreams. Reproduces issue #7162.1136def test_latest_stream_label(test_table_stream_with_result, dynamodbstreams):1137 (result, table) = test_table_stream_with_result1138 assert 'LatestStreamLabel' in result['TableDescription']1139 label_in_create_table = result['TableDescription']['LatestStreamLabel']1140 # Check that ListStreams returns the same stream label as returned1141 # by the original CreateTable1142 (arn, label) = wait_for_active_stream(dynamodbstreams, table)1143 assert label_in_create_table == label1144 # Check that DescribeTable also includes the same LatestStreamLabel:1145 desc = table.meta.client.describe_table(TableName=table.name)['Table']1146 assert 'LatestStreamLabel' in desc1147 assert desc['LatestStreamLabel'] == label_in_create_table1148 # Check that UpdateTable also includes the same LatestStreamLabel.1149 # The "update" changes nothing (it just sets BillingMode to what it was).1150 desc = table.meta.client.update_table(TableName=table.name,1151 BillingMode='PAY_PER_REQUEST')['TableDescription']1152 assert desc['LatestStreamLabel'] == label_in_create_table1153# Test that in a table with Streams enabled, StreamSpecification is returned1154# by CreateTable, DescribeTable and UpdateTable. Reproduces issue #7163.1155def test_stream_specification(test_table_stream_with_result, dynamodbstreams):1156 # StreamSpecification as set in test_table_stream_with_result:1157 stream_specification = {'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}1158 (result, table) = test_table_stream_with_result1159 assert 'StreamSpecification' in result['TableDescription']1160 assert stream_specification == result['TableDescription']['StreamSpecification']1161 # Check that DescribeTable also includes the same StreamSpecification:1162 desc = table.meta.client.describe_table(TableName=table.name)['Table']1163 assert 'StreamSpecification' in desc1164 assert stream_specification == desc['StreamSpecification']1165 # Check that UpdateTable also includes the same StreamSpecification.1166 # The "update" changes nothing (it just sets BillingMode to what it was).1167 desc = table.meta.client.update_table(TableName=table.name,1168 BillingMode='PAY_PER_REQUEST')['TableDescription']1169 assert stream_specification == desc['StreamSpecification']1170# The following test checks the behavior of *closed* shards.1171# We achieve a closed shard by disabling the stream - the DynamoDB1172# documentation states that "If you disable a stream, any shards that are1173# still open will be closed. The data in the stream will continue to be1174# readable for 24 hours". In the test we verify that indeed, after a shard1175# is closed, it is still readable with GetRecords (reproduces issue #7239).1176# Moreover, upon reaching the end of data in the shard, the NextShardIterator1177# attribute should say that the end was reached. The DynamoDB documentation1178# says that NextShardIterator should be "set to null" in this case - but it1179# is not clear what "null" means in this context: Should NextShardIterator1180# be missing? Or a "null" JSON type? Or an empty string? This test verifies1181# that the right answer is that NextShardIterator should be *missing*1182# (reproduces issue #7237).1183@pytest.mark.xfail(reason="disabled stream is deleted - issue #7239")1184def test_streams_closed_read(test_table_ss_keys_only, dynamodbstreams):1185 table, arn = test_table_ss_keys_only1186 iterators = latest_iterators(dynamodbstreams, arn)1187 # Do an UpdateItem operation that is expected to leave one event in the1188 # stream.1189 table.update_item(Key={'p': random_string(), 'c': random_string()},1190 UpdateExpression='SET x = :val1', ExpressionAttributeValues={':val1': 5})1191 # Disable streaming for this table. Note that the test_table_ss_keys_only1192 # fixture has "function" scope so it is fine to ruin table, it will not1193 # be used in other tests.1194 disable_stream(dynamodbstreams, table)1195 # Even after streaming is disabled for the table, we can still read1196 # from the earlier stream (it is guaranteed to work for 24 hours).1197 # The iterators we got earlier should still be fully usable, and1198 # eventually *one* of the stream shards will return one event:1199 timeout = time.time() + 151200 while time.time() < timeout:1201 for iter in iterators:1202 response = dynamodbstreams.get_records(ShardIterator=iter)1203 if 'Records' in response and response['Records'] != []:1204 # Found the shard with the data! Test that it only has1205 # one event. NextShardIterator should either be missing now,1206 # indicating that it is a closed shard (DynamoDB does this),1207 # or, it may (and currently does in Alternator) return another1208 # and reading from *that* iterator should then tell us that1209 # we reached the end of the shard (i.e., zero results and1210 # missing NextShardIterator).1211 assert len(response['Records']) == 11212 if 'NextShardIterator' in response:1213 response = dynamodbstreams.get_records(ShardIterator=response['NextShardIterator'])1214 assert len(response['Records']) == 01215 assert not 'NextShardIterator' in response1216 return1217 time.sleep(0.5)1218 pytest.fail("timed out")1219# TODO: tests on multiple partitions1220# TODO: write a test that disabling the stream and re-enabling it works, but1221# requires the user to wait for the first stream to become DISABLED before1222# creating the new one. Then ListStreams should return the two streams,1223# one DISABLED and one ENABLED? I'm not sure we want or can do this in1224# Alternator.1225# TODO: Can we test shard splitting? (shard splitting1226# requires the user to - periodically or following shards ending - to call...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1from __future__ import unicode_literals2from .models import dynamodbstreams_backends3from ..core.models import base_decorator4dynamodbstreams_backend = dynamodbstreams_backends["us-east-1"]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful