ML Model Training and Deployment Pipeline

Open in Github.

In this example, we demonstrate how to organize and run a simple machine learning project using Bauplan. We’ll create and execute a pipeline that:

  1. Ingests raw data from the TLC NY taxi dataset.

  2. Transforms the data into a training dataset with the appropriate features.

  3. Trains a Linear Regression model to predict the tip amount for taxi rides.

  4. Writes the predictions to an Iceberg table for further analysis.

  5. Additionally, we’ll leverage the Bauplan SDK within notebooks to explore both the dataset and the generated predictions interactively. This project highlights how Bauplan simplifies building end-to-end machine learning workflows.

Set up

In this example, we will use Jupyter Notebooks, so we will need to install the right dependencies. Go into the example folder, and create a virtual environment.

python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Make sure you have a bauplan_project.yml file with a unique project id, and a project name.

project:
    id: a8133e8c-5abd-490a-b12e-223abaf60981
    name: machine-learning-regression

Exploratory analysis

The first step is to explore the dataset using one of our notebooks to identify the best features for training the model. This process is straightforward with the help of our SDK. Launch Jupyter Lab and open the notebook feature_exploration.ipynb. The code in the notebook is well-commented, so you can follow along and understand the feature selection process directly from the code.

cd 03-ml-regression-model
venv/bin/jupyter lab

In the notebook, we extract a sample of data from the taxi_fhvhv table and compute a correlation matrix. We then display a heatmap to identify which features have the strongest correlation with our target variable, tips. The top features identified are base_passenger_fare, trip_miles, and trip_time, which we will use as input columns to train the Linear Regression model.

ml1.png

The pipeline

This pipeline retrieves raw data from a data lake and processes it to train a Linear Regression model that predicts the tip amount for the length of taxi rides.

flowchart LR id0[(taxi_fhvhv)]-->id1[clean_taxi_trips] id1[clean_taxi_trips] --> id2[training_dataset] id2[training_dataset]-->id3[train_regression_model] id3[train_regression_model]-->id4[tip_predictions]

Run the pipeline

First, create a new branch and switch to it.

bauplan branch create <YOUR_BRANCH>
bauplan branch checkout <YOUR_BRANCH>

With the new branch checked out, make sure you are in the right folder and run the pipeline to materialize our predictions.

cd pipeline
bauplan run

Prepare the training dataset

Your pipeline consists of two main operations that work together to prepare taxi trip data for machine learning. The first operation uses the clean_taxi_trips function to handle data retrieval. This function connects to your data source through a Python S3 scan, accessing the taxi_fhvhv table to extract ten essential columns. It specifically filters the data based on pickup_datetime, pulling six months of historical data that amounts to roughly 19GB. As you work with this substantial dataset, you’ll get a clear sense of how bauplan manages data retrieval at scale.

Note

For more rapid development cycles, you can always use the bauplan run --dry-run command to test your workload in-memory.

The second operation focuses on transforming this raw data into a format suitable for machine learning models. Rather than working with all retrieved columns, this function processes only the specific columns declared in the input model definition. Using a combination of Pandas and Scikit-Learn, it creates a normalized dataset ready for model training.

Important

One particular transformation worth noting involves the trip_miles feature, which undergoes log-normalization to achieve a more balanced distribution. Additionally, all features are rescaled to ensure they contribute equally to the model’s learning process, preventing any unintended bias that might arise from differing measurement scales.

import bauplan

@bauplan.model()
# for this function we specify one dependency, Pandas 2.2.0
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def clean_taxi_trips(
        data=bauplan.Model(
            'taxi_fhvhv',
            # this function performs an S3 scan directly in Python, so we can specify the columns and the filter pushdown
            # by pushing the filters down to S3 we make the system considerably more performant
            columns=[
                'pickup_datetime',
                'dropoff_datetime',
                'PULocationID',
                'DOLocationID',
                'trip_miles',
                'trip_time',
                'base_passenger_fare',
                'tolls',
                'sales_tax',
                'tips'],
            filter="pickup_datetime >= '2023-01-01T00:00:00-05:00' AND pickup_datetime < '2023-03-31T00:00:00-05:00'"
        )
):
    import math
    import pandas as pd

    # debugging lines to check print the version of Python interpreter and the size of the table
    size_in_gb = data.nbytes / math.pow(1024, 3)
    print(f"This table is {size_in_gb} GB and has {data.num_rows} rows")

    # input data is always an Arrow table, so if you wish to use pandas, you need an explicit conversion
    df = data.to_pandas()

    # exclude rows based on multiple conditions
    df = df[(df['trip_miles'] > 1.0) & (df['tips'] > 0.0) & (df['base_passenger_fare'] > 1.0)]

    # output the data as a Pandas dataframe
    return df

@bauplan.model()
@bauplan.python('3.10', pip={'pandas': '1.5.3', 'scikit-learn': '1.3.2'})
def training_dataset(
        data=bauplan.Model(
            'clean_taxi_trips',
        )
):
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import StandardScaler

    # convert data from Arrow to Pandas
    df = data.to_pandas()

    # drop all the rows with NaN values
    df = df.dropna()
    # add a new column with log transformed trip_miles to deal with skewed distributions
    df['log_trip_miles'] = np.log10(df['trip_miles'])
    # define training and target features
    features = df[['log_trip_miles', 'base_passenger_fare', 'trip_time']]
    target = df['tips']
    pickup_dates = df['pickup_datetime']

    # scale the features to ensure that they have similar scales
    # compute the mean and standard deviation for each feature in the training set
    # then use the computed mean and standard deviation to scale the features.
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)
    scaled_df = pd.DataFrame(scaled_features, columns=features.columns)
    scaled_df['tips'] = target.values # Add the target column back to the DataFrame
    scaled_df['pickup_datetime'] = pickup_dates.values  # Add the date column back to the DataFrame

    # print the size of the training dataset
    print(f"The training dataset has {len(scaled_df)} rows")

    # The result is a new array where each feature will have a mean of 0 and a standard deviation of 1
    return scaled_df

Train the model

Training a model can be seamlessly integrated into the pipeline, just like any other step, by executing arbitrary code within a bauplan model. The train_linear_regression function uses Scikit-Learn to prepare the train, validation, and test sets, separating the training features from the target feature. Once the dataset is split, we train the model using Scikit-Learn’s built-in LinearRegression method, ensuring a straightforward and efficient training process.

Passing ML models across functions

bauplan models are functions from and to tabular artifacts, like tables and dataframes. The general structure of a data pipeline in bauplan consists of a sequence of functions that take a table-like object as input and return a table-like object as output.

For users, this is the only key constraint: as long as each function adheres to this structure, everything else is handled by the system. Developers can implement bauplan models using Python functions or SQL queries without worrying about how individual functions are executed or how data is passed across them. As long as the output is a table-like object—such as an Arrow Table, Pandas DataFrame, or list of dictionaries—the pipeline will run smoothly. In addition to handling tables, Machine Learning workflows often involve models, which must be passed between functions. bauplan provides a built-in utility, bauplan.store (see the reference documentation), to persist and retrieve models from a key-value store. This feature makes it easy to pass trained models between functions in a pipeline. The entire code for training and testing a Linear Regression model is the following. It is heavily commented, so you can read through it to get a clear understanding of how it works. This is not the only design pattern for Machine Learning. For instance, we could decide to serialize the model and save it as a file to retrieve it later on in the pipeline or even use a more sophisticated system like a feature store. Ultimately, the framework is powerful enough to allow for more complex designs. For now, it is enough to know that bauplan.store provides a simple way to pass a ML model across bauplan functions in a DAG.

@bauplan.model()
# for this function we specify two dependencies, Pandas 2.2.0 and Scikit-Learn 1.3.2
@bauplan.python('3.11', pip={'pandas': '2.2.0', 'scikit-learn': '1.3.2'})
def train_regression_model(
        data=bauplan.Model(
            'training_dataset',
        )
):
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression

    # convert arrow input into a Pandas DataFrame
    df = data.to_pandas()

    # Define the training and validation set sizes
    training_threshold = 0.8
    validation_threshold = 0.1  # This will implicitly define the test size as the remaining percentage
    # Split the dataset into training and remaining sets first
    train_set, remaining_set = train_test_split(df, train_size=training_threshold, random_state=42)
    # Split the remaining set into validation and test sets
    validation_threshold_adjusted = validation_threshold / (1 - training_threshold)
    validation_set, test_set = train_test_split(remaining_set, test_size=validation_threshold_adjusted, random_state=42)
    #print(f"The training dataset has {len(train_set)} rows")
    print(f"The validation set has {len(validation_set)} rows")
    print(f"The test set has {len(test_set)} rows (remaining)")

    # prepare the feature matrix (X) and target vector (y) for training
    X_train = train_set[['log_trip_miles', 'base_passenger_fare', 'trip_time']]
    y_train = train_set['tips']
    # Train the linear regression model
    reg = LinearRegression().fit(X_train, y_train)

    # persist the model in a key, value store so we can use it later in the DAG
    from bauplan.store import save_obj
    save_obj("regression", reg)

    # Prepare the feature matrix (X) and target vector (y) for validation
    X_test = validation_set[['log_trip_miles', 'base_passenger_fare', 'trip_time']]
    y_test = validation_set['tips']

    # Make predictions on the validation set
    y_hat = reg.predict(X_test)
    # Print the model's mean accuracy (R^2 score)
    print("Mean accuracy: {}".format(reg.score(X_test, y_test)))

    # Prepare the output table with predictions
    validation_df = validation_set[['log_trip_miles', 'base_passenger_fare', 'trip_time', 'tips']]
    validation_df['predictions'] = y_hat

    # Display the validation set with predictions
    print(validation_df.head())

    return test_set

@bauplan.model(materialization_strategy='REPLACE')
# for this function we specify two dependencies, Pandas 2.2.0 and Scikit-Learn 1.3.2
@bauplan.python('3.11', pip={'scikit-learn': '1.3.2', 'pandas': '2.1.0'})
def tip_predictions(
        data=bauplan.Model(
            'train_regression_model',
        )

):

    # retrieve the model trained in the previous step of the DAG from the key, value store
    from bauplan.store import load_obj
    reg = load_obj("regression")
    print(type(reg))

    # convert the test set from an Arrow table to a Pandas DataFrame
    test_set = data.to_pandas()

    # Prepare the feature matrix (X) and target vector (y) for test
    X_test = test_set[['log_trip_miles', 'base_passenger_fare', 'trip_time']]
    y_test = test_set['tips']

    # Make predictions on the test set
    y_hat = reg.predict(X_test)
    # Print the model's mean accuracy (R^2 score)
    print("Mean accuracy: {}".format(reg.score(X_test, y_test)))

    # Prepare the finale output table with the predictions
    prediction_df = test_set[['log_trip_miles', 'base_passenger_fare', 'trip_time', 'tips']]
    prediction_df['predictions'] = y_hat

    # return the prediction dataset
    return prediction_df

Explore the results

Once the predictions generated by the model are materialized in the data catalog as an Iceberg table, we can leverage the bauplan SDK to integrate the table into exploration tools, such as notebooks, or into a visualization app. To assess the accuracy of our predictions, we use various types of plots in the notebook, employing Matplotlib and Seaborn for data visualization. As usual, we interact with the data directly from our branch of the data catalog using the bauplan SDK’s query method. Launch Jupyter Lab again, and open the prediction_visualization.ipynb notebook to explore the prediction results.

cd 03-ml-regression-model
venv/bin/jupyter lab

We will use three different plots to evaluate the performance of the Linear Regression model we just trained: Actual vs. Predicted Values Plot: this scatter plot compares the predicted values with the actual values. Ideally, the points should align closely along the line y = x, indicating accurate predictions.

ml2.png

Residual Plot: This plot shows the residuals (differences between actual and predicted values) against the predicted values. Ideally, the residuals should be randomly distributed around zero, indicating that the model captures the underlying patterns in the data.

ml3.png

Distribution of Residuals: This histogram shows the distribution of the residuals. Ideally, the residuals should be normally distributed around zero.

ml4.png

The code in there is heavily commented, so you can explore it directly in the notebook.

Summary

In this chapter:

  • We demonstrated how to use Bauplan to build a machine learning pipeline that predicts the tip amount for taxi rides.

  • We explored the dataset, prepared the training data, trained a Linear Regression model, and visualized the predictions.

This example showcases bauplan’s flexibility and ease of use in integrating machine learning workflows seamlessly into data pipelines.