Best Python code snippet using localstack_python
test_kinesis.py
Source:test_kinesis.py  
...31        response = kinesis_client.register_stream_consumer(32            StreamARN=stream_arn, ConsumerName=consumer_name33        )34        consumer_arn = response["Consumer"]["ConsumerARN"]35        wait_for_consumer_ready(consumer_arn=consumer_arn)36        assert consumer_name == response["Consumer"]["ConsumerName"]37        # boto3 converts the timestamp to datetime38        assert isinstance(response["Consumer"]["ConsumerCreationTimestamp"], datetime)39        consumers = assert_consumers(count=1)40        consumer_arn = consumers[0]["ConsumerARN"]41        assert consumer_name == consumers[0]["ConsumerName"]42        assert "/%s" % consumer_name in consumer_arn43        assert isinstance(consumers[0]["ConsumerCreationTimestamp"], datetime)44        # lookup stream consumer by describe calls, assert response45        consumer_description_by_arn = kinesis_client.describe_stream_consumer(46            StreamARN=stream_arn, ConsumerARN=consumer_arn47        )["ConsumerDescription"]48        assert consumer_name == consumer_description_by_arn["ConsumerName"]49        assert consumer_arn == consumer_description_by_arn["ConsumerARN"]50        assert stream_arn == consumer_description_by_arn["StreamARN"]51        assert "ACTIVE", consumer_description_by_arn["ConsumerStatus"]52        assert isinstance(consumer_description_by_arn["ConsumerCreationTimestamp"], datetime)53        consumer_description_by_name = kinesis_client.describe_stream_consumer(54            StreamARN=stream_arn, ConsumerName=consumer_name55        )["ConsumerDescription"]56        assert consumer_description_by_arn == consumer_description_by_name57        # delete existing consumer and assert 0 remaining consumers58        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName=consumer_name)59        retry(assert_consumers, count=0, retries=6, sleep=3.0)60    def test_subscribe_to_shard(61        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready62    ):63        stream_name = "test-%s" % short_uid()64        # create stream and consumer65        kinesis_create_stream(StreamName=stream_name, ShardCount=1)66        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][67            "StreamARN"68        ]69        wait_for_stream_ready(stream_name)70        result = kinesis_client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")[71            "Consumer"72        ]73        consumer_arn = result["ConsumerARN"]74        wait_for_consumer_ready(consumer_arn=consumer_arn)75        # subscribe to shard76        response = kinesis_client.describe_stream(StreamName=stream_name)77        shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")78        result = kinesis_client.subscribe_to_shard(79            ConsumerARN=result["ConsumerARN"],80            ShardId=shard_id,81            StartingPosition={"Type": "TRIM_HORIZON"},82        )83        stream = result["EventStream"]84        # put records85        num_records = 586        msg = b"Hello world"87        for i in range(num_records):88            kinesis_client.put_records(89                StreamName=stream_name, Records=[{"Data": msg, "PartitionKey": "1"}]90            )91        # assert results92        results = []93        for entry in stream:94            records = entry["SubscribeToShardEvent"]["Records"]95            continuation_sequence_number = entry["SubscribeToShardEvent"][96                "ContinuationSequenceNumber"97            ]98            # https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html99            assert re.fullmatch("^0|([1-9][0-9]{0,128})$", continuation_sequence_number)100            results.extend(records)101            if len(results) >= num_records:102                break103        # assert results104        assert num_records == len(results)105        for record in results:106            assert msg == record["Data"]107        # clean up108        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")109    def test_subscribe_to_shard_with_sequence_number_as_iterator(110        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready, wait_for_consumer_ready111    ):112        stream_name = "test-%s" % short_uid()113        record_data = "Hello world"114        # create stream and consumer115        kinesis_create_stream(StreamName=stream_name, ShardCount=1)116        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][117            "StreamARN"118        ]119        wait_for_stream_ready(stream_name)120        result = kinesis_client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")[121            "Consumer"122        ]123        consumer_arn = result["ConsumerARN"]124        wait_for_consumer_ready(consumer_arn=consumer_arn)125        # get starting sequence number126        response = kinesis_client.describe_stream(StreamName=stream_name)127        sequence_number = (128            response.get("StreamDescription")129            .get("Shards")[0]130            .get("SequenceNumberRange")131            .get("StartingSequenceNumber")132        )133        # subscribe to shard with iterator type as AT_SEQUENCE_NUMBER134        response = kinesis_client.describe_stream(StreamName=stream_name)135        shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")136        result = kinesis_client.subscribe_to_shard(137            ConsumerARN=result["ConsumerARN"],138            ShardId=shard_id,139            StartingPosition={140                "Type": "AT_SEQUENCE_NUMBER",141                "SequenceNumber": sequence_number,142            },143        )144        stream = result["EventStream"]145        # put records146        num_records = 5147        for i in range(num_records):148            kinesis_client.put_records(149                StreamName=stream_name,150                Records=[{"Data": record_data, "PartitionKey": "1"}],151            )152        results = []153        for entry in stream:154            records = entry["SubscribeToShardEvent"]["Records"]155            results.extend(records)156            if len(results) >= num_records:157                break158        # assert results159        assert num_records == len(results)160        for record in results:161            assert str.encode(record_data) == record["Data"]162        # clean up163        kinesis_client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="c1")164    def test_get_records(self, kinesis_client, kinesis_create_stream, wait_for_stream_ready):165        stream_name = "test-%s" % short_uid()166        kinesis_create_stream(StreamName=stream_name, ShardCount=1)167        wait_for_stream_ready(stream_name)168        kinesis_client.put_records(169            StreamName=stream_name,170            Records=[{"Data": "SGVsbG8gd29ybGQ=", "PartitionKey": "1"}],171        )172        # get records with JSON encoding173        iterator = self._get_shard_iterator(stream_name, kinesis_client)174        response = kinesis_client.get_records(ShardIterator=iterator)175        json_records = response.get("Records")176        assert 1 == len(json_records)177        assert "Data" in json_records[0]178        # get records with CBOR encoding179        iterator = self._get_shard_iterator(stream_name, kinesis_client)180        url = config.get_edge_url()181        headers = aws_stack.mock_aws_request_headers("kinesis")182        headers["Content-Type"] = constants.APPLICATION_AMZ_CBOR_1_1183        headers["X-Amz-Target"] = "Kinesis_20131202.GetRecords"184        data = cbor2.dumps({"ShardIterator": iterator})185        result = requests.post(url, data, headers=headers)186        assert 200 == result.status_code187        result = cbor2.loads(result.content)188        attrs = ("Data", "EncryptionType", "PartitionKey", "SequenceNumber")189        assert select_attributes(json_records[0], attrs) == select_attributes(190            result["Records"][0], attrs191        )192    def test_record_lifecycle_data_integrity(193        self, kinesis_client, kinesis_create_stream, wait_for_stream_ready194    ):195        """196        kinesis records should contain the same data from when they are sent to when they are received197        """198        stream_name = "test-%s" % short_uid()199        records_data = {"test", "ünicödé ç»ä¸ç  ð£ð»ð¥", "a" * 1000, ""}200        kinesis_create_stream(StreamName=stream_name, ShardCount=1)201        wait_for_stream_ready(stream_name)202        iterator = self._get_shard_iterator(stream_name, kinesis_client)203        for record_data in records_data:204            kinesis_client.put_record(205                StreamName=stream_name,206                Data=record_data,207                PartitionKey="1",208            )209        response = kinesis_client.get_records(ShardIterator=iterator)210        response_records = response.get("Records")211        assert len(records_data) == len(response_records)212        for response_record in response_records:213            assert response_record.get("Data").decode("utf-8") in records_data214    def _get_shard_iterator(self, stream_name, kinesis_client):215        response = kinesis_client.describe_stream(StreamName=stream_name)216        sequence_number = (217            response.get("StreamDescription")218            .get("Shards")[0]219            .get("SequenceNumberRange")220            .get("StartingSequenceNumber")221        )222        shard_id = response.get("StreamDescription").get("Shards")[0].get("ShardId")223        response = kinesis_client.get_shard_iterator(224            StreamName=stream_name,225            ShardId=shard_id,226            ShardIteratorType="AT_SEQUENCE_NUMBER",227            StartingSequenceNumber=sequence_number,228        )229        return response.get("ShardIterator")230@pytest.fixture231def wait_for_consumer_ready(kinesis_client):232    def _wait_for_consumer_ready(consumer_arn: str):233        def is_consumer_ready():234            describe_response = kinesis_client.describe_stream_consumer(ConsumerARN=consumer_arn)235            return describe_response["ConsumerDescription"]["ConsumerStatus"] == "ACTIVE"236        poll_condition(is_consumer_ready)237    return _wait_for_consumer_ready238def test_get_records_next_shard_iterator(239    kinesis_client, kinesis_create_stream, wait_for_stream_ready240):241    stream_name = kinesis_create_stream()242    wait_for_stream_ready(stream_name)243    first_stream_shard_data = kinesis_client.describe_stream(StreamName=stream_name)[244        "StreamDescription"245    ]["Shards"][0]246    shard_id = first_stream_shard_data["ShardId"]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
