Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...112    # static tagging service instance113    TAGS = TaggingService()114    def __init__(self):115        self.delivery_streams = {}116def _get_description_or_raise_not_found(117    context, delivery_stream_name: str118) -> DeliveryStreamDescription:119    region = FirehoseBackend.get()120    delivery_stream_description = region.delivery_streams.get(delivery_stream_name)121    if not delivery_stream_description:122        raise ResourceNotFoundException(123            f"Firehose {delivery_stream_name} under account {context.account_id} " f"not found."124        )125    return delivery_stream_description126class FirehoseProvider(FirehoseApi):127    def create_delivery_stream(128        self,129        context: RequestContext,130        delivery_stream_name: DeliveryStreamName,131        delivery_stream_type: DeliveryStreamType = DeliveryStreamType.DirectPut,132        kinesis_stream_source_configuration: KinesisStreamSourceConfiguration = None,133        delivery_stream_encryption_configuration_input: DeliveryStreamEncryptionConfigurationInput = None,134        s3_destination_configuration: S3DestinationConfiguration = None,135        extended_s3_destination_configuration: ExtendedS3DestinationConfiguration = None,136        redshift_destination_configuration: RedshiftDestinationConfiguration = None,137        elasticsearch_destination_configuration: ElasticsearchDestinationConfiguration = None,138        amazonopensearchservice_destination_configuration: AmazonopensearchserviceDestinationConfiguration = None,139        splunk_destination_configuration: SplunkDestinationConfiguration = None,140        http_endpoint_destination_configuration: HttpEndpointDestinationConfiguration = None,141        tags: TagDeliveryStreamInputTagList = None,142    ) -> CreateDeliveryStreamOutput:143        region = FirehoseBackend.get()144        destinations: DestinationDescriptionList = []145        if elasticsearch_destination_configuration:146            destinations.append(147                DestinationDescription(148                    DestinationId=short_uid(),149                    ElasticsearchDestinationDescription=convert_es_config_to_desc(150                        elasticsearch_destination_configuration151                    ),152                )153            )154        if amazonopensearchservice_destination_configuration:155            destinations.append(156                DestinationDescription(157                    DestinationId=short_uid(),158                    AmazonopensearchserviceDestinationDescription=convert_opensearch_config_to_desc(159                        amazonopensearchservice_destination_configuration160                    ),161                )162            )163        if s3_destination_configuration or extended_s3_destination_configuration:164            destinations.append(165                DestinationDescription(166                    DestinationId=short_uid(),167                    S3DestinationDescription=convert_s3_config_to_desc(168                        s3_destination_configuration169                    ),170                    ExtendedS3DestinationDescription=convert_extended_s3_config_to_desc(171                        extended_s3_destination_configuration172                    ),173                )174            )175        if http_endpoint_destination_configuration:176            destinations.append(177                DestinationDescription(178                    DestinationId=short_uid(),179                    HttpEndpointDestinationDescription=convert_http_config_to_desc(180                        http_endpoint_destination_configuration181                    ),182                )183            )184        if splunk_destination_configuration:185            LOG.warning(186                "Delivery stream contains a splunk destination (which is currently not supported)."187            )188        if redshift_destination_configuration:189            LOG.warning(190                "Delivery stream contains a redshift destination (which is currently not supported)."191            )192        stream = DeliveryStreamDescription(193            DeliveryStreamName=delivery_stream_name,194            DeliveryStreamARN=firehose_stream_arn(195                stream_name=delivery_stream_name,196                account_id=context.account_id,197                region_name=context.region,198            ),199            DeliveryStreamStatus=DeliveryStreamStatus.ACTIVE,200            DeliveryStreamType=delivery_stream_type,201            HasMoreDestinations=False,202            VersionId="1",203            CreateTimestamp=datetime.now(),204            LastUpdateTimestamp=datetime.now(),205            Destinations=destinations,206            Source=convert_source_config_to_desc(kinesis_stream_source_configuration),207        )208        FirehoseBackend.TAGS.tag_resource(stream["DeliveryStreamARN"], tags)209        region.delivery_streams[delivery_stream_name] = stream210        # record event211        event_publisher.fire_event(212            event_publisher.EVENT_FIREHOSE_CREATE_STREAM,213            payload={"n": event_publisher.get_hash(delivery_stream_name)},214        )215        if delivery_stream_type == DeliveryStreamType.KinesisStreamAsSource:216            if not kinesis_stream_source_configuration:217                raise InvalidArgumentException("Missing delivery stream configuration")218            kinesis_stream_name = kinesis_stream_source_configuration["KinesisStreamARN"].split(219                "/"220            )[1]221            kinesis_connector.listen_to_kinesis(222                stream_name=kinesis_stream_name,223                fh_d_stream=delivery_stream_name,224                listener_func=self._process_records,225                wait_until_started=True,226                ddb_lease_table_suffix="-firehose",227            )228        return CreateDeliveryStreamOutput(DeliveryStreamARN=stream["DeliveryStreamARN"])229    def delete_delivery_stream(230        self,231        context: RequestContext,232        delivery_stream_name: DeliveryStreamName,233        allow_force_delete: BooleanObject = None,234    ) -> DeleteDeliveryStreamOutput:235        region = FirehoseBackend.get()236        delivery_stream_description = region.delivery_streams.pop(delivery_stream_name, {})237        if not delivery_stream_description:238            raise ResourceNotFoundException(239                f"Firehose {delivery_stream_name} under account {context.account_id} " f"not found."240            )241        # record event242        event_publisher.fire_event(243            event_publisher.EVENT_FIREHOSE_DELETE_STREAM,244            payload={"n": event_publisher.get_hash(delivery_stream_name)},245        )246        return DeleteDeliveryStreamOutput()247    def describe_delivery_stream(248        self,249        context: RequestContext,250        delivery_stream_name: DeliveryStreamName,251        limit: DescribeDeliveryStreamInputLimit = None,252        exclusive_start_destination_id: DestinationId = None,253    ) -> DescribeDeliveryStreamOutput:254        delivery_stream_description = _get_description_or_raise_not_found(255            context, delivery_stream_name256        )257        return DescribeDeliveryStreamOutput(DeliveryStreamDescription=delivery_stream_description)258    def list_delivery_streams(259        self,260        context: RequestContext,261        limit: ListDeliveryStreamsInputLimit = None,262        delivery_stream_type: DeliveryStreamType = None,263        exclusive_start_delivery_stream_name: DeliveryStreamName = None,264    ) -> ListDeliveryStreamsOutput:265        region = FirehoseBackend.get()266        delivery_stream_names = []267        for name, stream in region.delivery_streams.items():268            delivery_stream_names.append(stream["DeliveryStreamName"])269        return ListDeliveryStreamsOutput(270            DeliveryStreamNames=delivery_stream_names, HasMoreDeliveryStreams=False271        )272    def put_record(273        self,274        context: RequestContext,275        delivery_stream_name: DeliveryStreamName,276        record: Record,277    ) -> PutRecordOutput:278        record = self._reencode_record(record)279        return self._put_record(delivery_stream_name, record)280    def put_record_batch(281        self,282        context: RequestContext,283        delivery_stream_name: DeliveryStreamName,284        records: PutRecordBatchRequestEntryList,285    ) -> PutRecordBatchOutput:286        records = self._reencode_records(records)287        return PutRecordBatchOutput(288            FailedPutCount=0, RequestResponses=self._put_records(delivery_stream_name, records)289        )290    def tag_delivery_stream(291        self,292        context: RequestContext,293        delivery_stream_name: DeliveryStreamName,294        tags: TagDeliveryStreamInputTagList,295    ) -> TagDeliveryStreamOutput:296        delivery_stream_description = _get_description_or_raise_not_found(297            context, delivery_stream_name298        )299        FirehoseBackend.TAGS.tag_resource(delivery_stream_description["DeliveryStreamARN"], tags)300        return ListTagsForDeliveryStreamOutput()301    def list_tags_for_delivery_stream(302        self,303        context: RequestContext,304        delivery_stream_name: DeliveryStreamName,305        exclusive_start_tag_key: TagKey = None,306        limit: ListTagsForDeliveryStreamInputLimit = None,307    ) -> ListTagsForDeliveryStreamOutput:308        delivery_stream_description = _get_description_or_raise_not_found(309            context, delivery_stream_name310        )311        # The tagging service returns a dictionary with the given root name312        tags = FirehoseBackend.TAGS.list_tags_for_resource(313            arn=delivery_stream_description["DeliveryStreamARN"], root_name="root"314        )315        # Extract the actual list of tags for the typed response316        tag_list: ListTagsForDeliveryStreamOutputTagList = tags["root"]317        return ListTagsForDeliveryStreamOutput(Tags=tag_list, HasMoreTags=False)318    def untag_delivery_stream(319        self,320        context: RequestContext,321        delivery_stream_name: DeliveryStreamName,322        tag_keys: TagKeyList,323    ) -> UntagDeliveryStreamOutput:324        delivery_stream_description = _get_description_or_raise_not_found(325            context, delivery_stream_name326        )327        # The tagging service returns a dictionary with the given root name328        FirehoseBackend.TAGS.untag_resource(329            arn=delivery_stream_description["DeliveryStreamARN"], tag_names=tag_keys330        )331        return UntagDeliveryStreamOutput()332    def update_destination(333        self,334        context: RequestContext,335        delivery_stream_name: DeliveryStreamName,336        current_delivery_stream_version_id: DeliveryStreamVersionId,337        destination_id: DestinationId,338        s3_destination_update: S3DestinationUpdate = None,339        extended_s3_destination_update: ExtendedS3DestinationUpdate = None,340        redshift_destination_update: RedshiftDestinationUpdate = None,341        elasticsearch_destination_update: ElasticsearchDestinationUpdate = None,342        amazonopensearchservice_destination_update: AmazonopensearchserviceDestinationUpdate = None,343        splunk_destination_update: SplunkDestinationUpdate = None,344        http_endpoint_destination_update: HttpEndpointDestinationUpdate = None,345    ) -> UpdateDestinationOutput:346        delivery_stream_description = _get_description_or_raise_not_found(347            context, delivery_stream_name348        )349        destinations = delivery_stream_description["Destinations"]350        try:351            destination = next(filter(lambda d: d["DestinationId"] == destination_id, destinations))352        except StopIteration:353            destination = DestinationDescription(DestinationId=destination_id)354            delivery_stream_description["Destinations"].append(destination)355        if elasticsearch_destination_update:356            destination["ElasticsearchDestinationDescription"] = convert_es_update_to_desc(357                elasticsearch_destination_update358            )359        if amazonopensearchservice_destination_update:360            destination[...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
