05. Advanced topics

This chapter covers more advanced topics regarding multilanguage pipelines, data contracts and more advanced operations. The first thing for you to do is to create another branch and run the pipeline:

bauplan branch create <YOUR_USERNAME>.advanced_topics
bauplan branch checkout <YOUR_USERNAME>.advanced_topics
bauplan run

The pipeline

In this example, you will run a DAG with the following structure which will allow you to familiarize with two important concepts: table joins and ML models. As you can see, the first ETL step joins the two tables taxi_fhvhv and taxi_zones together, while the second part of the DAG trains and evaluates a Linear Regression model.

flowchart LR subgraph Data Lake id0[(taxi_fhvhv)] id1[(taxi_zones)] end subgraph ETL id0[(taxi_fhvhv)]--> id2[etl.trips_and_zones] id1[(taxi_zones)]--> id2[etl.trips_and_zones] id2[etl.trips_and_zones]-->id3[etl.normalized_taxi_trips] end subgraph Linear Regression model id3[etl.normalized_taxi_trips] --> id4[regression_model.lr_training_dataset]--> id5[regression_model.train_linear_regression] -->id12[regression_model.tips_predictions] end

🔗 Join models 🔗

Python models in Bauplan accept multiple parents as inputs, so you can join two or more models into one. Take a look at the function etl.trips_and_zones in the file etl.py. The function joins the data about taxi trips from taxi_fhvhv with the geographical data from taxi_zones and creates a table where each trip has a borough, a zone and a service_zone for both pickup and drop-off locations. To have a model that takes multiple parents as an input, you simply have to define them in the function signature, while everything else remains the same. This is the relevant function in the code: as you can see, the function simply takes two input arguments trips and zones:

@bauplan.model(
    columns=[
        'pickup_datetime',
        'dropoff_datetime',
        'PULocationID',
        'DOLocationID',
        'trip_miles',
        'trip_time',
        'base_passenger_fare',
        'tolls',
        'sales_tax',
        'tips',
        'zonePU',
        'service_zonePU',
        'boroughPU',
        'zoneDO',
        'boroughDO',
        'service_zoneDO'
    ],
    materialize=False
)
@bauplan.python('3.11', pip={'pandas': '2.1.0'})
def trips_and_zones(
        # This is an S3 scan done directly in Python taking multiple tables as input
        # to define the multiple input tables simply define each of them as instances of bauplan.model
        # everything else in the function syntax remains the same
        trips=bauplan.Model(
            'taxi_fhvhv',
            columns=[
                'pickup_datetime',
                'dropoff_datetime',
                'PULocationID',
                'DOLocationID',
                'trip_miles',
                'trip_time',
                'base_passenger_fare',
                'tolls',
                'sales_tax',
                'tips',
            ],
            filter="pickup_datetime >= '2023-01-01T00:00:00-05:00' AND pickup_datetime < '2023-01-15T00:00:00-05:00'"
        ),
        # This is the second input table of the function
        zones=bauplan.Model(
            'taxi_zones',
            columns=[
                'LocationID',
                'Borough',
                'Zone',
                'service_zone',
            ]),
):
    # Join 'zone' with 'trips' on 'PULocationID column
    pickup_location_table = trips.join(zones, 'PULocationID', 'LocationID').combine_chunks()
    # join 'pickup_table' with 'zones' again and add a prefix to the right table columns
    # this will give us the geographical information also for the drop-off locations
    pickup_location_table = pickup_location_table.join(zones, 'DOLocationID', 'LocationID', right_suffix='DO').combine_chunks()

    return pickup_location_table

Warning

Bauplan does not support joins in SQL right now, so joining multiple tables must be done in Python. As you will see later in the section, this is relevant since the system supports interoperability between SQL and Python. It would be a perfectly legitimate design to join data in an SQL step, but for now you just can’t.

🧠 Machine Learning models 🤖

As we mentioned in the beginning, Bauplan models are functions from tabular artifacts to other tabular artifacts (except expectations which are functions from tabular artifacts to booleans). While this is very simple to reason about and makes the entire system intuitive, there are cases where we need to pass something other than a tabular artifact from one function to another. An obvious case is, of course, ML pipelines where ML models need to be persisted across functions to split train, eval and test efficiently. In this example, you will see how it is possible to persist an ML model in a key value store for subsequent functions in the DAG to use.

