Best Python code snippet using pandera_python
pipeline_utils.py
Source:pipeline_utils.py  
1import os2from enum import Enum3PipelineStep = Enum("PipelineStep", "DATA_VALIDATION PREPROCESSING MODELING DEPLOY")4def create_pipeline_file(step, project, train_steps=1000, eval_steps=500, columns_for_slicing=None):5    6    base_file = """7import os8import logging9import datetime10from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner11from tfx.orchestration.pipeline import PipelineDecorator12from tfx.utils.dsl_utils import csv_input13from tfx.proto import trainer_pb2, evaluator_pb2, pusher_pb214from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen15from tfx.components.statistics_gen.component import StatisticsGen16from tfx.components.schema_gen.component import SchemaGen17from tfx.components.example_validator.component import ExampleValidator18from tfx.components.transform.component import Transform19from tfx.components.trainer.component import Trainer20from tfx.components.evaluator.component import Evaluator21from tfx.components.model_validator.component import ModelValidator22from tfx.components.pusher.component import Pusher23data_dir = os.path.join(os.environ['DATA_DIR'], '{project}')24log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')25serving_model_dir = os.path.join(os.environ['SERVING_DIR'], 'serving_model', '{project}')26project_preprocessing_file = os.path.join(os.environ['DAGS_DIR'], '{project}_preprocessing.py')27project_training_file = os.path.join(os.environ['DAGS_DIR'], '{project}_modeling.py')28logger_overrides = dict([29    ('log_root', log_dir),30    ('log_level', logging.INFO)31])32airflow_config = dict([33    ('schedule_interval', None),34    ('start_date', datetime.datetime(2019, 1, 1))35])36@PipelineDecorator(37    pipeline_name='{project}',38    enable_cache=True,39    metadata_db_root=os.environ['METADATA_DB_DIR'],40    additional_pipeline_args=dict([('logger_args', logger_overrides)]),41    pipeline_root=os.environ['PIPELINE_DIR']42)43def create_pipeline():44    45    pipeline = []46    47    {components}48    49    return pipeline50pipeline = AirflowDAGRunner(airflow_config).run(create_pipeline())51"""52    53    pipeline_file = None54    if step == PipelineStep.DATA_VALIDATION:55    56        pipeline_file = base_file.format(57            project=project,58            components="""     59    examples = csv_input(data_dir)60    # Brings data into the pipeline61    example_gen = CsvExampleGen(input_base=examples)62    pipeline.append(example_gen)63    64    # Computes statistics over data for visualization and example validation.65    statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)66    pipeline.append(statistics_gen)67    # Generates schema based on statistics files.68    infer_schema = SchemaGen(stats=statistics_gen.outputs.output)69    pipeline.append(infer_schema)70    # Performs anomaly detection based on statistics and data schema.71    validate_stats = ExampleValidator(72        stats=statistics_gen.outputs.output,73        schema=infer_schema.outputs.output74    )75    pipeline.append(validate_stats)76    """77        )78    elif step == PipelineStep.PREPROCESSING:79    80        pipeline_file = base_file.format(81            project=project,82            components="""     83    examples = csv_input(data_dir)84    # Brings data into the pipeline85    example_gen = CsvExampleGen(input_base=examples)86    pipeline.append(example_gen)87    88    # Computes statistics over data for visualization and example validation.89    statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)90    pipeline.append(statistics_gen)91    # Generates schema based on statistics files.92    infer_schema = SchemaGen(stats=statistics_gen.outputs.output)93    pipeline.append(infer_schema)94    # Performs anomaly detection based on statistics and data schema.95    validate_stats = ExampleValidator(96        stats=statistics_gen.outputs.output,97        schema=infer_schema.outputs.output98    )99    pipeline.append(validate_stats)100    # Performs transformations and feature engineering in training and serving.101    transform = Transform(102        input_data=example_gen.outputs.examples,103        schema=infer_schema.outputs.output,104        module_file=project_preprocessing_file105    )106    pipeline.append(transform)107    """108        )109    elif step == PipelineStep.MODELING:110    111        pipeline_file = base_file.format(112            project=project,113            components="""     114    examples = csv_input(data_dir)115    # Brings data into the pipeline116    example_gen = CsvExampleGen(input_base=examples)117    pipeline.append(example_gen)118    119    # Computes statistics over data for visualization and example validation.120    statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)121    pipeline.append(statistics_gen)122    # Generates schema based on statistics files.123    infer_schema = SchemaGen(stats=statistics_gen.outputs.output)124    pipeline.append(infer_schema)125    # Performs anomaly detection based on statistics and data schema.126    validate_stats = ExampleValidator(127        stats=statistics_gen.outputs.output,128        schema=infer_schema.outputs.output129    )130    pipeline.append(validate_stats)131    # Performs transformations and feature engineering in training and serving.132    transform = Transform(133        input_data=example_gen.outputs.examples,134        schema=infer_schema.outputs.output,135        module_file=project_preprocessing_file136    )137    pipeline.append(transform)138    # Uses user-provided Python function that implements a model.139    trainer = Trainer(140        module_file=project_training_file,141        transformed_examples=transform.outputs.transformed_examples,142        schema=infer_schema.outputs.output,143        transform_output=transform.outputs.transform_output,144        train_args=trainer_pb2.TrainArgs(num_steps={train_steps}),145        eval_args=trainer_pb2.EvalArgs(num_steps={eval_steps})146    )147    pipeline.append(trainer)148    # Uses TFMA to compute a evaluation statistics over features of a model.149    model_analyzer = Evaluator(150        examples=example_gen.outputs.examples,151        model_exports=trainer.outputs.output,152        feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[{specs}])153    )154    pipeline.append(model_analyzer)155    """.format(156            train_steps=train_steps,157            eval_steps=eval_steps,158            specs=",".join(["evaluator_pb2.SingleSlicingSpec(column_for_slicing=['%s'])" % col for col in columns_for_slicing])159        ))160    elif step == PipelineStep.DEPLOY:161    162        pipeline_file = base_file.format(163            project=project,164            components="""     165    examples = csv_input(data_dir)166    # Brings data into the pipeline167    example_gen = CsvExampleGen(input_base=examples)168    pipeline.append(example_gen)169    170    # Computes statistics over data for visualization and example validation.171    statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)172    pipeline.append(statistics_gen)173    # Generates schema based on statistics files.174    infer_schema = SchemaGen(stats=statistics_gen.outputs.output)175    pipeline.append(infer_schema)176    # Performs anomaly detection based on statistics and data schema.177    validate_stats = ExampleValidator(178        stats=statistics_gen.outputs.output,179        schema=infer_schema.outputs.output180    )181    pipeline.append(validate_stats)182    # Performs transformations and feature engineering in training and serving.183    transform = Transform(184        input_data=example_gen.outputs.examples,185        schema=infer_schema.outputs.output,186        module_file=project_preprocessing_file187    )188    pipeline.append(transform)189    # Uses user-provided Python function that implements a model.190    trainer = Trainer(191        module_file=project_training_file,192        transformed_examples=transform.outputs.transformed_examples,193        schema=infer_schema.outputs.output,194        transform_output=transform.outputs.transform_output,195        train_args=trainer_pb2.TrainArgs(num_steps={train_steps}),196        eval_args=trainer_pb2.EvalArgs(num_steps={eval_steps})197    )198    pipeline.append(trainer)199    # Uses TFMA to compute a evaluation statistics over features of a model.200    model_analyzer = Evaluator(201        examples=example_gen.outputs.examples,202        model_exports=trainer.outputs.output,203        feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[{specs}])204    )205    pipeline.append(model_analyzer)206    # Performs quality validation of a candidate model (compared to a baseline).207    model_validator = ModelValidator(208        examples=example_gen.outputs.examples, model=trainer.outputs.output209    )210    pipeline.append(model_validator)211    # Checks whether the model passed the validation steps and pushes the model to a file destination if check passed.212    pusher = Pusher(213        model_export=trainer.outputs.output,214        model_blessing=model_validator.outputs.blessing,215        push_destination=pusher_pb2.PushDestination(216            filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)217        )218    )219    pipeline.append(pusher)220    """.format(221            train_steps=train_steps,222            eval_steps=eval_steps,223            specs=",".join(["evaluator_pb2.SingleSlicingSpec(column_for_slicing=['%s'])" % col for col in columns_for_slicing])224        ))225        226        227    return pipeline_file228    229def write_pipeline_to_dags(pipeline_file, name):230    with open(os.path.join(os.environ['DAGS_DIR'], name + ".py"), "w") as f:231        f.write(pipeline_file)...test_infer_schema.py
Source:test_infer_schema.py  
...6    col1 = pd.Series(["1", "4", "9", "20", "8"], dtype="string", name="col1")7    col2 = pd.Series(["1", "4", "9", "20", "8"], name="col2")8    col3 = pd.Series(["1", "4", "9", "20", "8"], dtype="category", name="col3")9    col4 = pd.Series([True, False, False, True], name="col4")10    assert "col1" in infer_schema(col1.to_frame()).categorical_columns11    assert "col2" in infer_schema(col2.to_frame()).categorical_columns12    assert "col3" in infer_schema(col3.to_frame()).categorical_columns13    assert "col4" in infer_schema(col4.to_frame()).categorical_columns14def test_valid_numerical_column():15    col1 = pd.Series([1, 2, 3, 4], name="col1")16    assert "col1" in infer_schema(col1.to_frame()).numerical_columns17def test_valid_text_column():18    col1 = pd.Series([f"{x}" for x in range(21)], dtype="string", name="col1")19    col2 = pd.Series([f"{x}" for x in range(21)], name="col2")20    assert "col1" in infer_schema(col1.to_frame()).text_columns21    assert "col2" in infer_schema(col2.to_frame()).text_columns22def test_valid_time_column():23    col1 = pd.to_datetime(24        pd.Series(["3/11/2000", "3/12/2000", "3/13/2000"], name="col1")25    )26    col2 = pd.Series(27        [datetime(2022, 9, 13, 9), pd.Timestamp(1513393355.5, unit="s")], name="col2"28    )29    assert "col1" in infer_schema(col1.to_frame()).time_columns30    assert "col2" in infer_schema(col2.to_frame()).time_columns31def test_non_supported_mixed_column():32    col1 = pd.Series([1, 2, 3, 4, "a"], name="col1")33    col2 = pd.Series([1, 2, 3, 4, datetime(2013, 1, 1)], name="col1")34    with pytest.raises(35        Exception, match=r"Mixed data type mixed-integer is not supported"36    ):37        infer_schema(col1.to_frame())38        infer_schema(col2.to_frame())39def test_non_supported_other_column():40    col1 = pd.Series(pd.Timedelta(timedelta(days=1, seconds=1)))41    col2 = pd.Series(pd.Interval(left=0, right=5))42    col3 = pd.Series(pd.Period("2017-01-01"))43    with pytest.raises(44        Exception, match=r"Data type timedelta64\[ns\] is not supported"45    ):46        infer_schema(col1.to_frame())47    with pytest.raises(Exception, match=r"Data type interval.* is not supported"):48        infer_schema(col2.to_frame())49    with pytest.raises(Exception, match=r"Data type period.* is not supported"):...python-dict-list-to-dataframe.py
Source:python-dict-list-to-dataframe.py  
...15        {"Category": 'Category C', 'ItemID': 3, 'Amount': 100.01},16        {"Category": 'Category A', 'ItemID': 4, 'Amount': 110.01},17        {"Category": 'Category B', 'ItemID': 5, 'Amount': 70.85}18        ]19def infer_schema():20    # Create data frame21    df = spark.createDataFrame(data)22    print(df.schema)23    df.show()24def explicit_schema():25    # Create a schema for the dataframe26    schema = StructType([27        StructField('Category', StringType(), False),28        StructField('ItemID', IntegerType(), False),29        StructField('Amount', FloatType(), True)30    ])31    # Create data frame32    df = spark.createDataFrame(data, schema)33    print(df.schema)34    df.show()35if __name__ == "__main__":36    infer_schema()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
