How to use is_extension_dtype method in pandera

Best Python code snippet using pandera_python

utils.py

Source:utils.py Github

copy

Full Screen

1#2# Licensed to the Apache Software Foundation (ASF) under one or more3# contributor license agreements. See the NOTICE file distributed with4# this work for additional information regarding copyright ownership.5# The ASF licenses this file to You under the Apache License, Version 2.06# (the "License"); you may not use this file except in compliance with7# the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""18Commonly used utils in pandas-on-Spark.19"""20import functools21from collections import OrderedDict22from contextlib import contextmanager23import os24from typing import ( # noqa: F401 (SPARK-34943)25 Any,26 Callable,27 Dict,28 Iterable,29 Iterator,30 List,31 Optional,32 Tuple,33 Union,34 TYPE_CHECKING,35 cast,36 no_type_check,37 overload,38)39import warnings40from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame, SparkSession41from pyspark.sql.types import DoubleType42import pandas as pd43from pandas.api.types import is_list_like44# For running doctests and reference resolution in PyCharm.45from pyspark import pandas as ps # noqa: F40146from pyspark.pandas._typing import Axis, Label, Name, DataFrameOrSeries47from pyspark.pandas.spark import functions as SF48from pyspark.pandas.typedef.typehints import as_spark_type49if TYPE_CHECKING:50 # This is required in old Python 3.5 to prevent circular reference.51 from pyspark.pandas.base import IndexOpsMixin # noqa: F401 (SPARK-34943)52 from pyspark.pandas.frame import DataFrame # noqa: F401 (SPARK-34943)53 from pyspark.pandas.internal import InternalFrame # noqa: F401 (SPARK-34943)54 from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943)55ERROR_MESSAGE_CANNOT_COMBINE = (56 "Cannot combine the series or dataframe because it comes from a different dataframe. "57 "In order to allow this operation, enable 'compute.ops_on_diff_frames' option."58)59SPARK_CONF_ARROW_ENABLED = "spark.sql.execution.arrow.pyspark.enabled"60def same_anchor(61 this: Union["DataFrame", "IndexOpsMixin", "InternalFrame"],62 that: Union["DataFrame", "IndexOpsMixin", "InternalFrame"],63) -> bool:64 """65 Check if the anchors of the given DataFrame or Series are the same or not.66 """67 from pyspark.pandas.base import IndexOpsMixin68 from pyspark.pandas.frame import DataFrame69 from pyspark.pandas.internal import InternalFrame70 if isinstance(this, InternalFrame):71 this_internal = this72 else:73 assert isinstance(this, (DataFrame, IndexOpsMixin)), type(this)74 this_internal = this._internal75 if isinstance(that, InternalFrame):76 that_internal = that77 else:78 assert isinstance(that, (DataFrame, IndexOpsMixin)), type(that)79 that_internal = that._internal80 return (81 this_internal.spark_frame is that_internal.spark_frame82 and this_internal.index_level == that_internal.index_level83 and all(84 spark_column_equals(this_scol, that_scol)85 for this_scol, that_scol in zip(86 this_internal.index_spark_columns, that_internal.index_spark_columns87 )88 )89 )90def combine_frames(91 this: "DataFrame",92 *args: DataFrameOrSeries,93 how: str = "full",94 preserve_order_column: bool = False95) -> "DataFrame":96 """97 This method combines `this` DataFrame with a different `that` DataFrame or98 Series from a different DataFrame.99 It returns a DataFrame that has prefix `this_` and `that_` to distinct100 the columns names from both DataFrames101 It internally performs a join operation which can be expensive in general.102 So, if `compute.ops_on_diff_frames` option is False,103 this method throws an exception.104 """105 from pyspark.pandas.config import get_option106 from pyspark.pandas.frame import DataFrame107 from pyspark.pandas.internal import (108 InternalField,109 InternalFrame,110 HIDDEN_COLUMNS,111 NATURAL_ORDER_COLUMN_NAME,112 SPARK_INDEX_NAME_FORMAT,113 )114 from pyspark.pandas.series import Series115 if all(isinstance(arg, Series) for arg in args):116 assert all(117 same_anchor(arg, args[0]) for arg in args118 ), "Currently only one different DataFrame (from given Series) is supported"119 assert not same_anchor(this, args[0]), "We don't need to combine. All series is in this."120 that = args[0]._psdf[list(args)]121 elif len(args) == 1 and isinstance(args[0], DataFrame):122 assert isinstance(args[0], DataFrame)123 assert not same_anchor(124 this, args[0]125 ), "We don't need to combine. `this` and `that` are same."126 that = args[0]127 else:128 raise AssertionError("args should be single DataFrame or " "single/multiple Series")129 if get_option("compute.ops_on_diff_frames"):130 def resolve(internal: InternalFrame, side: str) -> InternalFrame:131 rename = lambda col: "__{}_{}".format(side, col)132 internal = internal.resolved_copy133 sdf = internal.spark_frame134 sdf = internal.spark_frame.select(135 *[136 scol_for(sdf, col).alias(rename(col))137 for col in sdf.columns138 if col not in HIDDEN_COLUMNS139 ],140 *HIDDEN_COLUMNS141 )142 return internal.copy(143 spark_frame=sdf,144 index_spark_columns=[145 scol_for(sdf, rename(col)) for col in internal.index_spark_column_names146 ],147 index_fields=[148 field.copy(name=rename(field.name)) for field in internal.index_fields149 ],150 data_spark_columns=[151 scol_for(sdf, rename(col)) for col in internal.data_spark_column_names152 ],153 data_fields=[field.copy(name=rename(field.name)) for field in internal.data_fields],154 )155 this_internal = resolve(this._internal, "this")156 that_internal = resolve(that._internal, "that")157 this_index_map = list(158 zip(159 this_internal.index_spark_column_names,160 this_internal.index_names,161 this_internal.index_fields,162 )163 )164 that_index_map = list(165 zip(166 that_internal.index_spark_column_names,167 that_internal.index_names,168 that_internal.index_fields,169 )170 )171 assert len(this_index_map) == len(that_index_map)172 join_scols = []173 merged_index_scols = []174 # Note that the order of each element in index_map is guaranteed according to the index175 # level.176 this_and_that_index_map = list(zip(this_index_map, that_index_map))177 this_sdf = this_internal.spark_frame.alias("this")178 that_sdf = that_internal.spark_frame.alias("that")179 # If the same named index is found, that's used.180 index_column_names = []181 index_use_extension_dtypes = []182 for (183 i,184 ((this_column, this_name, this_field), (that_column, that_name, that_field)),185 ) in enumerate(this_and_that_index_map):186 if this_name == that_name:187 # We should merge the Spark columns into one188 # to mimic pandas' behavior.189 this_scol = scol_for(this_sdf, this_column)190 that_scol = scol_for(that_sdf, that_column)191 join_scol = this_scol == that_scol192 join_scols.append(join_scol)193 column_name = SPARK_INDEX_NAME_FORMAT(i)194 index_column_names.append(column_name)195 index_use_extension_dtypes.append(196 any(field.is_extension_dtype for field in [this_field, that_field])197 )198 merged_index_scols.append(199 F.when(this_scol.isNotNull(), this_scol).otherwise(that_scol).alias(column_name)200 )201 else:202 raise ValueError("Index names must be exactly matched currently.")203 assert len(join_scols) > 0, "cannot join with no overlapping index names"204 joined_df = this_sdf.join(that_sdf, on=join_scols, how=how)205 if preserve_order_column:206 order_column = [scol_for(this_sdf, NATURAL_ORDER_COLUMN_NAME)]207 else:208 order_column = []209 joined_df = joined_df.select(210 *merged_index_scols,211 *(212 scol_for(this_sdf, this_internal.spark_column_name_for(label))213 for label in this_internal.column_labels214 ),215 *(216 scol_for(that_sdf, that_internal.spark_column_name_for(label))217 for label in that_internal.column_labels218 ),219 *order_column220 )221 index_spark_columns = [scol_for(joined_df, col) for col in index_column_names]222 index_columns = set(index_column_names)223 new_data_columns = [224 col225 for col in joined_df.columns226 if col not in index_columns and col != NATURAL_ORDER_COLUMN_NAME227 ]228 schema = joined_df.select(*index_spark_columns, *new_data_columns).schema229 index_fields = [230 InternalField.from_struct_field(struct_field, use_extension_dtypes=use_extension_dtypes)231 for struct_field, use_extension_dtypes in zip(232 schema.fields[: len(index_spark_columns)], index_use_extension_dtypes233 )234 ]235 data_fields = [236 InternalField.from_struct_field(237 struct_field, use_extension_dtypes=field.is_extension_dtype238 )239 for struct_field, field in zip(240 schema.fields[len(index_spark_columns) :],241 this_internal.data_fields + that_internal.data_fields,242 )243 ]244 level = max(this_internal.column_labels_level, that_internal.column_labels_level)245 def fill_label(label: Optional[Label]) -> List:246 if label is None:247 return ([""] * (level - 1)) + [None]248 else:249 return ([""] * (level - len(label))) + list(label)250 column_labels = [251 tuple(["this"] + fill_label(label)) for label in this_internal.column_labels252 ] + [tuple(["that"] + fill_label(label)) for label in that_internal.column_labels]253 column_label_names = (254 cast(List[Optional[Label]], [None]) * (1 + level - this_internal.column_labels_level)255 ) + this_internal.column_label_names256 return DataFrame(257 InternalFrame(258 spark_frame=joined_df,259 index_spark_columns=index_spark_columns,260 index_names=this_internal.index_names,261 index_fields=index_fields,262 column_labels=column_labels,263 data_spark_columns=[scol_for(joined_df, col) for col in new_data_columns],264 data_fields=data_fields,265 column_label_names=column_label_names,266 )267 )268 else:269 raise ValueError(ERROR_MESSAGE_CANNOT_COMBINE)270def align_diff_frames(271 resolve_func: Callable[272 ["DataFrame", List[Label], List[Label]], Iterator[Tuple["Series", Label]]273 ],274 this: "DataFrame",275 that: "DataFrame",276 fillna: bool = True,277 how: str = "full",278 preserve_order_column: bool = False,279) -> "DataFrame":280 """281 This method aligns two different DataFrames with a given `func`. Columns are resolved and282 handled within the given `func`.283 To use this, `compute.ops_on_diff_frames` should be True, for now.284 :param resolve_func: Takes aligned (joined) DataFrame, the column of the current DataFrame, and285 the column of another DataFrame. It returns an iterable that produces Series.286 >>> from pyspark.pandas.config import set_option, reset_option287 >>>288 >>> set_option("compute.ops_on_diff_frames", True)289 >>>290 >>> psdf1 = ps.DataFrame({'a': [9, 8, 7, 6, 5, 4, 3, 2, 1]})291 >>> psdf2 = ps.DataFrame({'a': [9, 8, 7, 6, 5, 4, 3, 2, 1]})292 >>>293 >>> def func(psdf, this_column_labels, that_column_labels):294 ... psdf # conceptually this is A + B.295 ...296 ... # Within this function, Series from A or B can be performed against `psdf`.297 ... this_label = this_column_labels[0] # this is ('a',) from psdf1.298 ... that_label = that_column_labels[0] # this is ('a',) from psdf2.299 ... new_series = (psdf[this_label] - psdf[that_label]).rename(str(this_label))300 ...301 ... # This new series will be placed in new DataFrame.302 ... yield (new_series, this_label)303 >>>304 >>>305 >>> align_diff_frames(func, psdf1, psdf2).sort_index()306 a307 0 0308 1 0309 2 0310 3 0311 4 0312 5 0313 6 0314 7 0315 8 0316 >>> reset_option("compute.ops_on_diff_frames")317 :param this: a DataFrame to align318 :param that: another DataFrame to align319 :param fillna: If True, it fills missing values in non-common columns in both `this` and `that`.320 Otherwise, it returns as are.321 :param how: join way. In addition, it affects how `resolve_func` resolves the column conflict.322 - full: `resolve_func` should resolve only common columns from 'this' and 'that' DataFrames.323 For instance, if 'this' has columns A, B, C and that has B, C, D, `this_columns` and324 'that_columns' in this function are B, C and B, C.325 - left: `resolve_func` should resolve columns including that columns.326 For instance, if 'this' has columns A, B, C and that has B, C, D, `this_columns` is327 B, C but `that_columns` are B, C, D.328 - inner: Same as 'full' mode; however, internally performs inner join instead.329 :return: Aligned DataFrame330 """331 from pyspark.pandas.frame import DataFrame332 assert how == "full" or how == "left" or how == "inner"333 this_column_labels = this._internal.column_labels334 that_column_labels = that._internal.column_labels335 common_column_labels = set(this_column_labels).intersection(that_column_labels)336 # 1. Perform the join given two dataframes.337 combined = combine_frames(this, that, how=how, preserve_order_column=preserve_order_column)338 # 2. Apply the given function to transform the columns in a batch and keep the new columns.339 combined_column_labels = combined._internal.column_labels340 that_columns_to_apply = [] # type: List[Label]341 this_columns_to_apply = [] # type: List[Label]342 additional_that_columns = [] # type: List[Label]343 columns_to_keep = [] # type: List[Union[Series, Column]]344 column_labels_to_keep = [] # type: List[Label]345 for combined_label in combined_column_labels:346 for common_label in common_column_labels:347 if combined_label == tuple(["this", *common_label]):348 this_columns_to_apply.append(combined_label)349 break350 elif combined_label == tuple(["that", *common_label]):351 that_columns_to_apply.append(combined_label)352 break353 else:354 if how == "left" and combined_label in [355 tuple(["that", *label]) for label in that_column_labels356 ]:357 # In this case, we will drop `that_columns` in `columns_to_keep` but passes358 # it later to `func`. `func` should resolve it.359 # Note that adding this into a separate list (`additional_that_columns`)360 # is intentional so that `this_columns` and `that_columns` can be paired.361 additional_that_columns.append(combined_label)362 elif fillna:363 columns_to_keep.append(SF.lit(None).cast(DoubleType()).alias(str(combined_label)))364 column_labels_to_keep.append(combined_label)365 else:366 columns_to_keep.append(combined._psser_for(combined_label))367 column_labels_to_keep.append(combined_label)368 that_columns_to_apply += additional_that_columns369 # Should extract columns to apply and do it in a batch in case370 # it adds new columns for example.371 if len(this_columns_to_apply) > 0 or len(that_columns_to_apply) > 0:372 psser_set, column_labels_set = zip(373 *resolve_func(combined, this_columns_to_apply, that_columns_to_apply)374 )375 columns_applied = list(psser_set) # type: List[Union[Series, Column]]376 column_labels_applied = list(column_labels_set) # type: List[Label]377 else:378 columns_applied = []379 column_labels_applied = []380 applied = DataFrame(381 combined._internal.with_new_columns(382 columns_applied + columns_to_keep,383 column_labels=column_labels_applied + column_labels_to_keep,384 )385 ) # type: DataFrame386 # 3. Restore the names back and deduplicate columns.387 this_labels = OrderedDict()388 # Add columns in an order of its original frame.389 for this_label in this_column_labels:390 for new_label in applied._internal.column_labels:391 if new_label[1:] not in this_labels and this_label == new_label[1:]:392 this_labels[new_label[1:]] = new_label393 # After that, we will add the rest columns.394 other_labels = OrderedDict()395 for new_label in applied._internal.column_labels:396 if new_label[1:] not in this_labels:397 other_labels[new_label[1:]] = new_label398 psdf = applied[list(this_labels.values()) + list(other_labels.values())]399 psdf.columns = psdf.columns.droplevel()400 return psdf401def is_testing() -> bool:402 """Indicates whether Spark is currently running tests."""403 return "SPARK_TESTING" in os.environ404def default_session(conf: Optional[Dict[str, Any]] = None) -> SparkSession:405 if conf is None:406 conf = dict()407 builder = SparkSession.builder.appName("pandas-on-Spark")408 for key, value in conf.items():409 builder = builder.config(key, value)410 # Currently, pandas-on-Spark is dependent on such join due to 'compute.ops_on_diff_frames'411 # configuration. This is needed with Spark 3.0+.412 builder.config("spark.sql.analyzer.failAmbiguousSelfJoin", False)413 if is_testing():414 builder.config("spark.executor.allowSparkContext", False)415 return builder.getOrCreate()416@contextmanager417def sql_conf(pairs: Dict[str, Any], *, spark: Optional[SparkSession] = None) -> Iterator[None]:418 """419 A convenient context manager to set `value` to the Spark SQL configuration `key` and420 then restores it back when it exits.421 """422 assert isinstance(pairs, dict), "pairs should be a dictionary."423 if spark is None:424 spark = default_session()425 keys = pairs.keys()426 new_values = pairs.values()427 old_values = [spark.conf.get(key, None) for key in keys]428 for key, new_value in zip(keys, new_values):429 spark.conf.set(key, new_value)430 try:431 yield432 finally:433 for key, old_value in zip(keys, old_values):434 if old_value is None:435 spark.conf.unset(key)436 else:437 spark.conf.set(key, old_value)438def validate_arguments_and_invoke_function(439 pobj: Union[pd.DataFrame, pd.Series],440 pandas_on_spark_func: Callable,441 pandas_func: Callable,442 input_args: Dict,443) -> Any:444 """445 Invokes a pandas function.446 This is created because different versions of pandas support different parameters, and as a447 result when we code against the latest version, our users might get a confusing448 "got an unexpected keyword argument" error if they are using an older version of pandas.449 This function validates all the arguments, removes the ones that are not supported if they450 are simply the default value (i.e. most likely the user didn't explicitly specify it). It451 throws a TypeError if the user explicitly specify an argument that is not supported by the452 pandas version available.453 For example usage, look at DataFrame.to_html().454 :param pobj: the pandas DataFrame or Series to operate on455 :param pandas_on_spark_func: pandas-on-Spark function, used to get default parameter values456 :param pandas_func: pandas function, used to check whether pandas supports all the arguments457 :param input_args: arguments to pass to the pandas function, often created by using locals().458 Make sure locals() call is at the top of the function so it captures only459 input parameters, rather than local variables.460 :return: whatever pandas_func returns461 """462 import inspect463 # Makes a copy since whatever passed in is likely created by locals(), and we can't delete464 # 'self' key from that.465 args = input_args.copy()466 del args["self"]467 if "kwargs" in args:468 # explode kwargs469 kwargs = args["kwargs"]470 del args["kwargs"]471 args = {**args, **kwargs}472 pandas_on_spark_params = inspect.signature(pandas_on_spark_func).parameters473 pandas_params = inspect.signature(pandas_func).parameters474 for param in pandas_on_spark_params.values():475 if param.name not in pandas_params:476 if args[param.name] == param.default:477 del args[param.name]478 else:479 raise TypeError(480 (481 "The pandas version [%s] available does not support parameter '%s' "482 + "for function '%s'."483 )484 % (pd.__version__, param.name, pandas_func.__name__)485 )486 args["self"] = pobj487 return pandas_func(**args)488@no_type_check489def lazy_property(fn: Callable[[Any], Any]) -> property:490 """491 Decorator that makes a property lazy-evaluated.492 Copied from https://stevenloria.com/lazy-properties/493 """494 attr_name = "_lazy_" + fn.__name__495 @property496 @functools.wraps(fn)497 def wrapped_lazy_property(self):498 if not hasattr(self, attr_name):499 setattr(self, attr_name, fn(self))500 return getattr(self, attr_name)501 def deleter(self):502 if hasattr(self, attr_name):503 delattr(self, attr_name)504 return wrapped_lazy_property.deleter(deleter)505def scol_for(sdf: SparkDataFrame, column_name: str) -> Column:506 """Return Spark Column for the given column name."""507 return sdf["`{}`".format(column_name)]508def column_labels_level(column_labels: List[Label]) -> int:509 """Return the level of the column index."""510 if len(column_labels) == 0:511 return 1512 else:513 levels = set(1 if label is None else len(label) for label in column_labels)514 assert len(levels) == 1, levels515 return list(levels)[0]516def name_like_string(name: Optional[Name]) -> str:517 """518 Return the name-like strings from str or tuple of str519 Examples520 --------521 >>> name = 'abc'522 >>> name_like_string(name)523 'abc'524 >>> name = ('abc',)525 >>> name_like_string(name)526 'abc'527 >>> name = ('a', 'b', 'c')528 >>> name_like_string(name)529 '(a, b, c)'530 """531 if name is None:532 label = ("__none__",) # type: Label533 elif is_list_like(name):534 label = tuple([str(n) for n in name])535 else:536 label = (str(name),)537 return ("(%s)" % ", ".join(label)) if len(label) > 1 else label[0]538def is_name_like_tuple(value: Any, allow_none: bool = True, check_type: bool = False) -> bool:539 """540 Check the given tuple is be able to be used as a name.541 Examples542 --------543 >>> is_name_like_tuple(('abc',))544 True545 >>> is_name_like_tuple((1,))546 True547 >>> is_name_like_tuple(('abc', 1, None))548 True549 >>> is_name_like_tuple(('abc', 1, None), check_type=True)550 True551 >>> is_name_like_tuple((1.0j,))552 True553 >>> is_name_like_tuple(tuple())554 False555 >>> is_name_like_tuple((list('abc'),))556 False557 >>> is_name_like_tuple(('abc', 1, None), allow_none=False)558 False559 >>> is_name_like_tuple((1.0j,), check_type=True)560 False561 """562 if value is None:563 return allow_none564 elif not isinstance(value, tuple):565 return False566 elif len(value) == 0:567 return False568 elif not allow_none and any(v is None for v in value):569 return False570 elif any(is_list_like(v) or isinstance(v, slice) for v in value):571 return False572 elif check_type:573 return all(574 v is None or as_spark_type(type(v), raise_error=False) is not None for v in value575 )576 else:577 return True578def is_name_like_value(579 value: Any, allow_none: bool = True, allow_tuple: bool = True, check_type: bool = False580) -> bool:581 """582 Check the given value is like a name.583 Examples584 --------585 >>> is_name_like_value('abc')586 True587 >>> is_name_like_value(1)588 True589 >>> is_name_like_value(None)590 True591 >>> is_name_like_value(('abc',))592 True593 >>> is_name_like_value(1.0j)594 True595 >>> is_name_like_value(list('abc'))596 False597 >>> is_name_like_value(None, allow_none=False)598 False599 >>> is_name_like_value(('abc',), allow_tuple=False)600 False601 >>> is_name_like_value(1.0j, check_type=True)602 False603 """604 if value is None:605 return allow_none606 elif isinstance(value, tuple):607 return allow_tuple and is_name_like_tuple(608 value, allow_none=allow_none, check_type=check_type609 )610 elif is_list_like(value) or isinstance(value, slice):611 return False612 elif check_type:613 return as_spark_type(type(value), raise_error=False) is not None614 else:615 return True616def validate_axis(axis: Optional[Axis] = 0, none_axis: int = 0) -> int:617 """Check the given axis is valid."""618 # convert to numeric axis619 axis = cast(Dict[Optional[Axis], int], {None: none_axis, "index": 0, "columns": 1}).get(620 axis, axis621 )622 if axis in (none_axis, 0, 1):623 return cast(int, axis)624 else:625 raise ValueError("No axis named {0}".format(axis))626def validate_bool_kwarg(value: Any, arg_name: str) -> Optional[bool]:627 """Ensures that argument passed in arg_name is of type bool."""628 if not (isinstance(value, bool) or value is None):629 raise TypeError(630 'For argument "{}" expected type bool, received '631 "type {}.".format(arg_name, type(value).__name__)632 )633 return value634def validate_how(how: str) -> str:635 """Check the given how for join is valid."""636 if how == "full":637 warnings.warn(638 "Warning: While pandas-on-Spark will accept 'full', you should use 'outer' "639 + "instead to be compatible with the pandas merge API",640 UserWarning,641 )642 if how == "outer":643 # 'outer' in pandas equals 'full' in Spark644 how = "full"645 if how not in ("inner", "left", "right", "full"):646 raise ValueError(647 "The 'how' parameter has to be amongst the following values: ",648 "['inner', 'left', 'right', 'outer']",649 )650 return how651def validate_mode(mode: str) -> str:652 """Check the given mode for writing is valid."""653 if mode in ("w", "w+"):654 # 'w' in pandas equals 'overwrite' in Spark655 # '+' is meaningless for writing methods, but pandas just pass it as 'w'.656 mode = "overwrite"657 if mode in ("a", "a+"):658 # 'a' in pandas equals 'append' in Spark659 # '+' is meaningless for writing methods, but pandas just pass it as 'a'.660 mode = "append"661 if mode not in (662 "w",663 "a",664 "w+",665 "a+",666 "overwrite",667 "append",668 "ignore",669 "error",670 "errorifexists",671 ):672 raise ValueError(673 "The 'mode' parameter has to be amongst the following values: ",674 "['w', 'a', 'w+', 'a+', 'overwrite', 'append', 'ignore', 'error', 'errorifexists']",675 )676 return mode677@overload678def verify_temp_column_name(df: SparkDataFrame, column_name_or_label: str) -> str:679 ...680@overload681def verify_temp_column_name(df: "DataFrame", column_name_or_label: Name) -> Label:682 ...683def verify_temp_column_name(684 df: Union["DataFrame", SparkDataFrame], column_name_or_label: Union[str, Name]685) -> Union[str, Label]:686 """687 Verify that the given column name does not exist in the given pandas-on-Spark or688 Spark DataFrame.689 The temporary column names should start and end with `__`. In addition, `column_name_or_label`690 expects a single string, or column labels when `df` is a pandas-on-Spark DataFrame.691 >>> psdf = ps.DataFrame({("x", "a"): ['a', 'b', 'c']})692 >>> psdf["__dummy__"] = 0693 >>> psdf[("", "__dummy__")] = 1694 >>> psdf # doctest: +NORMALIZE_WHITESPACE695 x __dummy__696 a __dummy__697 0 a 0 1698 1 b 0 1699 2 c 0 1700 >>> verify_temp_column_name(psdf, '__tmp__')701 ('__tmp__', '')702 >>> verify_temp_column_name(psdf, ('', '__tmp__'))703 ('', '__tmp__')704 >>> verify_temp_column_name(psdf, '__dummy__')705 Traceback (most recent call last):706 ...707 AssertionError: ... `(__dummy__, )` ...708 >>> verify_temp_column_name(psdf, ('', '__dummy__'))709 Traceback (most recent call last):710 ...711 AssertionError: ... `(, __dummy__)` ...712 >>> verify_temp_column_name(psdf, 'dummy')713 Traceback (most recent call last):714 ...715 AssertionError: ... should be empty or start and end with `__`: ('dummy', '')716 >>> verify_temp_column_name(psdf, ('', 'dummy'))717 Traceback (most recent call last):718 ...719 AssertionError: ... should be empty or start and end with `__`: ('', 'dummy')720 >>> internal = psdf._internal.resolved_copy721 >>> sdf = internal.spark_frame722 >>> sdf.select(internal.data_spark_columns).show() # doctest: +NORMALIZE_WHITESPACE723 +------+---------+-------------+724 |(x, a)|__dummy__|(, __dummy__)|725 +------+---------+-------------+726 | a| 0| 1|727 | b| 0| 1|728 | c| 0| 1|729 +------+---------+-------------+730 >>> verify_temp_column_name(sdf, '__tmp__')731 '__tmp__'732 >>> verify_temp_column_name(sdf, '__dummy__')733 Traceback (most recent call last):734 ...735 AssertionError: ... `__dummy__` ... '(x, a)', '__dummy__', '(, __dummy__)', ...736 >>> verify_temp_column_name(sdf, ('', '__dummy__'))737 Traceback (most recent call last):738 ...739 AssertionError: <class 'tuple'>740 >>> verify_temp_column_name(sdf, 'dummy')741 Traceback (most recent call last):742 ...743 AssertionError: ... should start and end with `__`: dummy744 """745 from pyspark.pandas.frame import DataFrame746 if isinstance(df, DataFrame):747 if isinstance(column_name_or_label, str):748 column_name = column_name_or_label749 level = df._internal.column_labels_level750 column_name_or_label = tuple([column_name_or_label] + ([""] * (level - 1)))751 else:752 column_name = name_like_string(column_name_or_label)753 assert any(len(label) > 0 for label in column_name_or_label) and all(754 label == "" or (label.startswith("__") and label.endswith("__"))755 for label in column_name_or_label756 ), "The temporary column name should be empty or start and end with `__`: {}".format(757 column_name_or_label758 )759 assert all(760 column_name_or_label != label for label in df._internal.column_labels761 ), "The given column name `{}` already exists in the pandas-on-Spark DataFrame: {}".format(762 name_like_string(column_name_or_label), df.columns763 )764 df = df._internal.resolved_copy.spark_frame765 else:766 assert isinstance(column_name_or_label, str), type(column_name_or_label)767 assert column_name_or_label.startswith("__") and column_name_or_label.endswith(768 "__"769 ), "The temporary column name should start and end with `__`: {}".format(770 column_name_or_label771 )772 column_name = column_name_or_label773 assert isinstance(df, SparkDataFrame), type(df)774 assert (775 column_name not in df.columns776 ), "The given column name `{}` already exists in the Spark DataFrame: {}".format(777 column_name, df.columns778 )779 return column_name_or_label780def spark_column_equals(left: Column, right: Column) -> bool:781 """782 Check both `left` and `right` have the same expressions.783 >>> spark_column_equals(SF.lit(0), SF.lit(0))784 True785 >>> spark_column_equals(SF.lit(0) + 1, SF.lit(0) + 1)786 True787 >>> spark_column_equals(SF.lit(0) + 1, SF.lit(0) + 2)788 False789 >>> sdf1 = ps.DataFrame({"x": ['a', 'b', 'c']}).to_spark()790 >>> spark_column_equals(sdf1["x"] + 1, sdf1["x"] + 1)791 True792 >>> sdf2 = ps.DataFrame({"x": ['a', 'b', 'c']}).to_spark()793 >>> spark_column_equals(sdf1["x"] + 1, sdf2["x"] + 1)794 False795 """796 return left._jc.equals(right._jc) # type: ignore797def compare_null_first(798 left: Column,799 right: Column,800 comp: Callable[[Column, Column], Column],801) -> Column:802 return (left.isNotNull() & right.isNotNull() & comp(left, right)) | (803 left.isNull() & right.isNotNull()804 )805def compare_null_last(806 left: Column,807 right: Column,808 comp: Callable[[Column, Column], Column],809) -> Column:810 return (left.isNotNull() & right.isNotNull() & comp(left, right)) | (811 left.isNotNull() & right.isNull()812 )813def compare_disallow_null(814 left: Column,815 right: Column,816 comp: Callable[[Column, Column], Column],817) -> Column:818 return left.isNotNull() & right.isNotNull() & comp(left, right)819def compare_allow_null(820 left: Column,821 right: Column,822 comp: Callable[[Column, Column], Column],823) -> Column:824 return left.isNull() | right.isNull() | comp(left, right)825def _test() -> None:826 import os827 import doctest828 import sys829 from pyspark.sql import SparkSession830 import pyspark.pandas.utils831 os.chdir(os.environ["SPARK_HOME"])832 globs = pyspark.pandas.utils.__dict__.copy()833 globs["ps"] = pyspark.pandas834 spark = (835 SparkSession.builder.master("local[4]").appName("pyspark.pandas.utils tests").getOrCreate()836 )837 (failure_count, test_count) = doctest.testmod(838 pyspark.pandas.utils,839 globs=globs,840 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,841 )842 spark.stop()843 if failure_count:844 sys.exit(-1)845if __name__ == "__main__":...

