Best Python code snippet using localstack_python
fixtures.py
Source:fixtures.py  
...573        except Exception as e:574            LOG.debug("error cleaning up kinesis stream %s: %s", stream_name, e)575@pytest.fixture576def wait_for_stream_ready(kinesis_client):577    def _wait_for_stream_ready(stream_name: str):578        def is_stream_ready():579            describe_stream_response = kinesis_client.describe_stream(StreamName=stream_name)580            return describe_stream_response["StreamDescription"]["StreamStatus"] in [581                "ACTIVE",582                "UPDATING",583            ]584        poll_condition(is_stream_ready)585    return _wait_for_stream_ready586@pytest.fixture587def wait_for_delivery_stream_ready(firehose_client):588    def _wait_for_stream_ready(delivery_stream_name: str):589        def is_stream_ready():590            describe_stream_response = firehose_client.describe_delivery_stream(591                DeliveryStreamName=delivery_stream_name592            )593            return (594                describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]595                == "ACTIVE"596            )597        poll_condition(is_stream_ready)598    return _wait_for_stream_ready599@pytest.fixture600def wait_for_dynamodb_stream_ready(dynamodbstreams_client):601    def _wait_for_stream_ready(stream_arn: str):602        def is_stream_ready():603            describe_stream_response = dynamodbstreams_client.describe_stream(StreamArn=stream_arn)604            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"605        poll_condition(is_stream_ready)606    return _wait_for_stream_ready607@pytest.fixture()608def kms_create_key(kms_client):609    key_ids = []610    def _create_key(**kwargs):611        if "Description" not in kwargs:612            kwargs["Description"] = f"test description - {short_uid()}"613        if "KeyUsage" not in kwargs:614            kwargs["KeyUsage"] = "ENCRYPT_DECRYPT"615        key_metadata = kms_client.create_key(**kwargs)["KeyMetadata"]...test_kinesis.py
Source:test_kinesis.py  
...225        )226        return response.get("ShardIterator")227@pytest.fixture228def wait_for_stream_ready(kinesis_client):229    def _wait_for_stream_ready(stream_name: str):230        def is_stream_ready():231            describe_stream_response = kinesis_client.describe_stream(StreamName=stream_name)232            return describe_stream_response["StreamDescription"]["StreamStatus"] in [233                "ACTIVE",234                "UPDATING",235            ]236        poll_condition(is_stream_ready)237    return _wait_for_stream_ready238def test_get_records_next_shard_iterator(239    kinesis_client, kinesis_create_stream, wait_for_stream_ready240):241    stream_name = kinesis_create_stream()242    wait_for_stream_ready(stream_name)243    first_stream_shard_data = kinesis_client.describe_stream(StreamName=stream_name)[...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
