Best Python code snippet using localstack_python
test_logs.py
Source:test_logs.py  
...201            firehose_name = f"test-firehose-{short_uid()}"202            s3_bucket_arn = f"arn:aws:s3:::{s3_bucket}"203            role = f"test-firehose-s3-role-{short_uid()}"204            policy_name = f"test-firehose-s3-role-policy-{short_uid()}"205            role_arn = create_iam_role_with_policy(206                RoleName=role,207                PolicyName=policy_name,208                RoleDefinition=s3_firehose_role,209                PolicyDefinition=s3_firehose_permission,210            )211            # TODO AWS has troubles creating the delivery stream the first time212            # policy is not accepted at first, so we try again213            def create_delivery_stream():214                firehose_client.create_delivery_stream(215                    DeliveryStreamName=firehose_name,216                    S3DestinationConfiguration={217                        "BucketARN": s3_bucket_arn,218                        "RoleARN": role_arn,219                        "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},220                    },221                )222            retry(create_delivery_stream, retries=5, sleep=10.0)223            response = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)224            firehose_arn = response["DeliveryStreamDescription"]["DeliveryStreamARN"]225            role = f"test-firehose-role-{short_uid()}"226            policy_name = f"test-firehose-role-policy-{short_uid()}"227            role_arn_logs = create_iam_role_with_policy(228                RoleName=role,229                PolicyName=policy_name,230                RoleDefinition=logs_role,231                PolicyDefinition=firehose_permission,232            )233            def check_stream_active():234                state = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)[235                    "DeliveryStreamDescription"236                ]["DeliveryStreamStatus"]237                if state != "ACTIVE":238                    raise Exception(f"DeliveryStreamStatus is {state}")239            retry(check_stream_active, retries=60, sleep=30.0)240            logs_client.put_subscription_filter(241                logGroupName=logs_log_group,242                filterName="Destination",243                filterPattern="",244                destinationArn=firehose_arn,245                roleArn=role_arn_logs,246            )247            logs_client.put_log_events(248                logGroupName=logs_log_group,249                logStreamName=logs_log_stream,250                logEvents=[251                    {"timestamp": now_utc(millis=True), "message": "test"},252                    {"timestamp": now_utc(millis=True), "message": "test 2"},253                ],254            )255            def list_objects():256                response = s3_client.list_objects(Bucket=s3_bucket)257                assert len(response["Contents"]) >= 1258            retry(list_objects, retries=60, sleep=30.0)259            response = s3_client.list_objects(Bucket=s3_bucket)260            key = response["Contents"][-1]["Key"]261            response = s3_client.get_object(Bucket=s3_bucket, Key=key)262            content = gzip.decompress(response["Body"].read()).decode("utf-8")263            assert "DATA_MESSAGE" in content264            assert "test" in content265            assert "test 2" in content266        finally:267            # clean up268            firehose_client.delete_delivery_stream(269                DeliveryStreamName=firehose_name, AllowForceDelete=True270            )271    def test_put_subscription_filter_kinesis(272        self,273        logs_client,274        logs_log_group,275        logs_log_stream,276        kinesis_client,277        iam_client,278        create_iam_role_with_policy,279    ):280        kinesis_name = f"test-kinesis-{short_uid()}"281        filter_name = "Destination"282        kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)283        try:284            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]285            kinesis_arn = result["StreamARN"]286            role = f"test-kinesis-role-{short_uid()}"287            policy_name = f"test-kinesis-role-policy-{short_uid()}"288            role_arn = create_iam_role_with_policy(289                RoleName=role,290                PolicyName=policy_name,291                RoleDefinition=logs_role,292                PolicyDefinition=kinesis_permission,293            )294            # wait for stream-status "ACTIVE"295            status = result["StreamStatus"]296            if status != "ACTIVE":297                def check_stream_active():298                    state = kinesis_client.describe_stream(StreamName=kinesis_name)[299                        "StreamDescription"300                    ]["StreamStatus"]301                    if state != "ACTIVE":302                        raise Exception(f"StreamStatus is {state}")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
