Best Python code snippet using localstack_python
test_kinesis.py
Source:test_kinesis.py  
...28    @pytest.mark.aws_validated29    def test_create_stream_without_shard_count(30        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready31    ):32        stream_name = kinesis_create_stream()33        wait_for_stream_ready(stream_name)34        describe_stream = kinesis_client.describe_stream(StreamName=stream_name)35        assert describe_stream36        assert "StreamDescription" in describe_stream37        assert "Shards" in describe_stream["StreamDescription"]38        # By default, new streams have a shard count of 439        assert len(describe_stream["StreamDescription"]["Shards"]) == 440    def test_stream_consumers(41        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready42    ):43        stream_name = "test-%s" % short_uid()44        def assert_consumers(**kwargs):45            consumer_list = kinesis_client.list_stream_consumers(StreamARN=stream_arn).get(46                "Consumers"47            )48            assert kwargs["count"] == len(consumer_list)49            return consumer_list50        # create stream and assert 0 consumers51        kinesis_create_stream(StreamName=stream_name, ShardCount=1)52        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][53            "StreamARN"54        ]55        wait_for_stream_ready(stream_name)56        assert_consumers(count=0)57        # create consumer and assert 1 consumer58        consumer_name = "cons1"59        response = kinesis_client.register_stream_consumer(60            StreamARN=stream_arn, ConsumerName=consumer_name61        )62        consumer_arn = response["Consumer"]["ConsumerARN"]63        wait_for_consumer_ready(consumer_arn=consumer_arn)64        assert consumer_name == response["Consumer"]["ConsumerName"]65        # boto3 converts the timestamp to datetime66        assert isinstance(response["Consumer"]["ConsumerCreationTimestamp"], datetime)67        consumers = assert_consumers(count=1)68        consumer_arn = consumers[0]["ConsumerARN"]69        assert consumer_name == consumers[0]["ConsumerName"]70        assert "/%s" % consumer_name in consumer_arn71        assert isinstance(consumers[0]["ConsumerCreationTimestamp"], datetime)72        # lookup stream consumer by describe calls, assert response73        consumer_description_by_arn = kinesis_client.describe_stream_consumer(74            StreamARN=stream_arn, ConsumerARN=consumer_arn75        )["ConsumerDescription"]76        assert consumer_name == consumer_description_by_arn["ConsumerName"]77        assert consumer_arn == consumer_description_by_arn["ConsumerARN"]78        assert stream_arn == consumer_description_by_arn["StreamARN"]79        assert "ACTIVE", consumer_description_by_arn["ConsumerStatus"]80        assert isinstance(consumer_description_by_arn["ConsumerCreationTimestamp"], datetime)81        consumer_description_by_name = kinesis_client.describe_stream_consumer(82            StreamARN=stream_arn, ConsumerName=consumer_name83        )["ConsumerDescription"]84        assert consumer_description_by_arn == consumer_description_by_name85        # delete existing consumer and assert 0 remaining consumers86        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName=consumer_name)87        retry(assert_consumers, count=0, retries=6, sleep=3.0)88    def test_subscribe_to_shard(89        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready90    ):91        stream_name = "test-%s" % short_uid()92        # create stream and consumer93        kinesis_create_stream(StreamName=stream_name, ShardCount=1)94        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][95            "StreamARN"96        ]97        wait_for_stream_ready(stream_name)98        result = kinesis_client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")[99            "Consumer"100        ]101        consumer_arn = result["ConsumerARN"]102        wait_for_consumer_ready(consumer_arn=consumer_arn)103        # subscribe to shard104        response = kinesis_client.describe_stream(StreamName=stream_name)105        shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")106        result = kinesis_client.subscribe_to_shard(107            ConsumerARN=result["ConsumerARN"],108            ShardId=shard_id,109            StartingPosition={"Type": "TRIM_HORIZON"},110        )111        stream = result["EventStream"]112        # put records113        num_records = 5114        msg = b"Hello world"115        for i in range(num_records):116            kinesis_client.put_records(117                StreamName=stream_name, Records=[{"Data": msg, "PartitionKey": "1"}]118            )119        # assert results120        results = []121        for entry in stream:122            records = entry["SubscribeToShardEvent"]["Records"]123            continuation_sequence_number = entry["SubscribeToShardEvent"][124                "ContinuationSequenceNumber"125            ]126            # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html127            assert re.fullmatch("^0|([1-9][0-9]{0,128})$", continuation_sequence_number)128            results.extend(records)129            if len(results) >= num_records:130                break131        # assert results132        assert num_records == len(results)133        for record in results:134            assert msg == record["Data"]135        # clean up136        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")137    def test_subscribe_to_shard_with_sequence_number_as_iterator(138        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready139    ):140        stream_name = "test-%s" % short_uid()141        record_data = "Hello world"142        # create stream and consumer143        kinesis_create_stream(StreamName=stream_name, ShardCount=1)144        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][145            "StreamARN"146        ]147        wait_for_stream_ready(stream_name)148        result = kinesis_client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")[149            "Consumer"150        ]151        consumer_arn = result["ConsumerARN"]152        wait_for_consumer_ready(consumer_arn=consumer_arn)153        # get starting sequence number154        response = kinesis_client.describe_stream(StreamName=stream_name)155        sequence_number = (156            response.get("StreamDescription")157            .get("Shards")[0]158            .get("SequenceNumberRange")159            .get("StartingSequenceNumber")160        )161        # subscribe to shard with iterator type as AT_SEQUENCE_NUMBER162        response = kinesis_client.describe_stream(StreamName=stream_name)163        shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")164        result = kinesis_client.subscribe_to_shard(165            ConsumerARN=result["ConsumerARN"],166            ShardId=shard_id,167            StartingPosition={168                "Type": "AT_SEQUENCE_NUMBER",169                "SequenceNumber": sequence_number,170            },171        )172        stream = result["EventStream"]173        # put records174        num_records = 5175        for i in range(num_records):176            kinesis_client.put_records(177                StreamName=stream_name,178                Records=[{"Data": record_data, "PartitionKey": "1"}],179            )180        results = []181        for entry in stream:182            records = entry["SubscribeToShardEvent"]["Records"]183            results.extend(records)184            if len(results) >= num_records:185                break186        # assert results187        assert num_records == len(results)188        for record in results:189            assert str.encode(record_data) == record["Data"]190        # clean up191        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")192    def test_get_records(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready):193        stream_name = "test-%s" % short_uid()194        kinesis_create_stream(StreamName=stream_name, ShardCount=1)195        wait_for_stream_ready(stream_name)196        kinesis_client.put_records(197            StreamName=stream_name,198            Records=[{"Data": "SGVsbG8gd29ybGQ=", "PartitionKey": "1"}],199        )200        # get records with JSON encoding201        iterator = get_shard_iterator(stream_name, kinesis_client)202        response = kinesis_client.get_records(ShardIterator=iterator)203        json_records = response.get("Records")204        assert 1 == len(json_records)205        assert "Data" in json_records[0]206        # get records with CBOR encoding207        iterator = get_shard_iterator(stream_name, kinesis_client)208        url = config.get_edge_url()209        headers = aws_stack.mock_aws_request_headers("kinesis")210        headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1211        headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"212        data = cbor2.dumps({"ShardIterator": iterator})213        result = requests.post(url, data, headers=headers)214        assert 200 == result.status_code215        result = cbor2.loads(result.content)216        attrs = ("Data", "EncryptionType", "PartitionKey", "SequenceNumber")217        assert select_attributes(json_records[0], attrs) == select_attributes(218            result["Records"][0], attrs219        )220    def test_get_records_empty_stream(221        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready222    ):223        stream_name = "test-%s" % short_uid()224        kinesis_create_stream(StreamName=stream_name, ShardCount=1)225        wait_for_stream_ready(stream_name)226        # empty get records with JSON encoding227        iterator = get_shard_iterator(stream_name, kinesis_client)228        json_response = kinesis_client.get_records(ShardIterator=iterator)229        json_records = json_response.get("Records")230        assert 0 == len(json_records)231        # empty get records with CBOR encoding232        url = config.get_edge_url()233        headers = aws_stack.mock_aws_request_headers("kinesis")234        headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1235        headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"236        data = cbor2.dumps({"ShardIterator": iterator})237        cbor_response = requests.post(url, data, headers=headers)238        assert 200 == cbor_response.status_code239        cbor_records_content = cbor2.loads(cbor_response.content)240        cbor_records = cbor_records_content.get("Records")241        assert 0 == len(cbor_records)242    def test_record_lifecycle_data_integrity(243        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready244    ):245        """246        kinesis records should contain the same data from when they are sent to when they are received247        """248        stream_name = "test-%s" % short_uid()249        records_data = {"test", "ünicödé ç»ä¸ç  ð£ð»ð¥", "a" * 1000, ""}250        kinesis_create_stream(StreamName=stream_name, ShardCount=1)251        wait_for_stream_ready(stream_name)252        iterator = get_shard_iterator(stream_name, kinesis_client)253        for record_data in records_data:254            kinesis_client.put_record(255                StreamName=stream_name,256                Data=record_data,257                PartitionKey="1",258            )259        response = kinesis_client.get_records(ShardIterator=iterator)260        response_records = response.get("Records")261        assert len(records_data) == len(response_records)262        for response_record in response_records:263            assert response_record.get("Data").decode("utf-8") in records_data264@pytest.fixture265def wait_for_consumer_ready(kinesis_client):266    def _wait_for_consumer_ready(consumer_arn: str):267        def is_consumer_ready():268            describe_response = kinesis_client.describe_stream_consumer(ConsumerARN=consumer_arn)269            return describe_response["ConsumerDescription"]["ConsumerStatus"] == "ACTIVE"270        poll_condition(is_consumer_ready)271    return _wait_for_consumer_ready272def test_get_records_next_shard_iterator(273    kinesis_client, kinesis_create_stream, wait_for_stream_ready274):275    stream_name = kinesis_create_stream()276    wait_for_stream_ready(stream_name)277    first_stream_shard_data = kinesis_client.describe_stream(StreamName=stream_name)[278        "StreamDescription"279    ]["Shards"][0]280    shard_id = first_stream_shard_data["ShardId"]281    shard_iterator = kinesis_client.get_shard_iterator(282        StreamName=stream_name, ShardIteratorType="LATEST", ShardId=shard_id283    )["ShardIterator"]284    get_records_response = kinesis_client.get_records(ShardIterator=shard_iterator)285    new_shard_iterator = get_records_response["NextShardIterator"]286    assert shard_iterator != new_shard_iterator287    get_records_response = kinesis_client.get_records(ShardIterator=new_shard_iterator)288    assert shard_iterator != get_records_response["NextShardIterator"]289    assert new_shard_iterator != get_records_response["NextShardIterator"]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
