Best Python code snippet using pandera_python
schemas.py
Source:schemas.py  
...651                )652            except errors.SchemaError as err:653                error_handler.collect_error("dataframe_check", err)654        if self.unique:655            keep_setting = convert_uniquesettings(self._report_duplicates)656            # NOTE: fix this pylint error657            # pylint: disable=not-an-iterable658            temp_unique: List[List] = (659                [self.unique]660                if all(isinstance(x, str) for x in self.unique)661                else self.unique662            )663            for lst in temp_unique:664                duplicates = df_to_validate.duplicated(665                    subset=lst, keep=keep_setting666                )667                if duplicates.any():668                    # NOTE: this is a hack to support pyspark.pandas, need to669                    # figure out a workaround to error: "Cannot combine the670                    # series or dataframe because it comes from a different671                    # dataframe."672                    if type(duplicates).__module__.startswith(673                        "pyspark.pandas"674                    ):675                        # pylint: disable=import-outside-toplevel676                        import pyspark.pandas as ps677                        with ps.option_context(678                            "compute.ops_on_diff_frames", True679                        ):680                            failure_cases = df_to_validate.loc[duplicates, lst]681                    else:682                        failure_cases = df_to_validate.loc[duplicates, lst]683                    failure_cases = reshape_failure_cases(failure_cases)684                    error_handler.collect_error(685                        "duplicates",686                        errors.SchemaError(687                            self,688                            check_obj,689                            f"columns '{*lst,}' not unique:\n{failure_cases}",690                            failure_cases=failure_cases,691                            check="multiple_fields_uniqueness",692                        ),693                    )694        if lazy and error_handler.collected_errors:695            raise errors.SchemaErrors(696                self, error_handler.collected_errors, check_obj697            )698        assert all(check_results), "all check results must be True."699        return check_obj700    def __call__(701        self,702        dataframe: pd.DataFrame,703        head: Optional[int] = None,704        tail: Optional[int] = None,705        sample: Optional[int] = None,706        random_state: Optional[int] = None,707        lazy: bool = False,708        inplace: bool = False,709    ):710        """Alias for :func:`DataFrameSchema.validate` method.711        :param pd.DataFrame dataframe: the dataframe to be validated.712        :param head: validate the first n rows. Rows overlapping with `tail` or713            `sample` are de-duplicated.714        :type head: int715        :param tail: validate the last n rows. Rows overlapping with `head` or716            `sample` are de-duplicated.717        :type tail: int718        :param sample: validate a random sample of n rows. Rows overlapping719            with `head` or `tail` are de-duplicated.720        :param random_state: random seed for the ``sample`` argument.721        :param lazy: if True, lazily evaluates dataframe against all validation722            checks and raises a ``SchemaErrors``. Otherwise, raise723            ``SchemaError`` as soon as one occurs.724        :param inplace: if True, applies coercion to the object of validation,725            otherwise creates a copy of the data.726        """727        return self.validate(728            dataframe, head, tail, sample, random_state, lazy, inplace729        )730    def __repr__(self) -> str:731        """Represent string for logging."""732        return (733            f"<Schema {self.__class__.__name__}("734            f"columns={self.columns}, "735            f"checks={self.checks}, "736            f"index={self.index.__repr__()}, "737            f"coerce={self.coerce}, "738            f"dtype={self._dtype}, "739            f"strict={self.strict}, "740            f"name={self.name}, "741            f"ordered={self.ordered}, "742            f"unique_column_names={self.unique_column_names}"743            ")>"744        )745    def __str__(self) -> str:746        """Represent string for user inspection."""747        def _format_multiline(json_str, arg):748            return "\n".join(749                f"{indent}{line}" if i != 0 else f"{indent}{arg}={line}"750                for i, line in enumerate(json_str.split("\n"))751            )752        indent = " " * N_INDENT_SPACES753        if self.columns:754            columns_str = f"{indent}columns={{\n"755            for colname, col in self.columns.items():756                columns_str += f"{indent * 2}'{colname}': {col}\n"757            columns_str += f"{indent}}}"758        else:759            columns_str = f"{indent}columns={{}}"760        if self.checks:761            checks_str = f"{indent}checks=[\n"762            for check in self.checks:763                checks_str += f"{indent * 2}{check}\n"764            checks_str += f"{indent}]"765        else:766            checks_str = f"{indent}checks=[]"767        # add additional indents768        index_ = str(self.index).split("\n")769        if len(index_) == 1:770            index = str(self.index)771        else:772            index = "\n".join(773                x if i == 0 else f"{indent}{x}" for i, x in enumerate(index_)774            )775        return (776            f"<Schema {self.__class__.__name__}(\n"777            f"{columns_str},\n"778            f"{checks_str},\n"779            f"{indent}coerce={self.coerce},\n"780            f"{indent}dtype={self._dtype},\n"781            f"{indent}index={index},\n"782            f"{indent}strict={self.strict}\n"783            f"{indent}name={self.name},\n"784            f"{indent}ordered={self.ordered},\n"785            f"{indent}unique_column_names={self.unique_column_names}\n"786            ")>"787        )788    def __eq__(self, other: object) -> bool:789        if not isinstance(other, type(self)):790            return NotImplemented791        def _compare_dict(obj):792            return {793                k: v for k, v in obj.__dict__.items() if k != "_IS_INFERRED"794            }795        return _compare_dict(self) == _compare_dict(other)796    @st.strategy_import_error797    def strategy(798        self, *, size: Optional[int] = None, n_regex_columns: int = 1799    ):800        """Create a ``hypothesis`` strategy for generating a DataFrame.801        :param size: number of elements to generate802        :param n_regex_columns: number of regex columns to generate.803        :returns: a strategy that generates pandas DataFrame objects.804        """805        return st.dataframe_strategy(806            self.dtype,807            columns=self.columns,808            checks=self.checks,809            unique=self.unique,810            index=self.index,811            size=size,812            n_regex_columns=n_regex_columns,813        )814    def example(815        self, size: Optional[int] = None, n_regex_columns: int = 1816    ) -> pd.DataFrame:817        """Generate an example of a particular size.818        :param size: number of elements in the generated DataFrame.819        :returns: pandas DataFrame object.820        """821        # pylint: disable=import-outside-toplevel,cyclic-import,import-error822        import hypothesis823        with warnings.catch_warnings():824            warnings.simplefilter(825                "ignore",826                category=hypothesis.errors.NonInteractiveExampleWarning,827            )828            return self.strategy(829                size=size, n_regex_columns=n_regex_columns830            ).example()831    @_inferred_schema_guard832    def add_columns(self, extra_schema_cols: Dict[str, Any]) -> Self:833        """Create a copy of the :class:`DataFrameSchema` with extra columns.834        :param extra_schema_cols: Additional columns of the format835        :type extra_schema_cols: DataFrameSchema836        :returns: a new :class:`DataFrameSchema` with the extra_schema_cols837            added.838        :example:839        To add columns to the schema, pass a dictionary with column name and840        ``Column`` instance key-value pairs.841        >>> import pandera as pa842        >>>843        >>> example_schema = pa.DataFrameSchema(844        ...    {845        ...        "category": pa.Column(str),846        ...        "probability": pa.Column(float),847        ...    }848        ... )849        >>> print(850        ...     example_schema.add_columns({"even_number": pa.Column(pa.Bool)})851        ... )852        <Schema DataFrameSchema(853            columns={854                'category': <Schema Column(name=category, type=DataType(str))>855                'probability': <Schema Column(name=probability, type=DataType(float64))>856                'even_number': <Schema Column(name=even_number, type=DataType(bool))>857            },858            checks=[],859            coerce=False,860            dtype=None,861            index=None,862            strict=False863            name=None,864            ordered=False,865            unique_column_names=False866        )>867        .. seealso:: :func:`remove_columns`868        """869        schema_copy = copy.deepcopy(self)870        schema_copy.columns = {871            **schema_copy.columns,872            **self.__class__(extra_schema_cols).columns,873        }874        return schema_copy875    @_inferred_schema_guard876    def remove_columns(self, cols_to_remove: List[str]) -> Self:877        """Removes columns from a :class:`DataFrameSchema` and returns a new878        copy.879        :param cols_to_remove: Columns to be removed from the880            ``DataFrameSchema``881        :type cols_to_remove: List882        :returns: a new :class:`DataFrameSchema` without the cols_to_remove883        :raises: :class:`~pandera.errors.SchemaInitError`: if column not in884            schema.885        :example:886        To remove a column or set of columns from a schema, pass a list of887        columns to be removed:888        >>> import pandera as pa889        >>>890        >>> example_schema = pa.DataFrameSchema(891        ...     {892        ...         "category" : pa.Column(str),893        ...         "probability": pa.Column(float)894        ...     }895        ... )896        >>>897        >>> print(example_schema.remove_columns(["category"]))898        <Schema DataFrameSchema(899            columns={900                'probability': <Schema Column(name=probability, type=DataType(float64))>901            },902            checks=[],903            coerce=False,904            dtype=None,905            index=None,906            strict=False907            name=None,908            ordered=False,909            unique_column_names=False910        )>911        .. seealso:: :func:`add_columns`912        """913        schema_copy = copy.deepcopy(self)914        # ensure all specified keys are present in the columns915        not_in_cols: List[str] = [916            x for x in cols_to_remove if x not in schema_copy.columns.keys()917        ]918        if not_in_cols:919            raise errors.SchemaInitError(920                f"Keys {not_in_cols} not found in schema columns!"921            )922        for col in cols_to_remove:923            schema_copy.columns.pop(col)924        return schema_copy925    @_inferred_schema_guard926    def update_column(self, column_name: str, **kwargs) -> Self:927        """Create copy of a :class:`DataFrameSchema` with updated column928        properties.929        :param column_name:930        :param kwargs: key-word arguments supplied to931            :class:`~pandera.schema_components.Column`932        :returns: a new :class:`DataFrameSchema` with updated column933        :raises: :class:`~pandera.errors.SchemaInitError`: if column not in934            schema or you try to change the name.935        :example:936        Calling ``schema.1`` returns the :class:`DataFrameSchema`937        with the updated column.938        >>> import pandera as pa939        >>>940        >>> example_schema = pa.DataFrameSchema({941        ...     "category" : pa.Column(str),942        ...     "probability": pa.Column(float)943        ... })944        >>> print(945        ...     example_schema.update_column(946        ...         'category', dtype=pa.Category947        ...     )948        ... )949        <Schema DataFrameSchema(950            columns={951                'category': <Schema Column(name=category, type=DataType(category))>952                'probability': <Schema Column(name=probability, type=DataType(float64))>953            },954            checks=[],955            coerce=False,956            dtype=None,957            index=None,958            strict=False959            name=None,960            ordered=False,961            unique_column_names=False962        )>963        .. seealso:: :func:`rename_columns`964        """965        # check that columns exist in schema966        if "name" in kwargs:967            raise ValueError("cannot update 'name' of the column.")968        if column_name not in self.columns:969            raise ValueError(f"column '{column_name}' not in {self}")970        schema_copy = copy.deepcopy(self)971        column_copy = copy.deepcopy(self.columns[column_name])972        new_column = column_copy.__class__(973            **{**column_copy.properties, **kwargs}974        )975        schema_copy.columns.update({column_name: new_column})976        return schema_copy977    def update_columns(self, update_dict: Dict[str, Dict[str, Any]]) -> Self:978        """979        Create copy of a :class:`DataFrameSchema` with updated column980        properties.981        :param update_dict:982        :return: a new :class:`DataFrameSchema` with updated columns983        :raises: :class:`~pandera.errors.SchemaInitError`: if column not in984            schema or you try to change the name.985        :example:986        Calling ``schema.update_columns`` returns the :class:`DataFrameSchema`987        with the updated columns.988        >>> import pandera as pa989        >>>990        >>> example_schema = pa.DataFrameSchema({991        ...     "category" : pa.Column(str),992        ...     "probability": pa.Column(float)993        ... })994        >>>995        >>> print(996        ...     example_schema.update_columns(997        ...         {"category": {"dtype":pa.Category}}998        ...     )999        ... )1000        <Schema DataFrameSchema(1001            columns={1002                'category': <Schema Column(name=category, type=DataType(category))>1003                'probability': <Schema Column(name=probability, type=DataType(float64))>1004            },1005            checks=[],1006            coerce=False,1007            dtype=None,1008            index=None,1009            strict=False1010            name=None,1011            ordered=False,1012            unique_column_names=False1013        )>1014        """1015        new_schema = copy.deepcopy(self)1016        # ensure all specified keys are present in the columns1017        not_in_cols: List[str] = [1018            x for x in update_dict.keys() if x not in new_schema.columns.keys()1019        ]1020        if not_in_cols:1021            raise errors.SchemaInitError(1022                f"Keys {not_in_cols} not found in schema columns!"1023            )1024        new_columns: Dict[str, Column] = {}1025        for col in new_schema.columns:1026            # check1027            if update_dict.get(col):1028                if update_dict[col].get("name"):1029                    raise errors.SchemaInitError(1030                        "cannot update 'name' \1031                                             property of the column."1032                    )1033            original_properties = new_schema.columns[col].properties1034            if update_dict.get(col):1035                new_properties = copy.deepcopy(original_properties)1036                new_properties.update(update_dict[col])1037                new_columns[col] = new_schema.columns[col].__class__(1038                    **new_properties1039                )1040            else:1041                new_columns[col] = new_schema.columns[col].__class__(1042                    **original_properties1043                )1044        new_schema.columns = new_columns1045        return new_schema1046    def rename_columns(self, rename_dict: Dict[str, str]) -> Self:1047        """Rename columns using a dictionary of key-value pairs.1048        :param rename_dict: dictionary of 'old_name': 'new_name' key-value1049            pairs.1050        :returns: :class:`DataFrameSchema` (copy of original)1051        :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1052            schema.1053        :example:1054        To rename a column or set of columns, pass a dictionary of old column1055        names and new column names, similar to the pandas DataFrame method.1056        >>> import pandera as pa1057        >>>1058        >>> example_schema = pa.DataFrameSchema({1059        ...     "category" : pa.Column(str),1060        ...     "probability": pa.Column(float)1061        ... })1062        >>>1063        >>> print(1064        ...     example_schema.rename_columns({1065        ...         "category": "categories",1066        ...         "probability": "probabilities"1067        ...     })1068        ... )1069        <Schema DataFrameSchema(1070            columns={1071                'categories': <Schema Column(name=categories, type=DataType(str))>1072                'probabilities': <Schema Column(name=probabilities, type=DataType(float64))>1073            },1074            checks=[],1075            coerce=False,1076            dtype=None,1077            index=None,1078            strict=False1079            name=None,1080            ordered=False,1081            unique_column_names=False1082        )>1083        .. seealso:: :func:`update_column`1084        """1085        new_schema = copy.deepcopy(self)1086        # ensure all specified keys are present in the columns1087        not_in_cols: List[str] = [1088            x for x in rename_dict.keys() if x not in new_schema.columns.keys()1089        ]1090        if not_in_cols:1091            raise errors.SchemaInitError(1092                f"Keys {not_in_cols} not found in schema columns!"1093            )1094        # remove any mapping to itself as this is a no-op1095        rename_dict = {k: v for k, v in rename_dict.items() if k != v}1096        # ensure all new keys are not present in the current column names1097        already_in_columns: List[str] = [1098            x for x in rename_dict.values() if x in new_schema.columns.keys()1099        ]1100        if already_in_columns:1101            raise errors.SchemaInitError(1102                f"Keys {already_in_columns} already found in schema columns!"1103            )1104        # We iterate over the existing columns dict and replace those keys1105        # that exist in the rename_dict1106        new_columns = {1107            (rename_dict[col_name] if col_name in rename_dict else col_name): (1108                col_attrs.set_name(rename_dict[col_name])1109                if col_name in rename_dict1110                else col_attrs1111            )1112            for col_name, col_attrs in new_schema.columns.items()1113        }1114        new_schema.columns = new_columns1115        return new_schema1116    def select_columns(self, columns: List[Any]) -> Self:1117        """Select subset of columns in the schema.1118        *New in version 0.4.5*1119        :param columns: list of column names to select.1120        :returns:  :class:`DataFrameSchema` (copy of original) with only1121            the selected columns.1122        :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1123            schema.1124        :example:1125        To subset a schema by column, and return a new schema:1126        >>> import pandera as pa1127        >>>1128        >>> example_schema = pa.DataFrameSchema({1129        ...     "category" : pa.Column(str),1130        ...     "probability": pa.Column(float)1131        ... })1132        >>>1133        >>> print(example_schema.select_columns(['category']))1134        <Schema DataFrameSchema(1135            columns={1136                'category': <Schema Column(name=category, type=DataType(str))>1137            },1138            checks=[],1139            coerce=False,1140            dtype=None,1141            index=None,1142            strict=False1143            name=None,1144            ordered=False,1145            unique_column_names=False1146        )>1147        .. note:: If an index is present in the schema, it will also be1148            included in the new schema.1149        """1150        new_schema = copy.deepcopy(self)1151        # ensure all specified keys are present in the columns1152        not_in_cols: List[str] = [1153            x for x in columns if x not in new_schema.columns.keys()1154        ]1155        if not_in_cols:1156            raise errors.SchemaInitError(1157                f"Keys {not_in_cols} not found in schema columns!"1158            )1159        new_columns = {1160            col_name: column1161            for col_name, column in self.columns.items()1162            if col_name in columns1163        }1164        new_schema.columns = new_columns1165        return new_schema1166    def to_script(self, fp: Union[str, Path] = None) -> "DataFrameSchema":1167        """Create DataFrameSchema from yaml file.1168        :param path: str, Path to write script1169        :returns: dataframe schema.1170        """1171        # pylint: disable=import-outside-toplevel,cyclic-import1172        import pandera.io1173        return pandera.io.to_script(self, fp)1174    @classmethod1175    def from_yaml(cls, yaml_schema) -> "DataFrameSchema":1176        """Create DataFrameSchema from yaml file.1177        :param yaml_schema: str, Path to yaml schema, or serialized yaml1178            string.1179        :returns: dataframe schema.1180        """1181        # pylint: disable=import-outside-toplevel,cyclic-import1182        import pandera.io1183        return pandera.io.from_yaml(yaml_schema)1184    @overload1185    def to_yaml(self, stream: None = None) -> str:  # pragma: no cover1186        ...1187    @overload1188    def to_yaml(self, stream: os.PathLike) -> None:  # pragma: no cover1189        ...1190    def to_yaml(self, stream: Optional[os.PathLike] = None) -> Optional[str]:1191        """Write DataFrameSchema to yaml file.1192        :param stream: file path or stream to write to. If None, dumps1193            to string.1194        :returns: yaml string if stream is None, otherwise returns None.1195        """1196        # pylint: disable=import-outside-toplevel,cyclic-import1197        import pandera.io1198        return pandera.io.to_yaml(self, stream)1199    @classmethod1200    def from_json(cls, source) -> "DataFrameSchema":1201        """Create DataFrameSchema from json file.1202        :param source: str, Path to json schema, or serialized yaml1203            string.1204        :returns: dataframe schema.1205        """1206        # pylint: disable=import-outside-toplevel,cyclic-import1207        import pandera.io1208        return pandera.io.from_json(source)1209    @overload1210    def to_json(1211        self, target: None = None, **kwargs1212    ) -> str:  # pragma: no cover1213        ...1214    @overload1215    def to_json(1216        self, target: os.PathLike, **kwargs1217    ) -> None:  # pragma: no cover1218        ...1219    def to_json(1220        self, target: Optional[os.PathLike] = None, **kwargs1221    ) -> Optional[str]:1222        """Write DataFrameSchema to json file.1223        :param target: file target to write to. If None, dumps to string.1224        :returns: json string if target is None, otherwise returns None.1225        """1226        # pylint: disable=import-outside-toplevel,cyclic-import1227        import pandera.io1228        return pandera.io.to_json(self, target, **kwargs)1229    def set_index(1230        self, keys: List[str], drop: bool = True, append: bool = False1231    ) -> Self:1232        """1233        A method for setting the :class:`Index` of a :class:`DataFrameSchema`,1234        via an existing :class:`Column` or list of columns.1235        :param keys: list of labels1236        :param drop: bool, default True1237        :param append: bool, default False1238        :return: a new :class:`DataFrameSchema` with specified column(s) in the1239            index.1240        :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1241            schema.1242        :examples:1243        Just as you would set the index in a ``pandas`` DataFrame from an1244        existing column, you can set an index within the schema from an1245        existing column in the schema.1246        >>> import pandera as pa1247        >>>1248        >>> example_schema = pa.DataFrameSchema({1249        ...     "category" : pa.Column(str),1250        ...     "probability": pa.Column(float)})1251        >>>1252        >>> print(example_schema.set_index(['category']))1253        <Schema DataFrameSchema(1254            columns={1255                'probability': <Schema Column(name=probability, type=DataType(float64))>1256            },1257            checks=[],1258            coerce=False,1259            dtype=None,1260            index=<Schema Index(name=category, type=DataType(str))>,1261            strict=False1262            name=None,1263            ordered=False,1264            unique_column_names=False1265        )>1266        If you have an existing index in your schema, and you would like to1267        append a new column as an index to it (yielding a :class:`Multiindex`),1268        just use set_index as you would in pandas.1269        >>> example_schema = pa.DataFrameSchema(1270        ...     {1271        ...         "column1": pa.Column(str),1272        ...         "column2": pa.Column(int)1273        ...     },1274        ...     index=pa.Index(name = "column3", dtype = int)1275        ... )1276        >>>1277        >>> print(example_schema.set_index(["column2"], append = True))1278        <Schema DataFrameSchema(1279            columns={1280                'column1': <Schema Column(name=column1, type=DataType(str))>1281            },1282            checks=[],1283            coerce=False,1284            dtype=None,1285            index=<Schema MultiIndex(1286                indexes=[1287                    <Schema Index(name=column3, type=DataType(int64))>1288                    <Schema Index(name=column2, type=DataType(int64))>1289                ]1290                coerce=False,1291                strict=False,1292                name=None,1293                ordered=True1294            )>,1295            strict=False1296            name=None,1297            ordered=False,1298            unique_column_names=False1299        )>1300        .. seealso:: :func:`reset_index`1301        """1302        # pylint: disable=import-outside-toplevel,cyclic-import1303        from pandera.schema_components import Index, MultiIndex1304        new_schema = copy.deepcopy(self)1305        keys_temp: List = (1306            list(set(keys)) if not isinstance(keys, list) else keys1307        )1308        # ensure all specified keys are present in the columns1309        not_in_cols: List[str] = [1310            x for x in keys_temp if x not in new_schema.columns.keys()1311        ]1312        if not_in_cols:1313            raise errors.SchemaInitError(1314                f"Keys {not_in_cols} not found in schema columns!"1315            )1316        # if there is already an index, append or replace according to1317        # parameters1318        ind_list: List = (1319            []1320            if new_schema.index is None or not append1321            else list(new_schema.index.indexes)1322            if isinstance(new_schema.index, MultiIndex) and append1323            else [new_schema.index]1324        )1325        for col in keys_temp:1326            ind_list.append(1327                Index(1328                    dtype=new_schema.columns[col].dtype,1329                    name=col,1330                    checks=new_schema.columns[col].checks,1331                    nullable=new_schema.columns[col].nullable,1332                    unique=new_schema.columns[col].unique,1333                    coerce=new_schema.columns[col].coerce,1334                )1335            )1336        new_schema.index = (1337            ind_list[0] if len(ind_list) == 1 else MultiIndex(ind_list)1338        )1339        # if drop is True as defaulted, drop the columns moved into the index1340        if drop:1341            new_schema = new_schema.remove_columns(keys_temp)1342        return new_schema1343    def reset_index(self, level: List[str] = None, drop: bool = False) -> Self:1344        """1345        A method for resetting the :class:`Index` of a :class:`DataFrameSchema`1346        :param level: list of labels1347        :param drop: bool, default False1348        :return: a new :class:`DataFrameSchema` with specified column(s) in the1349            index.1350        :raises: :class:`~pandera.errors.SchemaInitError` if no index set in1351            schema.1352        :examples:1353        Similar to the ``pandas`` reset_index method on a pandas DataFrame,1354        this method can be used to to fully or partially reset indices of a1355        schema.1356        To remove the entire index from the schema, just call the reset_index1357        method with default parameters.1358        >>> import pandera as pa1359        >>>1360        >>> example_schema = pa.DataFrameSchema(1361        ...     {"probability" : pa.Column(float)},1362        ...     index = pa.Index(name="unique_id", dtype=int)1363        ... )1364        >>>1365        >>> print(example_schema.reset_index())1366        <Schema DataFrameSchema(1367            columns={1368                'probability': <Schema Column(name=probability, type=DataType(float64))>1369                'unique_id': <Schema Column(name=unique_id, type=DataType(int64))>1370            },1371            checks=[],1372            coerce=False,1373            dtype=None,1374            index=None,1375            strict=False1376            name=None,1377            ordered=False,1378            unique_column_names=False1379        )>1380        This reclassifies an index (or indices) as a column (or columns).1381        Similarly, to partially alter the index, pass the name of the column1382        you would like to be removed to the ``level`` parameter, and you may1383        also decide whether to drop the levels with the ``drop`` parameter.1384        >>> example_schema = pa.DataFrameSchema({1385        ...     "category" : pa.Column(str)},1386        ...     index = pa.MultiIndex([1387        ...         pa.Index(name="unique_id1", dtype=int),1388        ...         pa.Index(name="unique_id2", dtype=str)1389        ...         ]1390        ...     )1391        ... )1392        >>> print(example_schema.reset_index(level = ["unique_id1"]))1393        <Schema DataFrameSchema(1394            columns={1395                'category': <Schema Column(name=category, type=DataType(str))>1396                'unique_id1': <Schema Column(name=unique_id1, type=DataType(int64))>1397            },1398            checks=[],1399            coerce=False,1400            dtype=None,1401            index=<Schema Index(name=unique_id2, type=DataType(str))>,1402            strict=False1403            name=None,1404            ordered=False,1405            unique_column_names=False1406        )>1407        .. seealso:: :func:`set_index`1408        """1409        # pylint: disable=import-outside-toplevel,cyclic-import1410        from pandera.schema_components import Column, Index, MultiIndex1411        # explcit check for an empty list1412        if level == []:1413            return self1414        new_schema = copy.deepcopy(self)1415        if new_schema.index is None:1416            raise errors.SchemaInitError(1417                "There is currently no index set for this schema."1418            )1419        # ensure no duplicates1420        level_temp: Union[List[Any], List[str]] = (1421            new_schema.index.names if level is None else list(set(level))1422        )1423        # ensure all specified keys are present in the index1424        level_not_in_index: Union[List[Any], List[str], None] = (1425            [x for x in level_temp if x not in new_schema.index.names]1426            if isinstance(new_schema.index, MultiIndex) and level_temp1427            else []1428            if isinstance(new_schema.index, Index)1429            and (level_temp == [new_schema.index.name])1430            else level_temp1431        )1432        if level_not_in_index:1433            raise errors.SchemaInitError(1434                f"Keys {level_not_in_index} not found in schema columns!"1435            )1436        new_index = (1437            None1438            if not level_temp or isinstance(new_schema.index, Index)1439            else new_schema.index.remove_columns(level_temp)1440        )1441        new_index = (1442            new_index1443            if new_index is None1444            else Index(1445                dtype=new_index.columns[list(new_index.columns)[0]].dtype,1446                checks=new_index.columns[list(new_index.columns)[0]].checks,1447                nullable=new_index.columns[1448                    list(new_index.columns)[0]1449                ].nullable,1450                unique=new_index.columns[list(new_index.columns)[0]].unique,1451                coerce=new_index.columns[list(new_index.columns)[0]].coerce,1452                name=new_index.columns[list(new_index.columns)[0]].name,1453            )1454            if (len(list(new_index.columns)) == 1) and (new_index is not None)1455            else None1456            if (len(list(new_index.columns)) == 0) and (new_index is not None)1457            else new_index1458        )1459        if not drop:1460            additional_columns: Dict[str, Any] = (1461                {col: new_schema.index.columns.get(col) for col in level_temp}1462                if isinstance(new_schema.index, MultiIndex)1463                else {new_schema.index.name: new_schema.index}1464            )1465            new_schema = new_schema.add_columns(1466                {1467                    k: Column(1468                        dtype=v.dtype,1469                        checks=v.checks,1470                        nullable=v.nullable,1471                        unique=v.unique,1472                        coerce=v.coerce,1473                        name=v.name,1474                    )1475                    for (k, v) in additional_columns.items()1476                }1477            )1478        new_schema.index = new_index1479        return new_schema1480    @classmethod1481    def __get_validators__(cls):1482        yield cls._pydantic_validate1483    @classmethod1484    def _pydantic_validate(cls, schema: Any) -> "DataFrameSchema":1485        """Verify that the input is a compatible DataFrameSchema."""1486        if not isinstance(schema, cls):  # type: ignore1487            raise TypeError(f"{schema} is not a {cls}.")1488        return cast("DataFrameSchema", schema)1489class SeriesSchemaBase:1490    """Base series validator object."""1491    def __init__(1492        self,1493        dtype: PandasDtypeInputTypes = None,1494        checks: CheckList = None,1495        nullable: bool = False,1496        unique: bool = False,1497        report_duplicates: UniqueSettings = "all",1498        coerce: bool = False,1499        name: Any = None,1500        title: Optional[str] = None,1501        description: Optional[str] = None,1502    ) -> None:1503        """Initialize series schema base object.1504        :param dtype: datatype of the column. If a string is specified,1505            then assumes one of the valid pandas string values:1506            http://pandas.pydata.org/pandas-docs/stable/basics.html#dtypes1507        :param checks: If element_wise is True, then callable signature should1508            be:1509            ``Callable[Any, bool]`` where the ``Any`` input is a scalar element1510            in the column. Otherwise, the input is assumed to be a1511            pandas.Series object.1512        :param nullable: Whether or not column can contain null values.1513        :param unique: whether column values should be unique.1514        :param report_duplicates: how to report unique errors1515            - `exclude_first`: report all duplicates except first occurence1516            - `exclude_last`: report all duplicates except last occurence1517            - `all`: (default) report all duplicates1518        :param coerce: If True, when schema.validate is called the column will1519            be coerced into the specified dtype. This has no effect on columns1520            where ``dtype=None``.1521        :param name: column name in dataframe to validate.1522        :param title: A human-readable label for the series.1523        :param description: An arbitrary textual description of the series.1524        :type nullable: bool1525        """1526        if checks is None:1527            checks = []1528        if isinstance(checks, (Check, Hypothesis)):1529            checks = [checks]1530        self.dtype = dtype  # type: ignore1531        self._nullable = nullable1532        self._coerce = coerce1533        self._checks = checks1534        self._name = name1535        self._unique = unique1536        self._report_duplicates = report_duplicates1537        self._title = title1538        self._description = description1539        for check in self.checks:1540            if check.groupby is not None and not self._allow_groupby:1541                raise errors.SchemaInitError(1542                    f"Cannot use groupby checks with type {type(self)}"1543                )1544        # make sure pandas dtype is valid1545        self.dtype  # pylint: disable=pointless-statement1546        # this attribute is not meant to be accessed by users and is explicitly1547        # set to True in the case that a schema is created by infer_schema.1548        self._IS_INFERRED = False1549        if isinstance(self.dtype, pandas_engine.PydanticModel):1550            raise errors.SchemaInitError(1551                "PydanticModel dtype can only be specified as a "1552                "DataFrameSchema dtype."1553            )1554    # the _is_inferred getter and setter methods are not public1555    @property1556    def _is_inferred(self):1557        return self._IS_INFERRED1558    @_is_inferred.setter1559    def _is_inferred(self, value: bool):1560        self._IS_INFERRED = value1561    @property1562    def checks(self):1563        """Return list of checks or hypotheses."""1564        return self._checks1565    @checks.setter1566    def checks(self, checks):1567        self._checks = checks1568    @_inferred_schema_guard1569    def set_checks(self, checks: CheckList):1570        """Create a new SeriesSchema with a new set of Checks1571        :param checks: checks to set on the new schema1572        :returns: a new SeriesSchema with a new set of checks1573        """1574        schema_copy = copy.deepcopy(self)1575        schema_copy.checks = checks1576        return schema_copy1577    @property1578    def nullable(self) -> bool:1579        """Whether the series is nullable."""1580        return self._nullable1581    @property1582    def unique(self) -> bool:1583        """Whether to check for duplicates in check object"""1584        return self._unique1585    @unique.setter1586    def unique(self, value: bool) -> None:1587        """Set unique attribute"""1588        self._unique = value1589    @property1590    def coerce(self) -> bool:1591        """Whether to coerce series to specified type."""1592        return self._coerce1593    @coerce.setter1594    def coerce(self, value: bool) -> None:1595        """Set coerce attribute."""1596        self._coerce = value1597    @property1598    def name(self) -> Union[str, None]:1599        """Get SeriesSchema name."""1600        return self._name1601    @property1602    def title(self):1603        """A human-readable label for the series."""1604        return self._title1605    @property1606    def description(self):1607        """An arbitrary textual description of the series."""1608        return self._description1609    @property1610    def dtype(1611        self,1612    ) -> DataType:1613        """Get the pandas dtype"""1614        return self._dtype  # type: ignore1615    @dtype.setter1616    def dtype(self, value: PandasDtypeInputTypes) -> None:1617        """Set the pandas dtype"""1618        self._dtype = pandas_engine.Engine.dtype(value) if value else None1619    def coerce_dtype(self, obj: Union[pd.Series, pd.Index]) -> pd.Series:1620        """Coerce type of a pd.Series by type specified in dtype.1621        :param pd.Series series: One-dimensional ndarray with axis labels1622            (including time series).1623        :returns: ``Series`` with coerced data type1624        """1625        if self.dtype is None:1626            return obj1627        try:1628            return self.dtype.try_coerce(obj)1629        except errors.ParserError as exc:1630            msg = (1631                f"Error while coercing '{self.name}' to type "1632                f"{self.dtype}: {exc}:\n{exc.failure_cases}"1633            )1634            raise errors.SchemaError(1635                self,1636                obj,1637                msg,1638                failure_cases=exc.failure_cases,1639                check=f"coerce_dtype('{self.dtype}')",1640            ) from exc1641    @property1642    def _allow_groupby(self):1643        """Whether the schema or schema component allows groupby operations."""1644        raise NotImplementedError(  # pragma: no cover1645            "The _allow_groupby property must be implemented by subclasses "1646            "of SeriesSchemaBase"1647        )1648    def validate(1649        self,1650        check_obj: Union[pd.DataFrame, pd.Series],1651        head: Optional[int] = None,1652        tail: Optional[int] = None,1653        sample: Optional[int] = None,1654        random_state: Optional[int] = None,1655        lazy: bool = False,1656        inplace: bool = False,1657    ) -> Union[pd.DataFrame, pd.Series]:1658        # pylint: disable=too-many-locals,too-many-branches,too-many-statements1659        """Validate a series or specific column in dataframe.1660        :check_obj: pandas DataFrame or Series to validate.1661        :param head: validate the first n rows. Rows overlapping with `tail` or1662            `sample` are de-duplicated.1663        :param tail: validate the last n rows. Rows overlapping with `head` or1664            `sample` are de-duplicated.1665        :param sample: validate a random sample of n rows. Rows overlapping1666            with `head` or `tail` are de-duplicated.1667        :param random_state: random seed for the ``sample`` argument.1668        :param lazy: if True, lazily evaluates dataframe against all validation1669            checks and raises a ``SchemaErrors``. Otherwise, raise1670            ``SchemaError`` as soon as one occurs.1671        :param inplace: if True, applies coercion to the object of validation,1672            otherwise creates a copy of the data.1673        :returns: validated DataFrame or Series.1674        """1675        if self._is_inferred:1676            warnings.warn(1677                f"This {type(self)} is an inferred schema that hasn't been "1678                "modified. It's recommended that you refine the schema "1679                "by calling `set_checks` before using it to validate data.",1680                UserWarning,1681            )1682        error_handler = SchemaErrorHandler(lazy)1683        if not inplace:1684            check_obj = check_obj.copy()1685        series = (1686            check_obj1687            if check_utils.is_field(check_obj)1688            else check_obj[self.name]1689        )1690        series = _pandas_obj_to_validate(1691            series, head, tail, sample, random_state1692        )1693        check_obj = _pandas_obj_to_validate(1694            check_obj, head, tail, sample, random_state1695        )1696        if self.name is not None and series.name != self._name:1697            msg = (1698                f"Expected {type(self)} to have name '{self._name}', found "1699                f"'{series.name}'"1700            )1701            error_handler.collect_error(1702                "wrong_field_name",1703                errors.SchemaError(1704                    self,1705                    check_obj,1706                    msg,1707                    failure_cases=scalar_failure_case(series.name),1708                    check=f"field_name('{self._name}')",1709                ),1710            )1711        if not self._nullable:1712            nulls = series.isna()1713            if nulls.sum() > 0:1714                failed = series[nulls]1715                msg = (1716                    f"non-nullable series '{series.name}' contains null "1717                    f"values:\n{failed}"1718                )1719                error_handler.collect_error(1720                    "series_contains_nulls",1721                    errors.SchemaError(1722                        self,1723                        check_obj,1724                        msg,1725                        failure_cases=reshape_failure_cases(1726                            series[nulls], ignore_na=False1727                        ),1728                        check="not_nullable",1729                    ),1730                )1731        # Check if the series contains duplicate values1732        if self._unique:1733            keep_argument = convert_uniquesettings(self._report_duplicates)1734            if type(series).__module__.startswith("pyspark.pandas"):1735                duplicates = (1736                    series.to_frame()1737                    .duplicated(keep=keep_argument)1738                    .reindex(series.index)1739                )1740                # pylint: disable=import-outside-toplevel1741                import pyspark.pandas as ps1742                with ps.option_context("compute.ops_on_diff_frames", True):1743                    failed = series[duplicates]1744            else:1745                duplicates = series.duplicated(keep=keep_argument)1746                failed = series[duplicates]1747            if duplicates.any():1748                msg = (1749                    f"series '{series.name}' contains duplicate values:\n"1750                    f"{failed}"1751                )1752                error_handler.collect_error(1753                    "series_contains_duplicates",1754                    errors.SchemaError(1755                        self,1756                        check_obj,1757                        msg,1758                        failure_cases=reshape_failure_cases(failed),1759                        check="field_uniqueness",1760                    ),1761                )1762        if self._dtype is not None:1763            failure_cases = None1764            check_output = self._dtype.check(1765                pandas_engine.Engine.dtype(series.dtype), series1766            )1767            if check_output is False:1768                failure_cases = scalar_failure_case(str(series.dtype))1769                msg = (1770                    f"expected series '{series.name}' to have type {self._dtype}, "1771                    + f"got {series.dtype}"1772                )1773            elif not isinstance(check_output, bool):1774                _, failure_cases = check_utils.prepare_series_check_output(1775                    series,1776                    pd.Series(list(check_output))1777                    if not isinstance(check_output, pd.Series)1778                    else check_output,1779                )1780                failure_cases = reshape_failure_cases(failure_cases)1781                msg = (1782                    f"expected series '{series.name}' to have type {self._dtype}:\n"1783                    f"failure cases:\n{failure_cases}"1784                )1785            if failure_cases is not None and not failure_cases.empty:1786                error_handler.collect_error(1787                    "wrong_dtype",1788                    errors.SchemaError(1789                        self,1790                        check_obj,1791                        msg,1792                        failure_cases=failure_cases,1793                        check=f"dtype('{self.dtype}')",1794                    ),1795                )1796        check_results = []1797        if check_utils.is_field(check_obj):1798            check_obj, check_args = series, [None]1799        else:1800            check_args = [self.name]  # type: ignore1801        for check_index, check in enumerate(self.checks):1802            try:1803                check_results.append(1804                    _handle_check_results(1805                        self, check_index, check, check_obj, *check_args1806                    )1807                )1808            except errors.SchemaError as err:1809                error_handler.collect_error("dataframe_check", err)1810            except Exception as err:  # pylint: disable=broad-except1811                # catch other exceptions that may occur when executing the1812                # Check1813                err_msg = f'"{err.args[0]}"' if len(err.args) > 0 else ""1814                err_str = f"{err.__class__.__name__}({ err_msg})"1815                msg = (1816                    f"Error while executing check function: {err_str}\n"1817                    + traceback.format_exc()1818                )1819                error_handler.collect_error(1820                    "check_error",1821                    errors.SchemaError(1822                        self,1823                        check_obj,1824                        msg,1825                        failure_cases=scalar_failure_case(err_str),1826                        check=check,1827                        check_index=check_index,1828                    ),1829                    original_exc=err,1830                )1831        if lazy and error_handler.collected_errors:1832            raise errors.SchemaErrors(1833                self, error_handler.collected_errors, check_obj1834            )1835        assert all(check_results)1836        return check_obj1837    def __call__(1838        self,1839        check_obj: Union[pd.DataFrame, pd.Series],1840        head: Optional[int] = None,1841        tail: Optional[int] = None,1842        sample: Optional[int] = None,1843        random_state: Optional[int] = None,1844        lazy: bool = False,1845        inplace: bool = False,1846    ) -> Union[pd.DataFrame, pd.Series]:1847        """Alias for ``validate`` method."""1848        return self.validate(1849            check_obj, head, tail, sample, random_state, lazy, inplace1850        )1851    def __eq__(self, other):1852        return self.__dict__ == other.__dict__1853    @st.strategy_import_error1854    def strategy(self, *, size=None):1855        """Create a ``hypothesis`` strategy for generating a Series.1856        :param size: number of elements to generate1857        :returns: a strategy that generates pandas Series objects.1858        """1859        return st.series_strategy(1860            self.dtype,1861            checks=self.checks,1862            nullable=self.nullable,1863            unique=self.unique,1864            name=self.name,1865            size=size,1866        )1867    def example(self, size=None) -> pd.Series:1868        """Generate an example of a particular size.1869        :param size: number of elements in the generated Series.1870        :returns: pandas Series object.1871        """1872        # pylint: disable=import-outside-toplevel,cyclic-import,import-error1873        import hypothesis1874        with warnings.catch_warnings():1875            warnings.simplefilter(1876                "ignore",1877                category=hypothesis.errors.NonInteractiveExampleWarning,1878            )1879            return self.strategy(size=size).example()1880    def __repr__(self):1881        return (1882            f"<Schema {self.__class__.__name__}"1883            f"(name={self._name}, type={self.dtype!r})>"1884        )1885    @classmethod1886    def __get_validators__(cls):1887        yield cls._pydantic_validate1888    @classmethod1889    def _pydantic_validate(  # type: ignore1890        cls: TSeriesSchemaBase, schema: Any1891    ) -> TSeriesSchemaBase:1892        """Verify that the input is a compatible DataFrameSchema."""1893        if not isinstance(schema, cls):  # type: ignore1894            raise TypeError(f"{schema} is not a {cls}.")1895        return cast(TSeriesSchemaBase, schema)1896class SeriesSchema(SeriesSchemaBase):1897    """Series validator."""1898    def __init__(1899        self,1900        dtype: PandasDtypeInputTypes = None,1901        checks: CheckList = None,1902        index=None,1903        nullable: bool = False,1904        unique: bool = False,1905        report_duplicates: UniqueSettings = "all",1906        coerce: bool = False,1907        name: str = None,1908        title: Optional[str] = None,1909        description: Optional[str] = None,1910    ) -> None:1911        """Initialize series schema base object.1912        :param dtype: datatype of the column. If a string is specified,1913            then assumes one of the valid pandas string values:1914            http://pandas.pydata.org/pandas-docs/stable/basics.html#dtypes1915        :param checks: If element_wise is True, then callable signature should1916            be:1917            ``Callable[Any, bool]`` where the ``Any`` input is a scalar element1918            in the column. Otherwise, the input is assumed to be a1919            pandas.Series object.1920        :param index: specify the datatypes and properties of the index.1921        :param nullable: Whether or not column can contain null values.1922        :param unique: whether column values should be unique.1923        :param report_duplicates: how to report unique errors1924            - `exclude_first`: report all duplicates except first occurence1925            - `exclude_last`: report all duplicates except last occurence1926            - `all`: (default) report all duplicates1927        :param coerce: If True, when schema.validate is called the column will1928            be coerced into the specified dtype. This has no effect on columns1929            where ``dtype=None``.1930        :param name: series name.1931        :param title: A human-readable label for the series.1932        :param description: An arbitrary textual description of the series.1933        """1934        super().__init__(1935            dtype,1936            checks,1937            nullable,1938            unique,1939            report_duplicates,1940            coerce,1941            name,1942            title,1943            description,1944        )1945        self.index = index1946    @property1947    def _allow_groupby(self) -> bool:1948        """Whether the schema or schema component allows groupby operations."""1949        return False1950    def validate(1951        self,1952        check_obj: pd.Series,1953        head: Optional[int] = None,1954        tail: Optional[int] = None,1955        sample: Optional[int] = None,1956        random_state: Optional[int] = None,1957        lazy: bool = False,1958        inplace: bool = False,1959    ) -> pd.Series:1960        """Validate a Series object.1961        :param check_obj: One-dimensional ndarray with axis labels1962            (including time series).1963        :param head: validate the first n rows. Rows overlapping with `tail` or1964            `sample` are de-duplicated.1965        :param tail: validate the last n rows. Rows overlapping with `head` or1966            `sample` are de-duplicated.1967        :param sample: validate a random sample of n rows. Rows overlapping1968            with `head` or `tail` are de-duplicated.1969        :param random_state: random seed for the ``sample`` argument.1970        :param lazy: if True, lazily evaluates dataframe against all validation1971            checks and raises a ``SchemaErrors``. Otherwise, raise1972            ``SchemaError`` as soon as one occurs.1973        :param inplace: if True, applies coercion to the object of validation,1974            otherwise creates a copy of the data.1975        :returns: validated Series.1976        :raises SchemaError: when ``DataFrame`` violates built-in or custom1977            checks.1978        :example:1979        >>> import pandas as pd1980        >>> import pandera as pa1981        >>>1982        >>> series_schema = pa.SeriesSchema(1983        ...     float, [1984        ...         pa.Check(lambda s: s > 0),1985        ...         pa.Check(lambda s: s < 1000),1986        ...         pa.Check(lambda s: s.mean() > 300),1987        ...     ])1988        >>> series = pd.Series([1, 100, 800, 900, 999], dtype=float)1989        >>> print(series_schema.validate(series))1990        0      1.01991        1    100.01992        2    800.01993        3    900.01994        4    999.01995        dtype: float641996        """1997        if not check_utils.is_field(check_obj):1998            raise TypeError(f"expected pd.Series, got {type(check_obj)}")1999        if hasattr(check_obj, "dask"):2000            # special case for dask series2001            if inplace:2002                check_obj = check_obj.pandera.add_schema(self)2003            else:2004                check_obj = check_obj.copy()2005            check_obj = check_obj.map_partitions(2006                self._validate,2007                head=head,2008                tail=tail,2009                sample=sample,2010                random_state=random_state,2011                lazy=lazy,2012                inplace=inplace,2013                meta=check_obj,2014            )2015            return check_obj.pandera.add_schema(self)2016        return self._validate(2017            check_obj=check_obj,2018            head=head,2019            tail=tail,2020            sample=sample,2021            random_state=random_state,2022            lazy=lazy,2023            inplace=inplace,2024        )2025    def _validate(2026        self,2027        check_obj: pd.Series,2028        head: Optional[int] = None,2029        tail: Optional[int] = None,2030        sample: Optional[int] = None,2031        random_state: Optional[int] = None,2032        lazy: bool = False,2033        inplace: bool = False,2034    ) -> pd.Series:2035        if not inplace:2036            check_obj = check_obj.copy()2037        if hasattr(check_obj, "pandera"):2038            check_obj = check_obj.pandera.add_schema(self)2039        error_handler = SchemaErrorHandler(lazy=lazy)2040        if self.coerce:2041            try:2042                check_obj = self.coerce_dtype(check_obj)2043                if hasattr(check_obj, "pandera"):2044                    check_obj = check_obj.pandera.add_schema(self)2045            except errors.SchemaError as exc:2046                error_handler.collect_error("dtype_coercion_error", exc)2047        # validate index2048        if self.index:2049            # coerce data type using index schema copy to prevent mutation2050            # of original index schema attribute.2051            _index = copy.deepcopy(self.index)2052            _index.coerce = _index.coerce or self.coerce2053            try:2054                check_obj = _index(2055                    check_obj, head, tail, sample, random_state, lazy, inplace2056                )2057            except errors.SchemaError as exc:2058                error_handler.collect_error("dtype_coercion_error", exc)2059            except errors.SchemaErrors as err:2060                for schema_error_dict in err.schema_errors:2061                    error_handler.collect_error(2062                        "index_check", schema_error_dict["error"]2063                    )2064        # validate series2065        try:2066            super().validate(2067                check_obj, head, tail, sample, random_state, lazy, inplace2068            )2069        except errors.SchemaErrors as err:2070            for schema_error_dict in err.schema_errors:2071                error_handler.collect_error(2072                    "series_check", schema_error_dict["error"]2073                )2074        if error_handler.collected_errors:2075            raise errors.SchemaErrors(2076                self, error_handler.collected_errors, check_obj2077            )2078        return check_obj2079    def __call__(2080        self,2081        check_obj: pd.Series,2082        head: Optional[int] = None,2083        tail: Optional[int] = None,2084        sample: Optional[int] = None,2085        random_state: Optional[int] = None,2086        lazy: bool = False,2087        inplace: bool = False,2088    ) -> pd.Series:2089        """Alias for :func:`SeriesSchema.validate` method."""2090        return self.validate(2091            check_obj, head, tail, sample, random_state, lazy, inplace2092        )2093    def __eq__(self, other):2094        return self.__dict__ == other.__dict__2095def _pandas_obj_to_validate(2096    dataframe_or_series: Union[pd.DataFrame, pd.Series],2097    head: Optional[int],2098    tail: Optional[int],2099    sample: Optional[int],2100    random_state: Optional[int],2101) -> Union[pd.DataFrame, pd.Series]:2102    pandas_obj_subsample = []2103    if head is not None:2104        pandas_obj_subsample.append(dataframe_or_series.head(head))2105    if tail is not None:2106        pandas_obj_subsample.append(dataframe_or_series.tail(tail))2107    if sample is not None:2108        pandas_obj_subsample.append(2109            dataframe_or_series.sample(sample, random_state=random_state)2110        )2111    return (2112        dataframe_or_series2113        if not pandas_obj_subsample2114        else pd.concat(pandas_obj_subsample).pipe(2115            lambda x: x[~x.index.duplicated()]2116        )2117    )2118def _handle_check_results(2119    schema: Union[DataFrameSchema, SeriesSchemaBase],2120    check_index: int,2121    check: Union[Check, Hypothesis],2122    check_obj: Union[pd.DataFrame, pd.Series],2123    *check_args,2124) -> bool:2125    """Handle check results, raising SchemaError on check failure.2126    :param check_index: index of check in the schema component check list.2127    :param check: Check object used to validate pandas object.2128    :param check_args: arguments to pass into check object.2129    :returns: True if check results pass or check.raise_warning=True, otherwise2130        False.2131    """2132    check_result = check(check_obj, *check_args)2133    if not check_result.check_passed:2134        if check_result.failure_cases is None:2135            # encode scalar False values explicitly2136            failure_cases = scalar_failure_case(check_result.check_passed)2137            error_msg = format_generic_error_message(2138                schema, check, check_index2139            )2140        else:2141            failure_cases = reshape_failure_cases(2142                check_result.failure_cases, check.ignore_na2143            )2144            error_msg = format_vectorized_error_message(2145                schema, check, check_index, failure_cases2146            )2147        # raise a warning without exiting if the check is specified to do so2148        if check.raise_warning:2149            warnings.warn(error_msg, UserWarning)2150            return True2151        raise errors.SchemaError(2152            schema,2153            check_obj,2154            error_msg,2155            failure_cases=failure_cases,2156            check=check,2157            check_index=check_index,2158            check_output=check_result.check_output,2159        )2160    return check_result.check_passed2161def convert_uniquesettings(unique: UniqueSettings) -> Union[bool, str]:2162    """2163    Converts UniqueSettings object to string that can be passed onto pandas .duplicated() call2164    """2165    # Default `keep` argument for pandas .duplicated() function2166    keep_argument: Union[bool, str]2167    if unique == "exclude_first":2168        keep_argument = "first"2169    elif unique == "exclude_last":2170        keep_argument = "last"2171    elif unique == "all":2172        keep_argument = False2173    else:2174        raise ValueError(2175            str(unique) + " is not a recognized report_duplicates value"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
