05. Advanced topics¶
This chapter covers more advanced topics regarding multilanguage pipelines, data contracts and more advanced operations. The first thing for you to do is to create another branch and run the pipeline:
bauplan branch create <YOUR_USERNAME>.advanced_topics
bauplan branch checkout <YOUR_USERNAME>.advanced_topics
bauplan run
The pipeline¶
In this example, you will run a DAG with the following structure which
will allow you to familiarize with two important concepts: table joins
and ML models. As you can see, the first ETL step joins the two tables
taxi_fhvhv
and taxi_zones
together, while the second part of the
DAG trains and evaluates a Linear Regression model.
🔗 Join models 🔗¶
Python models in Bauplan accept multiple parents as inputs, so you can
join two or more models into one. Take a look at the function
etl.trips_and_zones
in the file etl.py
. The function joins the
data about taxi trips from taxi_fhvhv
with the geographical data
from taxi_zones
and creates a table where each trip has a
borough
, a zone
and a service_zone
for both pickup and
drop-off locations. To have a model that takes multiple parents as an
input, you simply have to define them in the function signature, while
everything else remains the same. This is the relevant function in the
code: as you can see, the function simply takes two input arguments
trips
and zones
:
@bauplan.model(
columns=[
'pickup_datetime',
'dropoff_datetime',
'PULocationID',
'DOLocationID',
'trip_miles',
'trip_time',
'base_passenger_fare',
'tolls',
'sales_tax',
'tips',
'zonePU',
'service_zonePU',
'boroughPU',
'zoneDO',
'boroughDO',
'service_zoneDO'
],
materialize=False
)
@bauplan.python('3.11', pip={'pandas': '2.1.0'})
def trips_and_zones(
# This is an S3 scan done directly in Python taking multiple tables as input
# to define the multiple input tables simply define each of them as instances of bauplan.model
# everything else in the function syntax remains the same
trips=bauplan.Model(
'taxi_fhvhv',
columns=[
'pickup_datetime',
'dropoff_datetime',
'PULocationID',
'DOLocationID',
'trip_miles',
'trip_time',
'base_passenger_fare',
'tolls',
'sales_tax',
'tips',
],
filter="pickup_datetime >= '2023-01-01T00:00:00-05:00' AND pickup_datetime < '2023-01-15T00:00:00-05:00'"
),
# This is the second input table of the function
zones=bauplan.Model(
'taxi_zones',
columns=[
'LocationID',
'Borough',
'Zone',
'service_zone',
]),
):
# Join 'zone' with 'trips' on 'PULocationID column
pickup_location_table = trips.join(zones, 'PULocationID', 'LocationID').combine_chunks()
# join 'pickup_table' with 'zones' again and add a prefix to the right table columns
# this will give us the geographical information also for the drop-off locations
pickup_location_table = pickup_location_table.join(zones, 'DOLocationID', 'LocationID', right_suffix='DO').combine_chunks()
return pickup_location_table
Warning
Bauplan does not support joins in SQL right now, so joining multiple tables must be done in Python. As you will see later in the section, this is relevant since the system supports interoperability between SQL and Python. It would be a perfectly legitimate design to join data in an SQL step, but for now you just can’t.
🧠 Machine Learning models 🤖¶
As we mentioned in the beginning, Bauplan models are functions from tabular artifacts to other tabular artifacts (except expectations which are functions from tabular artifacts to booleans). While this is very simple to reason about and makes the entire system intuitive, there are cases where we need to pass something other than a tabular artifact from one function to another. An obvious case is, of course, ML pipelines where ML models need to be persisted across functions to split train, eval and test efficiently. In this example, you will see how it is possible to persist an ML model in a key value store for subsequent functions in the DAG to use.
In the file regression_model, you’ll encounter the second part of the
depicted DAG, which focuses on training a linear regression model to
forecast the tip amount based on the length of past trips in miles. This
DAG employs three functions: the first function is utilized to prepare
the training dataset, the second function is dedicated to training and
evaluating the regression model, and the third function is responsible
for generating a table of predictions by applying the model to a
held-out test set. Within the
regression_model.train_linear_regression
function, we employ
Scikit-Learn to train the linear
regression. To persist the model, you can use Bauplan SDK and save the
model into a key-value store:
# import save_obj from bauplan utils
from bauplan.store import save_obj
# persist the model in a key value store
save_obj("regression", reg)
The subsequent function regression_model.tip_predictions
can then
load the regression model back into memory and use it to generate
predictions against the test set:
from bauplan.store import load_obj
reg = load_obj('regression')
Note that both functions regression_model.train_linear_regression
and regression_model.tip_predictions
still maintain the general
semantics of the DAG and outputs tabular artifacts, but they are able to
store a non-tabular artifact like a ML model.
🔄 Mix and match SQL and Python 🔄¶
Bauplan allows full interoperability between SQL and Python, so you can actually build multi-language pipelines. While the models showcased in this tutorial have primarily been Python functions thus far, it’s important to note that a model can be expressed either as a SQL query or as a Python function.
For instance, let’s say that we want to add a step after the ETL that computes the top pickup locations, using an SQL query instead of a Python function.
To build a multi-language pipeline, go in
alpha/oas_pipeline/tutorial/05_advanced_topics
, create a new file
called top_pickup_locations.sql
, and paste the following SQL code in
it:
-- bauplan: materialize=False
SELECT
COUNT(pickup_datetime) as number_of_trips,
Borough,
Zone
FROM
normalized_taxi_trips
GROUP BY
Borough, Zone
ORDER BY COUNT(pickup_datetime) DESC
Then, run the pipeline as usual by executing bauplan run
. As you
will see, the resulting DAG will contain a new node expressed in SQL
that takes as input the output of the Python function
etl.normalized_taxi_trips
.
By convention, Bauplan requires each SQL model to be expressed as a separate file (à la dbt). In contrast, Python models can be distributed across files in an arbitrary manner, as illustrated earlier. As a consequence, to build multi-language complex pipelines, you simply need to add as many .sql files as SQL models and as many Python functions as Python models.
🧱 Materialize from SQL 🧱¶
To materialize models expressed with SQL, you can set the flag
materialize
to True
, similarly to what you would do in a Python
model. Go into top_pickup_locations.sql
look at the comment right
above the SELECT statement, and set the flag to True
.
bauplan run
🤝 Data contracts 🤝¶
Whether they are expressed in Python or SQL, Bauplan models are functions from tabular artifacts to tabular artifacts. The system allows you to define and enforce explicit data contracts between models, to make sure that columnar schemas are consistent throughout the pipelines. Like with all contracts, there are at least two parties involved and all parties must agree on the terms. In the context of a Bauplan DAG, the parties are models. These models take tabular artifacts as input, execute certain operations, and generate tabular artifacts as output. The definition of data contracts informs the system about expected input and the expected output of each model. Defining those in the models is a good practice, even if the contracts are not enforced: since Bauplan uses a columnar-based engine, declaring the columns is a way to help the system to be as efficient as possible. In case you wanted to enforce the data contracts between the models, you can do it by using a special flag:
bauplan run --strict
When executed with the --strict
flag, the run will terminate with an
error if there are discrepancies between the expected input and output
and the actual input and output. For example, if the output of a parent
node differs from the expected input of its child node, the run will
fail. Data contracts in Bauplan are defined in the following way:
Expected output schemas¶
The expected schema of the output of a model is defined in the
@bauplan.model
decorator. These may be different from the input
columns below because columns can be dropped, renamed or added in the
body of the model.
@bauplan.model(
columns=[
'expected_output_col1',
'expected_output_col2'
],
materialize=False)
Expected input schemas¶
The expected schema of the input model is defined in the input arguments of a model - that is, in the signature of the function:
def my_bauplan_model(
data=bauplan.Model(
'input_model',
columns=[
'expected_input_col1',
'expected_input_col2'
]
)
):
return
Go into the file etl.py
, and take a look at the first model in the
DAG trips_and_zones
. Then comment out the column
base_passenger_fare
from the list of columns in the decorator
@bauplan.model
, then run:
bauplan run --strict
The system will throw an error and the run will fail:
ERR runtime task (model[3.11].trips_and_zones @ models.py:110) failed due to: Validation for the output columns on model [normalize_data] failed!
This is because the expected output of the first model is not compatible
with its actual output, which in fact contains the column
base_passenger_fare
. If you now comment out the column
base_passenger_fare
in both the expected output and the expected
inputs of all the models in the file etl.py
and run again with the
--strict
flag, the run should run to completion. That is the case
because removing the column from all the expected inputs and outputs in
the DAG makes the whole data contract consistent.
Parameterized Runs¶
For some use cases, it is useful to run the pipeline with different parameters. For example, you might want to:
Run the pipeline with a particular date range
Run the pipeline with a specific user ID
Run the pipeline with a flag that changes the behavior of the pipeline (e.g., different classifiers)
To run the pipeline with different parameters, you can use the
bauplan run --param
flag. This repeated flag allows you to pass parameters
as key=value
pairs to the pipeline. The parameters are then available
in the pipeline as environment variables.
To use parameters, you must first declare them, along with their default values,
in your bauplan_project.yml project configuration file. Types are inferred
automatically from the default key, so make sure to carefully define the default
values. (e.g., YAML would misinterpret 2
as a string, so you should use "2"
if the parameter is intended to be a string).
Supported types are str
, int
, float
, and bool
.
Example:
project:
id: 641d33d8-a6c2-4525-90d5-d54e7f27c26f
name: parameters_example
parameters:
customer_id:
default: 1337
golden_ratio:
default: 1.6666
use_random_forest:
default: True
start_datetime:
default: "2023-01-01T00:00:00-05:00"
end_datetime:
default: "2023-01-30T00:00:00-05:00"
Parameters in SQL¶
To use a parameter in a SQL model, use the DuckDB prepared statement syntax.
For example:
-- page_view_counts.sql
SELECT
page_view_counts AS x
FROM
customer_metrics
WHERE
start_datetime >= $start_datetime AND
end_datetime < $end_datetime
Parameters in Python¶
To use a parameter in a Python model, pass it as a keyword argument, similar to
how the bauplan.Model
is passed. For example:
@bauplan.model(columns=['x', 'y'])
@bauplan.python('3.11')
def do_classifications(
samples=bauplan.Model('page_view_counts', columns=['x']),
use_random_forest=bauplan.Parameter('use_random_forest'),
):
clf = RandomForestClassifier() if use_random_forest else LogisticRegression()
# ... do something with clf
Then, to use the parameters in your bauplan run
, use the --param
flag:
bauplan run \
--param customer_id=1234 \
--param use_random_forest=False
Parameters are saved with the run information, so, bauplan run --id
to re-run
a particular pipeline will also re-run the pipeline with the same parameters.
🔥 Recap of chapter 5 🔥¶
Join tables: you can join multiple tables using the very same syntax used for every other Python model. Remember that, for now, Bauplan does not support joins in SQL and Python functions are the only way to do that.
ML Models: you can persist ML models in a key-value store and pass it along different functions in the DAG. This pattern is useful to build ML pipelines where train, eval and test functions are separate.
Mix and Match SQL and Python: you can express models in both SQL and Python. Both languages have pros and cons, and you can pick the best tool depending on what you are trying to accomplish. This allows you to organize projects quite freely and to easily port code from other tools like notebooks into your pipelines.
Data contracts: you can define data contracts in your pipelines by simply declaring the expected input columns and the expected output columns of your models. You can enforce contracts by running Bauplan with a special flag
--strict
.Parameterized runs: you can run the pipeline with different parameters by using the
--param
flag. This is useful to run the same pipeline with different configurations.