Full Screen

Full Screen

pandas_engine.py

Source:pandas_engine.py Github

copy

Full Screen

...27try:28 from typing import Literal # type: ignore29except ImportError:30 from typing_extensions import Literal # type: ignore31def is_extension_dtype(pd_dtype: PandasDataType) -> bool:32 """Check if a value is a pandas extension type or instance of one."""33 return isinstance(pd_dtype, PandasExtensionType) or (34 isinstance(pd_dtype, type)35 and issubclass(pd_dtype, PandasExtensionType)36 )37@immutable(init=True)38class DataType(dtypes.DataType):39 """Base `DataType` for boxing Pandas data types."""40 type: Any = dataclasses.field(repr=False, init=False)41 """Native pandas dtype boxed by the data type."""42 def __init__(self, dtype: Any):43 super().__init__()44 object.__setattr__(self, "type", pd.api.types.pandas_dtype(dtype))45 dtype_cls = dtype if inspect.isclass(dtype) else dtype.__class__46 warnings.warn(47 f"'{dtype_cls}' support is not guaranteed.\n"48 + "Usage Tip: Consider writing a custom "49 + "pandera.dtypes.DataType or opening an issue at "50 + "https://github.com/pandera-dev/pandera"51 )52 def __post_init__(self):53 # this method isn't called if __init__ is defined54 object.__setattr__(55 self, "type", pd.api.types.pandas_dtype(self.type)56 ) # pragma: no cover57 def coerce(self, data_container: PandasObject) -> PandasObject:58 """Pure coerce without catching exceptions."""59 coerced = data_container.astype(self.type)60 if type(data_container).__module__.startswith("modin.pandas"):61 # NOTE: this is a hack to enable catching of errors in modin62 coerced.__str__()63 return coerced64 def try_coerce(self, data_container: PandasObject) -> PandasObject:65 try:66 return self.coerce(data_container)67 except Exception as exc: # pylint:disable=broad-except68 raise errors.ParserError(69 f"Could not coerce {type(data_container)} data_container "70 f"into type {self.type}",71 failure_cases=utils.numpy_pandas_coerce_failure_cases(72 data_container, self73 ),74 ) from exc75 def check(self, pandera_dtype: dtypes.DataType) -> bool:76 try:77 pandera_dtype = Engine.dtype(pandera_dtype)78 except TypeError:79 return False80 # attempts to compare pandas native type if possible81 # to let subclass inherit check82 # (super will compare that DataType classes are exactly the same)83 try:84 return self.type == pandera_dtype.type or super().check(85 pandera_dtype86 )87 except TypeError:88 return super().check(pandera_dtype)89 def __str__(self) -> str:90 return str(self.type)91 def __repr__(self) -> str:92 return f"DataType({self})"93class Engine( # pylint:disable=too-few-public-methods94 metaclass=engine.Engine,95 base_pandera_dtypes=(DataType, numpy_engine.DataType),96):97 """Pandas data type engine."""98 @classmethod99 def dtype(cls, data_type: Any) -> "DataType":100 """Convert input into a pandas-compatible101 Pandera :class:`~pandera.dtypes.DataType` object."""102 try:103 return engine.Engine.dtype(cls, data_type)104 except TypeError:105 if is_extension_dtype(data_type) and isinstance(data_type, type):106 try:107 np_or_pd_dtype = data_type()108 # Convert to str here because some pandas dtypes allow109 # an empty constructor for compatibility but fail on110 # str(). e.g: PeriodDtype111 str(np_or_pd_dtype.name)112 except (TypeError, AttributeError) as err:113 raise TypeError(114 f" dtype {data_type} cannot be instantiated: {err}\n"115 "Usage Tip: Use an instance or a string "116 "representation."117 ) from None118 else:119 # let pandas transform any acceptable value...

