Best Python code snippet using localstack_python
manage.py
Source:manage.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import base644import boto35import json6import os7import re8import sys9from elasticsearch import Elasticsearch, RequestsHttpConnection, TransportError10from manager import Manager11from aws_requests_auth.boto_utils import BotoAWSRequestsAuth12manager = Manager()13boto_session = boto3.Session()14es = {}15def get_es_conn(es_domain_arn):16    if es_domain_arn not in es:17        es_domain = re.sub(18            r'^arn:aws:es:[a-z]*-[a-z]*-[0-9]*:[0-9]*:domain/(.*)$',19            r'\1',20            es_domain_arn)21        aws_es = boto_session.client('es')22        es_config = aws_es.describe_elasticsearch_domain(DomainName=es_domain)23        awsauth = BotoAWSRequestsAuth(24            aws_host=es_config['DomainStatus']['Endpoint'],25            aws_region=boto_session.region_name,26            aws_service='es'27        )28        es.update({es_domain_arn:29            Elasticsearch(30                hosts=[{'host': es_config['DomainStatus']['Endpoint'], 'port': 443}],31                http_auth=awsauth,32                use_ssl=True,33                verify_certs=True,34                connection_class=RequestsHttpConnection,35                retry_on_timeout=True36            )37        })38    return es[es_domain_arn]39def get_firehose_config(stream=None):40    if stream is None and not os.environ.get('STREAM_NAME', False):41        print('No stream name passed and STREAM_NAME not set in environment')42        sys.exit(1)43    firehose = boto_session.client('firehose')44    # Intentionally not catching the ResourceNotFoundException45    try:46        exists_resp = firehose.describe_delivery_stream(DeliveryStreamName=stream)47        if 'DeliveryStreamDescription' in exists_resp:48            return exists_resp['DeliveryStreamDescription']49    except firehose.exceptions.ResourceNotFoundException as e:50        print(e.message)51    except Exception:52        print(e.message)53    print('Something went wrong in retrieving the Firehose Stream configuration')54    sys.exit(1)55def get_s3info_from_config(config):56    info = {}57    if 'S3BackupMode' not in config:58        return info59    # S3 and Redshift destinations use this key60    if config['S3BackupMode'] == 'enabled':61        info.update({62            'ARN': config['S3BackupDescription']['BucketARN'],63            'prefix': config['S3BackupDescription']['Prefix'],64            'bucket': config['S3BackupDescription']['BucketARN'].replace('arn:aws:s3:::','')65        })66    # ES destinations use this key67    elif config['S3BackupMode'] == 'FailedDocumentsOnly':68        info.update({69            'ARN': config['S3DestinationDescription']['BucketARN'],70            'prefix': config['S3DestinationDescription']['Prefix'],71            'bucket': config['S3DestinationDescription']['BucketARN'].replace('arn:aws:s3:::','')72        })73    return info74@manager.command75def review(stream, pattern=None, year=None, month=None, day=None):76    """77    Review the list of failures from a Kinesis Firehose Stream78    """79    stream_config = get_firehose_config(stream)80    buckets = _extract_s3_info(stream_config)81    if len(buckets) < 1:82        print('Nothing to review, no stream destinations have S3 backups enabled.')83        sys.exit(1)84    elif len(buckets) > 1:85        print('WARNING: Review is possible, but cannot resubmit to firehose for delivery since there are multiple destinations defined on the stream.')86        print('Some destinations may have already accepted the records.')87    for destination in buckets:88        print('Keys in {}'.format(destination['bucket']))89        prefix = _build_prefix(destination, year=year, month=month, day=day)90        print('Prefix: {}'.format(prefix))91        bucket = boto_session.resource('s3').Bucket(destination['bucket'])92        bucketlist = bucket.objects.filter(Prefix=prefix)93        for subobj in sorted(bucketlist, key=lambda k: k.last_modified):94            display_key = subobj.key.replace(prefix, '')95            if pattern:96                if subobj.key.contains(pattern):97                    print('  {}'.format(display_key))98            else:99                print('  {}'.format(display_key))100def _build_prefix(s3_destination, year=None, month=None, day=None):101    prefix = s3_destination['prefix']102    if year:103        if prefix.endswith('/'):104            prefix = prefix[:-1]105        prefix = '{}/elasticsearch-failed/{}'.format(prefix, year)106        if month:107            if int(month) < 10:108                month = '0{}'.format(int(month))109            prefix = '{}/{}'.format(prefix, month)110            if day:111                if int(day) < 10:112                    day = '0{}'.format(int(day))113                prefix = '{}/{}'.format(prefix, day)114        prefix = '{}/'.format(prefix)115    return prefix116def _extract_s3_info(stream_config, es_only=False):117    buckets = []118    for destination in stream_config['Destinations']:119        if 'ExtendedS3DestinationDescription' in destination:120            info = get_s3info_from_config(destination['ExtendedS3DestinationDescription'])121            if info:122                buckets.append(info)123        if 'RedshiftDestinationDescription' in destination:124            info = get_s3info_from_config(destination['RedshiftDestinationDescription'])125            if info:126                buckets.append(info)127        if 'ElasticsearchDestinationDescription' in destination:128            info = get_s3info_from_config(destination['ElasticsearchDestinationDescription'])129            if info:130                buckets.append(info)131    return buckets132@manager.command133def show(stream, key, year=None, month=None, day=None):134    """135    Show the contents of a single failure report. Needs S3 key as provided by the review command136    """137    stream_config = get_firehose_config(stream)138    try:139        s3_info = _extract_s3_info(stream_config, es_only=True)[0]140    except IndexError:141        print('Cannot resubmit to ES, no ES destination defined in stream config.')142        sys.exit(1)143    prefix = _build_prefix(s3_info, year=year, month=month, day=day)144    print(json.dumps(get_failure_report(s3_info['bucket'], '{}{}'.format(prefix, key)), indent=4))145def get_failure_report(bucket, key):146    """147    Retrieves and decodes a failure report for display or resubmission148    """149    obj = boto_session.resource('s3').Object(bucket, key).get()150    body = obj['Body'].read()151    lines = []152    for line in body.split('\n'):153        if not line:154            continue155        try:156            newline = json.loads(line.replace('\r',''))157            newline['rawData'] = json.loads(base64.b64decode(newline['rawData']))158            lines.append(newline)159        except ValueError as e:160            print(line)161            raise e162        except TypeError as e1:163            print(line)164            raise e1165    return lines166@manager.command167def resubmit_to_es(stream, year=None, month=None, day=None):168    """169    Resubmit a day of failed records to ElasticSearch170    """171    stream_config = get_firehose_config(stream)172    try:173        s3_info = _extract_s3_info(stream_config, es_only=True)[0]174    except IndexError:175        print('Cannot resubmit to ES, no ES destination defined in stream config.')176        sys.exit(1)177    prefix = _build_prefix(s3_info, year=year, month=month, day=day)178    print('Prefix: {}'.format(prefix))179    bucket = boto_session.resource('s3').Bucket(s3_info['bucket'])180    bucketlist = bucket.objects.filter(Prefix=prefix)181    # Get list in reverse, because typically when processing a full month, we182    # want the most recent entries populated fastest183    for subobj in sorted(bucketlist, key=lambda k: k.last_modified, reverse=True):184        print('Attempting to resubmit: {}'.format(subobj.key))185        _resubmit_to_es(s3_info['bucket'], subobj.key, stream_config)186def _resubmit_to_es(bucket, key, stream_config):187    """188    Resubmit a failed record to ElasticSearch189    """190    es_destination_arn = None191    for destination in stream_config['Destinations']:192        if 'ElasticsearchDestinationDescription' in destination:193            es_destination_arn = destination['ElasticsearchDestinationDescription']['DomainARN']194            break195    if not es_destination_arn:196        print('There is no elasticsearch destination configured for stream {}'.format(stream))197        sys.exit(1)198    report = get_failure_report(bucket, key)199    es_conn = get_es_conn(es_destination_arn)200    all_resubmitted = True201    for failures in report:202        try:203            if 'esIndexName' not in failures:204                continue205            exception_message = failures['rawData'].get('response', {}).get('exception', {}).get('message', {})206            if exception_message:207                if not isinstance(exception_message, str):208                    failures['rawData']['response']['exception']['raw_message'] = json.dumps(exception_message)209                else:210                    failures['rawData']['response']['exception']['raw_message'] = exception_message211                failures['rawData']['response']['exception'].pop('message')212            request_data = failures['rawData'].get('request', {}).get('data', {})213            if request_data:214                if not isinstance(request_data, str):215                    failures['rawData']['request']['raw_data'] = json.dumps(request_data)216                else:217                    failures['rawData']['request']['raw_data'] = request_data218                failures['rawData']['request'].pop('data')219            print('Resubmitting document id: {}'.format(failures['esDocumentId']))220            response = es_conn.create(221                index=failures['esIndexName'],222                doc_type=failures['esTypeName'],223                body=failures['rawData'],224                id=failures['esDocumentId']225            )226        except TransportError as e:227            if e.status_code == 409 and e.error == 'version_conflict_engine_exception':228                pass229            else:230                all_resubmitted = False231                print('An transport error occurred resubmitting the data for {}: {}'.format(failures['esDocumentId'], e))232        except Exception as e:233            print('An error occurred resubmitting the data for {}: {}'.format(failures['esDocumentId'], e))234            all_resubmitted = False235    if all_resubmitted:236        boto_session.resource('s3').Object(bucket, key).delete()237        print('Removed {}'.format(key))238    else:239        print('Did not remove {}'.format(key))240if __name__ == '__main__':...kinesis_firehose_stream.py
Source:kinesis_firehose_stream.py  
1from typing import List, Optional2from cloudrail.knowledge.context.aws.resources.networking_config.network_configuration import NetworkConfiguration3from cloudrail.knowledge.context.aws.resources.networking_config.network_entity import NetworkEntity4from cloudrail.knowledge.context.aws.resources.service_name import AwsServiceName5from cloudrail.knowledge.utils.tags_utils import filter_tags6class KinesisFirehoseStream(NetworkEntity):7    """8        Attributes:9            stream_name: The name of the Kinesis Firehose Stream.10            stream_arn: The ARN of the Kinesis Firehose Stream.11            encrypted_at_rest: True if the stream is set to be encrypted.12            es_domain_arn: The ARN of the related ElasticSearch Domain, if any.13            es_vpc_config: The VPC configuration of the ElasticSearch Domain related14                to this stream, if any.15    """16    def __init__(self,17                 stream_name: str,18                 stream_arn: str,19                 encrypted_at_rest: bool,20                 account: str,21                 region: str,22                 es_domain_arn: Optional[str],23                 es_vpc_config: Optional[NetworkConfiguration]):24        super().__init__(stream_name, account, region, AwsServiceName.AWS_KINESIS_FIREHOSE_DELIVERY_STREAM)25        self.stream_name: str = stream_name26        self.stream_arn: str = stream_arn27        self.encrypted_at_rest: bool = encrypted_at_rest28        self.es_domain_arn: Optional[str] = es_domain_arn29        self.es_vpc_config: Optional[NetworkConfiguration] = es_vpc_config30    def get_keys(self) -> List[str]:31        return [self.stream_arn]32    def get_name(self) -> str:33        return self.stream_name34    def get_arn(self) -> str:35        return self.stream_arn36    def get_type(self, is_plural: bool = False) -> str:37        if not is_plural:38            return 'Kinesis Data Firehose'39        else:40            return 'Kinesis Data Firehoses'41    def get_all_network_configurations(self) -> Optional[List[NetworkConfiguration]]:42        if self.es_vpc_config:43            return [NetworkConfiguration(self.es_vpc_config.assign_public_ip,44                                         self.es_vpc_config.security_groups_ids,45                                         self.es_vpc_config.subnet_list_ids)]46        else:47            return None48    def get_cloud_resource_url(self) -> str:49        return '{0}firehose/home?region={1}#/details/{2}' \50            .format(self.AWS_CONSOLE_URL, self.region, self.stream_name)51    @property52    def is_tagable(self) -> bool:53        return True54    def to_drift_detection_object(self) -> dict:55        return {'tags': filter_tags(self.tags), 'stream_name': self.stream_name,56                'encrypted_at_rest': self.encrypted_at_rest,57                'es_domain_arn': self.es_domain_arn,58                'assign_public_ip': self.es_vpc_config and self.es_vpc_config.assign_public_ip,59                'security_groups_ids': self.es_vpc_config and self.es_vpc_config.security_groups_ids,...app.py
Source:app.py  
1#!/usr/bin/env python32from aws_cdk import core3from aws_es_recommended_cw_alarms.aws_es_recommended_cw_alarms_stack import (4    AwsEsRecommendedCwAlarmsStack,5)6import os7import sys8app = core.App()9CFN_STACK_NAME, ES_DOMAIN_ARN, AWS_PROFILE, CW_TRIGGER_SNS_ARN_LIST, ENABLE_ES_API_OUTPUT, ES_API_OUTPUT_SNS_ARN = (10    sys.argv[1],11    sys.argv[2],12    sys.argv[3],13    None if sys.argv[4] == 'None' else sys.argv[4].split(","),14    sys.argv[5] == "True",15    None if sys.argv[6] == 'None' else sys.argv[6],16)17AwsEsRecommendedCwAlarmsStack(18    app,19    CFN_STACK_NAME,20    ES_DOMAIN_ARN,21    AWS_PROFILE,22    CW_TRIGGER_SNS_ARN_LIST,23    ENABLE_ES_API_OUTPUT,24    ES_API_OUTPUT_SNS_ARN,25    env={"account": ES_DOMAIN_ARN.split(":")[4], "region": ES_DOMAIN_ARN.split(":")[3]},26)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
