How to use firehose_client method in localstack

Best Python code snippet using localstack_python

firehose_to_s3.py

Source:firehose_to_s3.py Github

copy

Full Screen

1# snippet-comment:[These are tags for the AWS doc team's sample catalog. Do not remove.]2# snippet-sourcedescription:[firehose_to_s3.py demonstrates how to create and use an Amazon Kinesis Data Firehose delivery stream to Amazon S3.]3# snippet-service:[firehose]4# snippet-keyword:[Amazon Kinesis Data Firehose]5# snippet-keyword:[Python]6# snippet-sourcesyntax:[python]7# snippet-sourcesyntax:[python]8# snippet-keyword:[Code Sample]9# snippet-sourcetype:[snippet]10# snippet-sourcedate:[2019-05-15]11# snippet-sourceauthor:[AWS]12# Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.13#14# Licensed under the Apache License, Version 2.0 (the "License"). You15# may not use this file except in compliance with the License. A copy of16# the License is located at17#18# http://aws.amazon.com/apache2.0/19#20# or in the "license" file accompanying this file. This file is21# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF22# ANY KIND, either express or implied. See the License for the specific23# language governing permissions and limitations under the License.24import json25import logging26import time27import boto328from botocore.exceptions import ClientError29def get_firehose_arn(firehose_name):30 """Retrieve the ARN of the specified Firehose31 :param firehose_name: Firehose stream name32 :return: If the Firehose stream exists, return ARN, else None33 """34 # Try to get the description of the Firehose35 firehose_client = boto3.client('firehose')36 try:37 result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)38 except ClientError as e:39 logging.error(e)40 return None41 return result['DeliveryStreamDescription']['DeliveryStreamARN']42def firehose_exists(firehose_name):43 """Check if the specified Firehose exists44 :param firehose_name: Firehose stream name45 :return: True if Firehose exists, else False46 """47 # Try to get the description of the Firehose48 if get_firehose_arn(firehose_name) is None:49 return False50 return True51def get_iam_role_arn(iam_role_name):52 """Retrieve the ARN of the specified IAM role53 :param iam_role_name: IAM role name54 :return: If the IAM role exists, return ARN, else None55 """56 # Try to retrieve information about the role57 iam_client = boto3.client('iam')58 try:59 result = iam_client.get_role(RoleName=iam_role_name)60 except ClientError as e:61 logging.error(e)62 return None63 return result['Role']['Arn']64def iam_role_exists(iam_role_name):65 """Check if the specified IAM role exists66 :param iam_role_name: IAM role name67 :return: True if IAM role exists, else False68 """69 # Try to retrieve information about the role70 if get_iam_role_arn(iam_role_name) is None:71 return False72 return True73def create_iam_role_for_firehose_to_s3(iam_role_name, s3_bucket,74 firehose_src_stream=None):75 """Create an IAM role for a Firehose delivery system to S376 :param iam_role_name: Name of IAM role77 :param s3_bucket: ARN of S3 bucket78 :param firehose_src_stream: ARN of source Kinesis Data Stream. If79 Firehose data source is via direct puts then arg should be None.80 :return: ARN of IAM role. If error, returns None.81 """82 # Firehose trusted relationship policy document83 firehose_assume_role = {84 'Version': '2012-10-17',85 'Statement': [86 {87 'Sid': '',88 'Effect': 'Allow',89 'Principal': {90 'Service': 'firehose.amazonaws.com'91 },92 'Action': 'sts:AssumeRole'93 }94 ]95 }96 iam_client = boto3.client('iam')97 try:98 result = iam_client.create_role(RoleName=iam_role_name,99 AssumeRolePolicyDocument=json.dumps(firehose_assume_role))100 except ClientError as e:101 logging.error(e)102 return None103 firehose_role_arn = result['Role']['Arn']104 # Define and attach a policy that grants sufficient S3 permissions105 policy_name = 'firehose_s3_access'106 s3_access = {107 "Version": "2012-10-17",108 "Statement": [109 {110 "Sid": "",111 "Effect": "Allow",112 "Action": [113 "s3:AbortMultipartUpload",114 "s3:GetBucketLocation",115 "s3:GetObject",116 "s3:ListBucket",117 "s3:ListBucketMultipartUploads",118 "s3:PutObject"119 ],120 "Resource": [121 f"{s3_bucket}/*",122 f"{s3_bucket}"123 ]124 }125 ]126 }127 try:128 iam_client.put_role_policy(RoleName=iam_role_name,129 PolicyName=policy_name,130 PolicyDocument=json.dumps(s3_access))131 except ClientError as e:132 logging.error(e)133 return None134 # If the Firehose source is a Kinesis data stream then access to the135 # stream must be allowed.136 if firehose_src_stream is not None:137 policy_name = 'firehose_kinesis_access'138 kinesis_access = {139 "Version": "2012-10-17",140 "Statement": [141 {142 "Sid": "",143 "Effect": "Allow",144 "Action": [145 "kinesis:DescribeStream",146 "kinesis:GetShardIterator",147 "kinesis:GetRecords"148 ],149 "Resource": [150 f"{firehose_src_stream}"151 ]152 }153 ]154 }155 try:156 iam_client.put_role_policy(RoleName=iam_role_name,157 PolicyName=policy_name,158 PolicyDocument=json.dumps(kinesis_access))159 except ClientError as e:160 logging.error(e)161 return None162 # Return the ARN of the created IAM role163 return firehose_role_arn164def create_firehose_to_s3(firehose_name, s3_bucket_arn, iam_role_name,165 firehose_src_type='DirectPut',166 firehose_src_stream=None):167 """Create a Kinesis Firehose delivery stream to S3168 The data source can be either a Kinesis Data Stream or puts sent directly169 to the Firehose stream.170 :param firehose_name: Delivery stream name171 :param s3_bucket_arn: ARN of S3 bucket172 :param iam_role_name: Name of Firehose-to-S3 IAM role. If the role doesn't173 exist, it is created.174 :param firehose_src_type: 'DirectPut' or 'KinesisStreamAsSource'175 :param firehose_src_stream: ARN of source Kinesis Data Stream. Required if176 firehose_src_type is 'KinesisStreamAsSource'177 :return: ARN of Firehose delivery stream. If error, returns None.178 """179 # Create Firehose-to-S3 IAM role if necessary180 if iam_role_exists(iam_role_name):181 # Retrieve its ARN182 iam_role = get_iam_role_arn(iam_role_name)183 else:184 iam_role = create_iam_role_for_firehose_to_s3(iam_role_name,185 s3_bucket_arn,186 firehose_src_stream)187 if iam_role is None:188 # Error creating IAM role189 return None190 # Create the S3 configuration dictionary191 # Both BucketARN and RoleARN are required192 # Set the buffer interval=60 seconds (Default=300 seconds)193 s3_config = {194 'BucketARN': s3_bucket_arn,195 'RoleARN': iam_role,196 'BufferingHints': {197 'IntervalInSeconds': 60,198 },199 }200 # Create the delivery stream201 # By default, the DeliveryStreamType='DirectPut'202 firehose_client = boto3.client('firehose')203 try:204 if firehose_src_type == 'KinesisStreamAsSource':205 # Define the Kinesis Data Stream configuration206 stream_config = {207 'KinesisStreamARN': firehose_src_stream,208 'RoleARN': iam_role,209 }210 result = firehose_client.create_delivery_stream(211 DeliveryStreamName=firehose_name,212 DeliveryStreamType=firehose_src_type,213 KinesisStreamSourceConfiguration=stream_config,214 ExtendedS3DestinationConfiguration=s3_config)215 else:216 result = firehose_client.create_delivery_stream(217 DeliveryStreamName=firehose_name,218 DeliveryStreamType=firehose_src_type,219 ExtendedS3DestinationConfiguration=s3_config)220 except ClientError as e:221 logging.error(e)222 return None223 return result['DeliveryStreamARN']224def wait_for_active_firehose(firehose_name):225 """Wait until the Firehose delivery stream is active226 :param firehose_name: Name of Firehose delivery stream227 :return: True if delivery stream is active. Otherwise, False.228 """229 # Wait until the stream is active230 firehose_client = boto3.client('firehose')231 while True:232 try:233 # Get the stream's current status234 result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)235 except ClientError as e:236 logging.error(e)237 return False238 status = result['DeliveryStreamDescription']['DeliveryStreamStatus']239 if status == 'ACTIVE':240 return True241 if status == 'DELETING':242 logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')243 return False244 time.sleep(2)245def main():246 """Exercise Kinesis Firehose methods"""247 # Assign these values before running the program248 # If the specified IAM role does not exist, it will be created249 firehose_name = 'firehose_to_s3_stream'250 bucket_arn = 'arn:aws:s3:::BUCKET_NAME'251 iam_role_name = 'firehose_to_s3'252 # Set up logging253 logging.basicConfig(level=logging.DEBUG,254 format='%(levelname)s: %(asctime)s: %(message)s')255 # If Firehose doesn't exist, create it256 if not firehose_exists(firehose_name):257 # Create a Firehose delivery stream to S3. The Firehose will receive258 # data from direct puts.259 firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)260 if firehose_arn is None:261 exit(1)262 logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')263 # Wait for the stream to become active264 if not wait_for_active_firehose(firehose_name):265 exit(1)266 logging.info('Firehose stream is active')267 # Put records into the Firehose stream268 test_data_file = 'kinesis_test_data.txt'269 firehose_client = boto3.client('firehose')270 with open(test_data_file, 'r') as f:271 logging.info('Putting 20 records into the Firehose one at a time')272 for i in range(20):273 # Read a record of test data274 line = next(f)275 # Put the record into the Firehose stream276 try:277 firehose_client.put_record(DeliveryStreamName=firehose_name,278 Record={'Data': line})279 except ClientError as e:280 logging.error(e)281 exit(1)282 # Put 200 records in a batch283 logging.info('Putting 200 records into the Firehose in a batch')284 batch = [{'Data': next(f)} for x in range(200)] # Read 200 records285 # Put the batch into the Firehose stream286 try:287 result = firehose_client.put_record_batch(DeliveryStreamName=firehose_name,288 Records=batch)289 except ClientError as e:290 logging.error(e)291 exit(1)292 # Did any records in the batch not get processed?293 num_failures = result['FailedPutCount']294 '''295 # Test: Simulate a failed record296 num_failures = 1297 failed_rec_index = 3298 result['RequestResponses'][failed_rec_index]['ErrorCode'] = 404299 '''300 if num_failures:301 # Resend failed records302 logging.info(f'Resending {num_failures} failed records')303 rec_index = 0304 for record in result['RequestResponses']:305 if 'ErrorCode' in record:306 # Resend the record307 firehose_client.put_record(DeliveryStreamName=firehose_name,308 Record=batch[rec_index])309 # Stop if all failed records have been resent310 num_failures -= 1311 if not num_failures:312 break313 rec_index += 1314 logging.info('Test data sent to Firehose stream')315if __name__ == '__main__':...

Full Screen

Full Screen

kinesis_stream.py

Source:kinesis_stream.py Github

copy

Full Screen

1from secret import aws_key_id, aws_secret_key2import boto33from data_fetch import Data_Fetch4class Fire_Hose:5 def __init__(self):6 firehose_client = boto3.client('firehose', 7 region_name = 'us-east-1',8 aws_access_key_id = aws_key_id,9 aws_secret_access_key = aws_secret_key)10 11 self.firehose_client = firehose_client12 def add_to_stream(self, json_entry):13 client = self.firehose_client14 client.put_record(DeliveryStreamName = 'nasa-datahose-02', 15 Record={'Data':json_entry}16 )17 18 19 def check_streams(self):20 client = self.firehose_client21 print(client.describe_delivery_stream(22 DeliveryStreamName='nasa-datahose-02'23 ))24 # print(client.list_delivery_streams())25def main():26 fetch = Data_Fetch()27 hose = Fire_Hose()28 json_entry = fetch.access_api()29 print(json_entry)30 hose.add_to_stream(json_entry)31 # hose.check_streams()32if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful