Best Python code snippet using localstack_python
kinesis_listener.py
Source:kinesis_listener.py  
...248    iterator = kinesis.get_shard_iterator(249        StreamName=stream_name, ShardId=data["ShardId"], ShardIteratorType=iter_type, **kwargs250    )["ShardIterator"]251    def send_events():252        yield convert_to_binary_event_payload("", event_type="initial-response")253        iter = iterator254        last_sequence_number = starting_sequence_number255        # TODO: find better way to run loop up to max 5 minutes (until connection terminates)!256        for i in range(5 * 60):257            result = None258            try:259                result = kinesis.get_records(ShardIterator=iter)260            except Exception as e:261                if "ResourceNotFoundException" in str(e):262                    LOG.debug(263                        'Kinesis stream "%s" has been deleted, closing shard subscriber',264                        stream_name,265                    )266                    return267                raise268            iter = result.get("NextShardIterator")269            records = result.get("Records", [])270            for record in records:271                record["ApproximateArrivalTimestamp"] = record[272                    "ApproximateArrivalTimestamp"273                ].timestamp()274                # boto3 automatically decodes records in get_records(), so we must re-encode275                record["Data"] = to_str(base64.b64encode(record["Data"]))276                last_sequence_number = record["SequenceNumber"]277            if not records:278                time.sleep(1)279                continue280            response = {281                "ChildShards": [],282                "ContinuationSequenceNumber": last_sequence_number,283                "MillisBehindLatest": 0,284                "Records": json_safe(records),285            }286            result = json.dumps(response)287            yield convert_to_binary_event_payload(result, event_type="SubscribeToShardEvent")288    headers = {}289    return send_events(), headers290def find_consumer(consumer_arn="", consumer_name="", stream_arn=""):291    stream_consumers = KinesisBackend.get().stream_consumers292    for consumer in stream_consumers:293        if consumer_arn and consumer_arn == consumer.get("ConsumerARN"):294            return consumer295        elif consumer_name == consumer.get("ConsumerName") and stream_arn == consumer.get(296            "StreamARN"297        ):298            return consumer299def find_stream_for_consumer(consumer_arn):300    kinesis = aws_stack.connect_to_service("kinesis")301    for stream_name in kinesis.list_streams()["StreamNames"]:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
