Best Python code snippet using localstack_python
firehose_api.py
Source:firehose_api.py  
...137    dest = {}138    dest["DestinationId"] = destination_id139    destinations.append(dest)140    return dest141def update_destination(142    stream_name,143    destination_id,144    s3_update=None,145    elasticsearch_update=None,146    http_update=None,147    version_id=None,148):149    dest = get_destination(stream_name, destination_id)150    if elasticsearch_update:151        details = dest.setdefault("ESDestinationDescription", {})152        dest.update(elasticsearch_update)153    if s3_update:154        details = dest.setdefault("S3DestinationDescription", {})155        details.update(s3_update)156    if http_update:157        details = dest.setdefault("HttpEndpointDestinationDescription", {})158        details.update(http_update)159    return dest160def process_records(records, shard_id, fh_d_stream):161    return put_records(fh_d_stream, records)162def create_stream(163    stream_name,164    delivery_stream_type="DirectPut",165    delivery_stream_type_configuration=None,166    s3_destination=None,167    elasticsearch_destination=None,168    http_destination=None,169    tags=None,170    region_name=None,171):172    region = FirehoseBackend.get()173    tags = tags or {}174    stream = {175        "DeliveryStreamType": delivery_stream_type,176        "KinesisStreamSourceConfiguration": delivery_stream_type_configuration,177        "HasMoreDestinations": False,178        "VersionId": "1",179        "CreateTimestamp": time.time(),180        "DeliveryStreamARN": firehose_stream_arn(stream_name),181        "DeliveryStreamStatus": "ACTIVE",182        "DeliveryStreamName": stream_name,183        "Destinations": [],184        "Tags": tags,185    }186    region.delivery_streams[stream_name] = stream187    if elasticsearch_destination:188        update_destination(189            stream_name=stream_name,190            destination_id=short_uid(),191            elasticsearch_update=elasticsearch_destination,192        )193    if s3_destination:194        update_destination(195            stream_name=stream_name,196            destination_id=short_uid(),197            s3_update=s3_destination,198        )199    if http_destination:200        update_destination(201            stream_name=stream_name,202            destination_id=short_uid(),203            http_update=http_destination,204        )205    # record event206    event_publisher.fire_event(207        event_publisher.EVENT_FIREHOSE_CREATE_STREAM,208        payload={"n": event_publisher.get_hash(stream_name)},209    )210    if delivery_stream_type == "KinesisStreamAsSource":211        kinesis_stream_name = delivery_stream_type_configuration.get("KinesisStreamARN").split("/")[212            1213        ]214        kinesis_connector.listen_to_kinesis(215            stream_name=kinesis_stream_name,216            fh_d_stream=stream_name,217            listener_func=process_records,218            wait_until_started=True,219            ddb_lease_table_suffix="-firehose",220            region_name=region_name,221        )222    return stream223def delete_stream(stream_name):224    region = FirehoseBackend.get()225    stream = region.delivery_streams.pop(stream_name, {})226    if not stream:227        return error_not_found(stream_name)228    # record event229    event_publisher.fire_event(230        event_publisher.EVENT_FIREHOSE_DELETE_STREAM,231        payload={"n": event_publisher.get_hash(stream_name)},232    )233    return {}234def get_stream(stream_name: str, format_s3_dest: bool = False):235    region = FirehoseBackend.get()236    result = region.delivery_streams.get(stream_name)237    if result and format_s3_dest:238        extended_attrs = [239            "ProcessingConfiguration",240            "S3BackupMode",241            "S3BackupDescription",242            "DataFormatConversionConfiguration",243        ]244        result = clone(result)245        for dest in result.get("Destinations", []):246            s3_dest = dest.get("S3DestinationDescription") or {}247            if any([s3_dest.get(attr) is not None for attr in extended_attrs]):248                dest["ExtendedS3DestinationDescription"] = dest.pop("S3DestinationDescription")249    return result250def bucket_name(bucket_arn):251    return bucket_arn.split(":::")[-1]252def role_arn(stream_name):253    return "arn:aws:iam::%s:role/%s" % (TEST_AWS_ACCOUNT_ID, stream_name)254def error_not_found(stream_name):255    msg = "Firehose %s under account %s not found." % (stream_name, TEST_AWS_ACCOUNT_ID)256    return error_response(msg, code=400, error_type="ResourceNotFoundException")257def error_response(msg, code=500, error_type="InternalFailure"):258    return aws_responses.flask_error_response_json(msg, code=code, error_type=error_type)259@app.route("/", methods=["POST"])260def post_request():261    action = request.headers.get("x-amz-target", "")262    action = action.split(".")[-1]263    data = json.loads(to_str(request.data))264    response = None265    if action == "ListDeliveryStreams":266        response = {267            "DeliveryStreamNames": get_delivery_stream_names(),268            "HasMoreDeliveryStreams": False,269        }270    elif action == "CreateDeliveryStream":271        stream_name = data["DeliveryStreamName"]272        region_name = extract_region_from_auth_header(request.headers)273        _s3_destination = data.get("S3DestinationConfiguration") or data.get(274            "ExtendedS3DestinationConfiguration"275        )276        response = create_stream(277            stream_name,278            delivery_stream_type=data.get("DeliveryStreamType"),279            delivery_stream_type_configuration=data.get("KinesisStreamSourceConfiguration"),280            s3_destination=_s3_destination,281            elasticsearch_destination=data.get("ElasticsearchDestinationConfiguration"),282            http_destination=data.get("HttpEndpointDestinationConfiguration"),283            tags=data.get("Tags"),284            region_name=region_name,285        )286    elif action == "DeleteDeliveryStream":287        stream_name = data["DeliveryStreamName"]288        response = delete_stream(stream_name)289    elif action == "DescribeDeliveryStream":290        stream_name = data["DeliveryStreamName"]291        response = get_stream(stream_name, format_s3_dest=True)292        if not response:293            return error_not_found(stream_name)294        response = {"DeliveryStreamDescription": response}295    elif action == "PutRecord":296        stream_name = data["DeliveryStreamName"]297        record = data["Record"]298        response = put_record(stream_name, record)299    elif action == "PutRecordBatch":300        stream_name = data["DeliveryStreamName"]301        records = data["Records"]302        put_records(stream_name, records)303        request_responses = []304        for i in records:305            request_responses.append({"RecordId": str(uuid.uuid4())})306        response = {"FailedPutCount": 0, "RequestResponses": request_responses}307    elif action == "UpdateDestination":308        stream_name = data["DeliveryStreamName"]309        version_id = data["CurrentDeliveryStreamVersionId"]310        destination_id = data["DestinationId"]311        s3_update = data["S3DestinationUpdate"] if "S3DestinationUpdate" in data else None312        update_destination(313            stream_name=stream_name,314            destination_id=destination_id,315            s3_update=s3_update,316            version_id=version_id,317        )318        es_update = data["ESDestinationUpdate"] if "ESDestinationUpdate" in data else None319        update_destination(320            stream_name=stream_name,321            destination_id=destination_id,322            elasticsearch_update=es_update,323            version_id=version_id,324        )325        http_update = data.get("HttpEndpointDestinationUpdate")326        update_destination(327            stream_name=stream_name,328            destination_id=destination_id,329            http_update=http_update,330            version_id=version_id,331        )332        response = {}333    elif action == "ListTagsForDeliveryStream":334        response = get_delivery_stream_tags(335            data["DeliveryStreamName"],336            data.get("ExclusiveStartTagKey"),337            data.get("Limit", 50),338        )339    else:340        response = error_response(...App.py
Source:App.py  
...42    """ Returns vaoid43    a# Event handler to update destination column of table44    a# eg after textChanged of txtCopyDir, dropEvent of tblFiles, valueChanged of bxLevelDir45    """46    def update_destination(self, event=None):47        if not self.ui.txtCopyToDir.text():48            return49        # @todo: Create new handler function for text value changed event50        stats = AtUtils.get_free_space_mb(self.ui.txtCopyToDir.text())51        self.set_target_status_value(stats["free"], stats["total"])52        basepath = self.ui.txtCopyToDir.text()53        if basepath and os.path.isdir(basepath) and self.ui.tblFiles.rowCount() > 0:54            level = self.ui.bxDirLevel.value()55            level = level if level else 056            level += 157            rows = self.ui.tblFiles.rowCount()58            counter = 059            while counter < rows:60                srcpath = self.ui.tblFiles.item(counter, 0).text()...urls.py
Source:urls.py  
1from django.urls import path2from destinations.views import *3urlpatterns = [4   	path('', index, name='index'),5   	path('home/', home, name='home'),6   	path('message/', message, name='message'),7   	path('contact/', contact, name='contact'),8   	path('explore/', explore, name='explore'),9   	path('listing/', listing, name='listing'),10   	path('admin_dashboard/', admin_dashboard, name='admin_dashboard'),11   	path('destinations_place/', destinations_place, name='destinations_place'),12   	path('destination_list/', destination_list, name='destination_list'),13   	path('<uuid:destination_id>/destination_detail', destination_detail, name='destination_detail'),14   	path('<uuid:destination_id>/bookings', bookings, name='bookings'),15   	path('religious_tour/', religious_tour, name='religious_tour'),16   	path('hilly_tour/', hilly_tour, name='hilly_tour'),17   	path('mountain_tour/', mountain_tour, name='mountain_tour'),18   	path('terai_tour/', terai_tour, name='terai_tour'),19   	path('pokhara_tour/', pokhara_tour, name='pokhara_tour'),20   	path('jungle_safari/', jungle_safari, name='jungle_safari'),21   	path('admin_destination_list/', admin_destination_list, name='admin_destination_list'),22   	path('<uuid:destination_id>/admin_destination_detail', admin_destination_detail, name='admin_destination_detail'),23   	path('<uuid:destination_id>/delete_destination', delete_destination, name='delete_destination'),24   	path('<uuid:destination_id>/update_destination', update_destination, name='update_destination'),25   	path('messages/', mesge, name='mesge'),26   	path('admin_messages/', admin_messages, name='admin_messages'),27   	path('admin_bookings/', admin_bookings, name='admin_bookings'),28   	path('category_destinations_religious/', category_destinations_religious, name='category_destinations_religious'),29   	path('category_destinations_hilly/', category_destinations_hilly, name='category_destinations_hilly'),30   	path('category_destinations_mountain/', category_destinations_mountain, name='category_destinations_mountain'),31   	path('category_destinations_terai/', category_destinations_terai, name='category_destinations_terai'),32   	path('category_destinations_famous/', category_destinations_famous, name='category_destinations_famous'),33   	path('category_destinations_jungle/', category_destinations_jungle, name='category_destinations_jungle'),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
