Best Python code snippet using localstack_python
firehose_to_s3.py
Source:firehose_to_s3.py  
...220    except ClientError as e:221        logging.error(e)222        return None223    return result['DeliveryStreamARN']224def wait_for_active_firehose(firehose_name):225    """Wait until the Firehose delivery stream is active226    :param firehose_name: Name of Firehose delivery stream227    :return: True if delivery stream is active. Otherwise, False.228    """229    # Wait until the stream is active230    firehose_client = boto3.client('firehose')231    while True:232        try:233            # Get the stream's current status234            result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)235        except ClientError as e:236            logging.error(e)237            return False238        status = result['DeliveryStreamDescription']['DeliveryStreamStatus']239        if status == 'ACTIVE':240            return True241        if status == 'DELETING':242            logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')243            return False244        time.sleep(2)245def main():246    """Exercise Kinesis Firehose methods"""247    # Assign these values before running the program248    # If the specified IAM role does not exist, it will be created249    firehose_name = 'firehose_to_s3_stream'250    bucket_arn = 'arn:aws:s3:::BUCKET_NAME'251    iam_role_name = 'firehose_to_s3'252    # Set up logging253    logging.basicConfig(level=logging.DEBUG,254                        format='%(levelname)s: %(asctime)s: %(message)s')255    # If Firehose doesn't exist, create it256    if not firehose_exists(firehose_name):257        # Create a Firehose delivery stream to S3. The Firehose will receive258        # data from direct puts.259        firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)260        if firehose_arn is None:261            exit(1)262        logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')263        # Wait for the stream to become active264        if not wait_for_active_firehose(firehose_name):265            exit(1)266        logging.info('Firehose stream is active')267    # Put records into the Firehose stream268    test_data_file = 'kinesis_test_data.txt'269    firehose_client = boto3.client('firehose')270    with open(test_data_file, 'r') as f:271        logging.info('Putting 20 records into the Firehose one at a time')272        for i in range(20):273            # Read a record of test data274            line = next(f)275            # Put the record into the Firehose stream276            try:277                firehose_client.put_record(DeliveryStreamName=firehose_name,278                                           Record={'Data': line})...firehose_stack.py
Source:firehose_stack.py  
...10        self._supported_in_region = self.is_service_supported_in_region()11        delivery_bucket = self.create_s3_delivery_bucket()12        self.create_log_group_and_stream()13        firehose_role_arn = self.create_firehose_role(delivery_bucket)14        firehose = self.create_firehose(delivery_bucket, firehose_role_arn)15        firehose_stream_name = firehose.ref16        self._parameters_to_save["firehose_stream_name"] = firehose_stream_name17        self.create_test_policies(common_stack)18        self.save_parameters_in_parameter_store(Platform.IOS)19    def create_s3_delivery_bucket(self) -> aws_s3.Bucket:20        delivery_bucket = aws_s3.Bucket(21            self, "integ_test_firehose_delivery_bucket", removal_policy=core.RemovalPolicy.DESTROY22        )23        return delivery_bucket24    def create_log_group_and_stream(self) -> aws_logs.LogGroup:25        log_group = aws_logs.LogGroup(26            self,27            "integ_test_firehose_delivery_log_group",28            log_group_name=FirehoseStack.LOG_GROUP_NAME,29            removal_policy=core.RemovalPolicy.DESTROY,30            retention=aws_logs.RetentionDays.FIVE_DAYS,31        )32        aws_logs.LogStream(33            self,34            "integ_test_firehose_delivery_log_stream",35            log_group=log_group,36            log_stream_name=FirehoseStack.LOG_STREAM_NAME,37            removal_policy=core.RemovalPolicy.DESTROY,38        )39        return log_group40    def create_firehose_role(self, delivery_bucket) -> str:41        """42        Creates an IAM role to allow Kinesis to deliver records to S3, per43        https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html44        :param delivery_bucket: The destination bucket45        :return: IAM Role ARN46        """47        firehose_role = aws_iam.Role(48            self,49            "integ_test_firehose_delivery_role",50            assumed_by=aws_iam.ServicePrincipal("firehose.amazonaws.com"),51        )52        firehose_role.add_to_policy(53            aws_iam.PolicyStatement(54                effect=aws_iam.Effect.ALLOW,55                actions=[56                    "s3:AbortMultipartUpload",57                    "s3:GetBucketLocation",58                    "s3:GetObject",59                    "s3:ListBucket",60                    "s3:ListBucketMultipartUploads",61                    "s3:PutObject",62                ],63                resources=[delivery_bucket.bucket_arn, f"{delivery_bucket.bucket_arn}/*"],64            )65        )66        firehose_role.add_to_policy(67            aws_iam.PolicyStatement(68                effect=aws_iam.Effect.ALLOW,69                actions=[70                    "kinesis:DescribeStream",71                    "kinesis:GetShardIterator",72                    "kinesis:GetRecords",73                    "kinesis:ListShards",74                ],75                resources=[f"arn:aws:kinesis:{self.region}:{self.account}:stream/*"],76            )77        )78        log_stream_arn = ":".join(79            [80                "arn:aws:logs",81                self.region,82                self.account,83                "log-group",84                FirehoseStack.LOG_GROUP_NAME,85                "log-stream",86                FirehoseStack.LOG_STREAM_NAME,87            ]88        )89        firehose_role.add_to_policy(90            aws_iam.PolicyStatement(91                effect=aws_iam.Effect.ALLOW,92                actions=["logs:PutLogEvents"],93                resources=[log_stream_arn],94            )95        )96        return firehose_role.role_arn97    def create_firehose(98        self, delivery_bucket, firehose_role_arn99    ) -> aws_kinesisfirehose.CfnDeliveryStream:100        """101        Creates a Firehose DeliveryStream configured to deliver to the S3 Bucket `delivery_bucket`,102        and log errors to a log stream named 'S3Delivery' in `log_group`. Firehose will adopt the103        role specified in `firehose_role_arn`.104        :param delivery_bucket: The delivery destination bucket for the Firehose105        :param firehose_role_arn: The role to adopt106        :return: a CfnDeliveryStream107        """108        firehose = aws_kinesisfirehose.CfnDeliveryStream(109            self,110            "integ_test_firehose",111            extended_s3_destination_configuration={...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
