How to use test_lambda_streams_batch_and_transactions method in localstack

Best Python code snippet using localstack_python

test_integration.py

Source:test_integration.py Github

copy

Full Screen

...99 time.sleep(3)100 # check records in target bucket101 all_objects = testutil.list_all_s3_objects()102 testutil.assert_objects(json.loads(to_str(test_data)), all_objects)103 # TODO fix duplication with test_lambda_streams_batch_and_transactions(..)!104 # @profiled()105 def test_kinesis_lambda_sns_ddb_sqs_streams(self):106 ddb_lease_table_suffix = '-kclapp'107 table_name = TEST_TABLE_NAME + 'klsdss' + ddb_lease_table_suffix108 stream_name = TEST_STREAM_NAME109 dynamodb = aws_stack.connect_to_resource('dynamodb')110 dynamodb_service = aws_stack.connect_to_service('dynamodb')111 dynamodbstreams = aws_stack.connect_to_service('dynamodbstreams')112 kinesis = aws_stack.connect_to_service('kinesis')113 sns = aws_stack.connect_to_service('sns')114 sqs = aws_stack.connect_to_service('sqs')115 LOGGER.info('Creating test streams...')116 run_safe(lambda: dynamodb_service.delete_table(117 TableName=stream_name + ddb_lease_table_suffix), print_error=False)118 aws_stack.create_kinesis_stream(stream_name, delete=True)119 aws_stack.create_kinesis_stream(TEST_LAMBDA_SOURCE_STREAM_NAME)120 events = []121 # subscribe to inbound Kinesis stream122 def process_records(records, shard_id):123 events.extend(records)124 # start the KCL client process in the background125 kinesis_connector.listen_to_kinesis(stream_name, listener_func=process_records,126 wait_until_started=True, ddb_lease_table_suffix=ddb_lease_table_suffix)127 LOGGER.info('Kinesis consumer initialized.')128 # create table with stream forwarding config129 aws_stack.create_dynamodb_table(table_name, partition_key=PARTITION_KEY,130 stream_view_type='NEW_AND_OLD_IMAGES')131 # list DDB streams and make sure the table stream is there132 streams = dynamodbstreams.list_streams()133 ddb_event_source_arn = None134 for stream in streams['Streams']:135 if stream['TableName'] == table_name:136 ddb_event_source_arn = stream['StreamArn']137 self.assertTrue(ddb_event_source_arn)138 # deploy test lambda connected to DynamoDB Stream139 zip_file = testutil.create_lambda_archive(load_file(TEST_LAMBDA_PYTHON), get_content=True,140 libs=TEST_LAMBDA_LIBS, runtime=LAMBDA_RUNTIME_PYTHON27)141 testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_DDB,142 zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27, delete=True)143 # make sure we cannot create Lambda with same name twice144 assert_raises(Exception, testutil.create_lambda_function, func_name=TEST_LAMBDA_NAME_DDB,145 zip_file=zip_file, event_source_arn=ddb_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)146 # deploy test lambda connected to Kinesis Stream147 kinesis_event_source_arn = kinesis.describe_stream(148 StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)['StreamDescription']['StreamARN']149 testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_STREAM,150 zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)151 # deploy test lambda connected to SQS queue152 sqs_queue_info = testutil.create_sqs_queue(TEST_LAMBDA_NAME_QUEUE)153 testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_QUEUE,154 zip_file=zip_file, event_source_arn=sqs_queue_info['QueueArn'], runtime=LAMBDA_RUNTIME_PYTHON27)155 # set number of items to update/put to table156 num_events_ddb = 15157 num_put_new_items = 5158 num_put_existing_items = 2159 num_batch_items = 3160 num_updates_ddb = num_events_ddb - num_put_new_items - num_put_existing_items - num_batch_items161 LOGGER.info('Putting %s items to table...' % num_events_ddb)162 table = dynamodb.Table(table_name)163 for i in range(0, num_put_new_items):164 table.put_item(Item={165 PARTITION_KEY: 'testId%s' % i,166 'data': 'foobar123'167 })168 # Put items with an already existing ID (fix https://github.com/localstack/localstack/issues/522)169 for i in range(0, num_put_existing_items):170 table.put_item(Item={171 PARTITION_KEY: 'testId%s' % i,172 'data': 'foobar123_put_existing'173 })174 # batch write some items containing non-ASCII characters175 dynamodb.batch_write_item(RequestItems={table_name: [176 {'PutRequest': {'Item': {PARTITION_KEY: short_uid(), 'data': 'foobar123 ✓'}}},177 {'PutRequest': {'Item': {PARTITION_KEY: short_uid(), 'data': 'foobar123 £'}}},178 {'PutRequest': {'Item': {PARTITION_KEY: short_uid(), 'data': 'foobar123 ¢'}}}179 ]})180 # update some items, which also triggers notification events181 for i in range(0, num_updates_ddb):182 dynamodb_service.update_item(TableName=table_name,183 Key={PARTITION_KEY: {'S': 'testId%s' % i}},184 AttributeUpdates={'data': {185 'Action': 'PUT',186 'Value': {'S': 'foobar123_updated'}187 }})188 # put items to stream189 num_events_kinesis = 10190 LOGGER.info('Putting %s items to stream...' % num_events_kinesis)191 kinesis.put_records(192 Records=[193 {194 'Data': '{}',195 'PartitionKey': 'testId%s' % i196 } for i in range(0, num_events_kinesis)197 ], StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME198 )199 # put 1 item to stream that will trigger an error in the Lambda200 kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG,201 PartitionKey='testIderror', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)202 # create SNS topic, connect it to the Lambda, publish test messages203 num_events_sns = 3204 response = sns.create_topic(Name=TEST_TOPIC_NAME)205 sns.subscribe(TopicArn=response['TopicArn'], Protocol='lambda',206 Endpoint=aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_STREAM))207 for i in range(0, num_events_sns):208 sns.publish(TopicArn=response['TopicArn'], Message='test message %s' % i)209 # get latest records210 latest = aws_stack.kinesis_get_latest_records(TEST_LAMBDA_SOURCE_STREAM_NAME,211 shard_id='shardId-000000000000', count=10)212 self.assertEqual(len(latest), 10)213 # send messages to SQS queue214 num_events_sqs = 4215 for i in range(num_events_sqs):216 sqs.send_message(QueueUrl=sqs_queue_info['QueueUrl'], MessageBody=str(i))217 LOGGER.info('Waiting some time before finishing test.')218 time.sleep(2)219 num_events_lambda = num_events_ddb + num_events_sns + num_events_sqs220 num_events = num_events_lambda + num_events_kinesis221 def check_events():222 if len(events) != num_events:223 LOGGER.warning(('DynamoDB and Kinesis updates retrieved (actual/expected): %s/%s') %224 (len(events), num_events))225 self.assertEqual(len(events), num_events)226 event_items = [json.loads(base64.b64decode(e['data'])) for e in events]227 # make sure the we have the right amount of INSERT/MODIFY event types228 inserts = [e for e in event_items if e.get('__action_type') == 'INSERT']229 modifies = [e for e in event_items if e.get('__action_type') == 'MODIFY']230 self.assertEqual(len(inserts), num_put_new_items + num_batch_items)231 self.assertEqual(len(modifies), num_put_existing_items + num_updates_ddb)232 # this can take a long time in CI, make sure we give it enough time/retries233 retry(check_events, retries=9, sleep=3)234 # check cloudwatch notifications235 num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM)236 # TODO: It seems that CloudWatch is currently reporting an incorrect number of237 # invocations, namely the sum over *all* lambdas, not the single one we're asking for.238 # Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single239 # Lambda invocation may happen with a set of Kinesis records, hence we cannot simply240 # add num_events_ddb to num_events_lambda above!241 # self.assertEqual(num_invocations, 2 + num_events_lambda)242 self.assertGreater(num_invocations, num_events_sns + num_events_sqs)243 num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors')244 self.assertEqual(num_error_invocations, 1)245 # clean up246 testutil.delete_lambda_function(TEST_LAMBDA_NAME_STREAM)247 testutil.delete_lambda_function(TEST_LAMBDA_NAME_DDB)248 def test_lambda_streams_batch_and_transactions(self):249 ddb_lease_table_suffix = '-kclapp2'250 table_name = TEST_TABLE_NAME + 'lsbat' + ddb_lease_table_suffix251 stream_name = TEST_STREAM_NAME252 dynamodb = aws_stack.connect_to_service('dynamodb', client=True)253 dynamodb_service = aws_stack.connect_to_service('dynamodb')254 dynamodbstreams = aws_stack.connect_to_service('dynamodbstreams')255 LOGGER.info('Creating test streams...')256 run_safe(lambda: dynamodb_service.delete_table(257 TableName=stream_name + ddb_lease_table_suffix), print_error=False)258 aws_stack.create_kinesis_stream(stream_name, delete=True)259 events = []260 # subscribe to inbound Kinesis stream261 def process_records(records, shard_id):262 events.extend(records)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful