How to use _get_description_or_raise_not_found method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...112 # static tagging service instance113 TAGS = TaggingService()114 def __init__(self):115 self.delivery_streams = {}116def _get_description_or_raise_not_found(117 context, delivery_stream_name: str118) -> DeliveryStreamDescription:119 region = FirehoseBackend.get()120 delivery_stream_description = region.delivery_streams.get(delivery_stream_name)121 if not delivery_stream_description:122 raise ResourceNotFoundException(123 f"Firehose {delivery_stream_name} under account {context.account_id} " f"not found."124 )125 return delivery_stream_description126class FirehoseProvider(FirehoseApi):127 def create_delivery_stream(128 self,129 context: RequestContext,130 delivery_stream_name: DeliveryStreamName,131 delivery_stream_type: DeliveryStreamType = DeliveryStreamType.DirectPut,132 kinesis_stream_source_configuration: KinesisStreamSourceConfiguration = None,133 delivery_stream_encryption_configuration_input: DeliveryStreamEncryptionConfigurationInput = None,134 s3_destination_configuration: S3DestinationConfiguration = None,135 extended_s3_destination_configuration: ExtendedS3DestinationConfiguration = None,136 redshift_destination_configuration: RedshiftDestinationConfiguration = None,137 elasticsearch_destination_configuration: ElasticsearchDestinationConfiguration = None,138 amazonopensearchservice_destination_configuration: AmazonopensearchserviceDestinationConfiguration = None,139 splunk_destination_configuration: SplunkDestinationConfiguration = None,140 http_endpoint_destination_configuration: HttpEndpointDestinationConfiguration = None,141 tags: TagDeliveryStreamInputTagList = None,142 ) -> CreateDeliveryStreamOutput:143 region = FirehoseBackend.get()144 destinations: DestinationDescriptionList = []145 if elasticsearch_destination_configuration:146 destinations.append(147 DestinationDescription(148 DestinationId=short_uid(),149 ElasticsearchDestinationDescription=convert_es_config_to_desc(150 elasticsearch_destination_configuration151 ),152 )153 )154 if amazonopensearchservice_destination_configuration:155 destinations.append(156 DestinationDescription(157 DestinationId=short_uid(),158 AmazonopensearchserviceDestinationDescription=convert_opensearch_config_to_desc(159 amazonopensearchservice_destination_configuration160 ),161 )162 )163 if s3_destination_configuration or extended_s3_destination_configuration:164 destinations.append(165 DestinationDescription(166 DestinationId=short_uid(),167 S3DestinationDescription=convert_s3_config_to_desc(168 s3_destination_configuration169 ),170 ExtendedS3DestinationDescription=convert_extended_s3_config_to_desc(171 extended_s3_destination_configuration172 ),173 )174 )175 if http_endpoint_destination_configuration:176 destinations.append(177 DestinationDescription(178 DestinationId=short_uid(),179 HttpEndpointDestinationDescription=convert_http_config_to_desc(180 http_endpoint_destination_configuration181 ),182 )183 )184 if splunk_destination_configuration:185 LOG.warning(186 "Delivery stream contains a splunk destination (which is currently not supported)."187 )188 if redshift_destination_configuration:189 LOG.warning(190 "Delivery stream contains a redshift destination (which is currently not supported)."191 )192 stream = DeliveryStreamDescription(193 DeliveryStreamName=delivery_stream_name,194 DeliveryStreamARN=firehose_stream_arn(195 stream_name=delivery_stream_name,196 account_id=context.account_id,197 region_name=context.region,198 ),199 DeliveryStreamStatus=DeliveryStreamStatus.ACTIVE,200 DeliveryStreamType=delivery_stream_type,201 HasMoreDestinations=False,202 VersionId="1",203 CreateTimestamp=datetime.now(),204 LastUpdateTimestamp=datetime.now(),205 Destinations=destinations,206 Source=convert_source_config_to_desc(kinesis_stream_source_configuration),207 )208 FirehoseBackend.TAGS.tag_resource(stream["DeliveryStreamARN"], tags)209 region.delivery_streams[delivery_stream_name] = stream210 # record event211 event_publisher.fire_event(212 event_publisher.EVENT_FIREHOSE_CREATE_STREAM,213 payload={"n": event_publisher.get_hash(delivery_stream_name)},214 )215 if delivery_stream_type == DeliveryStreamType.KinesisStreamAsSource:216 if not kinesis_stream_source_configuration:217 raise InvalidArgumentException("Missing delivery stream configuration")218 kinesis_stream_name = kinesis_stream_source_configuration["KinesisStreamARN"].split(219 "/"220 )[1]221 kinesis_connector.listen_to_kinesis(222 stream_name=kinesis_stream_name,223 fh_d_stream=delivery_stream_name,224 listener_func=self._process_records,225 wait_until_started=True,226 ddb_lease_table_suffix="-firehose",227 )228 return CreateDeliveryStreamOutput(DeliveryStreamARN=stream["DeliveryStreamARN"])229 def delete_delivery_stream(230 self,231 context: RequestContext,232 delivery_stream_name: DeliveryStreamName,233 allow_force_delete: BooleanObject = None,234 ) -> DeleteDeliveryStreamOutput:235 region = FirehoseBackend.get()236 delivery_stream_description = region.delivery_streams.pop(delivery_stream_name, {})237 if not delivery_stream_description:238 raise ResourceNotFoundException(239 f"Firehose {delivery_stream_name} under account {context.account_id} " f"not found."240 )241 # record event242 event_publisher.fire_event(243 event_publisher.EVENT_FIREHOSE_DELETE_STREAM,244 payload={"n": event_publisher.get_hash(delivery_stream_name)},245 )246 return DeleteDeliveryStreamOutput()247 def describe_delivery_stream(248 self,249 context: RequestContext,250 delivery_stream_name: DeliveryStreamName,251 limit: DescribeDeliveryStreamInputLimit = None,252 exclusive_start_destination_id: DestinationId = None,253 ) -> DescribeDeliveryStreamOutput:254 delivery_stream_description = _get_description_or_raise_not_found(255 context, delivery_stream_name256 )257 return DescribeDeliveryStreamOutput(DeliveryStreamDescription=delivery_stream_description)258 def list_delivery_streams(259 self,260 context: RequestContext,261 limit: ListDeliveryStreamsInputLimit = None,262 delivery_stream_type: DeliveryStreamType = None,263 exclusive_start_delivery_stream_name: DeliveryStreamName = None,264 ) -> ListDeliveryStreamsOutput:265 region = FirehoseBackend.get()266 delivery_stream_names = []267 for name, stream in region.delivery_streams.items():268 delivery_stream_names.append(stream["DeliveryStreamName"])269 return ListDeliveryStreamsOutput(270 DeliveryStreamNames=delivery_stream_names, HasMoreDeliveryStreams=False271 )272 def put_record(273 self,274 context: RequestContext,275 delivery_stream_name: DeliveryStreamName,276 record: Record,277 ) -> PutRecordOutput:278 record = self._reencode_record(record)279 return self._put_record(delivery_stream_name, record)280 def put_record_batch(281 self,282 context: RequestContext,283 delivery_stream_name: DeliveryStreamName,284 records: PutRecordBatchRequestEntryList,285 ) -> PutRecordBatchOutput:286 records = self._reencode_records(records)287 return PutRecordBatchOutput(288 FailedPutCount=0, RequestResponses=self._put_records(delivery_stream_name, records)289 )290 def tag_delivery_stream(291 self,292 context: RequestContext,293 delivery_stream_name: DeliveryStreamName,294 tags: TagDeliveryStreamInputTagList,295 ) -> TagDeliveryStreamOutput:296 delivery_stream_description = _get_description_or_raise_not_found(297 context, delivery_stream_name298 )299 FirehoseBackend.TAGS.tag_resource(delivery_stream_description["DeliveryStreamARN"], tags)300 return ListTagsForDeliveryStreamOutput()301 def list_tags_for_delivery_stream(302 self,303 context: RequestContext,304 delivery_stream_name: DeliveryStreamName,305 exclusive_start_tag_key: TagKey = None,306 limit: ListTagsForDeliveryStreamInputLimit = None,307 ) -> ListTagsForDeliveryStreamOutput:308 delivery_stream_description = _get_description_or_raise_not_found(309 context, delivery_stream_name310 )311 # The tagging service returns a dictionary with the given root name312 tags = FirehoseBackend.TAGS.list_tags_for_resource(313 arn=delivery_stream_description["DeliveryStreamARN"], root_name="root"314 )315 # Extract the actual list of tags for the typed response316 tag_list: ListTagsForDeliveryStreamOutputTagList = tags["root"]317 return ListTagsForDeliveryStreamOutput(Tags=tag_list, HasMoreTags=False)318 def untag_delivery_stream(319 self,320 context: RequestContext,321 delivery_stream_name: DeliveryStreamName,322 tag_keys: TagKeyList,323 ) -> UntagDeliveryStreamOutput:324 delivery_stream_description = _get_description_or_raise_not_found(325 context, delivery_stream_name326 )327 # The tagging service returns a dictionary with the given root name328 FirehoseBackend.TAGS.untag_resource(329 arn=delivery_stream_description["DeliveryStreamARN"], tag_names=tag_keys330 )331 return UntagDeliveryStreamOutput()332 def update_destination(333 self,334 context: RequestContext,335 delivery_stream_name: DeliveryStreamName,336 current_delivery_stream_version_id: DeliveryStreamVersionId,337 destination_id: DestinationId,338 s3_destination_update: S3DestinationUpdate = None,339 extended_s3_destination_update: ExtendedS3DestinationUpdate = None,340 redshift_destination_update: RedshiftDestinationUpdate = None,341 elasticsearch_destination_update: ElasticsearchDestinationUpdate = None,342 amazonopensearchservice_destination_update: AmazonopensearchserviceDestinationUpdate = None,343 splunk_destination_update: SplunkDestinationUpdate = None,344 http_endpoint_destination_update: HttpEndpointDestinationUpdate = None,345 ) -> UpdateDestinationOutput:346 delivery_stream_description = _get_description_or_raise_not_found(347 context, delivery_stream_name348 )349 destinations = delivery_stream_description["Destinations"]350 try:351 destination = next(filter(lambda d: d["DestinationId"] == destination_id, destinations))352 except StopIteration:353 destination = DestinationDescription(DestinationId=destination_id)354 delivery_stream_description["Destinations"].append(destination)355 if elasticsearch_destination_update:356 destination["ElasticsearchDestinationDescription"] = convert_es_update_to_desc(357 elasticsearch_destination_update358 )359 if amazonopensearchservice_destination_update:360 destination[...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful