Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...274        context: RequestContext,275        delivery_stream_name: DeliveryStreamName,276        record: Record,277    ) -> PutRecordOutput:278        record = self._reencode_record(record)279        return self._put_record(delivery_stream_name, record)280    def put_record_batch(281        self,282        context: RequestContext,283        delivery_stream_name: DeliveryStreamName,284        records: PutRecordBatchRequestEntryList,285    ) -> PutRecordBatchOutput:286        records = self._reencode_records(records)287        return PutRecordBatchOutput(288            FailedPutCount=0, RequestResponses=self._put_records(delivery_stream_name, records)289        )290    def tag_delivery_stream(291        self,292        context: RequestContext,293        delivery_stream_name: DeliveryStreamName,294        tags: TagDeliveryStreamInputTagList,295    ) -> TagDeliveryStreamOutput:296        delivery_stream_description = _get_description_or_raise_not_found(297            context, delivery_stream_name298        )299        FirehoseBackend.TAGS.tag_resource(delivery_stream_description["DeliveryStreamARN"], tags)300        return ListTagsForDeliveryStreamOutput()301    def list_tags_for_delivery_stream(302        self,303        context: RequestContext,304        delivery_stream_name: DeliveryStreamName,305        exclusive_start_tag_key: TagKey = None,306        limit: ListTagsForDeliveryStreamInputLimit = None,307    ) -> ListTagsForDeliveryStreamOutput:308        delivery_stream_description = _get_description_or_raise_not_found(309            context, delivery_stream_name310        )311        # The tagging service returns a dictionary with the given root name312        tags = FirehoseBackend.TAGS.list_tags_for_resource(313            arn=delivery_stream_description["DeliveryStreamARN"], root_name="root"314        )315        # Extract the actual list of tags for the typed response316        tag_list: ListTagsForDeliveryStreamOutputTagList = tags["root"]317        return ListTagsForDeliveryStreamOutput(Tags=tag_list, HasMoreTags=False)318    def untag_delivery_stream(319        self,320        context: RequestContext,321        delivery_stream_name: DeliveryStreamName,322        tag_keys: TagKeyList,323    ) -> UntagDeliveryStreamOutput:324        delivery_stream_description = _get_description_or_raise_not_found(325            context, delivery_stream_name326        )327        # The tagging service returns a dictionary with the given root name328        FirehoseBackend.TAGS.untag_resource(329            arn=delivery_stream_description["DeliveryStreamARN"], tag_names=tag_keys330        )331        return UntagDeliveryStreamOutput()332    def update_destination(333        self,334        context: RequestContext,335        delivery_stream_name: DeliveryStreamName,336        current_delivery_stream_version_id: DeliveryStreamVersionId,337        destination_id: DestinationId,338        s3_destination_update: S3DestinationUpdate = None,339        extended_s3_destination_update: ExtendedS3DestinationUpdate = None,340        redshift_destination_update: RedshiftDestinationUpdate = None,341        elasticsearch_destination_update: ElasticsearchDestinationUpdate = None,342        amazonopensearchservice_destination_update: AmazonopensearchserviceDestinationUpdate = None,343        splunk_destination_update: SplunkDestinationUpdate = None,344        http_endpoint_destination_update: HttpEndpointDestinationUpdate = None,345    ) -> UpdateDestinationOutput:346        delivery_stream_description = _get_description_or_raise_not_found(347            context, delivery_stream_name348        )349        destinations = delivery_stream_description["Destinations"]350        try:351            destination = next(filter(lambda d: d["DestinationId"] == destination_id, destinations))352        except StopIteration:353            destination = DestinationDescription(DestinationId=destination_id)354            delivery_stream_description["Destinations"].append(destination)355        if elasticsearch_destination_update:356            destination["ElasticsearchDestinationDescription"] = convert_es_update_to_desc(357                elasticsearch_destination_update358            )359        if amazonopensearchservice_destination_update:360            destination[361                "AmazonopensearchserviceDestinationDescription"362            ] = convert_opensearch_update_to_desc(amazonopensearchservice_destination_update)363        if s3_destination_update:364            destination["S3DestinationDescription"] = convert_s3_update_to_desc(365                s3_destination_update366            )367        if extended_s3_destination_update:368            destination["ExtendedS3DestinationDescription"] = convert_extended_s3_update_to_desc(369                extended_s3_destination_update370            )371        if http_endpoint_destination_update:372            destination["HttpEndpointDestinationDescription"] = convert_http_update_to_desc(373                http_endpoint_destination_update374            )375        return UpdateDestinationOutput()376    def _reencode_record(self, record: Record) -> Record:377        """378        The ASF decodes the record's data automatically. But most of the service integrations (kinesis, lambda, http)379        are working with the base64 encoded data.380        """381        if "Data" in record:382            record["Data"] = base64.b64encode(record["Data"])383        return record384    def _reencode_records(self, records: List[Record]) -> List[Record]:385        return [self._reencode_record(r) for r in records]386    def _process_records(self, records: List[Record], shard_id: str, fh_d_stream: str):387        """Process the given records from the underlying Kinesis stream"""388        return self._put_records(fh_d_stream, records)389    def _put_record(self, delivery_stream_name: str, record: Record) -> PutRecordOutput:390        """Put a record to the firehose stream from a PutRecord API call"""391        result = self._put_records(delivery_stream_name, [record])392        return PutRecordOutput(RecordId=result[0]["RecordId"])393    def _put_records(394        self, delivery_stream_name: str, unprocessed_records: List[Record]395    ) -> List[PutRecordBatchResponseEntry]:396        """Put a list of records to the firehose stream - either directly from a PutRecord API call, or397        received from an underlying Kinesis stream (if 'KinesisStreamAsSource' is configured)"""398        region = FirehoseBackend.get()399        delivery_stream_description = region.delivery_streams.get(delivery_stream_name)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
