Best Python code snippet using localstack_python
test_kinesis_stream.py
Source:test_kinesis_stream.py  
1import pytest2import unittest3boto3 = pytest.importorskip("boto3")4botocore = pytest.importorskip("botocore")5import ansible.modules.cloud.amazon.kinesis_stream as kinesis_stream6aws_region = 'us-west-2'7class AnsibleKinesisStreamFunctions(unittest.TestCase):8    def test_convert_to_lower(self):9        example = {10            'HasMoreShards': True,11            'RetentionPeriodHours': 24,12            'StreamName': 'test',13            'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',14            'StreamStatus': 'ACTIVE'15        }16        converted_example = kinesis_stream.convert_to_lower(example)17        keys = list(converted_example.keys())18        keys.sort()19        for i in range(len(keys)):20            if i == 0:21                self.assertEqual(keys[i], 'has_more_shards')22            if i == 1:23                self.assertEqual(keys[i], 'retention_period_hours')24            if i == 2:25                self.assertEqual(keys[i], 'stream_arn')26            if i == 3:27                self.assertEqual(keys[i], 'stream_name')28            if i == 4:29                self.assertEqual(keys[i], 'stream_status')30    def test_make_tags_in_aws_format(self):31        example = {32            'env': 'development'33        }34        should_return = [35            {36                'Key': 'env',37                'Value': 'development'38            }39        ]40        aws_tags = kinesis_stream.make_tags_in_aws_format(example)41        self.assertEqual(aws_tags, should_return)42    def test_make_tags_in_proper_format(self):43        example = [44            {45                'Key': 'env',46                'Value': 'development'47            },48            {49                'Key': 'service',50                'Value': 'web'51            }52        ]53        should_return = {54            'env': 'development',55            'service': 'web'56        }57        proper_tags = kinesis_stream.make_tags_in_proper_format(example)58        self.assertEqual(proper_tags, should_return)59    def test_recreate_tags_from_list(self):60        example = [('environment', 'development'), ('service', 'web')]61        should_return = [62            {63                'Key': 'environment',64                'Value': 'development'65            },66            {67                'Key': 'service',68                'Value': 'web'69            }70        ]71        aws_tags = kinesis_stream.recreate_tags_from_list(example)72        self.assertEqual(aws_tags, should_return)73    def test_get_tags(self):74        client = boto3.client('kinesis', region_name=aws_region)75        success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True)76        self.assertTrue(success)77        should_return = [78            {79                'Key': 'DryRunMode',80                'Value': 'true'81            }82        ]83        self.assertEqual(tags, should_return)84    def test_find_stream(self):85        client = boto3.client('kinesis', region_name=aws_region)86        success, err_msg, stream = (87            kinesis_stream.find_stream(client, 'test', check_mode=True)88        )89        should_return = {90            'OpenShardsCount': 5,91            'ClosedShardsCount': 0,92            'ShardsCount': 5,93            'HasMoreShards': True,94            'RetentionPeriodHours': 24,95            'StreamName': 'test',96            'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',97            'StreamStatus': 'ACTIVE',98            'EncryptionType': 'NONE'99        }100        self.assertTrue(success)101        self.assertEqual(stream, should_return)102    def test_wait_for_status(self):103        client = boto3.client('kinesis', region_name=aws_region)104        success, err_msg, stream = (105            kinesis_stream.wait_for_status(106                client, 'test', 'ACTIVE', check_mode=True107            )108        )109        should_return = {110            'OpenShardsCount': 5,111            'ClosedShardsCount': 0,112            'ShardsCount': 5,113            'HasMoreShards': True,114            'RetentionPeriodHours': 24,115            'StreamName': 'test',116            'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',117            'StreamStatus': 'ACTIVE',118            'EncryptionType': 'NONE'119        }120        self.assertTrue(success)121        self.assertEqual(stream, should_return)122    def test_tags_action_create(self):123        client = boto3.client('kinesis', region_name=aws_region)124        tags = {125            'env': 'development',126            'service': 'web'127        }128        success, err_msg = (129            kinesis_stream.tags_action(130                client, 'test', tags, 'create', check_mode=True131            )132        )133        self.assertTrue(success)134    def test_tags_action_delete(self):135        client = boto3.client('kinesis', region_name=aws_region)136        tags = {137            'env': 'development',138            'service': 'web'139        }140        success, err_msg = (141            kinesis_stream.tags_action(142                client, 'test', tags, 'delete', check_mode=True143            )144        )145        self.assertTrue(success)146    def test_tags_action_invalid(self):147        client = boto3.client('kinesis', region_name=aws_region)148        tags = {149            'env': 'development',150            'service': 'web'151        }152        success, err_msg = (153            kinesis_stream.tags_action(154                client, 'test', tags, 'append', check_mode=True155            )156        )157        self.assertFalse(success)158    def test_update_tags(self):159        client = boto3.client('kinesis', region_name=aws_region)160        tags = {161            'env': 'development',162            'service': 'web'163        }164        success, changed, err_msg = (165            kinesis_stream.update_tags(166                client, 'test', tags, check_mode=True167            )168        )169        self.assertTrue(success)170    def test_stream_action_create(self):171        client = boto3.client('kinesis', region_name=aws_region)172        success, err_msg = (173            kinesis_stream.stream_action(174                client, 'test', 10, 'create', check_mode=True175            )176        )177        self.assertTrue(success)178    def test_stream_action_delete(self):179        client = boto3.client('kinesis', region_name=aws_region)180        success, err_msg = (181            kinesis_stream.stream_action(182                client, 'test', 10, 'delete', check_mode=True183            )184        )185        self.assertTrue(success)186    def test_stream_action_invalid(self):187        client = boto3.client('kinesis', region_name=aws_region)188        success, err_msg = (189            kinesis_stream.stream_action(190                client, 'test', 10, 'append', check_mode=True191            )192        )193        self.assertFalse(success)194    def test_retention_action_increase(self):195        client = boto3.client('kinesis', region_name=aws_region)196        success, err_msg = (197            kinesis_stream.retention_action(198                client, 'test', 48, 'increase', check_mode=True199            )200        )201        self.assertTrue(success)202    def test_retention_action_decrease(self):203        client = boto3.client('kinesis', region_name=aws_region)204        success, err_msg = (205            kinesis_stream.retention_action(206                client, 'test', 24, 'decrease', check_mode=True207            )208        )209        self.assertTrue(success)210    def test_retention_action_invalid(self):211        client = boto3.client('kinesis', region_name=aws_region)212        success, err_msg = (213            kinesis_stream.retention_action(214                client, 'test', 24, 'create', check_mode=True215            )216        )217        self.assertFalse(success)218    def test_update_shard_count(self):219        client = boto3.client('kinesis', region_name=aws_region)220        success, err_msg = (221            kinesis_stream.update_shard_count(222                client, 'test', 5, check_mode=True223            )224        )225        self.assertTrue(success)226    def test_update(self):227        client = boto3.client('kinesis', region_name=aws_region)228        current_stream = {229            'OpenShardsCount': 5,230            'ClosedShardsCount': 0,231            'ShardsCount': 1,232            'HasMoreShards': True,233            'RetentionPeriodHours': 24,234            'StreamName': 'test',235            'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',236            'StreamStatus': 'ACTIVE',237            'EncryptionType': 'NONE'238        }239        tags = {240            'env': 'development',241            'service': 'web'242        }243        success, changed, err_msg = (244            kinesis_stream.update(245                client, current_stream, 'test', number_of_shards=2, retention_period=48,246                tags=tags, check_mode=True247            )248        )249        self.assertTrue(success)250        self.assertTrue(changed)251        self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')252    def test_create_stream(self):253        client = boto3.client('kinesis', region_name=aws_region)254        tags = {255            'env': 'development',256            'service': 'web'257        }258        success, changed, err_msg, results = (259            kinesis_stream.create_stream(260                client, 'test', number_of_shards=10, retention_period=48,261                tags=tags, check_mode=True262            )263        )264        should_return = {265            'open_shards_count': 5,266            'closed_shards_count': 0,267            'shards_count': 5,268            'has_more_shards': True,269            'retention_period_hours': 24,270            'stream_name': 'test',271            'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test',272            'stream_status': 'ACTIVE',273            'encryption_type': 'NONE',274            'tags': tags,275        }276        self.assertTrue(success)277        self.assertTrue(changed)278        self.assertEqual(results, should_return)279        self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')280    def test_enable_stream_encription(self):281        client = boto3.client('kinesis', region_name=aws_region)282        success, changed, err_msg, results = (283            kinesis_stream.start_stream_encryption(284                client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True285            )286        )287        self.assertTrue(success)288        self.assertTrue(changed)289        self.assertEqual(err_msg, 'Kinesis Stream test encryption started successfully.')290    def test_dsbale_stream_encryption(self):291        client = boto3.client('kinesis', region_name=aws_region)292        success, changed, err_msg, results = (293            kinesis_stream.stop_stream_encryption(294                client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True295            )296        )297        self.assertTrue(success)298        self.assertTrue(changed)...test_kinesis_transport.py
Source:test_kinesis_transport.py  
1# -*- coding: utf-8 -*-2import sys3if sys.version_info < (2, 7):4    import unittest2 as unittest5else:6    import unittest7import mock8import tempfile9import logging10import beaver11from beaver.config import BeaverConfig12from beaver.transports import create_transport13from beaver.unicode_dammit import unicode_dammit14from fixtures import Fixture15from moto import mock_kinesis16import boto.kinesis17class KinesisTests(unittest.TestCase):18    @mock_kinesis19    def _create_streams(self):20        conn = boto.kinesis.connect_to_region("us-east-1")21        conn.create_stream("stream1", 1)22        conn.create_stream("stream2", 1)23    @classmethod24    def setUpClass(cls):25        cls.logger = logging.getLogger(__name__)26        empty_conf = tempfile.NamedTemporaryFile(delete=True)27        cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name))28        cls.beaver_config.set('transport', 'kinesis')29        cls.beaver_config.set('logstash_version', 1)30        output_file = Fixture.download_official_distribution()31        Fixture.extract_distribution(output_file)32    @mock_kinesis33    def test_kinesis_default_auth_profile(self):34        self._create_streams()35        self.beaver_config.set('kinesis_aws_profile_name', None)36        self.beaver_config.set('kinesis_aws_access_key', None)37        self.beaver_config.set('kinesis_aws_secret_key', None)38        self.beaver_config.set('kinesis_aws_stream', 'stream1')39        transport = create_transport(self.beaver_config, logger=self.logger)40        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)41        transport.interrupt()42    @mock_kinesis43    def test_kinesis_auth_profile(self):44        self._create_streams()45        self.beaver_config.set('kinesis_aws_profile_name', 'beaver_stream')46        self.beaver_config.set('kinesis_aws_access_key', None)47        self.beaver_config.set('kinesis_aws_secret_key', None)48        self.beaver_config.set('kinesis_aws_stream', 'stream1')49        transport = create_transport(self.beaver_config, logger=self.logger)50        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)51    @mock_kinesis52    def test_kinesis_auth_key(self):53        self._create_streams()54        self.beaver_config.set('kinesis_aws_profile_name', None)55        self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key')56        self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret')57        self.beaver_config.set('kinesis_aws_stream', 'stream1')58        transport = create_transport(self.beaver_config, logger=self.logger)59        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)60        transport.interrupt()61    @mock_kinesis62    def test_kinesis_auth_account_id(self):63        self._create_streams()64        self.beaver_config.set('kinesis_aws_stream_owner_acct_id', 'abc123')65        self.beaver_config.set('kinesis_aws_profile_name', None)66        self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key')67        self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret')68        self.beaver_config.set('kinesis_aws_stream', 'stream1')69        transport = create_transport(self.beaver_config, logger=self.logger)70        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)71        transport.interrupt()72    @mock_kinesis73    def test_kinesis_send_stream(self):74        self._create_streams()75        self.beaver_config.set('kinesis_aws_stream', 'stream1')76        self.beaver_config.set('kinesis_aws_profile_name', None)77        self.beaver_config.set('kinesis_aws_access_key', None)78        self.beaver_config.set('kinesis_aws_secret_key', None)79        self.beaver_config.set('kinesis_bulk_lines', False)80        transport = create_transport(self.beaver_config, logger=self.logger)81        mock_send_batch = mock.Mock()82        transport._send_message_batch = mock_send_batch83        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)84        data = {}85        lines = []86        n=50087        for i in range(n):88            lines.append('log' + str(i) + '\n')89        new_lines = []90        for line in lines:91            message = unicode_dammit(line)92            if len(message) == 0:93                continue94            new_lines.append(message)95        data['lines'] = new_lines96        data['fields'] = []97        self.assertTrue(transport.callback("test.log", **data))98        self.assertEqual(1, mock_send_batch.call_count)99    @mock_kinesis100    def test_kinesis_send_stream_with_record_count_cutoff(self):101        self._create_streams()102        self.beaver_config.set('kinesis_aws_stream', 'stream1')103        self.beaver_config.set('kinesis_aws_profile_name', None)104        self.beaver_config.set('kinesis_aws_access_key', None)105        self.beaver_config.set('kinesis_aws_secret_key', None)106        self.beaver_config.set('kinesis_bulk_lines', False)107        transport = create_transport(self.beaver_config, logger=self.logger)108        mock_send_batch = mock.Mock()109        transport._send_message_batch = mock_send_batch110        self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)111        data = {}112        lines = []113        n = 501114        for i in range(n):115            lines.append('log' + str(i) + '\n')116        new_lines = []117        for line in lines:118            message = unicode_dammit(line)119            if len(message) == 0:120                continue121            new_lines.append(message)122        data['lines'] = new_lines123        data['fields'] = []124        self.assertTrue(transport.callback("test.log", **data))...test_kinesis.py
Source:test_kinesis.py  
...5from aws_lambda_powertools.utilities.typing import LambdaContext6from tests.functional.parser.schemas import MyKinesisBusiness7from tests.functional.utils import load_event8@event_parser(model=MyKinesisBusiness, envelope=envelopes.KinesisDataStreamEnvelope)9def handle_kinesis(event: List[MyKinesisBusiness], _: LambdaContext):10    assert len(event) == 111    record: KinesisDataStreamModel = event[0]12    assert record.message == "test message"13    assert record.username == "test"14@event_parser(model=KinesisDataStreamModel)15def handle_kinesis_no_envelope(event: KinesisDataStreamModel, _: LambdaContext):16    records = event.Records17    assert len(records) == 218    record: KinesisDataStreamModel = records[0]19    assert record.awsRegion == "us-east-2"20    assert record.eventID == "shardId-000000000006:49590338271490256608559692538361571095921575989136588898"21    assert record.eventName == "aws:kinesis:record"22    assert record.eventSource == "aws:kinesis"23    assert record.eventSourceARN == "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"24    assert record.eventVersion == "1.0"25    assert record.invokeIdentityArn == "arn:aws:iam::123456789012:role/lambda-role"26    kinesis: KinesisDataStreamRecordPayload = record.kinesis27    assert kinesis.approximateArrivalTimestamp == 1545084650.98728    assert kinesis.kinesisSchemaVersion == "1.0"29    assert kinesis.partitionKey == "1"30    assert kinesis.sequenceNumber == "49590338271490256608559692538361571095921575989136588898"31    assert kinesis.data == b"Hello, this is a test."32def test_kinesis_trigger_event():33    event_dict = {34        "Records": [35            {36                "kinesis": {37                    "kinesisSchemaVersion": "1.0",38                    "partitionKey": "1",39                    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",40                    "data": "eyJtZXNzYWdlIjogInRlc3QgbWVzc2FnZSIsICJ1c2VybmFtZSI6ICJ0ZXN0In0=",41                    "approximateArrivalTimestamp": 1545084650.987,42                },43                "eventSource": "aws:kinesis",44                "eventVersion": "1.0",45                "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",46                "eventName": "aws:kinesis:record",47                "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",48                "awsRegion": "us-east-2",49                "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",50            }51        ]52    }53    handle_kinesis(event_dict, LambdaContext())54def test_kinesis_trigger_bad_base64_event():55    event_dict = {56        "Records": [57            {58                "kinesis": {59                    "kinesisSchemaVersion": "1.0",60                    "partitionKey": "1",61                    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",62                    "data": "bad",63                    "approximateArrivalTimestamp": 1545084650.987,64                },65                "eventSource": "aws:kinesis",66                "eventVersion": "1.0",67                "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",68                "eventName": "aws:kinesis:record",69                "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",70                "awsRegion": "us-east-2",71                "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",72            }73        ]74    }75    with pytest.raises(ValidationError):76        handle_kinesis_no_envelope(event_dict, LambdaContext())77def test_kinesis_trigger_event_no_envelope():78    event_dict = load_event("kinesisStreamEvent.json")79    handle_kinesis_no_envelope(event_dict, LambdaContext())80def test_validate_event_does_not_conform_with_model_no_envelope():81    event_dict: Any = {"hello": "s"}82    with pytest.raises(ValidationError):83        handle_kinesis_no_envelope(event_dict, LambdaContext())84def test_validate_event_does_not_conform_with_model():85    event_dict: Any = {"hello": "s"}86    with pytest.raises(ValidationError):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
