Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...868                    result += xmltodict.unparse(dest_dict, full_document=False)869    result += "</NotificationConfiguration>"870    response._content = result871    return response872def _validate_filter_rules(filter_doc):873    rules = filter_doc.get("FilterRule")874    if not rules:875        return876    for rule in rules:877        name = rule.get("Name", "")878        if name.lower() not in ["suffix", "prefix"]:879            raise InvalidFilterRuleName(name)880        # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882    rules = filter_doc.get("FilterRule")883    if not rules:884        return885    for rule in rules:886        name = rule.get("Name", "")...fetch_parq.py
Source:fetch_parq.py  
...142                partitioned parquet.143    Returns:144        A pandas dataframe of the filtered results from S3145    """146    _validate_filter_rules(filters)147    S3NamingHelper().validate_bucket_name(bucket)148    all_files = get_all_files_list(bucket, key)149    if not all_files:150        logger.debug(f"No files present under : {key} :, returning empty DataFrame")151        return pd.DataFrame()152    partition_metadata = _get_partitions_and_types(all_files[0], bucket)153    if partition_metadata is None:154        if accept_not_s3parq:155            logger.info("Parquet files do not have S3Parq metadata, fetching anyways.")156            return _get_filtered_data(bucket=bucket, paths=all_files, partition_metadata={},157                              parallel=parallel)158        else:159            raise MissingS3ParqMetadata("Parquet files are missing s3parq metadata, enable 'accept_not_s3parq' if you'd like this to pass.")160    _validate_matching_filter_data_type(partition_metadata, filters)161    # strip out the filenames (which we need)162    partition_values = _parse_partitions_and_values(all_files, key)163    typed_values = _get_partition_value_data_types(164        partition_values, partition_metadata)165    # filtered_paths is a list of the S3 prefixes from which we want to load data166    filtered_paths = _get_filtered_key_list(typed_values, filters, key)167    files_to_load = []168    for file in all_files:169        for prefix in filtered_paths:170            if file.startswith(prefix):171                files_to_load.append(file)172    # if there is no data matching the filters, return an empty DataFrame173    # with correct headers and type174    if len(files_to_load) < 1:175        logger.debug(f"Dataset filters left no matching values, returning empty dataframe with matching headers")176        sacrifical_files = [all_files[0]]177        sacrifical_frame = _get_filtered_data(178            bucket=bucket, paths=sacrifical_files, partition_metadata=partition_metadata, parallel=parallel)179        return sacrifical_frame.head(0)180    return _get_filtered_data(bucket=bucket, paths=files_to_load, partition_metadata=partition_metadata,181                              parallel=parallel)182def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, comparison_key: str, partition: str, reverse: bool = False, parallel: bool = True) -> pd.DataFrame:183    """ Returns a dataframe of whats in the input dataset but not the comparison 184    dataset by the specified partition.185    Args:186        input_bucket (str): The bucket of the dataset to start from187        input_key (str): The key to the dataset to start from188        comparison_bucket (str): The bucket of the dataset to compare against189        comparison_key (str): The key to the dataset to compare against190        partition (str): The partition in the dataset to compare the values of191        reverse (bool, Optional): Determines if the operation should be inversed,192            if True it will look for the values in comparison that are not 193            in the input (basically backwards). Defaults to False194        parallel (bool, Optional):195            Determines if multiprocessing should be used, defaults to True.196            Whether this is more or less efficient is dataset dependent,197            smaller datasets run much quicker without parallel198    Returns:199        A dataframe of all values in the input but not the comparison,200            if reverse=True then vice-versa201    """202    S3NamingHelper().validate_bucket_name(input_bucket)203    S3NamingHelper().validate_bucket_name(comparison_bucket)204    comparison_values = get_all_partition_values(205        bucket=comparison_bucket, key=comparison_key, partition=partition)206    diff_values = get_diff_partition_values(207        bucket=input_bucket,208        key=input_key,209        partition=partition,210        values_to_diff=comparison_values,211        reverse=reverse212    )213    filters = [{214        "partition": partition,215        "comparison": "==",216        "values": diff_values217    }]218    logger.debug(f"Fetching difference, looking for values : {diff_values} : under partition : {partition} ")219    if reverse:220        return fetch(bucket=comparison_bucket, key=comparison_key, filters=filters, parallel=parallel)221    else:222        return fetch(bucket=input_bucket, key=input_key, filters=filters, parallel=parallel)223def convert_type(val: Any, dtype: str) -> Any:224    """ Converts the given value to the given datatype225    Args:226        val (Any): The value to convert227        dtype (str): The type to attempt to convert to228    Returns:229        The value parsed into the new dtype230    """231    if dtype == 'string' or dtype == 'str':232        return str(val)233    elif dtype == 'integer' or dtype == 'int':234        return int(val)235    elif dtype == 'float':236        return float(val)237    elif dtype == 'datetime':238        return datetime.datetime.strptime(239            val, '%Y-%m-%d %H:%M:%S')240    elif dtype == 'category':241        return pd.Category(val)242    elif dtype == 'bool' or dtype == 'boolean':243        return bool(strtobool(val))244def dtype_to_pandas_dtype(dtype: str) -> str:245    """ Matches the given dtype to its pandas equivalent, if one exists246    Args:247        dtype (str): The type to attempt to match248    Returns:249        The pandas version of the dtype if one exists, otherwise the original250    """251    if dtype == "integer":252        dtype = "int"253    elif dtype == "string":254        dtype = "str"255    elif dtype == "boolean":256        dtype = "bool"257    return dtype258def get_all_files_list(bucket: str, key: str) -> list:259    """ Get a list of all files to get all partitions values.260    Necesarry to catch all partition values for non-filtered partiions.261    NOTE: This paginates across as many as are matching keys in the bucket;262        be mindful of any costs in extra large buckets without a specific key.263    Args:264        bucket (str): S3 bucket to search in265        key (str): S3 key to the dataset to check266    Returns:267        A list of the keys of all objects in the bucket/key that end with .parquet268    """269    objects_in_bucket = []270    s3_client = boto3.client('s3')271    paginator = s3_client.get_paginator('list_objects')272    operation_parameters = {'Bucket': bucket,273                            'Prefix': key}274    page_iterator = paginator.paginate(**operation_parameters)275    for page in page_iterator:276        if not "Contents" in page.keys():277            break278        for item in page['Contents']:279            if item['Key'].endswith('.parquet'):280                objects_in_bucket.append(item['Key'])281    return objects_in_bucket282def _get_partitions_and_types(first_file_key: str, bucket: str) -> dict:283    """ Fetch a list of all the partitions actually there and their 284    datatypes. List may be different than passed list if not being used285    for filtering on.286    NOTE: This is pulled from the metadata. It is assumed that this package287        is being used with its own publish method.288    Args:289        first_file_key (str): The key to the first file in the dataset, to get290            the metadata off of291        bucket (str): The S3 bucket to run in292    Returns:293        The parsed metadata from heading the first file294    TODO295    """296    s3_client = boto3.client('s3')297    first_file = s3_client.head_object(298        Bucket=bucket,299        Key=first_file_key300    )301    metadata = first_file['Metadata']302    partition_metadata_raw = metadata.get("partition_data_types", None)303    if partition_metadata_raw is None:304        return None305    partition_metadata = ast.literal_eval(partition_metadata_raw)306    return partition_metadata307def _parse_partitions_and_values(file_paths: List[str], key: str) -> dict:308    """ Parses the string keys into a usable dict of the parts and values309    Args:310        file_paths (List[str]): The list of all files to parse out311        key (str): S3 key to the root of the dataset of the files312    Returns:313        A dictionary of all partitions with their values314    """315    # TODO: find more neat/efficient way to do this316    parts = OrderedDict()317    key_len = len(key)318    for file_path in file_paths:319        # Delete key, split the parts out, delete the file name320        file_path = file_path[key_len:]321        unparsed_parts = file_path.split("/")322        del unparsed_parts[-1]323        for part in unparsed_parts:324            if "=" in part:325                key, value = part.split("=")326                if key not in parts:327                    parts.update({key: set([value])})328                else:329                    parts[key].add(value)330    return parts331def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dict:332    """ Uses the partitions with their known types to parse them out333    Args:334        parsed_parts (dict): A dictionary of all partitions with their values335        part_types (dict): A dictionary of all partitions to their datatypes336    Returns:337        A dictionary of all partitions with their values parsed into the correct datatype338    """339    for part, values in parsed_parts.items():340        part_type = part_types[part]341        if (part_type == 'string') or (part_type == 'category'):342            continue343        elif (part_type == 'int') or (part_type == 'integer'):344            parsed_parts[part] = set(map(int, values))345        elif part_type == 'float':346            parsed_parts[part] = set(map(float, values))347        elif part_type == 'datetime':348            parsed_parts[part] = set(349                map(lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), values))350        elif (part_type == 'bool') or (part_type == 'boolean'):351            parsed_parts[part] = set(map(bool, values))352        else:353            logger.debug(f"Unknown partition type : {part_type} :, leaving as a string")354    return parsed_parts355def _get_filtered_key_list(typed_parts: dict, filters: List[type(Filter)], key: str) -> List[str]:356    """ Create list of all "paths" to files after the filtered partitions357    are set ie all non-matching partitions are excluded.358    Args:359        typed_parts (dict): A dictionary of all partitions with their values360        filters (List[type(Filter)]): A dictionary of all partitions to their datatypes361        key (str): S3 key to the base root of the dataset362    Returns:363        A list of object keys that have been filtered by partition values364    """365    filter_keys = []366    matched_parts = OrderedDict()367    matched_parts.keys = typed_parts.keys()368    matched_part_vals = set()369    for part, part_values in typed_parts.items():370        matched_part_vals.clear()371        fil = next((f for f in filters if f['partition'] == part), False)372        if fil:373            comparison = OPS[fil['comparison']]374            for v in fil['values']:375                for x in part_values:376                    if comparison(x, v):377                        matched_part_vals.add(x)378            matched_parts[part] = matched_part_vals.copy()379        else:380            matched_parts[part] = part_values381    def construct_paths(matched_parts, previous_fil_keys: List[str]) -> None:382        if len(matched_parts) > 0:383            part = matched_parts.popitem(last=False)384            new_filter_keys = list()385            for value in part[1]:386                mapped_keys = list(map(387                    (lambda x: str(x) +388                     str(part[0]) + "=" + str(value) + "/"),389                    previous_fil_keys390                ))391                new_filter_keys = new_filter_keys + mapped_keys392            construct_paths(matched_parts, new_filter_keys)393        else:394            filter_keys.append(previous_fil_keys)395    construct_paths(matched_parts, [f"{key}/"])396    # TODO: fix the below mess with random array397    return filter_keys[0]398def _get_filtered_data(bucket: str, paths: List[str], partition_metadata: dict, parallel: bool = True) -> pd.DataFrame:399    """ Gets the data based on the filtered object key list. Concatenates all 400    the separate parquet files.401    Args:402        bucket (str): S3 bucket to fetch from403        paths (List[str]): A list of all the object keys of the parquet files404        partition_metadata (dict): A dictionary of all partitions to their datatypes405        parallel (bool, Optional):406            Determines if multiprocessing should be used, defaults to True.407            Whether this is more or less efficient is dataset dependent,408            smaller datasets run much quicker without parallel409    Returns:410        The dataframe of all the parquet files from the given keys411    """412    temp_frames = []413    def append_to_temp(frame: pd.DataFrame):414        temp_frames.append(frame)415    if parallel:416        with get_context("spawn").Pool() as pool:417            for path in paths:418                logger.debug(f"[Parallel] Fetching parquet file to append from : {path} ")419                append_to_temp(420                    pool.apply_async(_s3_parquet_to_dataframe, args=(bucket, path, partition_metadata)).get())421            pool.close()422            pool.join()423    else:424        for path in paths:425            logger.debug(f"Fetching parquet file to append from : {path} ")426            append_to_temp(_s3_parquet_to_dataframe(427                bucket, path, partition_metadata))428    return pd.concat(temp_frames, ignore_index=True)429def _s3_parquet_to_dataframe(bucket: str, key: str, partition_metadata: dict) -> pd.DataFrame:430    """ Grab a parquet file from s3 and convert to pandas df, add it to the destination431    Args:432        bucket (str): S3 bucket to fetch from433        key (key): The full object path of the parquet file to read434        partition_metadata (dict): A dictionary of all partitions to their datatypes435    Returns:436        Dataframe from the key parquet, with partitions repopulated437    """438    s3 = s3fs.S3FileSystem()439    uri = f"{bucket}/{key}"440    table = pq.ParquetDataset(uri, filesystem=s3)441    frame = table.read_pandas().to_pandas()442    if partition_metadata != {}:443        partitions = _repopulate_partitions(uri, partition_metadata)444        for k, v in partitions.items():445            frame[k] = v446    return frame447def _repopulate_partitions(partition_string: str, partition_metadata: dict) -> tuple:448    """ For each val in the partition string creates a list that can be added back into the dataframe449    Args:450        partition_string (str): S3 bucket to fetch from451        partition_metadata (dict): A dictionary of all partitions to their datatypes452    Returns:453        Dataframe from the key parquet, with partitions repopulated454    """455    raw = partition_string.split('/')456    partitions = {}457    for string in raw:458        if '=' in string:459            k, v = string.split('=')460            partitions[k] = v461    if partition_metadata:462        for key, val in partitions.items():463            partitions[key] = convert_type(val, partition_metadata[key])464    465    return partitions466def _validate_filter_rules(filters: List[type(Filter)]) -> None:467    """ Validate that the filters are the correct format and follow basic468    comparison rules, otherwise throw a ValueError469    Args:470        filters (List[type(Filter)]): List of filters to validate471    Returns:472        None473    """474    single_value_comparisons = [475        ">",476        "<",477        "<=",478        ">="479    ]480    for f in filters:...test_fetch_parq_internal.py
Source:test_fetch_parq_internal.py  
...38        }]39    ]40    for inv_fil in invalid_filters_list:41        with pytest.raises(ValueError):42            fetch_parq._validate_filter_rules(inv_fil)43    valid_filters_list = [44        [{45            "partition": "fake-part",46            "comparison": "==",47            "values": ["some-string"]48        }],49        [{50            "partition": "fake-part",51            "comparison": ">",52            "values": [1]53        }]54    ]55    for val_fil in valid_filters_list:56        fetch_parq._validate_filter_rules(val_fil)57# Test that all valid partitions are correctly parsed58def test_get_partitions():59    parts = [60        "fake-key/fil-1=1/fil-2=2/fil-3=str/rngf1.parquet",61        "fake-key/fil-1=1/fil-2=2/fil-3=rng-str/rngf2.parquet",62        "fake-key/fil-1=1/fil-2=4/fil-3=str_rng/rngf3.parquet",63        "fake-key/fil-1=5/fil-2=2/fil-3=str/rngf4.parquet",64        "fake-key/fil-1=5/fil-2=4/fil-3=99/rngf5.parquet"65    ]66    parsed_parts = OrderedDict({67        "fil-1": {"1", "5"},68        "fil-2": {"2", "4"},69        "fil-3": {"str", "rng-str", "str_rng", "99"}70    })...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
