How to use _validate_filter_rules method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...868 result += xmltodict.unparse(dest_dict, full_document=False)869 result += "</NotificationConfiguration>"870 response._content = result871 return response872def _validate_filter_rules(filter_doc):873 rules = filter_doc.get("FilterRule")874 if not rules:875 return876 for rule in rules:877 name = rule.get("Name", "")878 if name.lower() not in ["suffix", "prefix"]:879 raise InvalidFilterRuleName(name)880 # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882 rules = filter_doc.get("FilterRule")883 if not rules:884 return885 for rule in rules:886 name = rule.get("Name", "")...

Full Screen

Full Screen

fetch_parq.py

Source:fetch_parq.py Github

copy

Full Screen

...142 partitioned parquet.143 Returns:144 A pandas dataframe of the filtered results from S3145 """146 _validate_filter_rules(filters)147 S3NamingHelper().validate_bucket_name(bucket)148 all_files = get_all_files_list(bucket, key)149 if not all_files:150 logger.debug(f"No files present under : {key} :, returning empty DataFrame")151 return pd.DataFrame()152 partition_metadata = _get_partitions_and_types(all_files[0], bucket)153 if partition_metadata is None:154 if accept_not_s3parq:155 logger.info("Parquet files do not have S3Parq metadata, fetching anyways.")156 return _get_filtered_data(bucket=bucket, paths=all_files, partition_metadata={},157 parallel=parallel)158 else:159 raise MissingS3ParqMetadata("Parquet files are missing s3parq metadata, enable 'accept_not_s3parq' if you'd like this to pass.")160 _validate_matching_filter_data_type(partition_metadata, filters)161 # strip out the filenames (which we need)162 partition_values = _parse_partitions_and_values(all_files, key)163 typed_values = _get_partition_value_data_types(164 partition_values, partition_metadata)165 # filtered_paths is a list of the S3 prefixes from which we want to load data166 filtered_paths = _get_filtered_key_list(typed_values, filters, key)167 files_to_load = []168 for file in all_files:169 for prefix in filtered_paths:170 if file.startswith(prefix):171 files_to_load.append(file)172 # if there is no data matching the filters, return an empty DataFrame173 # with correct headers and type174 if len(files_to_load) < 1:175 logger.debug(f"Dataset filters left no matching values, returning empty dataframe with matching headers")176 sacrifical_files = [all_files[0]]177 sacrifical_frame = _get_filtered_data(178 bucket=bucket, paths=sacrifical_files, partition_metadata=partition_metadata, parallel=parallel)179 return sacrifical_frame.head(0)180 return _get_filtered_data(bucket=bucket, paths=files_to_load, partition_metadata=partition_metadata,181 parallel=parallel)182def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, comparison_key: str, partition: str, reverse: bool = False, parallel: bool = True) -> pd.DataFrame:183 """ Returns a dataframe of whats in the input dataset but not the comparison 184 dataset by the specified partition.185 Args:186 input_bucket (str): The bucket of the dataset to start from187 input_key (str): The key to the dataset to start from188 comparison_bucket (str): The bucket of the dataset to compare against189 comparison_key (str): The key to the dataset to compare against190 partition (str): The partition in the dataset to compare the values of191 reverse (bool, Optional): Determines if the operation should be inversed,192 if True it will look for the values in comparison that are not 193 in the input (basically backwards). Defaults to False194 parallel (bool, Optional):195 Determines if multiprocessing should be used, defaults to True.196 Whether this is more or less efficient is dataset dependent,197 smaller datasets run much quicker without parallel198 Returns:199 A dataframe of all values in the input but not the comparison,200 if reverse=True then vice-versa201 """202 S3NamingHelper().validate_bucket_name(input_bucket)203 S3NamingHelper().validate_bucket_name(comparison_bucket)204 comparison_values = get_all_partition_values(205 bucket=comparison_bucket, key=comparison_key, partition=partition)206 diff_values = get_diff_partition_values(207 bucket=input_bucket,208 key=input_key,209 partition=partition,210 values_to_diff=comparison_values,211 reverse=reverse212 )213 filters = [{214 "partition": partition,215 "comparison": "==",216 "values": diff_values217 }]218 logger.debug(f"Fetching difference, looking for values : {diff_values} : under partition : {partition} ")219 if reverse:220 return fetch(bucket=comparison_bucket, key=comparison_key, filters=filters, parallel=parallel)221 else:222 return fetch(bucket=input_bucket, key=input_key, filters=filters, parallel=parallel)223def convert_type(val: Any, dtype: str) -> Any:224 """ Converts the given value to the given datatype225 Args:226 val (Any): The value to convert227 dtype (str): The type to attempt to convert to228 Returns:229 The value parsed into the new dtype230 """231 if dtype == 'string' or dtype == 'str':232 return str(val)233 elif dtype == 'integer' or dtype == 'int':234 return int(val)235 elif dtype == 'float':236 return float(val)237 elif dtype == 'datetime':238 return datetime.datetime.strptime(239 val, '%Y-%m-%d %H:%M:%S')240 elif dtype == 'category':241 return pd.Category(val)242 elif dtype == 'bool' or dtype == 'boolean':243 return bool(strtobool(val))244def dtype_to_pandas_dtype(dtype: str) -> str:245 """ Matches the given dtype to its pandas equivalent, if one exists246 Args:247 dtype (str): The type to attempt to match248 Returns:249 The pandas version of the dtype if one exists, otherwise the original250 """251 if dtype == "integer":252 dtype = "int"253 elif dtype == "string":254 dtype = "str"255 elif dtype == "boolean":256 dtype = "bool"257 return dtype258def get_all_files_list(bucket: str, key: str) -> list:259 """ Get a list of all files to get all partitions values.260 Necesarry to catch all partition values for non-filtered partiions.261 NOTE: This paginates across as many as are matching keys in the bucket;262 be mindful of any costs in extra large buckets without a specific key.263 Args:264 bucket (str): S3 bucket to search in265 key (str): S3 key to the dataset to check266 Returns:267 A list of the keys of all objects in the bucket/key that end with .parquet268 """269 objects_in_bucket = []270 s3_client = boto3.client('s3')271 paginator = s3_client.get_paginator('list_objects')272 operation_parameters = {'Bucket': bucket,273 'Prefix': key}274 page_iterator = paginator.paginate(**operation_parameters)275 for page in page_iterator:276 if not "Contents" in page.keys():277 break278 for item in page['Contents']:279 if item['Key'].endswith('.parquet'):280 objects_in_bucket.append(item['Key'])281 return objects_in_bucket282def _get_partitions_and_types(first_file_key: str, bucket: str) -> dict:283 """ Fetch a list of all the partitions actually there and their 284 datatypes. List may be different than passed list if not being used285 for filtering on.286 NOTE: This is pulled from the metadata. It is assumed that this package287 is being used with its own publish method.288 Args:289 first_file_key (str): The key to the first file in the dataset, to get290 the metadata off of291 bucket (str): The S3 bucket to run in292 Returns:293 The parsed metadata from heading the first file294 TODO295 """296 s3_client = boto3.client('s3')297 first_file = s3_client.head_object(298 Bucket=bucket,299 Key=first_file_key300 )301 metadata = first_file['Metadata']302 partition_metadata_raw = metadata.get("partition_data_types", None)303 if partition_metadata_raw is None:304 return None305 partition_metadata = ast.literal_eval(partition_metadata_raw)306 return partition_metadata307def _parse_partitions_and_values(file_paths: List[str], key: str) -> dict:308 """ Parses the string keys into a usable dict of the parts and values309 Args:310 file_paths (List[str]): The list of all files to parse out311 key (str): S3 key to the root of the dataset of the files312 Returns:313 A dictionary of all partitions with their values314 """315 # TODO: find more neat/efficient way to do this316 parts = OrderedDict()317 key_len = len(key)318 for file_path in file_paths:319 # Delete key, split the parts out, delete the file name320 file_path = file_path[key_len:]321 unparsed_parts = file_path.split("/")322 del unparsed_parts[-1]323 for part in unparsed_parts:324 if "=" in part:325 key, value = part.split("=")326 if key not in parts:327 parts.update({key: set([value])})328 else:329 parts[key].add(value)330 return parts331def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dict:332 """ Uses the partitions with their known types to parse them out333 Args:334 parsed_parts (dict): A dictionary of all partitions with their values335 part_types (dict): A dictionary of all partitions to their datatypes336 Returns:337 A dictionary of all partitions with their values parsed into the correct datatype338 """339 for part, values in parsed_parts.items():340 part_type = part_types[part]341 if (part_type == 'string') or (part_type == 'category'):342 continue343 elif (part_type == 'int') or (part_type == 'integer'):344 parsed_parts[part] = set(map(int, values))345 elif part_type == 'float':346 parsed_parts[part] = set(map(float, values))347 elif part_type == 'datetime':348 parsed_parts[part] = set(349 map(lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), values))350 elif (part_type == 'bool') or (part_type == 'boolean'):351 parsed_parts[part] = set(map(bool, values))352 else:353 logger.debug(f"Unknown partition type : {part_type} :, leaving as a string")354 return parsed_parts355def _get_filtered_key_list(typed_parts: dict, filters: List[type(Filter)], key: str) -> List[str]:356 """ Create list of all "paths" to files after the filtered partitions357 are set ie all non-matching partitions are excluded.358 Args:359 typed_parts (dict): A dictionary of all partitions with their values360 filters (List[type(Filter)]): A dictionary of all partitions to their datatypes361 key (str): S3 key to the base root of the dataset362 Returns:363 A list of object keys that have been filtered by partition values364 """365 filter_keys = []366 matched_parts = OrderedDict()367 matched_parts.keys = typed_parts.keys()368 matched_part_vals = set()369 for part, part_values in typed_parts.items():370 matched_part_vals.clear()371 fil = next((f for f in filters if f['partition'] == part), False)372 if fil:373 comparison = OPS[fil['comparison']]374 for v in fil['values']:375 for x in part_values:376 if comparison(x, v):377 matched_part_vals.add(x)378 matched_parts[part] = matched_part_vals.copy()379 else:380 matched_parts[part] = part_values381 def construct_paths(matched_parts, previous_fil_keys: List[str]) -> None:382 if len(matched_parts) > 0:383 part = matched_parts.popitem(last=False)384 new_filter_keys = list()385 for value in part[1]:386 mapped_keys = list(map(387 (lambda x: str(x) +388 str(part[0]) + "=" + str(value) + "/"),389 previous_fil_keys390 ))391 new_filter_keys = new_filter_keys + mapped_keys392 construct_paths(matched_parts, new_filter_keys)393 else:394 filter_keys.append(previous_fil_keys)395 construct_paths(matched_parts, [f"{key}/"])396 # TODO: fix the below mess with random array397 return filter_keys[0]398def _get_filtered_data(bucket: str, paths: List[str], partition_metadata: dict, parallel: bool = True) -> pd.DataFrame:399 """ Gets the data based on the filtered object key list. Concatenates all 400 the separate parquet files.401 Args:402 bucket (str): S3 bucket to fetch from403 paths (List[str]): A list of all the object keys of the parquet files404 partition_metadata (dict): A dictionary of all partitions to their datatypes405 parallel (bool, Optional):406 Determines if multiprocessing should be used, defaults to True.407 Whether this is more or less efficient is dataset dependent,408 smaller datasets run much quicker without parallel409 Returns:410 The dataframe of all the parquet files from the given keys411 """412 temp_frames = []413 def append_to_temp(frame: pd.DataFrame):414 temp_frames.append(frame)415 if parallel:416 with get_context("spawn").Pool() as pool:417 for path in paths:418 logger.debug(f"[Parallel] Fetching parquet file to append from : {path} ")419 append_to_temp(420 pool.apply_async(_s3_parquet_to_dataframe, args=(bucket, path, partition_metadata)).get())421 pool.close()422 pool.join()423 else:424 for path in paths:425 logger.debug(f"Fetching parquet file to append from : {path} ")426 append_to_temp(_s3_parquet_to_dataframe(427 bucket, path, partition_metadata))428 return pd.concat(temp_frames, ignore_index=True)429def _s3_parquet_to_dataframe(bucket: str, key: str, partition_metadata: dict) -> pd.DataFrame:430 """ Grab a parquet file from s3 and convert to pandas df, add it to the destination431 Args:432 bucket (str): S3 bucket to fetch from433 key (key): The full object path of the parquet file to read434 partition_metadata (dict): A dictionary of all partitions to their datatypes435 Returns:436 Dataframe from the key parquet, with partitions repopulated437 """438 s3 = s3fs.S3FileSystem()439 uri = f"{bucket}/{key}"440 table = pq.ParquetDataset(uri, filesystem=s3)441 frame = table.read_pandas().to_pandas()442 if partition_metadata != {}:443 partitions = _repopulate_partitions(uri, partition_metadata)444 for k, v in partitions.items():445 frame[k] = v446 return frame447def _repopulate_partitions(partition_string: str, partition_metadata: dict) -> tuple:448 """ For each val in the partition string creates a list that can be added back into the dataframe449 Args:450 partition_string (str): S3 bucket to fetch from451 partition_metadata (dict): A dictionary of all partitions to their datatypes452 Returns:453 Dataframe from the key parquet, with partitions repopulated454 """455 raw = partition_string.split('/')456 partitions = {}457 for string in raw:458 if '=' in string:459 k, v = string.split('=')460 partitions[k] = v461 if partition_metadata:462 for key, val in partitions.items():463 partitions[key] = convert_type(val, partition_metadata[key])464 465 return partitions466def _validate_filter_rules(filters: List[type(Filter)]) -> None:467 """ Validate that the filters are the correct format and follow basic468 comparison rules, otherwise throw a ValueError469 Args:470 filters (List[type(Filter)]): List of filters to validate471 Returns:472 None473 """474 single_value_comparisons = [475 ">",476 "<",477 "<=",478 ">="479 ]480 for f in filters:...

Full Screen

Full Screen

test_fetch_parq_internal.py

Source:test_fetch_parq_internal.py Github

copy

Full Screen

...38 }]39 ]40 for inv_fil in invalid_filters_list:41 with pytest.raises(ValueError):42 fetch_parq._validate_filter_rules(inv_fil)43 valid_filters_list = [44 [{45 "partition": "fake-part",46 "comparison": "==",47 "values": ["some-string"]48 }],49 [{50 "partition": "fake-part",51 "comparison": ">",52 "values": [1]53 }]54 ]55 for val_fil in valid_filters_list:56 fetch_parq._validate_filter_rules(val_fil)57# Test that all valid partitions are correctly parsed58def test_get_partitions():59 parts = [60 "fake-key/fil-1=1/fil-2=2/fil-3=str/rngf1.parquet",61 "fake-key/fil-1=1/fil-2=2/fil-3=rng-str/rngf2.parquet",62 "fake-key/fil-1=1/fil-2=4/fil-3=str_rng/rngf3.parquet",63 "fake-key/fil-1=5/fil-2=2/fil-3=str/rngf4.parquet",64 "fake-key/fil-1=5/fil-2=4/fil-3=99/rngf5.parquet"65 ]66 parsed_parts = OrderedDict({67 "fil-1": {"1", "5"},68 "fil-2": {"2", "4"},69 "fil-3": {"str", "rng-str", "str_rng", "99"}70 })...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful