How to use update_destination method in localstack

Best Python code snippet using localstack_python

firehose_api.py

Source:firehose_api.py Github

copy

Full Screen

...137 dest = {}138 dest["DestinationId"] = destination_id139 destinations.append(dest)140 return dest141def update_destination(142 stream_name,143 destination_id,144 s3_update=None,145 elasticsearch_update=None,146 http_update=None,147 version_id=None,148):149 dest = get_destination(stream_name, destination_id)150 if elasticsearch_update:151 details = dest.setdefault("ESDestinationDescription", {})152 dest.update(elasticsearch_update)153 if s3_update:154 details = dest.setdefault("S3DestinationDescription", {})155 details.update(s3_update)156 if http_update:157 details = dest.setdefault("HttpEndpointDestinationDescription", {})158 details.update(http_update)159 return dest160def process_records(records, shard_id, fh_d_stream):161 return put_records(fh_d_stream, records)162def create_stream(163 stream_name,164 delivery_stream_type="DirectPut",165 delivery_stream_type_configuration=None,166 s3_destination=None,167 elasticsearch_destination=None,168 http_destination=None,169 tags=None,170 region_name=None,171):172 region = FirehoseBackend.get()173 tags = tags or {}174 stream = {175 "DeliveryStreamType": delivery_stream_type,176 "KinesisStreamSourceConfiguration": delivery_stream_type_configuration,177 "HasMoreDestinations": False,178 "VersionId": "1",179 "CreateTimestamp": time.time(),180 "DeliveryStreamARN": firehose_stream_arn(stream_name),181 "DeliveryStreamStatus": "ACTIVE",182 "DeliveryStreamName": stream_name,183 "Destinations": [],184 "Tags": tags,185 }186 region.delivery_streams[stream_name] = stream187 if elasticsearch_destination:188 update_destination(189 stream_name=stream_name,190 destination_id=short_uid(),191 elasticsearch_update=elasticsearch_destination,192 )193 if s3_destination:194 update_destination(195 stream_name=stream_name,196 destination_id=short_uid(),197 s3_update=s3_destination,198 )199 if http_destination:200 update_destination(201 stream_name=stream_name,202 destination_id=short_uid(),203 http_update=http_destination,204 )205 # record event206 event_publisher.fire_event(207 event_publisher.EVENT_FIREHOSE_CREATE_STREAM,208 payload={"n": event_publisher.get_hash(stream_name)},209 )210 if delivery_stream_type == "KinesisStreamAsSource":211 kinesis_stream_name = delivery_stream_type_configuration.get("KinesisStreamARN").split("/")[212 1213 ]214 kinesis_connector.listen_to_kinesis(215 stream_name=kinesis_stream_name,216 fh_d_stream=stream_name,217 listener_func=process_records,218 wait_until_started=True,219 ddb_lease_table_suffix="-firehose",220 region_name=region_name,221 )222 return stream223def delete_stream(stream_name):224 region = FirehoseBackend.get()225 stream = region.delivery_streams.pop(stream_name, {})226 if not stream:227 return error_not_found(stream_name)228 # record event229 event_publisher.fire_event(230 event_publisher.EVENT_FIREHOSE_DELETE_STREAM,231 payload={"n": event_publisher.get_hash(stream_name)},232 )233 return {}234def get_stream(stream_name: str, format_s3_dest: bool = False):235 region = FirehoseBackend.get()236 result = region.delivery_streams.get(stream_name)237 if result and format_s3_dest:238 extended_attrs = [239 "ProcessingConfiguration",240 "S3BackupMode",241 "S3BackupDescription",242 "DataFormatConversionConfiguration",243 ]244 result = clone(result)245 for dest in result.get("Destinations", []):246 s3_dest = dest.get("S3DestinationDescription") or {}247 if any([s3_dest.get(attr) is not None for attr in extended_attrs]):248 dest["ExtendedS3DestinationDescription"] = dest.pop("S3DestinationDescription")249 return result250def bucket_name(bucket_arn):251 return bucket_arn.split(":::")[-1]252def role_arn(stream_name):253 return "arn:aws:iam::%s:role/%s" % (TEST_AWS_ACCOUNT_ID, stream_name)254def error_not_found(stream_name):255 msg = "Firehose %s under account %s not found." % (stream_name, TEST_AWS_ACCOUNT_ID)256 return error_response(msg, code=400, error_type="ResourceNotFoundException")257def error_response(msg, code=500, error_type="InternalFailure"):258 return aws_responses.flask_error_response_json(msg, code=code, error_type=error_type)259@app.route("/", methods=["POST"])260def post_request():261 action = request.headers.get("x-amz-target", "")262 action = action.split(".")[-1]263 data = json.loads(to_str(request.data))264 response = None265 if action == "ListDeliveryStreams":266 response = {267 "DeliveryStreamNames": get_delivery_stream_names(),268 "HasMoreDeliveryStreams": False,269 }270 elif action == "CreateDeliveryStream":271 stream_name = data["DeliveryStreamName"]272 region_name = extract_region_from_auth_header(request.headers)273 _s3_destination = data.get("S3DestinationConfiguration") or data.get(274 "ExtendedS3DestinationConfiguration"275 )276 response = create_stream(277 stream_name,278 delivery_stream_type=data.get("DeliveryStreamType"),279 delivery_stream_type_configuration=data.get("KinesisStreamSourceConfiguration"),280 s3_destination=_s3_destination,281 elasticsearch_destination=data.get("ElasticsearchDestinationConfiguration"),282 http_destination=data.get("HttpEndpointDestinationConfiguration"),283 tags=data.get("Tags"),284 region_name=region_name,285 )286 elif action == "DeleteDeliveryStream":287 stream_name = data["DeliveryStreamName"]288 response = delete_stream(stream_name)289 elif action == "DescribeDeliveryStream":290 stream_name = data["DeliveryStreamName"]291 response = get_stream(stream_name, format_s3_dest=True)292 if not response:293 return error_not_found(stream_name)294 response = {"DeliveryStreamDescription": response}295 elif action == "PutRecord":296 stream_name = data["DeliveryStreamName"]297 record = data["Record"]298 response = put_record(stream_name, record)299 elif action == "PutRecordBatch":300 stream_name = data["DeliveryStreamName"]301 records = data["Records"]302 put_records(stream_name, records)303 request_responses = []304 for i in records:305 request_responses.append({"RecordId": str(uuid.uuid4())})306 response = {"FailedPutCount": 0, "RequestResponses": request_responses}307 elif action == "UpdateDestination":308 stream_name = data["DeliveryStreamName"]309 version_id = data["CurrentDeliveryStreamVersionId"]310 destination_id = data["DestinationId"]311 s3_update = data["S3DestinationUpdate"] if "S3DestinationUpdate" in data else None312 update_destination(313 stream_name=stream_name,314 destination_id=destination_id,315 s3_update=s3_update,316 version_id=version_id,317 )318 es_update = data["ESDestinationUpdate"] if "ESDestinationUpdate" in data else None319 update_destination(320 stream_name=stream_name,321 destination_id=destination_id,322 elasticsearch_update=es_update,323 version_id=version_id,324 )325 http_update = data.get("HttpEndpointDestinationUpdate")326 update_destination(327 stream_name=stream_name,328 destination_id=destination_id,329 http_update=http_update,330 version_id=version_id,331 )332 response = {}333 elif action == "ListTagsForDeliveryStream":334 response = get_delivery_stream_tags(335 data["DeliveryStreamName"],336 data.get("ExclusiveStartTagKey"),337 data.get("Limit", 50),338 )339 else:340 response = error_response(...

Full Screen

Full Screen

App.py

Source:App.py Github

copy

Full Screen

...42 """ Returns vaoid43 a# Event handler to update destination column of table44 a# eg after textChanged of txtCopyDir, dropEvent of tblFiles, valueChanged of bxLevelDir45 """46 def update_destination(self, event=None):47 if not self.ui.txtCopyToDir.text():48 return49 # @todo: Create new handler function for text value changed event50 stats = AtUtils.get_free_space_mb(self.ui.txtCopyToDir.text())51 self.set_target_status_value(stats["free"], stats["total"])52 basepath = self.ui.txtCopyToDir.text()53 if basepath and os.path.isdir(basepath) and self.ui.tblFiles.rowCount() > 0:54 level = self.ui.bxDirLevel.value()55 level = level if level else 056 level += 157 rows = self.ui.tblFiles.rowCount()58 counter = 059 while counter < rows:60 srcpath = self.ui.tblFiles.item(counter, 0).text()...

Full Screen

Full Screen

urls.py

Source:urls.py Github

copy

Full Screen

1from django.urls import path2from destinations.views import *3urlpatterns = [4 path('', index, name='index'),5 path('home/', home, name='home'),6 path('message/', message, name='message'),7 path('contact/', contact, name='contact'),8 path('explore/', explore, name='explore'),9 path('listing/', listing, name='listing'),10 path('admin_dashboard/', admin_dashboard, name='admin_dashboard'),11 path('destinations_place/', destinations_place, name='destinations_place'),12 path('destination_list/', destination_list, name='destination_list'),13 path('<uuid:destination_id>/destination_detail', destination_detail, name='destination_detail'),14 path('<uuid:destination_id>/bookings', bookings, name='bookings'),15 path('religious_tour/', religious_tour, name='religious_tour'),16 path('hilly_tour/', hilly_tour, name='hilly_tour'),17 path('mountain_tour/', mountain_tour, name='mountain_tour'),18 path('terai_tour/', terai_tour, name='terai_tour'),19 path('pokhara_tour/', pokhara_tour, name='pokhara_tour'),20 path('jungle_safari/', jungle_safari, name='jungle_safari'),21 path('admin_destination_list/', admin_destination_list, name='admin_destination_list'),22 path('<uuid:destination_id>/admin_destination_detail', admin_destination_detail, name='admin_destination_detail'),23 path('<uuid:destination_id>/delete_destination', delete_destination, name='delete_destination'),24 path('<uuid:destination_id>/update_destination', update_destination, name='update_destination'),25 path('messages/', mesge, name='mesge'),26 path('admin_messages/', admin_messages, name='admin_messages'),27 path('admin_bookings/', admin_bookings, name='admin_bookings'),28 path('category_destinations_religious/', category_destinations_religious, name='category_destinations_religious'),29 path('category_destinations_hilly/', category_destinations_hilly, name='category_destinations_hilly'),30 path('category_destinations_mountain/', category_destinations_mountain, name='category_destinations_mountain'),31 path('category_destinations_terai/', category_destinations_terai, name='category_destinations_terai'),32 path('category_destinations_famous/', category_destinations_famous, name='category_destinations_famous'),33 path('category_destinations_jungle/', category_destinations_jungle, name='category_destinations_jungle'),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful