Best Python code snippet using localstack_python
test_integration.py
Source:test_integration.py  
...45"""46class IntegrationTest(unittest.TestCase):47    @classmethod48    def setUpClass(cls):49        # Note: create scheduled Lambda here - assertions will be run in test_scheduled_lambda() below..50        # create test Lambda51        cls.scheduled_lambda_name = "scheduled-%s" % short_uid()52        handler_file = new_tmp_file()53        save_file(handler_file, TEST_HANDLER)54        resp = testutil.create_lambda_function(55            handler_file=handler_file, func_name=cls.scheduled_lambda_name56        )57        func_arn = resp["CreateFunctionResponse"]["FunctionArn"]58        # create scheduled Lambda function59        rule_name = "rule-%s" % short_uid()60        events = aws_stack.create_external_boto_client("events")61        events.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")62        events.put_targets(63            Rule=rule_name, Targets=[{"Id": "target-%s" % short_uid(), "Arn": func_arn}]64        )65    @classmethod66    def tearDownClass(cls):67        testutil.delete_lambda_function(cls.scheduled_lambda_name)68    def test_firehose_s3(self):69        s3_resource = aws_stack.connect_to_resource("s3")70        firehose = aws_stack.create_external_boto_client("firehose")71        s3_prefix = "/testdata"72        test_data = '{"test": "firehose_data_%s"}' % short_uid()73        # create Firehose stream74        stream = firehose.create_delivery_stream(75            DeliveryStreamName=TEST_FIREHOSE_NAME,76            S3DestinationConfiguration={77                "RoleARN": aws_stack.iam_resource_arn("firehose"),78                "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),79                "Prefix": s3_prefix,80            },81            Tags=TEST_TAGS,82        )83        self.assertTrue(stream)84        self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])85        tags = firehose.list_tags_for_delivery_stream(DeliveryStreamName=TEST_FIREHOSE_NAME)86        self.assertEqual(TEST_TAGS, tags["Tags"])87        # create target S3 bucket88        s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)89        # put records90        firehose.put_record(91            DeliveryStreamName=TEST_FIREHOSE_NAME, Record={"Data": to_bytes(test_data)}92        )93        # check records in target bucket94        all_objects = testutil.list_all_s3_objects()95        testutil.assert_objects(json.loads(to_str(test_data)), all_objects)96        # check file layout in target bucket97        all_objects = testutil.map_all_s3_objects(buckets=[TEST_BUCKET_NAME])98        for key in all_objects.keys():99            self.assertRegex(key, r".*/\d{4}/\d{2}/\d{2}/\d{2}/.*\-\d{4}\-\d{2}\-\d{2}\-\d{2}.*")100    def test_firehose_extended_s3(self):101        s3_resource = aws_stack.connect_to_resource("s3")102        firehose = aws_stack.create_external_boto_client("firehose")103        s3_prefix = "/testdata2"104        test_data = '{"test": "firehose_data_%s"}' % short_uid()105        # create Firehose stream106        stream = firehose.create_delivery_stream(107            DeliveryStreamName=TEST_FIREHOSE_NAME,108            ExtendedS3DestinationConfiguration={109                "RoleARN": aws_stack.iam_resource_arn("firehose"),110                "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),111                "Prefix": s3_prefix,112            },113            Tags=TEST_TAGS,114        )115        self.assertTrue(stream)116        self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])117        tags = firehose.list_tags_for_delivery_stream(DeliveryStreamName=TEST_FIREHOSE_NAME)118        self.assertEqual(TEST_TAGS, tags["Tags"])119        s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)120        # put records121        firehose.put_record(122            DeliveryStreamName=TEST_FIREHOSE_NAME, Record={"Data": to_bytes(test_data)}123        )124        # check records in target bucket125        all_objects = testutil.list_all_s3_objects()126        testutil.assert_objects(json.loads(to_str(test_data)), all_objects)127        # check file layout in target bucket128        all_objects = testutil.map_all_s3_objects(buckets=[TEST_BUCKET_NAME])129        for key in all_objects.keys():130            self.assertRegex(key, r".*/\d{4}/\d{2}/\d{2}/\d{2}/.*\-\d{4}\-\d{2}\-\d{2}\-\d{2}.*")131    def test_firehose_kinesis_to_s3(self):132        kinesis = aws_stack.create_external_boto_client("kinesis")133        s3_resource = aws_stack.connect_to_resource("s3")134        firehose = aws_stack.create_external_boto_client("firehose")135        aws_stack.create_kinesis_stream(TEST_STREAM_NAME, delete=True)136        s3_prefix = "/testdata"137        test_data = '{"test": "firehose_data_%s"}' % short_uid()138        # create Firehose stream139        stream = firehose.create_delivery_stream(140            DeliveryStreamType="KinesisStreamAsSource",141            KinesisStreamSourceConfiguration={142                "RoleARN": aws_stack.iam_resource_arn("firehose"),143                "KinesisStreamARN": aws_stack.kinesis_stream_arn(TEST_STREAM_NAME),144            },145            DeliveryStreamName=TEST_FIREHOSE_NAME,146            S3DestinationConfiguration={147                "RoleARN": aws_stack.iam_resource_arn("firehose"),148                "BucketARN": aws_stack.s3_bucket_arn(TEST_BUCKET_NAME),149                "Prefix": s3_prefix,150            },151        )152        self.assertTrue(stream)153        self.assertIn(TEST_FIREHOSE_NAME, firehose.list_delivery_streams()["DeliveryStreamNames"])154        # create target S3 bucket155        s3_resource.create_bucket(Bucket=TEST_BUCKET_NAME)156        # put records157        kinesis.put_record(158            Data=to_bytes(test_data), PartitionKey="testId", StreamName=TEST_STREAM_NAME159        )160        time.sleep(3)161        # check records in target bucket162        all_objects = testutil.list_all_s3_objects()163        testutil.assert_objects(json.loads(to_str(test_data)), all_objects)164    def test_lambda_streams_batch_and_transactions(self):165        ddb_lease_table_suffix = "-kclapp2"166        table_name = TEST_TABLE_NAME + "lsbat" + ddb_lease_table_suffix167        stream_name = TEST_STREAM_NAME168        lambda_ddb_name = "lambda-ddb-%s" % short_uid()169        dynamodb = aws_stack.create_external_boto_client("dynamodb", client=True)170        dynamodb_service = aws_stack.create_external_boto_client("dynamodb")171        dynamodbstreams = aws_stack.create_external_boto_client("dynamodbstreams")172        LOGGER.info("Creating test streams...")173        run_safe(174            lambda: dynamodb_service.delete_table(TableName=stream_name + ddb_lease_table_suffix),175            print_error=False,176        )177        aws_stack.create_kinesis_stream(stream_name, delete=True)178        events = []179        # subscribe to inbound Kinesis stream180        def process_records(records, shard_id):181            events.extend(records)182        # start the KCL client process in the background183        kinesis_connector.listen_to_kinesis(184            stream_name,185            listener_func=process_records,186            wait_until_started=True,187            ddb_lease_table_suffix=ddb_lease_table_suffix,188        )189        LOGGER.info("Kinesis consumer initialized.")190        # create table with stream forwarding config191        aws_stack.create_dynamodb_table(192            table_name,193            partition_key=PARTITION_KEY,194            stream_view_type="NEW_AND_OLD_IMAGES",195        )196        # list DDB streams and make sure the table stream is there197        streams = dynamodbstreams.list_streams()198        ddb_event_source_arn = None199        for stream in streams["Streams"]:200            if stream["TableName"] == table_name:201                ddb_event_source_arn = stream["StreamArn"]202        self.assertTrue(ddb_event_source_arn)203        # deploy test lambda connected to DynamoDB Stream204        testutil.create_lambda_function(205            handler_file=TEST_LAMBDA_PYTHON,206            libs=TEST_LAMBDA_LIBS,207            func_name=lambda_ddb_name,208            event_source_arn=ddb_event_source_arn,209            starting_position="TRIM_HORIZON",210            delete=True,211        )212        # submit a batch with writes213        dynamodb.batch_write_item(214            RequestItems={215                table_name: [216                    {217                        "PutRequest": {218                            "Item": {219                                PARTITION_KEY: {"S": "testId0"},220                                "data": {"S": "foobar123"},221                            }222                        }223                    },224                    {225                        "PutRequest": {226                            "Item": {227                                PARTITION_KEY: {"S": "testId1"},228                                "data": {"S": "foobar123"},229                            }230                        }231                    },232                    {233                        "PutRequest": {234                            "Item": {235                                PARTITION_KEY: {"S": "testId2"},236                                "data": {"S": "foobar123"},237                            }238                        }239                    },240                ]241            }242        )243        # submit a batch with writes and deletes244        dynamodb.batch_write_item(245            RequestItems={246                table_name: [247                    {248                        "PutRequest": {249                            "Item": {250                                PARTITION_KEY: {"S": "testId3"},251                                "data": {"S": "foobar123"},252                            }253                        }254                    },255                    {256                        "PutRequest": {257                            "Item": {258                                PARTITION_KEY: {"S": "testId4"},259                                "data": {"S": "foobar123"},260                            }261                        }262                    },263                    {264                        "PutRequest": {265                            "Item": {266                                PARTITION_KEY: {"S": "testId5"},267                                "data": {"S": "foobar123"},268                            }269                        }270                    },271                    {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId0"}}}},272                    {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId1"}}}},273                    {"DeleteRequest": {"Key": {PARTITION_KEY: {"S": "testId2"}}}},274                ]275            }276        )277        # submit a transaction with writes and delete278        dynamodb.transact_write_items(279            TransactItems=[280                {281                    "Put": {282                        "TableName": table_name,283                        "Item": {284                            PARTITION_KEY: {"S": "testId6"},285                            "data": {"S": "foobar123"},286                        },287                    }288                },289                {290                    "Put": {291                        "TableName": table_name,292                        "Item": {293                            PARTITION_KEY: {"S": "testId7"},294                            "data": {"S": "foobar123"},295                        },296                    }297                },298                {299                    "Put": {300                        "TableName": table_name,301                        "Item": {302                            PARTITION_KEY: {"S": "testId8"},303                            "data": {"S": "foobar123"},304                        },305                    }306                },307                {308                    "Delete": {309                        "TableName": table_name,310                        "Key": {PARTITION_KEY: {"S": "testId3"}},311                    }312                },313                {314                    "Delete": {315                        "TableName": table_name,316                        "Key": {PARTITION_KEY: {"S": "testId4"}},317                    }318                },319                {320                    "Delete": {321                        "TableName": table_name,322                        "Key": {PARTITION_KEY: {"S": "testId5"}},323                    }324                },325            ]326        )327        # submit a batch with a put over existing item328        dynamodb.transact_write_items(329            TransactItems=[330                {331                    "Put": {332                        "TableName": table_name,333                        "Item": {334                            PARTITION_KEY: {"S": "testId6"},335                            "data": {"S": "foobar123_updated1"},336                        },337                    }338                },339            ]340        )341        # submit a transaction with a put over existing item342        dynamodb.transact_write_items(343            TransactItems=[344                {345                    "Put": {346                        "TableName": table_name,347                        "Item": {348                            PARTITION_KEY: {"S": "testId7"},349                            "data": {"S": "foobar123_updated1"},350                        },351                    }352                },353            ]354        )355        # submit a transaction with updates356        dynamodb.transact_write_items(357            TransactItems=[358                {359                    "Update": {360                        "TableName": table_name,361                        "Key": {PARTITION_KEY: {"S": "testId6"}},362                        "UpdateExpression": "SET #0 = :0",363                        "ExpressionAttributeNames": {"#0": "data"},364                        "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},365                    }366                },367                {368                    "Update": {369                        "TableName": table_name,370                        "Key": {PARTITION_KEY: {"S": "testId7"}},371                        "UpdateExpression": "SET #0 = :0",372                        "ExpressionAttributeNames": {"#0": "data"},373                        "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},374                    }375                },376                {377                    "Update": {378                        "TableName": table_name,379                        "Key": {PARTITION_KEY: {"S": "testId8"}},380                        "UpdateExpression": "SET #0 = :0",381                        "ExpressionAttributeNames": {"#0": "data"},382                        "ExpressionAttributeValues": {":0": {"S": "foobar123_updated2"}},383                    }384                },385            ]386        )387        LOGGER.info("Waiting some time before finishing test.")388        time.sleep(2)389        num_insert = 9390        num_modify = 5391        num_delete = 6392        num_events = num_insert + num_modify + num_delete393        def check_events():394            if len(events) != num_events:395                msg = "DynamoDB updates retrieved (actual/expected): %s/%s" % (396                    len(events),397                    num_events,398                )399                LOGGER.warning(msg)400            self.assertEqual(num_events, len(events))401            event_items = [json.loads(base64.b64decode(e["data"])) for e in events]402            # make sure the we have the right amount of expected event types403            inserts = [e for e in event_items if e.get("__action_type") == "INSERT"]404            modifies = [e for e in event_items if e.get("__action_type") == "MODIFY"]405            removes = [e for e in event_items if e.get("__action_type") == "REMOVE"]406            self.assertEqual(num_insert, len(inserts))407            self.assertEqual(num_modify, len(modifies))408            self.assertEqual(num_delete, len(removes))409            # assert that all inserts were received410            for i, event in enumerate(inserts):411                self.assertNotIn("old_image", event)412                item_id = "testId%d" % i413                matching = [i for i in inserts if i["new_image"]["id"] == item_id][0]414                self.assertEqual({"id": item_id, "data": "foobar123"}, matching["new_image"])415            # assert that all updates were received416            def assert_updates(expected_updates, modifies):417                def found(update):418                    for modif in modifies:419                        if modif["old_image"]["id"] == update["id"]:420                            self.assertEqual(421                                modif["old_image"],422                                {"id": update["id"], "data": update["old"]},423                            )424                            self.assertEqual(425                                modif["new_image"],426                                {"id": update["id"], "data": update["new"]},427                            )428                            return True429                for update in expected_updates:430                    self.assertTrue(found(update))431            updates1 = [432                {"id": "testId6", "old": "foobar123", "new": "foobar123_updated1"},433                {"id": "testId7", "old": "foobar123", "new": "foobar123_updated1"},434            ]435            updates2 = [436                {437                    "id": "testId6",438                    "old": "foobar123_updated1",439                    "new": "foobar123_updated2",440                },441                {442                    "id": "testId7",443                    "old": "foobar123_updated1",444                    "new": "foobar123_updated2",445                },446                {"id": "testId8", "old": "foobar123", "new": "foobar123_updated2"},447            ]448            assert_updates(updates1, modifies[:2])449            assert_updates(updates2, modifies[2:])450            # assert that all removes were received451            for i, event in enumerate(removes):452                self.assertNotIn("new_image", event)453                item_id = "testId%d" % i454                matching = [i for i in removes if i["old_image"]["id"] == item_id][0]455                self.assertEqual({"id": item_id, "data": "foobar123"}, matching["old_image"])456        # this can take a long time in CI, make sure we give it enough time/retries457        retry(check_events, retries=30, sleep=4)458        # clean up459        testutil.delete_lambda_function(lambda_ddb_name)460    def test_scheduled_lambda(self):461        def check_invocation(*args):462            log_events = get_lambda_logs(self.scheduled_lambda_name)463            self.assertGreater(len(log_events), 0)464        # wait for up to 1 min for invocations to get triggered465        retry(check_invocation, retries=14, sleep=5)466@pytest.mark.skip(reason="This test is notoriously flaky in CI environments")  # FIXME467def test_sqs_batch_lambda_forward(lambda_client, sqs_client, create_lambda_function):468    lambda_name_queue_batch = "lambda_queue_batch-%s" % short_uid()469    # deploy test lambda connected to SQS queue470    sqs_queue_info = testutil.create_sqs_queue(lambda_name_queue_batch)471    queue_url = sqs_queue_info["QueueUrl"]472    resp = create_lambda_function(473        handler_file=TEST_LAMBDA_PYTHON_ECHO,474        func_name=lambda_name_queue_batch,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
