Best Python code snippet using localstack_python
fixtures.py
Source:fixtures.py  
...764    run_safe(iam_client.delete_policy(PolicyArn=policy_arn))765@pytest.fixture766def create_iam_role_with_policy(iam_client):767    roles = {}768    def _create_role_and_policy(**kwargs):769        role = kwargs["RoleName"]770        policy = kwargs["PolicyName"]771        role_policy = json.dumps(kwargs["RoleDefinition"])772        result = iam_client.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)773        role_arn = result["Role"]["Arn"]774        policy_document = json.dumps(kwargs["PolicyDefinition"])775        iam_client.put_role_policy(RoleName=role, PolicyName=policy, PolicyDocument=policy_document)776        roles[role] = policy777        return role_arn778    yield _create_role_and_policy779    for role_name, policy_name in roles.items():780        iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)781        iam_client.delete_role(RoleName=role_name)782@pytest.fixture...test_logs.py
Source:test_logs.py  
...48    logs_client.create_log_stream(logGroupName=logs_log_group, logStreamName=name)49    yield name50    logs_client.delete_log_stream(logStreamName=name, logGroupName=logs_log_group)51@pytest.fixture52def iam_create_role_and_policy(iam_client):53    roles = {}54    def _create_role_and_policy(**kwargs):55        role = kwargs["RoleName"]56        policy = kwargs["PolicyName"]57        role_policy = json.dumps(kwargs["RoleDefinition"])58        result = iam_client.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)59        role_arn = result["Role"]["Arn"]60        policy_document = json.dumps(kwargs["PolicyDefinition"])61        iam_client.put_role_policy(RoleName=role, PolicyName=policy, PolicyDocument=policy_document)62        roles[role] = policy63        return role_arn64    yield _create_role_and_policy65    for role_name, policy_name in roles.items():66        iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)67        iam_client.delete_role(RoleName=role_name)68class TestCloudWatchLogs:69    # TODO make creation and description atomic to avoid possible flake?70    def test_create_and_delete_log_group(self, logs_client):71        test_name = f"test-log-group-{short_uid()}"72        log_groups_before = logs_client.describe_log_groups(73            logGroupNamePrefix="test-log-group-"74        ).get("logGroups", [])75        logs_client.create_log_group(logGroupName=test_name)76        log_groups_between = logs_client.describe_log_groups(77            logGroupNamePrefix="test-log-group-"78        ).get("logGroups", [])79        assert poll_condition(80            lambda: len(log_groups_between) == len(log_groups_before) + 1, timeout=5.0, interval=0.581        )82        logs_client.delete_log_group(logGroupName=test_name)83        log_groups_after = logs_client.describe_log_groups(84            logGroupNamePrefix="test-log-group-"85        ).get("logGroups", [])86        assert poll_condition(87            lambda: len(log_groups_after) == len(log_groups_between) - 1, timeout=5.0, interval=0.588        )89        assert len(log_groups_after) == len(log_groups_before)90    def test_list_tags_log_group(self, logs_client):91        test_name = f"test-log-group-{short_uid()}"92        logs_client.create_log_group(logGroupName=test_name, tags={"env": "testing1"})93        response = logs_client.list_tags_log_group(logGroupName=test_name)94        assert response["ResponseMetadata"]["HTTPStatusCode"] == 20095        assert "tags" in response96        assert response["tags"]["env"] == "testing1"97        # clean up98        logs_client.delete_log_group(logGroupName=test_name)99    def test_create_and_delete_log_stream(self, logs_client, logs_log_group):100        test_name = f"test-log-stream-{short_uid()}"101        log_streams_before = logs_client.describe_log_streams(logGroupName=logs_log_group).get(102            "logStreams", []103        )104        logs_client.create_log_stream(logGroupName=logs_log_group, logStreamName=test_name)105        log_streams_between = logs_client.describe_log_streams(logGroupName=logs_log_group).get(106            "logStreams", []107        )108        assert poll_condition(109            lambda: len(log_streams_between) == len(log_streams_before) + 1,110            timeout=5.0,111            interval=0.5,112        )113        logs_client.delete_log_stream(logGroupName=logs_log_group, logStreamName=test_name)114        log_streams_after = logs_client.describe_log_streams(logGroupName=logs_log_group).get(115            "logStreams", []116        )117        assert poll_condition(118            lambda: len(log_streams_between) - 1 == len(log_streams_after),119            timeout=5.0,120            interval=0.5,121        )122        assert len(log_streams_after) == len(log_streams_before)123    def test_put_events_multi_bytes_msg(self, logs_client, logs_log_group, logs_log_stream):124        body_msg = "ð - åã - æ¥æ¬èª"125        events = [{"timestamp": now_utc(millis=True), "message": body_msg}]126        response = logs_client.put_log_events(127            logGroupName=logs_log_group, logStreamName=logs_log_stream, logEvents=events128        )129        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200130        events = logs_client.get_log_events(131            logGroupName=logs_log_group, logStreamName=logs_log_stream132        )["events"]133        assert events[0]["message"] == body_msg134    def test_filter_log_events_response_header(self, logs_client, logs_log_group, logs_log_stream):135        events = [136            {"timestamp": now_utc(millis=True), "message": "log message 1"},137            {"timestamp": now_utc(millis=True), "message": "log message 2"},138        ]139        response = logs_client.put_log_events(140            logGroupName=logs_log_group, logStreamName=logs_log_stream, logEvents=events141        )142        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200143        response = logs_client.filter_log_events(logGroupName=logs_log_group)144        assert response["ResponseMetadata"]["HTTPStatusCode"] == 200145        assert (146            response["ResponseMetadata"]["HTTPHeaders"]["content-type"] == APPLICATION_AMZ_JSON_1_1147        )148    def test_put_subscription_filter_lambda(149        self,150        lambda_client,151        logs_client,152        logs_log_group,153        logs_log_stream,154        create_lambda_function,155        sts_client,156    ):157        test_lambda_name = f"test-lambda-function-{short_uid()}"158        create_lambda_function(159            handler_file=TEST_LAMBDA_PYTHON3,160            libs=TEST_LAMBDA_LIBS,161            func_name=test_lambda_name,162            runtime=LAMBDA_RUNTIME_PYTHON36,163        )164        try:165            lambda_client.invoke(FunctionName=test_lambda_name, Payload=b"{}")166            # get account-id to set the correct policy167            account_id = sts_client.get_caller_identity()["Account"]168            lambda_client.add_permission(169                FunctionName=test_lambda_name,170                StatementId=test_lambda_name,171                Principal=f"logs.{config.DEFAULT_REGION}.amazonaws.com",172                Action="lambda:InvokeFunction",173                SourceArn=f"arn:aws:logs:{config.DEFAULT_REGION}:{account_id}:log-group:{logs_log_group}:*",174                SourceAccount=account_id,175            )176            logs_client.put_subscription_filter(177                logGroupName=logs_log_group,178                filterName="test",179                filterPattern="",180                destinationArn=aws_stack.lambda_function_arn(181                    test_lambda_name, account_id=account_id, region_name=config.DEFAULT_REGION182                ),183            )184            logs_client.put_log_events(185                logGroupName=logs_log_group,186                logStreamName=logs_log_stream,187                logEvents=[188                    {"timestamp": now_utc(millis=True), "message": "test"},189                    {"timestamp": now_utc(millis=True), "message": "test 2"},190                ],191            )192            response = logs_client.describe_subscription_filters(logGroupName=logs_log_group)193            assert len(response["subscriptionFilters"]) == 1194            def check_invocation():195                events = testutil.get_lambda_log_events(196                    test_lambda_name, log_group=logs_log_group, logs_client=logs_client197                )198                assert len(events) == 2199                assert "test" in events200                assert "test 2" in events201            retry(check_invocation, retries=6, sleep=3.0)202        finally:203            # clean up lambda log group204            log_group_name = f"/aws/lambda/{test_lambda_name}"205            logs_client.delete_log_group(logGroupName=log_group_name)206    def test_put_subscription_filter_firehose(207        self,208        logs_client,209        logs_log_group,210        logs_log_stream,211        s3_bucket,212        s3_client,213        firehose_client,214        iam_client,215        iam_create_role_and_policy,216    ):217        try:218            firehose_name = f"test-firehose-{short_uid()}"219            s3_bucket_arn = f"arn:aws:s3:::{s3_bucket}"220            role = f"test-firehose-s3-role-{short_uid()}"221            policy_name = f"test-firehose-s3-role-policy-{short_uid()}"222            role_arn = iam_create_role_and_policy(223                RoleName=role,224                PolicyName=policy_name,225                RoleDefinition=s3_firehose_role,226                PolicyDefinition=s3_firehose_permission,227            )228            # TODO AWS has troubles creating the delivery stream the first time229            # policy is not accepted at first, so we try again230            def create_delivery_stream():231                firehose_client.create_delivery_stream(232                    DeliveryStreamName=firehose_name,233                    S3DestinationConfiguration={234                        "BucketARN": s3_bucket_arn,235                        "RoleARN": role_arn,236                        "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},237                    },238                )239            retry(create_delivery_stream, retries=5, sleep=10.0)240            response = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)241            firehose_arn = response["DeliveryStreamDescription"]["DeliveryStreamARN"]242            role = f"test-firehose-role-{short_uid()}"243            policy_name = f"test-firehose-role-policy-{short_uid()}"244            role_arn_logs = iam_create_role_and_policy(245                RoleName=role,246                PolicyName=policy_name,247                RoleDefinition=logs_role,248                PolicyDefinition=firehose_permission,249            )250            def check_stream_active():251                state = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)[252                    "DeliveryStreamDescription"253                ]["DeliveryStreamStatus"]254                if state != "ACTIVE":255                    raise Exception(f"DeliveryStreamStatus is {state}")256            retry(check_stream_active, retries=60, sleep=30.0)257            logs_client.put_subscription_filter(258                logGroupName=logs_log_group,259                filterName="Destination",260                filterPattern="",261                destinationArn=firehose_arn,262                roleArn=role_arn_logs,263            )264            logs_client.put_log_events(265                logGroupName=logs_log_group,266                logStreamName=logs_log_stream,267                logEvents=[268                    {"timestamp": now_utc(millis=True), "message": "test"},269                    {"timestamp": now_utc(millis=True), "message": "test 2"},270                ],271            )272            def list_objects():273                response = s3_client.list_objects(Bucket=s3_bucket)274                assert len(response["Contents"]) >= 1275            retry(list_objects, retries=60, sleep=30.0)276            response = s3_client.list_objects(Bucket=s3_bucket)277            key = response["Contents"][-1]["Key"]278            response = s3_client.get_object(Bucket=s3_bucket, Key=key)279            content = gzip.decompress(response["Body"].read()).decode("utf-8")280            assert "DATA_MESSAGE" in content281            assert "test" in content282            assert "test 2" in content283        finally:284            # clean up285            firehose_client.delete_delivery_stream(286                DeliveryStreamName=firehose_name, AllowForceDelete=True287            )288    def test_put_subscription_filter_kinesis(289        self,290        logs_client,291        logs_log_group,292        logs_log_stream,293        kinesis_client,294        iam_client,295        iam_create_role_and_policy,296    ):297        kinesis_name = f"test-kinesis-{short_uid()}"298        filter_name = "Destination"299        kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)300        try:301            result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]302            kinesis_arn = result["StreamARN"]303            role = f"test-kinesis-role-{short_uid()}"304            policy_name = f"test-kinesis-role-policy-{short_uid()}"305            role_arn = iam_create_role_and_policy(306                RoleName=role,307                PolicyName=policy_name,308                RoleDefinition=logs_role,309                PolicyDefinition=kinesis_permission,310            )311            # wait for stream-status "ACTIVE"312            status = result["StreamStatus"]313            if status != "ACTIVE":314                def check_stream_active():315                    state = kinesis_client.describe_stream(StreamName=kinesis_name)[316                        "StreamDescription"317                    ]["StreamStatus"]318                    if state != "ACTIVE":319                        raise Exception(f"StreamStatus is {state}")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
