Best Python code snippet using localstack_python
test_kinesis.py
Source:test_kinesis.py  
...75    stream_name = "my_stream"76    conn.create_stream(stream_name, 1)77    response = conn.describe_stream(stream_name)78    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]79    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")80    shard_iterator = response["ShardIterator"]81    response = conn.get_records(shard_iterator)82    shard_iterator = response["NextShardIterator"]83    response["Records"].should.equal([])84    response["MillisBehindLatest"].should.equal(0)85@mock_kinesis_deprecated86def test_get_invalid_shard_iterator():87    conn = boto.kinesis.connect_to_region("us-west-2")88    stream_name = "my_stream"89    conn.create_stream(stream_name, 1)90    conn.get_shard_iterator.when.called_with(91        stream_name, "123", "TRIM_HORIZON"92    ).should.throw(ResourceNotFoundException)93@mock_kinesis_deprecated94def test_put_records():95    conn = boto.kinesis.connect_to_region("us-west-2")96    stream_name = "my_stream"97    conn.create_stream(stream_name, 1)98    data = "hello world"99    partition_key = "1234"100    conn.put_record.when.called_with(stream_name, data, 1234).should.throw(101        InvalidArgumentException102    )103    conn.put_record(stream_name, data, partition_key)104    response = conn.describe_stream(stream_name)105    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]106    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")107    shard_iterator = response["ShardIterator"]108    response = conn.get_records(shard_iterator)109    shard_iterator = response["NextShardIterator"]110    response["Records"].should.have.length_of(1)111    record = response["Records"][0]112    record["Data"].should.equal("hello world")113    record["PartitionKey"].should.equal("1234")114    record["SequenceNumber"].should.equal("1")115@mock_kinesis_deprecated116def test_get_records_limit():117    conn = boto.kinesis.connect_to_region("us-west-2")118    stream_name = "my_stream"119    conn.create_stream(stream_name, 1)120    # Create some data121    data = "hello world"122    for index in range(5):123        conn.put_record(stream_name, data, str(index))124    # Get a shard iterator125    response = conn.describe_stream(stream_name)126    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]127    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")128    shard_iterator = response["ShardIterator"]129    # Retrieve only 3 records130    response = conn.get_records(shard_iterator, limit=3)131    response["Records"].should.have.length_of(3)132    # Then get the rest of the results133    next_shard_iterator = response["NextShardIterator"]134    response = conn.get_records(next_shard_iterator)135    response["Records"].should.have.length_of(2)136@mock_kinesis_deprecated137def test_get_records_at_sequence_number():138    # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by139    # a specific sequence number.140    conn = boto.kinesis.connect_to_region("us-west-2")141    stream_name = "my_stream"142    conn.create_stream(stream_name, 1)143    # Create some data144    for index in range(1, 5):145        conn.put_record(stream_name, str(index), str(index))146    # Get a shard iterator147    response = conn.describe_stream(stream_name)148    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]149    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")150    shard_iterator = response["ShardIterator"]151    # Get the second record152    response = conn.get_records(shard_iterator, limit=2)153    second_sequence_id = response["Records"][1]["SequenceNumber"]154    # Then get a new iterator starting at that id155    response = conn.get_shard_iterator(156        stream_name, shard_id, "AT_SEQUENCE_NUMBER", second_sequence_id157    )158    shard_iterator = response["ShardIterator"]159    response = conn.get_records(shard_iterator)160    # And the first result returned should be the second item161    response["Records"][0]["SequenceNumber"].should.equal(second_sequence_id)162    response["Records"][0]["Data"].should.equal("2")163@mock_kinesis_deprecated164def test_get_records_after_sequence_number():165    # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted166    # by a specific sequence number.167    conn = boto.kinesis.connect_to_region("us-west-2")168    stream_name = "my_stream"169    conn.create_stream(stream_name, 1)170    # Create some data171    for index in range(1, 5):172        conn.put_record(stream_name, str(index), str(index))173    # Get a shard iterator174    response = conn.describe_stream(stream_name)175    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]176    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")177    shard_iterator = response["ShardIterator"]178    # Get the second record179    response = conn.get_records(shard_iterator, limit=2)180    second_sequence_id = response["Records"][1]["SequenceNumber"]181    # Then get a new iterator starting after that id182    response = conn.get_shard_iterator(183        stream_name, shard_id, "AFTER_SEQUENCE_NUMBER", second_sequence_id184    )185    shard_iterator = response["ShardIterator"]186    response = conn.get_records(shard_iterator)187    # And the first result returned should be the third item188    response["Records"][0]["Data"].should.equal("3")189    response["MillisBehindLatest"].should.equal(0)190@mock_kinesis_deprecated191def test_get_records_latest():192    # LATEST - Start reading just after the most recent record in the shard,193    # so that you always read the most recent data in the shard.194    conn = boto.kinesis.connect_to_region("us-west-2")195    stream_name = "my_stream"196    conn.create_stream(stream_name, 1)197    # Create some data198    for index in range(1, 5):199        conn.put_record(stream_name, str(index), str(index))200    # Get a shard iterator201    response = conn.describe_stream(stream_name)202    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]203    response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")204    shard_iterator = response["ShardIterator"]205    # Get the second record206    response = conn.get_records(shard_iterator, limit=2)207    second_sequence_id = response["Records"][1]["SequenceNumber"]208    # Then get a new iterator starting after that id209    response = conn.get_shard_iterator(210        stream_name, shard_id, "LATEST", second_sequence_id211    )212    shard_iterator = response["ShardIterator"]213    # Write some more data214    conn.put_record(stream_name, "last_record", "last_record")215    response = conn.get_records(shard_iterator)216    # And the only result returned should be the new item217    response["Records"].should.have.length_of(1)218    response["Records"][0]["PartitionKey"].should.equal("last_record")219    response["Records"][0]["Data"].should.equal("last_record")220    response["MillisBehindLatest"].should.equal(0)221@mock_kinesis222def test_get_records_at_timestamp():223    # AT_TIMESTAMP - Read the first record at or after the specified timestamp224    conn = boto3.client("kinesis", region_name="us-west-2")225    stream_name = "my_stream"226    conn.create_stream(StreamName=stream_name, ShardCount=1)227    # Create some data228    for index in range(1, 5):229        conn.put_record(230            StreamName=stream_name, Data=str(index), PartitionKey=str(index)231        )232    # When boto3 floors the timestamp that we pass to get_shard_iterator to233    # second precision even though AWS supports ms precision:234    # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html235    # To test around this limitation we wait until we well into the next second236    # before capturing the time and storing the records we expect to retrieve.237    time.sleep(1.0)238    timestamp = datetime.datetime.utcnow()239    keys = [str(i) for i in range(5, 10)]240    for k in keys:241        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)242    # Get a shard iterator243    response = conn.describe_stream(StreamName=stream_name)244    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]245    response = conn.get_shard_iterator(246        StreamName=stream_name,247        ShardId=shard_id,248        ShardIteratorType="AT_TIMESTAMP",249        Timestamp=timestamp,250    )251    shard_iterator = response["ShardIterator"]252    response = conn.get_records(ShardIterator=shard_iterator)253    response["Records"].should.have.length_of(len(keys))254    partition_keys = [r["PartitionKey"] for r in response["Records"]]255    partition_keys.should.equal(keys)256    response["MillisBehindLatest"].should.equal(0)257@mock_kinesis258def test_get_records_at_very_old_timestamp():259    conn = boto3.client("kinesis", region_name="us-west-2")260    stream_name = "my_stream"261    conn.create_stream(StreamName=stream_name, ShardCount=1)262    # Create some data263    keys = [str(i) for i in range(1, 5)]264    for k in keys:265        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)266    # Get a shard iterator267    response = conn.describe_stream(StreamName=stream_name)268    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]269    response = conn.get_shard_iterator(270        StreamName=stream_name,271        ShardId=shard_id,272        ShardIteratorType="AT_TIMESTAMP",273        Timestamp=1,274    )275    shard_iterator = response["ShardIterator"]276    response = conn.get_records(ShardIterator=shard_iterator)277    response["Records"].should.have.length_of(len(keys))278    partition_keys = [r["PartitionKey"] for r in response["Records"]]279    partition_keys.should.equal(keys)280    response["MillisBehindLatest"].should.equal(0)281@mock_kinesis282def test_get_records_timestamp_filtering():283    conn = boto3.client("kinesis", region_name="us-west-2")284    stream_name = "my_stream"285    conn.create_stream(StreamName=stream_name, ShardCount=1)286    conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")287    time.sleep(1.0)288    timestamp = datetime.datetime.now(tz=tzlocal())289    conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")290    response = conn.describe_stream(StreamName=stream_name)291    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]292    response = conn.get_shard_iterator(293        StreamName=stream_name,294        ShardId=shard_id,295        ShardIteratorType="AT_TIMESTAMP",296        Timestamp=timestamp,297    )298    shard_iterator = response["ShardIterator"]299    response = conn.get_records(ShardIterator=shard_iterator)300    response["Records"].should.have.length_of(1)301    response["Records"][0]["PartitionKey"].should.equal("1")302    response["Records"][0]["ApproximateArrivalTimestamp"].should.be.greater_than(303        timestamp304    )305    response["MillisBehindLatest"].should.equal(0)306@mock_kinesis307def test_get_records_millis_behind_latest():308    conn = boto3.client("kinesis", region_name="us-west-2")309    stream_name = "my_stream"310    conn.create_stream(StreamName=stream_name, ShardCount=1)311    conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")312    time.sleep(1.0)313    conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")314    response = conn.describe_stream(StreamName=stream_name)315    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]316    response = conn.get_shard_iterator(317        StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"318    )319    shard_iterator = response["ShardIterator"]320    response = conn.get_records(ShardIterator=shard_iterator, Limit=1)321    response["Records"].should.have.length_of(1)322    response["MillisBehindLatest"].should.be.greater_than(0)323@mock_kinesis324def test_get_records_at_very_new_timestamp():325    conn = boto3.client("kinesis", region_name="us-west-2")326    stream_name = "my_stream"327    conn.create_stream(StreamName=stream_name, ShardCount=1)328    # Create some data329    keys = [str(i) for i in range(1, 5)]330    for k in keys:331        conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)332    timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)333    # Get a shard iterator334    response = conn.describe_stream(StreamName=stream_name)335    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]336    response = conn.get_shard_iterator(337        StreamName=stream_name,338        ShardId=shard_id,339        ShardIteratorType="AT_TIMESTAMP",340        Timestamp=timestamp,341    )342    shard_iterator = response["ShardIterator"]343    response = conn.get_records(ShardIterator=shard_iterator)344    response["Records"].should.have.length_of(0)345    response["MillisBehindLatest"].should.equal(0)346@mock_kinesis347def test_get_records_from_empty_stream_at_timestamp():348    conn = boto3.client("kinesis", region_name="us-west-2")349    stream_name = "my_stream"350    conn.create_stream(StreamName=stream_name, ShardCount=1)351    timestamp = datetime.datetime.utcnow()352    # Get a shard iterator353    response = conn.describe_stream(StreamName=stream_name)354    shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]355    response = conn.get_shard_iterator(356        StreamName=stream_name,357        ShardId=shard_id,358        ShardIteratorType="AT_TIMESTAMP",359        Timestamp=timestamp,360    )361    shard_iterator = response["ShardIterator"]362    response = conn.get_records(ShardIterator=shard_iterator)363    response["Records"].should.have.length_of(0)364    response["MillisBehindLatest"].should.equal(0)365@mock_kinesis_deprecated366def test_invalid_shard_iterator_type():367    conn = boto.kinesis.connect_to_region("us-west-2")368    stream_name = "my_stream"369    conn.create_stream(stream_name, 1)...test_runner.py
Source:test_runner.py  
1import pytest2from datetime import datetime3from unittest.mock import Mock, patch, call4from lambda_kinesis.runner import (5    get_records,6    run_handler_on_stream_records,7    format_kinesis_timestamp,8    IteratorType,9)10def test_get_records_parses_response():11    kinesis_client = Mock()12    arrival_time = datetime(2015, 1, 1)13    kinesis_client.get_records.return_value = {14        "Records": [15            {"ApproximateArrivalTimestamp": arrival_time, "Data": b"data", "PartitionKey": "xxx"}16        ],17        "NextShardIterator": "456",18    }19    records, shard_iterator = get_records(20        kinesis_client, stream_name="stream", shard_iterator="123"21    )22    assert records == (23        {24            "kinesis": {25                "data": "ZGF0YQ==",26                "approximateArrivalTimestamp": arrival_time,27                "partitionKey": "xxx",28            },29            "eventSource": "aws:kinesis",30            "eventName": "aws:kinesis:record",31        },32    )33    assert shard_iterator == "456"34@patch("lambda_kinesis.runner.get_records")35def test_run_handler_polls_stream(get_records):36    kinesis_client = Mock()37    handler = Mock()38    kinesis_client.list_shards.return_value = {"Shards": [{"ShardId": "shard"}]}39    kinesis_client.get_shard_iterator.return_value = {"ShardIterator": "iterator"}40    class PleaseStop(Exception):41        pass42    iterator_type = IteratorType.TrimHorizon43    records = [str(i) for i in range(30)]44    record_batches = [records[i : i + 10] for i in range(0, 30, 10)]  # NOQA45    shard_iterators = [str(i) for i in range(3)]46    get_records.side_effect = list(zip(record_batches, shard_iterators)) + [PleaseStop]47    with pytest.raises(PleaseStop):48        run_handler_on_stream_records(49            stream_name="stream",50            kinesis_client=kinesis_client,51            shard_iterator_type=iterator_type,52            handler=handler,53            wait_seconds=0,54        )55    kinesis_client.get_shard_iterator.assert_called_once_with(56        StreamName="stream", ShardId="shard", ShardIteratorType="TRIM_HORIZON"57    )58    assert get_records.call_args_list == [59        call(kinesis_client, "stream", "iterator"),60        call(kinesis_client, "stream", "0"),61        call(kinesis_client, "stream", "1"),62        call(kinesis_client, "stream", "2"),63    ]64    assert handler.call_args_list == [65        call({"Records": record_batch}, None) for record_batch in record_batches66    ]67@patch("lambda_kinesis.runner.get_records")68def test_run_handler_supports_timestamp_iterator(get_records):69    kinesis_client = Mock()70    handler = Mock()71    kinesis_client.list_shards.return_value = {"Shards": [{"ShardId": "shard"}]}72    kinesis_client.get_shard_iterator.return_value = {"ShardIterator": "iterator"}73    class PleaseStop(Exception):74        pass75    iterator_type = IteratorType.AtTimestamp76    get_records.side_effect = [PleaseStop]77    with pytest.raises(PleaseStop):78        run_handler_on_stream_records(79            stream_name="stream",80            kinesis_client=kinesis_client,81            shard_iterator_type=iterator_type,82            timestamp=datetime(2019, 2, 5),83            handler=handler,84            wait_seconds=0,85        )86    kinesis_client.get_shard_iterator.assert_called_once_with(87        StreamName="stream",88        ShardId="shard",89        ShardIteratorType="AT_TIMESTAMP",90        Timestamp="2019-02-05T00:00:00.000+00:00",91    )92def test_format_kinesis_timestamp_returns_millis_and_tz():93    assert format_kinesis_timestamp(datetime(2019, 2, 5)) == "2019-02-05T00:00:00.000+00:00"94    assert (95        format_kinesis_timestamp(datetime(2019, 2, 5, 10, microsecond=605020))96        == "2019-02-05T10:00:00.605+00:00"...consumer.py
Source:consumer.py  
...8timestamp           = datetime(2016, 12, 15)9##############################################10client = boto3.client('kinesis')11pp = pprint.PrettyPrinter(indent=4)12def get_shard_iterator(shard_iterator):13	return client.get_records(14    	ShardIterator=shard_iterator,15    	Limit=12316)17shard_iterator = client.get_shard_iterator(18    StreamName        = stream_name,19    ShardId           = shard_id,20    ShardIteratorType = shard_iterator_type,21    Timestamp         = timestamp22)23shard_iterator = shard_iterator['ShardIterator']24flag = True25while (flag):26	new_shard_iterator = get_shard_iterator(shard_iterator)27	if len(new_shard_iterator['NextShardIterator']) > 15 and new_shard_iterator['MillisBehindLatest'] > 0:28		if len(new_shard_iterator['Records']) is 0:29			shard_iterator = get_shard_iterator(shard_iterator)['NextShardIterator']30			print pp.pprint(new_shard_iterator)31		else: 32			print "::::: RECEIVED DATA FROM THE STREAM ::::::"33			print pp.pprint(new_shard_iterator['Records'])34			flag = False35	else:36		print "::::::::::::::::::::::::::: NO MORE SHARDS TO LOOK INTO :::::::::::::::::::::::::::::::::"37		print "::::: THE NEXT SHARD ITERATOR IS %s ::::::" % str(new_shard_iterator['NextShardIterator'])38		print "::::: THE NEXT MILLIS-BEHIND-LATEST IS %s ::::::" % str(new_shard_iterator['MillisBehindLatest'])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
