Best Python code snippet using localstack_python
client.pyi
Source:client.pyi  
...354        shards.355        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/kinesis.html#Kinesis.Client.update_shard_count)356        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_kinesis/client.html#update_shard_count)357        """358    def update_stream_mode(359        self, *, StreamARN: str, StreamModeDetails: "StreamModeDetailsTypeDef"360    ) -> None:361        """362        Updates the capacity mode of the data stream.363        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/kinesis.html#Kinesis.Client.update_stream_mode)364        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_kinesis/client.html#update_stream_mode)365        """366    @overload367    def get_paginator(self, operation_name: Literal["describe_stream"]) -> DescribeStreamPaginator:368        """369        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/kinesis.html#Kinesis.Paginator.DescribeStream)370        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_kinesis/paginators.html#describestreampaginator)371        """372    @overload...responses.py
Source:responses.py  
...238    def stop_stream_encryption(self):239        stream_name = self.parameters.get("StreamName")240        self.kinesis_backend.stop_stream_encryption(stream_name=stream_name)241        return json.dumps(dict())242    def update_stream_mode(self):243        stream_arn = self.parameters.get("StreamARN")244        stream_mode = self.parameters.get("StreamModeDetails")245        self.kinesis_backend.update_stream_mode(stream_arn, stream_mode)...ffr_run_dof.py
Source:ffr_run_dof.py  
1import cPickle2import multiprocessing3import os4import sys5import time6import arcpy7import numpy as np8import pandas as pd9import indices.dof10import tools.helper as tool11from config import config12fd = config.var13def run_dof(stamp, para, paths):14    """15    Set up for multiprocessing. Creates a isolated processing environment,16    where each hydrological river basin is processed separately17    :param stamp: timestamp18    :param para: parameters19    :param paths: pathnames20    :return:21    """22    dam_fc = para["dams_fc"]23    streams_fc = para["streams_fc"]24    update_stream_mode = para["update_mode"]25    drf_upstream = para["drf_upstream"]26    drf_downstream = para["drf_downstream"]27    barrier_inc_field = para["barrier_inc_field"]28    dof_field = para["dof_field"]29    dof_mode = para["dof_mode"]30    use_dam_level_df = para["use_dam_level_df"]31    gdb_full_path = paths["gdb_full_path"]32    output_folder = para["output_folder"]33    output_folder = os.path.join(output_folder, "Results_" + stamp)34    scratch_ws = output_folder + r"\Scratch"35    tool.delete_path(scratch_ws)36    tool.create_path(scratch_ws)37    print ("Discharge range factor used (upstream): %s" % drf_upstream)38    print ("Discharge range factor used (downstream): %s" % drf_downstream)39    in_basins = list(get_unique(dam_fc, barrier_inc_field))40    print ("Loading {}".format(str(streams_fc)))41    streams = load_streams(streams_fc, dof_field)42    dams_temp = load_dams(dam_fc, barrier_inc_field, use_dam_level_df)43    pooled = True44    if pooled:45        pool = multiprocessing.Pool(8)46        jobs = []47        i = 148        print ("Starting analysis pooled")49        for basin in in_basins:50            # https://stackoverflow.com/a/8533626/34464751            # Much faster than querying the global dataset on disk is52            # to load the global dataset into memory first and then query it53            # using numpy methods54            streams_sel = np.copy(streams[streams[fd.BAS_ID] == basin])55            # Also faster than querying the feature dataset on disk,56            # it is better to to load it once and then57            # use numpy indexing to get the dams we want58            dams_sel = np.copy(dams_temp[dams_temp[fd.BAS_ID] == basin])59            jobs.append(pool.apply_async(run_basin, (streams_sel, dams_sel, basin,60                                                     stamp + str(i),61                                                     scratch_ws,62                                                     dof_field,63                                                     drf_upstream,64                                                     drf_downstream,65                                                     dof_mode,66                                                     use_dam_level_df)))67            i += 168        pool.close()69        pool.join()70        out_basin = [job.get() for job in jobs]71    else:72        jobs = []73        i = 174        print ("Starting analysis unpooled")75        for basin in in_basins:76            # https://stackoverflow.com/a/8533626/34464777            # Much faster than querying the global dataset on disk is78            # to load the global dataset into memory first and then query it79            # using numpy methods80            streams_sel = np.copy(streams[streams[fd.BAS_ID] == basin])81            # Also faster than querying the feature dataset on disk,82            # it is better to to load it once and then83            # use numpy indexing to get the dams we want84            dams_sel = np.copy(dams_temp[dams_temp[fd.BAS_ID] == basin])85            jobs.append(run_basin(streams_sel,86                                  dams_sel,87                                  basin,88                                  stamp + str(i),89                                  scratch_ws,90                                  dof_field,91                                  drf_upstream,92                                  drf_downstream,93                                  dof_mode,94                                  use_dam_level_df))95            i += 196        out_basin = [job for job in jobs]97    # Merge the temporary outputs98    print("Merging temporary outputs into output table %s ..." % gdb_full_path)99    i = 0100    tbl = {}101    for bas in out_basin:102        i += 1103        with open(bas, 'rb') as fp:104            tbl[i] = cPickle.load(fp)105    merged = np.concatenate(tbl.values(), 1)106    df = pd.DataFrame(merged)107    # Turn panda to numpy108    # https://my.usgs.gov/confluence/display/cdi/pandas.DataFrame+to+ArcGIS+Table109    x = np.array(np.rec.fromrecords(df.values))110    names = df.dtypes.index.tolist()111    x.dtype.names = tuple(names)112    output_table_location = gdb_full_path + "\\" + "dof"113    arcpy.da.NumPyArrayToTable(x, output_table_location)114    tool.add_index(lyr=merged, field_name="GOID")115    # Update automatically116    if update_stream_mode.lower() == "yes":117        print "Updating dof values in database {} ".format(streams_fc)118        tool.copy_between(to_join_fc=streams_fc,119                          to_join_field="GOID",120                          IntoJoinField=dof_field,121                          FromJoinFC=output_table_location,122                          FromJoinField="GOID",123                          FromValueField=dof_field,124                          over_mode=True,125                          over_value=0)126    tool.delete_path(scratch_ws)127def run_basin(streams, dams, basin, stamp, scratchws, dof_field, drf_upstream,128              drf_downstream, mode, use_dam_level_df):129    """130    Calculate DOF for all barriers in a specified river basin131    :param streams:132    :param dams:133    :param basin:134    :param stamp:135    :param scratchws:136    :param dof_field:137    :param drf_upstream:138    :param drf_downstream:139    :param mode:140    :param use_dam_level_df:141    :return:142    """143    # Setup isolated path and environment144    temp_out_folder = set_environment(scratchws, basin, stamp)145    # Update network ids for rivers and dams146    tool.update_stream_routing_index(streams)147    tool.update_dam_routing_index(dams, streams)148    # Calculate and write DOF into designated field149    indices.dof.calculate_DOF(dams, streams, mode, dof_field,150                              drf_upstream, drf_downstream, use_dam_level_df)151    # Export table to temporay geodatabase152    final_table = export(streams, basin, temp_out_folder)153    # returns the path to table for later merging154    return final_table155def get_unique(dam_table, inc_field):156    """157    Calculates a list of unique river basins that need to be processed based158    on the barriers to be considered.159    :param dam_table: numpy array with dams to process160    :param inc_field: field to determine dams to include161    :return: List of river basins162    """163    flds = [fd.BAS_ID, inc_field]164    where_clause = inc_field + ' = 1'165    where_clause = where_clause.replace("'", "")166    dams = arcpy.da.TableToNumPyArray(167        dam_table, flds, where_clause, null_value=-1)168    in_basins = np.unique(dams[fd.BAS_ID])169    if 0 in in_basins:170        sys.exit(171            "One of the dams has a basin ID of 0. BAS_ID cannot be zero. "172            "Please provide a BAS_ID other than 0 for "173            "all dams ")174    return in_basins175def set_environment(scratch_ws, basin, stamp):176    """177    Creates isolated output paths for specific basin using timestamp in scratch workspace.178    :param scratch_ws: scratch workspace179    :param basin:  river basin to process180    :param stamp: timestamp181    :return: fully specified pathname182    """183    out_folder = "basin_" + str(basin) + "_" + str(stamp)184    fullpath = os.path.join(scratch_ws, out_folder)185    tool.create_path(fullpath)186    return fullpath187def load_dams(dam_table, inc_field, use_dam_level_df):188    """189    This function loads from the database190    :param dam_table:191    :param inc_field:192    :param use_dam_level_df:193    :return: numpy array with dams194    """195    if use_dam_level_df.lower() == "yes":196        flds = [fd.BAS_ID, fd.GOID, fd.STOR_MCM, fd.DFU, fd.DFD,197                inc_field]198    else:199        flds = [fd.BAS_ID, fd.GOID, fd.STOR_MCM, inc_field]200    tool.check_fields(dam_table, flds)201    whereBClause2 = inc_field + ' > 0'202    whereBClause3 = ' AND ' + fd.INC + ' > 0'203    whereBClause4 = whereBClause2 + whereBClause3204    whereBClause5 = whereBClause4.replace("'", "")205    dams = arcpy.da.TableToNumPyArray(206        dam_table, flds, whereBClause5, null_value=0)207    return dams208def load_streams(stream_table, dof_field):209    """210    Loads the streams and adds a field for holding the DOF values211    :param stream_table: numpy array representing the river reaches212    :param dof_field: field name to store DOF results213    :return:214    """215    flds = [fd.BAS_ID, fd.GOID, fd.NOID, fd.NDOID, fd.NUOID, fd.RIV_ORD,216            fd.DIS_AV_CMS, fd.HYFALL]217    tool.check_fields(stream_table, flds)218    arr = arcpy.da.TableToNumPyArray(stream_table, flds, null_value=0)219    arr = tool.add_fields(arr, [(str(dof_field), 'f4')])220    arr[dof_field] = 0221    return arr222def export(streams, basin, folder):223    suffix = ".bas"224    name = "out_" + str(basin)225    fullname = name + suffix226    fullpath = os.path.join(folder, fullname)227    tool.save_as_cpickle(pickle_object=streams,228                         folder=folder,229                         name=name,230                         file_extension=suffix)231    return fullpath232def create_gdb_workspace(gdb_folder, gdb_name):233    """234    Creates a path and a geodatabase with timestamp as name235    :param stamp:236    :param gdb_folder:237    :param gdb_name:238    :return: full path and geodatabase name239    """240    if not os.path.exists(gdb_folder):241        os.makedirs(gdb_folder)242    gdb_file_name = gdb_name + ".gdb"243    gdb_full_path = gdb_folder + "\\" + gdb_file_name244    arcpy.CreateFileGDB_management(gdb_folder, gdb_file_name)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
