Best Python code snippet using localstack_python
firehose_to_s3.py
Source:firehose_to_s3.py  
1# snippet-comment:[These are tags for the AWS doc team's sample catalog. Do not remove.]2# snippet-sourcedescription:[firehose_to_s3.py demonstrates how to create and use an Amazon Kinesis Data Firehose delivery stream to Amazon S3.]3# snippet-service:[firehose]4# snippet-keyword:[Amazon Kinesis Data Firehose]5# snippet-keyword:[Python]6# snippet-sourcesyntax:[python]7# snippet-sourcesyntax:[python]8# snippet-keyword:[Code Sample]9# snippet-sourcetype:[snippet]10# snippet-sourcedate:[2019-05-15]11# snippet-sourceauthor:[AWS]12# Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.13#14# Licensed under the Apache License, Version 2.0 (the "License"). You15# may not use this file except in compliance with the License. A copy of16# the License is located at17#18# http://aws.amazon.com/apache2.0/19#20# or in the "license" file accompanying this file. This file is21# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF22# ANY KIND, either express or implied. See the License for the specific23# language governing permissions and limitations under the License.24import json25import logging26import time27import boto328from botocore.exceptions import ClientError29def get_firehose_arn(firehose_name):30    """Retrieve the ARN of the specified Firehose31    :param firehose_name: Firehose stream name32    :return: If the Firehose stream exists, return ARN, else None33    """34    # Try to get the description of the Firehose35    firehose_client = boto3.client('firehose')36    try:37        result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)38    except ClientError as e:39        logging.error(e)40        return None41    return result['DeliveryStreamDescription']['DeliveryStreamARN']42def firehose_exists(firehose_name):43    """Check if the specified Firehose exists44    :param firehose_name: Firehose stream name45    :return: True if Firehose exists, else False46    """47    # Try to get the description of the Firehose48    if get_firehose_arn(firehose_name) is None:49        return False50    return True51def get_iam_role_arn(iam_role_name):52    """Retrieve the ARN of the specified IAM role53    :param iam_role_name: IAM role name54    :return: If the IAM role exists, return ARN, else None55    """56    # Try to retrieve information about the role57    iam_client = boto3.client('iam')58    try:59        result = iam_client.get_role(RoleName=iam_role_name)60    except ClientError as e:61        logging.error(e)62        return None63    return result['Role']['Arn']64def iam_role_exists(iam_role_name):65    """Check if the specified IAM role exists66    :param iam_role_name: IAM role name67    :return: True if IAM role exists, else False68    """69    # Try to retrieve information about the role70    if get_iam_role_arn(iam_role_name) is None:71        return False72    return True73def create_iam_role_for_firehose_to_s3(iam_role_name, s3_bucket,74                                       firehose_src_stream=None):75    """Create an IAM role for a Firehose delivery system to S376    :param iam_role_name: Name of IAM role77    :param s3_bucket: ARN of S3 bucket78    :param firehose_src_stream: ARN of source Kinesis Data Stream. If79        Firehose data source is via direct puts then arg should be None.80    :return: ARN of IAM role. If error, returns None.81    """82    # Firehose trusted relationship policy document83    firehose_assume_role = {84        'Version': '2012-10-17',85        'Statement': [86            {87                'Sid': '',88                'Effect': 'Allow',89                'Principal': {90                    'Service': 'firehose.amazonaws.com'91                },92                'Action': 'sts:AssumeRole'93            }94        ]95    }96    iam_client = boto3.client('iam')97    try:98        result = iam_client.create_role(RoleName=iam_role_name,99                                        AssumeRolePolicyDocument=json.dumps(firehose_assume_role))100    except ClientError as e:101        logging.error(e)102        return None103    firehose_role_arn = result['Role']['Arn']104    # Define and attach a policy that grants sufficient S3 permissions105    policy_name = 'firehose_s3_access'106    s3_access = {107        "Version": "2012-10-17",108        "Statement": [109            {110                "Sid": "",111                "Effect": "Allow",112                "Action": [113                    "s3:AbortMultipartUpload",114                    "s3:GetBucketLocation",115                    "s3:GetObject",116                    "s3:ListBucket",117                    "s3:ListBucketMultipartUploads",118                    "s3:PutObject"119                ],120                "Resource": [121                    f"{s3_bucket}/*",122                    f"{s3_bucket}"123                ]124            }125        ]126    }127    try:128        iam_client.put_role_policy(RoleName=iam_role_name,129                                   PolicyName=policy_name,130                                   PolicyDocument=json.dumps(s3_access))131    except ClientError as e:132        logging.error(e)133        return None134    # If the Firehose source is a Kinesis data stream then access to the135    # stream must be allowed.136    if firehose_src_stream is not None:137        policy_name = 'firehose_kinesis_access'138        kinesis_access = {139            "Version": "2012-10-17",140            "Statement": [141                {142                    "Sid": "",143                    "Effect": "Allow",144                    "Action": [145                        "kinesis:DescribeStream",146                        "kinesis:GetShardIterator",147                        "kinesis:GetRecords"148                    ],149                    "Resource": [150                        f"{firehose_src_stream}"151                    ]152                }153             ]154        }155        try:156            iam_client.put_role_policy(RoleName=iam_role_name,157                                       PolicyName=policy_name,158                                       PolicyDocument=json.dumps(kinesis_access))159        except ClientError as e:160            logging.error(e)161            return None162    # Return the ARN of the created IAM role163    return firehose_role_arn164def create_firehose_to_s3(firehose_name, s3_bucket_arn, iam_role_name,165                          firehose_src_type='DirectPut',166                          firehose_src_stream=None):167    """Create a Kinesis Firehose delivery stream to S3168    The data source can be either a Kinesis Data Stream or puts sent directly169    to the Firehose stream.170    :param firehose_name: Delivery stream name171    :param s3_bucket_arn: ARN of S3 bucket172    :param iam_role_name: Name of Firehose-to-S3 IAM role. If the role doesn't173        exist, it is created.174    :param firehose_src_type: 'DirectPut' or 'KinesisStreamAsSource'175    :param firehose_src_stream: ARN of source Kinesis Data Stream. Required if176        firehose_src_type is 'KinesisStreamAsSource'177    :return: ARN of Firehose delivery stream. If error, returns None.178    """179    # Create Firehose-to-S3 IAM role if necessary180    if iam_role_exists(iam_role_name):181        # Retrieve its ARN182        iam_role = get_iam_role_arn(iam_role_name)183    else:184        iam_role = create_iam_role_for_firehose_to_s3(iam_role_name,185                                                      s3_bucket_arn,186                                                      firehose_src_stream)187        if iam_role is None:188            # Error creating IAM role189            return None190    # Create the S3 configuration dictionary191    # Both BucketARN and RoleARN are required192    # Set the buffer interval=60 seconds (Default=300 seconds)193    s3_config = {194        'BucketARN': s3_bucket_arn,195        'RoleARN': iam_role,196        'BufferingHints': {197            'IntervalInSeconds': 60,198        },199    }200    # Create the delivery stream201    # By default, the DeliveryStreamType='DirectPut'202    firehose_client = boto3.client('firehose')203    try:204        if firehose_src_type == 'KinesisStreamAsSource':205            # Define the Kinesis Data Stream configuration206            stream_config = {207                'KinesisStreamARN': firehose_src_stream,208                'RoleARN': iam_role,209            }210            result = firehose_client.create_delivery_stream(211                DeliveryStreamName=firehose_name,212                DeliveryStreamType=firehose_src_type,213                KinesisStreamSourceConfiguration=stream_config,214                ExtendedS3DestinationConfiguration=s3_config)215        else:216            result = firehose_client.create_delivery_stream(217                DeliveryStreamName=firehose_name,218                DeliveryStreamType=firehose_src_type,219                ExtendedS3DestinationConfiguration=s3_config)220    except ClientError as e:221        logging.error(e)222        return None223    return result['DeliveryStreamARN']224def wait_for_active_firehose(firehose_name):225    """Wait until the Firehose delivery stream is active226    :param firehose_name: Name of Firehose delivery stream227    :return: True if delivery stream is active. Otherwise, False.228    """229    # Wait until the stream is active230    firehose_client = boto3.client('firehose')231    while True:232        try:233            # Get the stream's current status234            result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)235        except ClientError as e:236            logging.error(e)237            return False238        status = result['DeliveryStreamDescription']['DeliveryStreamStatus']239        if status == 'ACTIVE':240            return True241        if status == 'DELETING':242            logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')243            return False244        time.sleep(2)245def main():246    """Exercise Kinesis Firehose methods"""247    # Assign these values before running the program248    # If the specified IAM role does not exist, it will be created249    firehose_name = 'firehose_to_s3_stream'250    bucket_arn = 'arn:aws:s3:::BUCKET_NAME'251    iam_role_name = 'firehose_to_s3'252    # Set up logging253    logging.basicConfig(level=logging.DEBUG,254                        format='%(levelname)s: %(asctime)s: %(message)s')255    # If Firehose doesn't exist, create it256    if not firehose_exists(firehose_name):257        # Create a Firehose delivery stream to S3. The Firehose will receive258        # data from direct puts.259        firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)260        if firehose_arn is None:261            exit(1)262        logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')263        # Wait for the stream to become active264        if not wait_for_active_firehose(firehose_name):265            exit(1)266        logging.info('Firehose stream is active')267    # Put records into the Firehose stream268    test_data_file = 'kinesis_test_data.txt'269    firehose_client = boto3.client('firehose')270    with open(test_data_file, 'r') as f:271        logging.info('Putting 20 records into the Firehose one at a time')272        for i in range(20):273            # Read a record of test data274            line = next(f)275            # Put the record into the Firehose stream276            try:277                firehose_client.put_record(DeliveryStreamName=firehose_name,278                                           Record={'Data': line})279            except ClientError as e:280                logging.error(e)281                exit(1)282        # Put 200 records in a batch283        logging.info('Putting 200 records into the Firehose in a batch')284        batch = [{'Data': next(f)} for x in range(200)]  # Read 200 records285        # Put the batch into the Firehose stream286        try:287            result = firehose_client.put_record_batch(DeliveryStreamName=firehose_name,288                                                      Records=batch)289        except ClientError as e:290            logging.error(e)291            exit(1)292        # Did any records in the batch not get processed?293        num_failures = result['FailedPutCount']294        '''295        # Test: Simulate a failed record296        num_failures = 1297        failed_rec_index = 3298        result['RequestResponses'][failed_rec_index]['ErrorCode'] = 404299        '''300        if num_failures:301            # Resend failed records302            logging.info(f'Resending {num_failures} failed records')303            rec_index = 0304            for record in result['RequestResponses']:305                if 'ErrorCode' in record:306                    # Resend the record307                    firehose_client.put_record(DeliveryStreamName=firehose_name,308                                               Record=batch[rec_index])309                    # Stop if all failed records have been resent310                    num_failures -= 1311                    if not num_failures:312                        break313                rec_index += 1314    logging.info('Test data sent to Firehose stream')315if __name__ == '__main__':...kinesis_stream.py
Source:kinesis_stream.py  
1from secret import aws_key_id, aws_secret_key2import boto33from data_fetch import Data_Fetch4class Fire_Hose:5    def __init__(self):6        firehose_client = boto3.client('firehose', 7                                region_name = 'us-east-1',8                                aws_access_key_id = aws_key_id,9                                aws_secret_access_key = aws_secret_key)10        11        self.firehose_client = firehose_client12    def add_to_stream(self, json_entry):13        client = self.firehose_client14        client.put_record(DeliveryStreamName = 'nasa-datahose-02', 15                            Record={'Data':json_entry}16                        )17        18    19    def check_streams(self):20        client = self.firehose_client21        print(client.describe_delivery_stream(22                DeliveryStreamName='nasa-datahose-02'23            ))24        # print(client.list_delivery_streams())25def main():26    fetch = Data_Fetch()27    hose = Fire_Hose()28    json_entry = fetch.access_api()29    print(json_entry)30    hose.add_to_stream(json_entry)31    # hose.check_streams()32if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
