A simple data app
In this example, we’ll introduce you to Bauplan by writing a simple pipeline that powers a data visualization app. We’ll explain the foundations of a Bauplan pipeline step by step and introduce you to our Python SDK.
For this example, we are going to use the TLC NY taxi dataset and we are going to build a
pipeline that joins data from the Yellow Taxi Record, joins it with a Taxi Zone Lookup Table, cleans the data and then
creates one final table to visualize: a top_pickup_location
table that shows the Boroughs and Zones in NY with the most taxi pickups.
The final shape of the pipeline we are going to build will look like this:
Set up
Go into the folder 01-simple-data-app/pipeline
, and create a bauplan_project.yml
file with a unique project id
, a
project name
and a default Python interpreter.
project:
id: bde138c0-0c48-4f37-a2be-cc55c8e8504a
name: simple-data-app
Pipeline
In the folder you can find a file called models.py
with some basic
transformation code in it to define the pipeline:
import bauplan
@bauplan.model(materialize=False)
# for this function we specify one dependency, namely Pandas 2.2.0
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def normalized_taxi_trips(
data=bauplan.Model(
'taxi_fhvhv',
# this function performs an S3 scan directly in Python, so we can specify the columns and the filter pushdown
# by pushing the filters down to S3 we make the system considerably more performant
columns=[
'pickup_datetime',
'dropoff_datetime',
'PULocationID',
'DOLocationID',
'trip_miles',
'trip_time',
'base_passenger_fare',
'tolls',
'sales_tax',
'tips',
],
filter="pickup_datetime >= '2022-12-01T00:00:00-05:00' AND pickup_datetime < '2023-01-01T00:00:00-05:00'"
),
):
"""
this function takes the table 'taxi_fhvhv' in the data lake and does some simple data cleaning operations
using Pandas https://pandas.pydata.org/docs/
"""
import pandas as pd
import math
# print some debug info - you will see every print statement directly in your terminal
size_in_gb = round(data.nbytes / math.pow(1024, 3), 3)
print(f"\nThis table is {size_in_gb} GB and has {data.num_rows} rows\n")
# the following code is Pandas
# convert data from Arrow to Pandas
df = data.to_pandas()
# create time filter on datetime UTC
time_filter = pd.to_datetime('2022-01-01')
time_filter_utc = time_filter.tz_localize('UTC')
# filter df by timestamp
df = df[df['pickup_datetime'] >= time_filter_utc]
# exclude rows with trip_miles = 0
df = df[df['trip_miles'] > 0.0]
# exclude rows with trip_miles > 200
df = df[df['trip_miles'] < 200.0]
# return a Pandas dataframe
return df
# with materialize=True we materialize this table into our active branch of the data catalog as an Iceberg table
@bauplan.model(materialize=True)
# for this function we specify a different dependency, namely DuckDB 0.10.3
@bauplan.python('3.11', pip={'duckdb': '0.10.3'})
def top_pickup_locations(
trips=bauplan.Model(
'normalized_taxi_trips',
columns=['*']
),
zones=bauplan.Model(
'taxi_zones',
columns=['*'],
),
):
"""
this function takes:
- the parent output of the function 'normalized_taxi_trips',
- the table 'taxi_zones' directly from the data catalog, and
- joins them to do some grouping and reordering over the number of trips using DuckDB https://duckdb.org/docs/
the output is a table with all the NY zones ordered by number of trips
"""
import duckdb
# the following code uses DuckDB
# because DuckDB can query directly Arrow tables we do not need to do anything and can query directly the input tables
# the input tables 'trips' and 'zones' are referenced directly in the SQL in the FROM and in the JOIN clauses
sql_query = f"""
SELECT
COUNT(trips.pickup_datetime) AS number_of_trips,
trips.PULocationID,
zones.Borough,
zones.Zone
FROM trips
JOIN zones
ON trips.PULocationID = zones.LocationID
GROUP BY
trips.PULocationID,
zones.Borough,
zones.Zone
ORDER BY
number_of_trips DESC;
"""
# run the query and return the results as an Arrow Table
data = duckdb.sql(sql_query).arrow()
# return an Arrow Table
return data
To showcase the fact that all function are fully containerized, we used
different libraries to compute each step: we are going to use Pandas for
the first model normalized_taxi_trips
and
DuckDB for the second one
top_pickup_locations
. These are arbitrary implementation choices.
You can refactor this pipeline using the dependencies you are more
comfortable with.
Note
For instance, you could rewrite the pipeline using only Pandas. What would the code of the pipeline look like in either cases?
Create a branch and run the pipeline in it so you will have a table in
it named top_pickup_locations
,
bauplan branch create <YOUR_BRANCH>
bauplan branch checkout <YOUR_BRANCH>
bauplan run
To make sure everything went as expected inspect the table from your branch.
bauplan table get bauplan.top_pickup_locations
Streamlit app
To visualize the data we will use Streamlit,
a powerful Python framework to build web apps in pure Python. Go into the
01-simple-data-app/app
folder,
Create a virtual environment, install the necessary requirements and then run the Streamlit app.
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
streamlit run viz_app.py
This app will simply visualize the final table of our pipeline, nothing
fancy. Note that you will need to pick a branch that contains the table
top_pickup_locations
. This is what it looks like.
The programmable Lakehouse in action: Bauplan Python SDK
One of the fundamental features of Bauplan is that every core operation of the platform can always be embedded in the code of another application: that’s why we call it The Programmable Lakehouse. All the main concepts illustrated in the tutorial - running, querying, branching, merging - can be accessed with a simple Python SDK. This makes your data stack infinitely programmable and incredibly easy to embed in any existing workflow.
This Streamlit app is a very simple example to illustrate how we can use
Bauplan inside another application by simply using our Python SDK. In
this example, we showcase query
, a method that allows us to
use Bauplan as a query engine and run a SQL query and easily cast the results
as a Pandas DataFrame. We can use this method to run an interactive
queries against any table in any branch of the data catalog that we have
access to.
import bauplan
client = bauplan.Client()
# run a query against an arbtrary branch and cast the results to a DataFrame
df = client.query('SELECT c1 FROM my_table', branch_name='my_branch').to_pandas()
We use this method to fetch the data from the table
top_pickup_location
from a branch and visualize the table in the app
Streamlit app.
The entire code of the Streamlit app is the following. The code is heavily commented, check it out to know more about how we used the SDK in the app.
# General Streamlit / Visualization imports
import streamlit as st
import sys
import grpc
import pandas as pd
import matplotlib.pyplot as plt
# we import the bauplan Python SDK, with pre-built functions
# Client.query allows us to use bauplan as an interactive query engine
# we can run query against a table in the data catalog and put the data in a Pandas dataframe
import bauplan
client = bauplan.Client()
@st.cache_data()
def query_as_dataframe(sql, branch, top):
"""
This function uses query method to query a table in the data catalog
and cast it as DataFrame
"""
try:
return client.query(sql, branch_name=branch, args={'preview': 'True'}).to_pandas().head(top)
except bauplan.exceptions.BauplanError:
return None
def plot_bar_chart(df):
"""
This function plots a bar chart from the table top_pickup_location
"""
plt.figure(figsize=(11, 11))
plt.barh(df['Zone'], df['number_of_trips'], color='skyblue', edgecolor='white')
plt.ylabel('Zone')
plt.xlabel('Number of Trips')
plt.title('Number of Trips per Zone')
plt.tight_layout()
st.pyplot(plt)
### THE STREAMLIT APP BEGINS HERE
def main():
st.title('A simple data app to visualize taxi rides and locations in NY')
# Debug line to ensure correct Python interpreter
print(sys.executable)
# define a text input field where the user can indicate her active branch
selected_branch = st.text_input("What branch are you looking for?", " ")
if selected_branch.strip():
# define the target table and the query to run
table_name = 'top_pickup_locations'
sql_query = f"SELECT * FROM {table_name}"
# use bauplan sdk to retrieve the data from the data catalog as a Pandas DataFrame
df = query_as_dataframe(sql_query, selected_branch, 50)
if df is not None and not df.empty:
st.dataframe(df, width=1200)
plot_bar_chart(df)
else:
st.write('No data available for the selected branch or table.')
else:
st.write('Please input your branch')
if __name__ == "__main__":
main()
Summary
We have shown how you can use Bauplan to develop a data pipeline that powers a simple data visualization app.
Through very simple abstractions we were able to:
create a branch of our data catalog,
build a transformation pipeline that uses several dependencies,
materialize the results of the pipeline in the data lake as Iceberg tables.
At no point did we have to worry about the configuration of the compute at runtime, the containerization and management of the Python environments, and the operations needed to persist the data in the data lake and the data catalog.
In addition, with a simple import statement we were able to embed the capabilities of the Bauplan runtime into our Streamlit app. Thanks to Bauplan Python SDK it is simple to use the capabilities of the platform inside other applications.