In the file regression_model, you’ll encounter the second part of the depicted DAG, which focuses on training a linear regression model to forecast the tip amount based on the length of past trips in miles. This DAG employs three functions: the first function is utilized to prepare the training dataset, the second function is dedicated to training and evaluating the regression model, and the third function is responsible for generating a table of predictions by applying the model to a held-out test set. Within the regression_model.train_linear_regression function, we employ Scikit-Learn to train the linear regression. To persist the model, you can use Bauplan SDK and save the model into a key-value store:

# import save_obj from bauplan utils
from bauplan.store import save_obj

# persist the model in a key value store
save_obj("regression", reg)

The subsequent function regression_model.tip_predictions can then load the regression model back into memory and use it to generate predictions against the test set:

from bauplan.store import load_obj
reg = load_obj('regression')

Note that both functions regression_model.train_linear_regression and regression_model.tip_predictions still maintain the general semantics of the DAG and outputs tabular artifacts, but they are able to store a non-tabular artifact like a ML model.

🔄 Mix and match SQL and Python 🔄

Bauplan allows full interoperability between SQL and Python, so you can actually build multi-language pipelines. While the models showcased in this tutorial have primarily been Python functions thus far, it’s important to note that a model can be expressed either as a SQL query or as a Python function.

For instance, let’s say that we want to add a step after the ETL that computes the top pickup locations, using an SQL query instead of a Python function.

To build a multi-language pipeline, go in alpha/oas_pipeline/tutorial/05_advanced_topics, create a new file called top_pickup_locations.sql, and paste the following SQL code in it:

-- bauplan: materialize=False
SELECT
    COUNT(pickup_datetime) as number_of_trips,
    Borough,
    Zone
FROM
    normalized_taxi_trips
GROUP BY
    Borough, Zone
ORDER BY COUNT(pickup_datetime) DESC

Then, run the pipeline as usual by executing bauplan run. As you will see, the resulting DAG will contain a new node expressed in SQL that takes as input the output of the Python function etl.normalized_taxi_trips.

flowchart LR subgraph Data Lake id0[(taxi_fhvhv)] id1[(taxi_zones)] end subgraph ETL id0[(taxi_fhvhv)]--> id2[etl.trips_and_zones] id1[(taxi_zones)]--> id2[etl.trips_and_zones] id2[etl.trips_and_zones]-->id3[etl.normalized_taxi_trips] end subgraph Linear Regression model id3[etl.normalized_taxi_trips] --> id4[regression_model.lr_training_dataset]--> id5[regression_model.train_linear_regression] -->id6[regression_model.tips_predictions] end subgraph Reporting id3[etl.normalized_taxi_trips] --> id7[top_pickup_locations.sql] end

By convention, Bauplan requires each SQL model to be expressed as a separate file (à la dbt). In contrast, Python models can be distributed across files in an arbitrary manner, as illustrated earlier. As a consequence, to build multi-language complex pipelines, you simply need to add as many .sql files as SQL models and as many Python functions as Python models.

🧱 Materialize from SQL 🧱

To materialize models expressed with SQL, you can set the flag materialize to True, similarly to what you would do in a Python model. Go into top_pickup_locations.sql look at the comment right above the SELECT statement, and set the flag to True.

bauplan run

🤝 Data contracts 🤝

Whether they are expressed in Python or SQL, Bauplan models are functions from tabular artifacts to tabular artifacts. The system allows you to define and enforce explicit data contracts between models, to make sure that columnar schemas are consistent throughout the pipelines. Like with all contracts, there are at least two parties involved and all parties must agree on the terms. In the context of a Bauplan DAG, the parties are models. These models take tabular artifacts as input, execute certain operations, and generate tabular artifacts as output. The definition of data contracts informs the system about expected input and the expected output of each model. Defining those in the models is a good practice, even if the contracts are not enforced: since Bauplan uses a columnar-based engine, declaring the columns is a way to help the system to be as efficient as possible. In case you wanted to enforce the data contracts between the models, you can do it by using a special flag:

bauplan run --strict

When executed with the --strict flag, the run will terminate with an error if there are discrepancies between the expected input and output and the actual input and output. For example, if the output of a parent node differs from the expected input of its child node, the run will fail. Data contracts in Bauplan are defined in the following way:

Expected output schemas

The expected schema of the output of a model is defined in the @bauplan.model decorator. These may be different from the input columns below because columns can be dropped, renamed or added in the body of the model.

@bauplan.model(
  columns=[
        'expected_output_col1',
        'expected_output_col2'
    ],
  materialize=False)

Expected input schemas

The expected schema of the input model is defined in the input arguments of a model - that is, in the signature of the function:

def my_bauplan_model(
    data=bauplan.Model(
        'input_model',
        columns=[
            'expected_input_col1',
            'expected_input_col2'
        ]
    )
):
  return

Go into the file etl.py, and take a look at the first model in the DAG trips_and_zones. Then comment out the column base_passenger_fare from the list of columns in the decorator @bauplan.model, then run:

bauplan run --strict

The system will throw an error and the run will fail:

ERR runtime task (model[3.11].trips_and_zones @ models.py:110) failed due to: Validation for the output columns on model [normalize_data] failed!

This is because the expected output of the first model is not compatible with its actual output, which in fact contains the column base_passenger_fare. If you now comment out the column base_passenger_fare in both the expected output and the expected inputs of all the models in the file etl.py and run again with the --strict flag, the run should run to completion. That is the case because removing the column from all the expected inputs and outputs in the DAG makes the whole data contract consistent.

Parameterized Runs

For some use cases, it is useful to run the pipeline with different parameters. For example, you might want to:

  • Run the pipeline with a particular date range

  • Run the pipeline with a specific user ID

  • Run the pipeline with a flag that changes the behavior of the pipeline (e.g., different classifiers)

To run the pipeline with different parameters, you can use the bauplan run --param flag. This repeated flag allows you to pass parameters as key=value pairs to the pipeline. The parameters are then available in the pipeline as environment variables.

To use parameters, you must first declare them, along with their default values, in your bauplan_project.yml project configuration file. Types are inferred automatically from the default key, so make sure to carefully define the default values. (e.g., YAML would misinterpret 2 as a string, so you should use "2" if the parameter is intended to be a string).

Supported types are str, int, float, and bool.

Example:

project:
  id: 641d33d8-a6c2-4525-90d5-d54e7f27c26f
  name: parameters_example

parameters:
  customer_id:
    default: 1337
  golden_ratio:
    default: 1.6666
  use_random_forest:
    default: True
  start_datetime:
    default: "2023-01-01T00:00:00-05:00"
  end_datetime:
    default: "2023-01-30T00:00:00-05:00"

Parameters in SQL

To use a parameter in a SQL model, use the DuckDB prepared statement syntax.

For example:

-- page_view_counts.sql
SELECT
    page_view_counts AS x
FROM
    customer_metrics
WHERE
    start_datetime >= $start_datetime AND
    end_datetime < $end_datetime

Parameters in Python

To use a parameter in a Python model, pass it as a keyword argument, similar to how the bauplan.Model is passed. For example:

@bauplan.model(columns=['x', 'y'])
@bauplan.python('3.11')
def do_classifications(
    samples=bauplan.Model('page_view_counts', columns=['x']),
    use_random_forest=bauplan.Parameter('use_random_forest'),
):
  clf = RandomForestClassifier() if use_random_forest else LogisticRegression()
  # ... do something with clf

Then, to use the parameters in your bauplan run, use the --param flag:

bauplan run \
    --param customer_id=1234 \
    --param use_random_forest=False

Parameters are saved with the run information, so, bauplan run --id to re-run a particular pipeline will also re-run the pipeline with the same parameters.

🔥 Recap of chapter 5 🔥

  • Join tables: you can join multiple tables using the very same syntax used for every other Python model. Remember that, for now, Bauplan does not support joins in SQL and Python functions are the only way to do that.

  • ML Models: you can persist ML models in a key-value store and pass it along different functions in the DAG. This pattern is useful to build ML pipelines where train, eval and test functions are separate.

  • Mix and Match SQL and Python: you can express models in both SQL and Python. Both languages have pros and cons, and you can pick the best tool depending on what you are trying to accomplish. This allows you to organize projects quite freely and to easily port code from other tools like notebooks into your pipelines.

  • Data contracts: you can define data contracts in your pipelines by simply declaring the expected input columns and the expected output columns of your models. You can enforce contracts by running Bauplan with a special flag --strict.

  • Parameterized runs: you can run the pipeline with different parameters by using the --param flag. This is useful to run the same pipeline with different configurations.