Best Python code snippet using localstack_python
firehose_to_s3.py
Source:firehose_to_s3.py  
1import json2import logging3import time4import boto35from botocore.exceptions import ClientError6def get_firehose_arn(firehose_name):7    """Retrieve the ARN of the specified Firehose8    :param firehose_name: Firehose stream name9    :return: If the Firehose stream exists, return ARN, else None10    """11    # Try to get the description of the Firehose12    firehose_client = boto3.client('firehose')13    try:14        result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)15    except ClientError as e:16        logging.error(e)17        return None18    return result['DeliveryStreamDescription']['DeliveryStreamARN']19def firehose_exists(firehose_name):20    """Check if the specified Firehose exists21    :param firehose_name: Firehose stream name22    :return: True if Firehose exists, else False23    """24    # Try to get the description of the Firehose25    if get_firehose_arn(firehose_name) is None:26        return False27    return True28def get_iam_role_arn(iam_role_name):29    """Retrieve the ARN of the specified IAM role30    :param iam_role_name: IAM role name31    :return: If the IAM role exists, return ARN, else None32    """33    # Try to retrieve information about the role34    iam_client = boto3.client('iam')35    try:36        result = iam_client.get_role(RoleName=iam_role_name)37    except ClientError as e:38        logging.error(e)39        return None40    return result['Role']['Arn']41def iam_role_exists(iam_role_name):42    """Check if the specified IAM role exists43    :param iam_role_name: IAM role name44    :return: True if IAM role exists, else False45    """46    # Try to retrieve information about the role47    if get_iam_role_arn(iam_role_name) is None:48        return False49    return True50def create_iam_role_for_firehose_to_s3(iam_role_name, s3_bucket,51                                       firehose_src_stream=None):52    """Create an IAM role for a Firehose delivery system to S353    :param iam_role_name: Name of IAM role54    :param s3_bucket: ARN of S3 bucket55    :param firehose_src_stream: ARN of source Kinesis Data Stream. If56        Firehose data source is via direct puts then arg should be None.57    :return: ARN of IAM role. If error, returns None.58    """59    # Firehose trusted relationship policy document60    firehose_assume_role = {61        'Version': '2012-10-17',62        'Statement': [63            {64                'Sid': '',65                'Effect': 'Allow',66                'Principal': {67                    'Service': 'firehose.amazonaws.com'68                },69                'Action': 'sts:AssumeRole'70            }71        ]72    }73    iam_client = boto3.client('iam')74    try:75        result = iam_client.create_role(RoleName=iam_role_name,76                                        AssumeRolePolicyDocument=json.dumps(firehose_assume_role))77    except ClientError as e:78        logging.error(e)79        return None80    firehose_role_arn = result['Role']['Arn']81    # Define and attach a policy that grants sufficient S3 permissions82    policy_name = 'firehose_s3_access'83    s3_access = {84        "Version": "2012-10-17",85        "Statement": [86            {87                "Sid": "",88                "Effect": "Allow",89                "Action": [90                    "s3:AbortMultipartUpload",91                    "s3:GetBucketLocation",92                    "s3:GetObject",93                    "s3:ListBucket",94                    "s3:ListBucketMultipartUploads",95                    "s3:PutObject"96                ],97                "Resource": [98                    f"{s3_bucket}/*",99                    f"{s3_bucket}"100                ]101            }102        ]103    }104    try:105        iam_client.put_role_policy(RoleName=iam_role_name,106                                   PolicyName=policy_name,107                                   PolicyDocument=json.dumps(s3_access))108    except ClientError as e:109        logging.error(e)110        return None111    # If the Firehose source is a Kinesis data stream then access to the112    # stream must be allowed.113    if firehose_src_stream is not None:114        policy_name = 'firehose_kinesis_access'115        kinesis_access = {116            "Version": "2012-10-17",117            "Statement": [118                {119                    "Sid": "",120                    "Effect": "Allow",121                    "Action": [122                        "kinesis:DescribeStream",123                        "kinesis:GetShardIterator",124                        "kinesis:GetRecords"125                    ],126                    "Resource": [127                        f"{firehose_src_stream}"128                    ]129                }130             ]131        }132        try:133            iam_client.put_role_policy(RoleName=iam_role_name,134                                       PolicyName=policy_name,135                                       PolicyDocument=json.dumps(kinesis_access))136        except ClientError as e:137            logging.error(e)138            return None139    # Return the ARN of the created IAM role140    return firehose_role_arn141def create_firehose_to_s3(firehose_name, s3_bucket_arn, iam_role_name,142                          firehose_src_type='DirectPut',143                          firehose_src_stream=None):144    """Create a Kinesis Firehose delivery stream to S3145    The data source can be either a Kinesis Data Stream or puts sent directly146    to the Firehose stream.147    :param firehose_name: Delivery stream name148    :param s3_bucket_arn: ARN of S3 bucket149    :param iam_role_name: Name of Firehose-to-S3 IAM role. If the role doesn't150        exist, it is created.151    :param firehose_src_type: 'DirectPut' or 'KinesisStreamAsSource'152    :param firehose_src_stream: ARN of source Kinesis Data Stream. Required if153        firehose_src_type is 'KinesisStreamAsSource'154    :return: ARN of Firehose delivery stream. If error, returns None.155    """156    # Create Firehose-to-S3 IAM role if necessary157    if iam_role_exists(iam_role_name):158        # Retrieve its ARN159        iam_role = get_iam_role_arn(iam_role_name)160    else:161        iam_role = create_iam_role_for_firehose_to_s3(iam_role_name,162                                                      s3_bucket_arn,163                                                      firehose_src_stream)164        if iam_role is None:165            # Error creating IAM role166            return None167    # Create the S3 configuration dictionary168    # Both BucketARN and RoleARN are required169    # Set the buffer interval=60 seconds (Default=300 seconds)170    s3_config = {171        'BucketARN': s3_bucket_arn,172        'RoleARN': iam_role,173        'BufferingHints': {174            'IntervalInSeconds': 60,175        },176    }177    # Create the delivery stream178    # By default, the DeliveryStreamType='DirectPut'179    firehose_client = boto3.client('firehose')180    try:181        if firehose_src_type == 'KinesisStreamAsSource':182            # Define the Kinesis Data Stream configuration183            stream_config = {184                'KinesisStreamARN': firehose_src_stream,185                'RoleARN': iam_role,186            }187            result = firehose_client.create_delivery_stream(188                DeliveryStreamName=firehose_name,189                DeliveryStreamType=firehose_src_type,190                KinesisStreamSourceConfiguration=stream_config,191                ExtendedS3DestinationConfiguration=s3_config)192        else:193            result = firehose_client.create_delivery_stream(194                DeliveryStreamName=firehose_name,195                DeliveryStreamType=firehose_src_type,196                ExtendedS3DestinationConfiguration=s3_config)197    except ClientError as e:198        logging.error(e)199        return None200    return result['DeliveryStreamARN']201def wait_for_active_firehose(firehose_name):202    """Wait until the Firehose delivery stream is active203    :param firehose_name: Name of Firehose delivery stream204    :return: True if delivery stream is active. Otherwise, False.205    """206    # Wait until the stream is active207    firehose_client = boto3.client('firehose')208    while True:209        try:210            # Get the stream's current status211            result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)212        except ClientError as e:213            logging.error(e)214            return False215        status = result['DeliveryStreamDescription']['DeliveryStreamStatus']216        if status == 'ACTIVE':217            return True218        if status == 'DELETING':219            logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')220            return False221        time.sleep(2)222def main():223    """Exercise Kinesis Firehose methods"""224    # Assign these values before running the program225    # If the specified IAM role does not exist, it will be created226    firehose_name = 'firehose_to_s3_stream'227    bucket_arn = 'arn:aws:s3:::BUCKET_NAME'228    iam_role_name = 'firehose_to_s3'229    # Set up logging230    logging.basicConfig(level=logging.DEBUG,231                        format='%(levelname)s: %(asctime)s: %(message)s')232    # If Firehose doesn't exist, create it233    if not firehose_exists(firehose_name):234        # Create a Firehose delivery stream to S3. The Firehose will receive235        # data from direct puts.236        firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)237        if firehose_arn is None:238            exit(1)239        logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')240        # Wait for the stream to become active241        if not wait_for_active_firehose(firehose_name):242            exit(1)243        logging.info('Firehose stream is active')244    # Put records into the Firehose stream245    test_data_file = 'kinesis_test_data.txt'246    firehose_client = boto3.client('firehose')247    with open(test_data_file, 'r') as f:248        logging.info('Putting 20 records into the Firehose one at a time')249        for i in range(20):250            # Read a record of test data251            line = next(f)252            # Put the record into the Firehose stream253            try:254                firehose_client.put_record(DeliveryStreamName=firehose_name,255                                           Record={'Data': line})256            except ClientError as e:257                logging.error(e)258                exit(1)259        # Put 200 records in a batch260        logging.info('Putting 200 records into the Firehose in a batch')261        batch = [{'Data': next(f)} for x in range(200)]  # Read 200 records262        # Put the batch into the Firehose stream263        try:264            result = firehose_client.put_record_batch(DeliveryStreamName=firehose_name,265                                                      Records=batch)266        except ClientError as e:267            logging.error(e)268            exit(1)269        # Did any records in the batch not get processed?270        num_failures = result['FailedPutCount']271        '''272        # Test: Simulate a failed record273        num_failures = 1274        failed_rec_index = 3275        result['RequestResponses'][failed_rec_index]['ErrorCode'] = 404276        '''277        if num_failures:278            # Resend failed records279            logging.info(f'Resending {num_failures} failed records')280            rec_index = 0281            for record in result['RequestResponses']:282                if 'ErrorCode' in record:283                    # Resend the record284                    firehose_client.put_record(DeliveryStreamName=firehose_name,285                                               Record=batch[rec_index])286                    # Stop if all failed records have been resent287                    num_failures -= 1288                    if not num_failures:289                        break290                rec_index += 1291    logging.info('Test data sent to Firehose stream')292if __name__ == '__main__':...firehose_helpers.py
Source:firehose_helpers.py  
1import boto32from LocalTime import *3import json4def stream_firehose_event(firehose_name, event_data):5	event_data = add_timestamps_to_event(event_data)6	response = stream_firehose_string(firehose_name, json.dumps(event_data))7	return response8def stream_firehose_string(firehose_name, string_data):9	print("About to stream into firehose: " + firehose_name)10	firehose = boto3.client("firehose")11	record = {"Data": string_data}12	print("record=")13	print(record)14	response = firehose.put_record(DeliveryStreamName=firehose_name, Record=record)15	print(response)16	return response17def add_timestamps_to_event(event_data):18	local_time = LocalTime()19	if "@timestamp" not in event_data:20		event_data["@timestamp"] = local_time.get_utc_timestamp()21	if "@timestamp_local" not in event_data:22		event_data["@timestamp_local"] = local_time.get_local_timestamp()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