Full Screen

Full Screen

data_manipulation.py

Source:data_manipulation.py Github

copy

Full Screen

...135 is_int_dtype = pd.api.types.is_integer_dtype136 is_float_dtype = pd.api.types.is_float_dtype137 is_string_dtype = pd.api.types.is_string_dtype138 is_extension_dtype = pd.api.types.is_extension_array_dtype139 if is_extension_dtype(values):140 if is_int_dtype(values):141 downcast_values = values.fillna(-1).astype(int)142 elif is_bool_dtype(values):143 downcast_values = values.fillna(False).astype(np.bool_, errors='raise')144 elif is_float_dtype(values):145 downcast_values = pd.to_numeric(values, errors='coerce')146 elif is_string_dtype(values):147 downcast_values = values.astype(np.object_, errors='raise')148 else:149 raise TypeError('unsupported extended datatype {TYPE} provided'.format(TYPE=values.dtype))150 else:151 downcast_values = values152 return downcast_values153def evaluate_condition(data, expression):154 """155 Evaluate a boolean expression for a set of data.156 Arguments:157 data: dictionary, Series, or DataFrame of data potentially containing one or more variables used in the158 operation.159 expression: string or list of expression components describing a conditional statement to evaluate on the160 provided data.161 Returns:162 results (pd.Series): results of the evaluation for each row of data provided.163 """164 is_bool_dtype = pd.api.types.is_bool_dtype165 is_extension_dtype = pd.api.types.is_extension_array_dtype166 reserved_chars = ('and', 'or', 'in', 'not', '+', '-', '/', '//', '*', '**', '%', '>', '>=', '<', '<=', '==', '!=',167 '~', ',', '(', ')', '[', ']', '{', '}')168 if isinstance(data, pd.Series): # single data entry169 df = data.to_frame().T170 elif isinstance(data, pd.DataFrame): # one or more entries171 df = data.copy()172 elif isinstance(data, dict): # one or more entries173 try:174 df = pd.DataFrame(data)175 except ValueError: # single entry was given176 df = pd.DataFrame(data, index=[0])177 else:178 raise ValueError('data must be either a pandas DataFrame or Series')179 header = df.columns.tolist()180 if isinstance(expression, str):181 components = parse_expression(expression)182 elif isinstance(expression, list):183 components = expression184 else:185 raise ValueError('expression {} must be provided as either a string or list'.format(expression))186 if len(components) > 1:187 # Quote non-numeric, static expression variables188 exp_comps = []189 for component in components:190 if component in header:191 col_values = df[component]192 if is_extension_dtype(col_values):193 try:194 df.loc[:, component] = downcast_extended(col_values)195 except TypeError:196 logger.exception('failed to downcast component {COMP} values'.format(COMP=component))197 raise198 exp_comp = '`{}`'.format(component)199 elif is_numeric(component) or component in reserved_chars:200 exp_comp = component201 else: # component is a string202 exp_comp = '"{}"'.format(component)203 exp_comps.append(exp_comp)204 expression = ' '.join(exp_comps)205 try:206 df_match = df.eval(expression)207 except ValueError:208 print(expression)209 print(df)210 print(df.dtypes)211 raise212 else: # component is a single static value or column values213 expression = ' '.join(components)214 if expression in header:215 values = df[expression]216 if is_bool_dtype(values.dtype):217 df_match = values218 else:219 df_match = ~ values.isna()220 else:221 df_match = df.eval(expression)222 if not isinstance(df_match, pd.Series):223 df_match = pd.Series(df_match, df.index)224 return df_match225def evaluate_operation(data, expression):226 """227 Evaluate a mathematical expression for a set of data.228 Arguments:229 data: dictionary, Series, or DataFrame of data potentially containing one or more variables used in the230 operation.231 expression: string or list of expression components describing an operation to evaluate on the provided data.232 Returns:233 results (pd.Series): results of the evaluation for each row of data provided.234 """235 is_extension_dtype = pd.api.types.is_extension_array_dtype236 if isinstance(data, pd.Series):237 df = data.to_frame().T238 elif isinstance(data, pd.DataFrame):239 df = data.copy()240 elif isinstance(data, dict):241 try:242 df = pd.DataFrame(data)243 except ValueError: # single entry was given244 df = pd.DataFrame(data, index=[0])245 else:246 raise ValueError('data must be either a pandas DataFrame, Series, or a dictionary')247 header = df.columns.tolist()248 if isinstance(expression, str):249 components = parse_expression(expression)250 elif isinstance(expression, list):251 components = expression252 else:253 components = parse_expression(str(expression))254 if len(components) > 1:255 exp_comps = []256 for component in components:257 if component in header:258 col_values = df[component]259 if is_extension_dtype(col_values):260 try:261 df.loc[:, component] = downcast_extended(col_values)262 except TypeError:263 logger.exception('failed to downcast component {COMP} values'.format(COMP=component))264 raise265 exp_comp = '`{}`'.format(component)266 else:267 exp_comp = component268 exp_comps.append(exp_comp)269 expression = ' '.join(exp_comps)270 results = df.eval(expression).squeeze()271 else: # results are a single static value or the values of a column in the dataframe272 expression = ' '.join(components)273 if expression in header:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pandera automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful