How to use test_scheduled_lambda method in localstack

Best Python code snippet using localstack_python

test_integration.py

Source:test_integration.py Github

copy

Full Screen

...45"""46class IntegrationTest(unittest.TestCase):47 @classmethod48 def setUpClass(cls):49 # Note: create scheduled Lambda here - assertions will be run in test_scheduled_lambda() below..50 # create test Lambda51 cls.scheduled_lambda_name = "scheduled-%s" % short_uid()52 handler_file = new_tmp_file()53 save_file(handler_file, TEST_HANDLER)54 resp = testutil.create_lambda_function(55 handler_file=handler_file, func_name=cls.scheduled_lambda_name56 )57 func_arn = resp["CreateFunctionResponse"]["FunctionArn"]58 # create scheduled Lambda function59 rule_name = "rule-%s" % short_uid()60 events = aws_stack.create_external_boto_client("events")61 events.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")62 events.put_targets(63 Rule=rule_name, Targets=[{"Id": "target-%s" % short_uid(), "Arn": func_arn}]64 )65 @classmethod66 def tearDownClass(cls):67 testutil.delete_lambda_function(cls.scheduled_lambda_name)68 def test_firehose_s3(self):69 s3_resource = aws_stack.connect_to_resource("s3")70 firehose = aws_stack.create_external_boto_client("firehose")71 s3_prefix = "/testdata"72 test_data = '{"test": "firehose_data_%s"}' % short_uid()73 # create Firehose stream74 stream = firehose.create_delivery_stream(75 DeliveryStreamName=TEST_FIREHOSE_NAME,76 S3DestinationConfiguration={77 "RoleARN": aws_stack.iam_resource_arn("firehose"),78 "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),79 "Prefix": s3_prefix,80 },81 Tags=TEST_TAGS,82 )83 self.assertTrue(stream)84 self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])85 tags = firehose.list_tags_for_delivery_stream(DeliveryStreamName=TEST_FIREHOSE_NAME)86 self.assertEqual(TEST_TAGS, tags["Tags"])87 # create target S3 bucket88 s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)89 # put records90 firehose.put_record(91 DeliveryStreamName=TEST_FIREHOSE_NAME, Record={"Data": to_bytes(test_data)}92 )93 # check records in target bucket94 all_objects = testutil.list_all_s3_objects()95 testutil.assert_objects(json.loads(to_str(test_data)), all_objects)96 # check file layout in target bucket97 all_objects = testutil.map_all_s3_objects(buckets=[TEST_BUCKET_NAME])98 for key in all_objects.keys():99 self.assertRegex(key, r".*/\d{4}/\d{2}/\d{2}/\d{2}/.*\-\d{4}\-\d{2}\-\d{2}\-\d{2}.*")100 def test_firehose_extended_s3(self):101 s3_resource = aws_stack.connect_to_resource("s3")102 firehose = aws_stack.create_external_boto_client("firehose")103 s3_prefix = "/testdata2"104 test_data = '{"test": "firehose_data_%s"}' % short_uid()105 # create Firehose stream106 stream = firehose.create_delivery_stream(107 DeliveryStreamName=TEST_FIREHOSE_NAME,108 ExtendedS3DestinationConfiguration={109 "RoleARN": aws_stack.iam_resource_arn("firehose"),110 "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),111 "Prefix": s3_prefix,112 },113 Tags=TEST_TAGS,114 )115 self.assertTrue(stream)116 self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])117 tags = firehose.list_tags_for_delivery_stream(DeliveryStreamName=TEST_FIREHOSE_NAME)118 self.assertEqual(TEST_TAGS, tags["Tags"])119 s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)120 # put records121 firehose.put_record(122 DeliveryStreamName=TEST_FIREHOSE_NAME, Record={"Data": to_bytes(test_data)}123 )124 # check records in target bucket125 all_objects = testutil.list_all_s3_objects()126 testutil.assert_objects(json.loads(to_str(test_data)), all_objects)127 # check file layout in target bucket128 all_objects = testutil.map_all_s3_objects(buckets=[TEST_BUCKET_NAME])129 for key in all_objects.keys():130 self.assertRegex(key, r".*/\d{4}/\d{2}/\d{2}/\d{2}/.*\-\d{4}\-\d{2}\-\d{2}\-\d{2}.*")131 def test_firehose_kinesis_to_s3(self):132 kinesis = aws_stack.create_external_boto_client("kinesis")133 s3_resource = aws_stack.connect_to_resource("s3")134 firehose = aws_stack.create_external_boto_client("firehose")135 aws_stack.create_kinesis_stream(TEST_STREAM_NAME, delete=True)136 s3_prefix = "/testdata"137 test_data = '{"test": "firehose_data_%s"}' % short_uid()138 # create Firehose stream139 stream = firehose.create_delivery_stream(140 DeliveryStreamType="KinesisStreamAsSource",141 KinesisStreamSourceConfiguration={142 "RoleARN": aws_stack.iam_resource_arn("firehose"),143 "KinesisStreamARN": aws_stack.kinesis_stream_arn(TEST_STREAM_NAME),144 },145 DeliveryStreamName=TEST_FIREHOSE_NAME,146 S3DestinationConfiguration={147 "RoleARN": aws_stack.iam_resource_arn("firehose"),148 "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),149 "Prefix": s3_prefix,150 },151 )152 self.assertTrue(stream)153 self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])154 # create target S3 bucket155 s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)156 # put records157 kinesis.put_record(158 Data=to_bytes(test_data), PartitionKey="testId", StreamName=TEST_STREAM_NAME159 )160 time.sleep(3)161 # check records in target bucket162 all_objects = testutil.list_all_s3_objects()163 testutil.assert_objects(json.loads(to_str(test_data)), all_objects)164 def test_lambda_streams_batch_and_transactions(self):165 ddb_lease_table_suffix = "-kclapp2"166 table_name = TEST_TABLE_NAME + "lsbat" + ddb_lease_table_suffix167 stream_name = TEST_STREAM_NAME168 lambda_ddb_name = "lambda-ddb-%s" % short_uid()169 dynamodb = aws_stack.create_external_boto_client("dynamodb", client=True)170 dynamodb_service = aws_stack.create_external_boto_client("dynamodb")171 dynamodbstreams = aws_stack.create_external_boto_client("dynamodbstreams")172 LOGGER.info("Creating test streams...")173 run_safe(174 lambda: dynamodb_service.delete_table(TableName=stream_name + ddb_lease_table_suffix),175 print_error=False,176 )177 aws_stack.create_kinesis_stream(stream_name, delete=True)178 events = []179 # subscribe to inbound Kinesis stream180 def process_records(records, shard_id):181 events.extend(records)182 # start the KCL client process in the background183 kinesis_connector.listen_to_kinesis(184 stream_name,185 listener_func=process_records,186 wait_until_started=True,187 ddb_lease_table_suffix=ddb_lease_table_suffix,188 )189 LOGGER.info("Kinesis consumer initialized.")190 # create table with stream forwarding config191 aws_stack.create_dynamodb_table(192 table_name,193 partition_key=PARTITION_KEY,194 stream_view_type="NEW_AND_OLD_IMAGES",195 )196 # list DDB streams and make sure the table stream is there197 streams = dynamodbstreams.list_streams()198 ddb_event_source_arn = None199 for stream in streams["Streams"]:200 if stream["TableName"] == table_name:201 ddb_event_source_arn = stream["StreamArn"]202 self.assertTrue(ddb_event_source_arn)203 # deploy test lambda connected to DynamoDB Stream204 testutil.create_lambda_function(205 handler_file=TEST_LAMBDA_PYTHON,206 libs=TEST_LAMBDA_LIBS,207 func_name=lambda_ddb_name,208 event_source_arn=ddb_event_source_arn,209 starting_position="TRIM_HORIZON",210 delete=True,211 )212 # submit a batch with writes213 dynamodb.batch_write_item(214 RequestItems={215 table_name: [216 {217 "PutRequest": {218 "Item": {219 PARTITION_KEY: {"S": "testId0"},220 "data": {"S": "foobar123"},221 }222 }223 },224 {225 "PutRequest": {226 "Item": {227 PARTITION_KEY: {"S": "testId1"},228 "data": {"S": "foobar123"},229 }230 }231 },232 {233 "PutRequest": {234 "Item": {235 PARTITION_KEY: {"S": "testId2"},236 "data": {"S": "foobar123"},237 }238 }239 },240 ]241 }242 )243 # submit a batch with writes and deletes244 dynamodb.batch_write_item(245 RequestItems={246 table_name: [247 {248 "PutRequest": {249 "Item": {250 PARTITION_KEY: {"S": "testId3"},251 "data": {"S": "foobar123"},252 }253 }254 },255 {256 "PutRequest": {257 "Item": {258 PARTITION_KEY: {"S": "testId4"},259 "data": {"S": "foobar123"},260 }261 }262 },263 {264 "PutRequest": {265 "Item": {266 PARTITION_KEY: {"S": "testId5"},267 "data": {"S": "foobar123"},268 }269 }270 },271 {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId0"}}}},272 {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId1"}}}},273 {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId2"}}}},274 ]275 }276 )277 # submit a transaction with writes and delete278 dynamodb.transact_write_items(279 TransactItems=[280 {281 "Put": {282 "TableName": table_name,283 "Item": {284 PARTITION_KEY: {"S": "testId6"},285 "data": {"S": "foobar123"},286 },287 }288 },289 {290 "Put": {291 "TableName": table_name,292 "Item": {293 PARTITION_KEY: {"S": "testId7"},294 "data": {"S": "foobar123"},295 },296 }297 },298 {299 "Put": {300 "TableName": table_name,301 "Item": {302 PARTITION_KEY: {"S": "testId8"},303 "data": {"S": "foobar123"},304 },305 }306 },307 {308 "Delete": {309 "TableName": table_name,310 "Key": {PARTITION_KEY: {"S": "testId3"}},311 }312 },313 {314 "Delete": {315 "TableName": table_name,316 "Key": {PARTITION_KEY: {"S": "testId4"}},317 }318 },319 {320 "Delete": {321 "TableName": table_name,322 "Key": {PARTITION_KEY: {"S": "testId5"}},323 }324 },325 ]326 )327 # submit a batch with a put over existing item328 dynamodb.transact_write_items(329 TransactItems=[330 {331 "Put": {332 "TableName": table_name,333 "Item": {334 PARTITION_KEY: {"S": "testId6"},335 "data": {"S": "foobar123_updated1"},336 },337 }338 },339 ]340 )341 # submit a transaction with a put over existing item342 dynamodb.transact_write_items(343 TransactItems=[344 {345 "Put": {346 "TableName": table_name,347 "Item": {348 PARTITION_KEY: {"S": "testId7"},349 "data": {"S": "foobar123_updated1"},350 },351 }352 },353 ]354 )355 # submit a transaction with updates356 dynamodb.transact_write_items(357 TransactItems=[358 {359 "Update": {360 "TableName": table_name,361 "Key": {PARTITION_KEY: {"S": "testId6"}},362 "UpdateExpression": "SET #0 = :0",363 "ExpressionAttributeNames": {"#0": "data"},364 "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},365 }366 },367 {368 "Update": {369 "TableName": table_name,370 "Key": {PARTITION_KEY: {"S": "testId7"}},371 "UpdateExpression": "SET #0 = :0",372 "ExpressionAttributeNames": {"#0": "data"},373 "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},374 }375 },376 {377 "Update": {378 "TableName": table_name,379 "Key": {PARTITION_KEY: {"S": "testId8"}},380 "UpdateExpression": "SET #0 = :0",381 "ExpressionAttributeNames": {"#0": "data"},382 "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},383 }384 },385 ]386 )387 LOGGER.info("Waiting some time before finishing test.")388 time.sleep(2)389 num_insert = 9390 num_modify = 5391 num_delete = 6392 num_events = num_insert + num_modify + num_delete393 def check_events():394 if len(events) != num_events:395 msg = "DynamoDB updates retrieved (actual/expected): %s/%s" % (396 len(events),397 num_events,398 )399 LOGGER.warning(msg)400 self.assertEqual(num_events, len(events))401 event_items = [json.loads(base64.b64decode(e["data"])) for e in events]402 # make sure the we have the right amount of expected event types403 inserts = [e for e in event_items if e.get("__action_type") == "INSERT"]404 modifies = [e for e in event_items if e.get("__action_type") == "MODIFY"]405 removes = [e for e in event_items if e.get("__action_type") == "REMOVE"]406 self.assertEqual(num_insert, len(inserts))407 self.assertEqual(num_modify, len(modifies))408 self.assertEqual(num_delete, len(removes))409 # assert that all inserts were received410 for i, event in enumerate(inserts):411 self.assertNotIn("old_image", event)412 item_id = "testId%d" % i413 matching = [i for i in inserts if i["new_image"]["id"] == item_id][0]414 self.assertEqual({"id": item_id, "data": "foobar123"}, matching["new_image"])415 # assert that all updates were received416 def assert_updates(expected_updates, modifies):417 def found(update):418 for modif in modifies:419 if modif["old_image"]["id"] == update["id"]:420 self.assertEqual(421 modif["old_image"],422 {"id": update["id"], "data": update["old"]},423 )424 self.assertEqual(425 modif["new_image"],426 {"id": update["id"], "data": update["new"]},427 )428 return True429 for update in expected_updates:430 self.assertTrue(found(update))431 updates1 = [432 {"id": "testId6", "old": "foobar123", "new": "foobar123_updated1"},433 {"id": "testId7", "old": "foobar123", "new": "foobar123_updated1"},434 ]435 updates2 = [436 {437 "id": "testId6",438 "old": "foobar123_updated1",439 "new": "foobar123_updated2",440 },441 {442 "id": "testId7",443 "old": "foobar123_updated1",444 "new": "foobar123_updated2",445 },446 {"id": "testId8", "old": "foobar123", "new": "foobar123_updated2"},447 ]448 assert_updates(updates1, modifies[:2])449 assert_updates(updates2, modifies[2:])450 # assert that all removes were received451 for i, event in enumerate(removes):452 self.assertNotIn("new_image", event)453 item_id = "testId%d" % i454 matching = [i for i in removes if i["old_image"]["id"] == item_id][0]455 self.assertEqual({"id": item_id, "data": "foobar123"}, matching["old_image"])456 # this can take a long time in CI, make sure we give it enough time/retries457 retry(check_events, retries=30, sleep=4)458 # clean up459 testutil.delete_lambda_function(lambda_ddb_name)460 def test_scheduled_lambda(self):461 def check_invocation(*args):462 log_events = get_lambda_logs(self.scheduled_lambda_name)463 self.assertGreater(len(log_events), 0)464 # wait for up to 1 min for invocations to get triggered465 retry(check_invocation, retries=14, sleep=5)466@pytest.mark.skip(reason="This test is notoriously flaky in CI environments") # FIXME467def test_sqs_batch_lambda_forward(lambda_client, sqs_client, create_lambda_function):468 lambda_name_queue_batch = "lambda_queue_batch-%s" % short_uid()469 # deploy test lambda connected to SQS queue470 sqs_queue_info = testutil.create_sqs_queue(lambda_name_queue_batch)471 queue_url = sqs_queue_info["QueueUrl"]472 resp = create_lambda_function(473 handler_file=TEST_LAMBDA_PYTHON_ECHO,474 func_name=lambda_name_queue_batch,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful