Best Python code snippet using localstack_python
test_logs.py
Source:test_logs.py  
...246                PolicyName=policy_name,247                RoleDefinition=logs_role,248                PolicyDefinition=firehose_permission,249            )250            def check_stream_active():251                state = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)[252                    "DeliveryStreamDescription"253                ]["DeliveryStreamStatus"]254                if state != "ACTIVE":255                    raise Exception(f"DeliveryStreamStatus is {state}")256            retry(check_stream_active, retries=60, sleep=30.0)257            logs_client.put_subscription_filter(258                logGroupName=logs_log_group,259                filterName="Destination",260                filterPattern="",261                destinationArn=firehose_arn,262                roleArn=role_arn_logs,263            )264            logs_client.put_log_events(265                logGroupName=logs_log_group,266                logStreamName=logs_log_stream,267                logEvents=[268                    {"timestamp": now_utc(millis=True), "message": "test"},269                    {"timestamp": now_utc(millis=True), "message": "test 2"},270                ],271            )272            def list_objects():273                response = s3_client.list_objects(Bucket=s3_bucket)274                assert len(response["Contents"]) >= 1275            retry(list_objects, retries=60, sleep=30.0)276            response = s3_client.list_objects(Bucket=s3_bucket)277            key = response["Contents"][-1]["Key"]278            response = s3_client.get_object(Bucket=s3_bucket, Key=key)279            content = gzip.decompress(response["Body"].read()).decode("utf-8")280            assert "DATA_MESSAGE" in content281            assert "test" in content282            assert "test 2" in content283        finally:284            # clean up285            firehose_client.delete_delivery_stream(286                DeliveryStreamName=firehose_name, AllowForceDelete=True287            )288    def test_put_subscription_filter_kinesis(289        self,290        logs_client,291        logs_log_group,292        logs_log_stream,293        kinesis_client,294        iam_client,295        iam_create_role_and_policy,296    ):297        kinesis_name = f"test-kinesis-{short_uid()}"298        filter_name = "Destination"299        kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)300        try:301            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]302            kinesis_arn = result["StreamARN"]303            role = f"test-kinesis-role-{short_uid()}"304            policy_name = f"test-kinesis-role-policy-{short_uid()}"305            role_arn = iam_create_role_and_policy(306                RoleName=role,307                PolicyName=policy_name,308                RoleDefinition=logs_role,309                PolicyDefinition=kinesis_permission,310            )311            # wait for stream-status "ACTIVE"312            status = result["StreamStatus"]313            if status != "ACTIVE":314                def check_stream_active():315                    state = kinesis_client.describe_stream(StreamName=kinesis_name)[316                        "StreamDescription"317                    ]["StreamStatus"]318                    if state != "ACTIVE":319                        raise Exception(f"StreamStatus is {state}")320                retry(check_stream_active, retries=6, sleep=1.0, sleep_before=2.0)321            def put_subscription_filter():322                logs_client.put_subscription_filter(323                    logGroupName=logs_log_group,324                    filterName=filter_name,325                    filterPattern="",326                    destinationArn=kinesis_arn,327                    roleArn=role_arn,328                )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
