How to use firehose_name method in localstack

Best Python code snippet using localstack_python

firehose_to_s3.py

Source:firehose_to_s3.py Github

copy

Full Screen

1import json2import logging3import time4import boto35from botocore.exceptions import ClientError6def get_firehose_arn(firehose_name):7 """Retrieve the ARN of the specified Firehose8 :param firehose_name: Firehose stream name9 :return: If the Firehose stream exists, return ARN, else None10 """11 # Try to get the description of the Firehose12 firehose_client = boto3.client('firehose')13 try:14 result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)15 except ClientError as e:16 logging.error(e)17 return None18 return result['DeliveryStreamDescription']['DeliveryStreamARN']19def firehose_exists(firehose_name):20 """Check if the specified Firehose exists21 :param firehose_name: Firehose stream name22 :return: True if Firehose exists, else False23 """24 # Try to get the description of the Firehose25 if get_firehose_arn(firehose_name) is None:26 return False27 return True28def get_iam_role_arn(iam_role_name):29 """Retrieve the ARN of the specified IAM role30 :param iam_role_name: IAM role name31 :return: If the IAM role exists, return ARN, else None32 """33 # Try to retrieve information about the role34 iam_client = boto3.client('iam')35 try:36 result = iam_client.get_role(RoleName=iam_role_name)37 except ClientError as e:38 logging.error(e)39 return None40 return result['Role']['Arn']41def iam_role_exists(iam_role_name):42 """Check if the specified IAM role exists43 :param iam_role_name: IAM role name44 :return: True if IAM role exists, else False45 """46 # Try to retrieve information about the role47 if get_iam_role_arn(iam_role_name) is None:48 return False49 return True50def create_iam_role_for_firehose_to_s3(iam_role_name, s3_bucket,51 firehose_src_stream=None):52 """Create an IAM role for a Firehose delivery system to S353 :param iam_role_name: Name of IAM role54 :param s3_bucket: ARN of S3 bucket55 :param firehose_src_stream: ARN of source Kinesis Data Stream. If56 Firehose data source is via direct puts then arg should be None.57 :return: ARN of IAM role. If error, returns None.58 """59 # Firehose trusted relationship policy document60 firehose_assume_role = {61 'Version': '2012-10-17',62 'Statement': [63 {64 'Sid': '',65 'Effect': 'Allow',66 'Principal': {67 'Service': 'firehose.amazonaws.com'68 },69 'Action': 'sts:AssumeRole'70 }71 ]72 }73 iam_client = boto3.client('iam')74 try:75 result = iam_client.create_role(RoleName=iam_role_name,76 AssumeRolePolicyDocument=json.dumps(firehose_assume_role))77 except ClientError as e:78 logging.error(e)79 return None80 firehose_role_arn = result['Role']['Arn']81 # Define and attach a policy that grants sufficient S3 permissions82 policy_name = 'firehose_s3_access'83 s3_access = {84 "Version": "2012-10-17",85 "Statement": [86 {87 "Sid": "",88 "Effect": "Allow",89 "Action": [90 "s3:AbortMultipartUpload",91 "s3:GetBucketLocation",92 "s3:GetObject",93 "s3:ListBucket",94 "s3:ListBucketMultipartUploads",95 "s3:PutObject"96 ],97 "Resource": [98 f"{s3_bucket}/*",99 f"{s3_bucket}"100 ]101 }102 ]103 }104 try:105 iam_client.put_role_policy(RoleName=iam_role_name,106 PolicyName=policy_name,107 PolicyDocument=json.dumps(s3_access))108 except ClientError as e:109 logging.error(e)110 return None111 # If the Firehose source is a Kinesis data stream then access to the112 # stream must be allowed.113 if firehose_src_stream is not None:114 policy_name = 'firehose_kinesis_access'115 kinesis_access = {116 "Version": "2012-10-17",117 "Statement": [118 {119 "Sid": "",120 "Effect": "Allow",121 "Action": [122 "kinesis:DescribeStream",123 "kinesis:GetShardIterator",124 "kinesis:GetRecords"125 ],126 "Resource": [127 f"{firehose_src_stream}"128 ]129 }130 ]131 }132 try:133 iam_client.put_role_policy(RoleName=iam_role_name,134 PolicyName=policy_name,135 PolicyDocument=json.dumps(kinesis_access))136 except ClientError as e:137 logging.error(e)138 return None139 # Return the ARN of the created IAM role140 return firehose_role_arn141def create_firehose_to_s3(firehose_name, s3_bucket_arn, iam_role_name,142 firehose_src_type='DirectPut',143 firehose_src_stream=None):144 """Create a Kinesis Firehose delivery stream to S3145 The data source can be either a Kinesis Data Stream or puts sent directly146 to the Firehose stream.147 :param firehose_name: Delivery stream name148 :param s3_bucket_arn: ARN of S3 bucket149 :param iam_role_name: Name of Firehose-to-S3 IAM role. If the role doesn't150 exist, it is created.151 :param firehose_src_type: 'DirectPut' or 'KinesisStreamAsSource'152 :param firehose_src_stream: ARN of source Kinesis Data Stream. Required if153 firehose_src_type is 'KinesisStreamAsSource'154 :return: ARN of Firehose delivery stream. If error, returns None.155 """156 # Create Firehose-to-S3 IAM role if necessary157 if iam_role_exists(iam_role_name):158 # Retrieve its ARN159 iam_role = get_iam_role_arn(iam_role_name)160 else:161 iam_role = create_iam_role_for_firehose_to_s3(iam_role_name,162 s3_bucket_arn,163 firehose_src_stream)164 if iam_role is None:165 # Error creating IAM role166 return None167 # Create the S3 configuration dictionary168 # Both BucketARN and RoleARN are required169 # Set the buffer interval=60 seconds (Default=300 seconds)170 s3_config = {171 'BucketARN': s3_bucket_arn,172 'RoleARN': iam_role,173 'BufferingHints': {174 'IntervalInSeconds': 60,175 },176 }177 # Create the delivery stream178 # By default, the DeliveryStreamType='DirectPut'179 firehose_client = boto3.client('firehose')180 try:181 if firehose_src_type == 'KinesisStreamAsSource':182 # Define the Kinesis Data Stream configuration183 stream_config = {184 'KinesisStreamARN': firehose_src_stream,185 'RoleARN': iam_role,186 }187 result = firehose_client.create_delivery_stream(188 DeliveryStreamName=firehose_name,189 DeliveryStreamType=firehose_src_type,190 KinesisStreamSourceConfiguration=stream_config,191 ExtendedS3DestinationConfiguration=s3_config)192 else:193 result = firehose_client.create_delivery_stream(194 DeliveryStreamName=firehose_name,195 DeliveryStreamType=firehose_src_type,196 ExtendedS3DestinationConfiguration=s3_config)197 except ClientError as e:198 logging.error(e)199 return None200 return result['DeliveryStreamARN']201def wait_for_active_firehose(firehose_name):202 """Wait until the Firehose delivery stream is active203 :param firehose_name: Name of Firehose delivery stream204 :return: True if delivery stream is active. Otherwise, False.205 """206 # Wait until the stream is active207 firehose_client = boto3.client('firehose')208 while True:209 try:210 # Get the stream's current status211 result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)212 except ClientError as e:213 logging.error(e)214 return False215 status = result['DeliveryStreamDescription']['DeliveryStreamStatus']216 if status == 'ACTIVE':217 return True218 if status == 'DELETING':219 logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')220 return False221 time.sleep(2)222def main():223 """Exercise Kinesis Firehose methods"""224 # Assign these values before running the program225 # If the specified IAM role does not exist, it will be created226 firehose_name = 'firehose_to_s3_stream'227 bucket_arn = 'arn:aws:s3:::BUCKET_NAME'228 iam_role_name = 'firehose_to_s3'229 # Set up logging230 logging.basicConfig(level=logging.DEBUG,231 format='%(levelname)s: %(asctime)s: %(message)s')232 # If Firehose doesn't exist, create it233 if not firehose_exists(firehose_name):234 # Create a Firehose delivery stream to S3. The Firehose will receive235 # data from direct puts.236 firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)237 if firehose_arn is None:238 exit(1)239 logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')240 # Wait for the stream to become active241 if not wait_for_active_firehose(firehose_name):242 exit(1)243 logging.info('Firehose stream is active')244 # Put records into the Firehose stream245 test_data_file = 'kinesis_test_data.txt'246 firehose_client = boto3.client('firehose')247 with open(test_data_file, 'r') as f:248 logging.info('Putting 20 records into the Firehose one at a time')249 for i in range(20):250 # Read a record of test data251 line = next(f)252 # Put the record into the Firehose stream253 try:254 firehose_client.put_record(DeliveryStreamName=firehose_name,255 Record={'Data': line})256 except ClientError as e:257 logging.error(e)258 exit(1)259 # Put 200 records in a batch260 logging.info('Putting 200 records into the Firehose in a batch')261 batch = [{'Data': next(f)} for x in range(200)] # Read 200 records262 # Put the batch into the Firehose stream263 try:264 result = firehose_client.put_record_batch(DeliveryStreamName=firehose_name,265 Records=batch)266 except ClientError as e:267 logging.error(e)268 exit(1)269 # Did any records in the batch not get processed?270 num_failures = result['FailedPutCount']271 '''272 # Test: Simulate a failed record273 num_failures = 1274 failed_rec_index = 3275 result['RequestResponses'][failed_rec_index]['ErrorCode'] = 404276 '''277 if num_failures:278 # Resend failed records279 logging.info(f'Resending {num_failures} failed records')280 rec_index = 0281 for record in result['RequestResponses']:282 if 'ErrorCode' in record:283 # Resend the record284 firehose_client.put_record(DeliveryStreamName=firehose_name,285 Record=batch[rec_index])286 # Stop if all failed records have been resent287 num_failures -= 1288 if not num_failures:289 break290 rec_index += 1291 logging.info('Test data sent to Firehose stream')292if __name__ == '__main__':...

Full Screen

Full Screen

firehose_helpers.py

Source:firehose_helpers.py Github

copy

Full Screen

1import boto32from LocalTime import *3import json4def stream_firehose_event(firehose_name, event_data):5 event_data = add_timestamps_to_event(event_data)6 response = stream_firehose_string(firehose_name, json.dumps(event_data))7 return response8def stream_firehose_string(firehose_name, string_data):9 print("About to stream into firehose: " + firehose_name)10 firehose = boto3.client("firehose")11 record = {"Data": string_data}12 print("record=")13 print(record)14 response = firehose.put_record(DeliveryStreamName=firehose_name, Record=record)15 print(response)16 return response17def add_timestamps_to_event(event_data):18 local_time = LocalTime()19 if "@timestamp" not in event_data:20 event_data["@timestamp"] = local_time.get_utc_timestamp()21 if "@timestamp_local" not in event_data:22 event_data["@timestamp_local"] = local_time.get_local_timestamp()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful