A simple data app

In this example, we’ll introduce you to Bauplan by writing a simple pipeline that powers a data visualization app. We’ll explain the foundations of a Bauplan pipeline step by step and introduce you to our Python SDK.

For this example, we are going to use the TLC NY taxi dataset and we are going to build a pipeline that joins data from the Yellow Taxi Record, joins it with a Taxi Zone Lookup Table, cleans the data and then creates one final table to visualize: a top_pickup_location table that shows the Boroughs and Zones in NY with the most taxi pickups.

The final shape of the pipeline we are going to build will look like this:

flowchart LR id0[(taxi_fhvhv)]-->id2[models.normalized_taxi_trips] id2[models.normalized_taxi_trips] --> id4[models.top_pickup_locations] id3[(taxi_zones)] --> id4[models.top_pickup_locations]

Set up

Go into the folder 01-simple-data-app/pipeline, and create a bauplan_project.yml file with a unique project id, a project name and a default Python interpreter.

project:
    id: bde138c0-0c48-4f37-a2be-cc55c8e8504a
    name: simple-data-app

defaults:
    python_version: 3.11

Pipeline

In the folder you can find a file called models.py with some basic transformation code in it to define the pipeline:

import bauplan


@bauplan.model(materialize=False)
# for this function we specify one dependency, namely Pandas 2.2.0
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def normalized_taxi_trips(
        data=bauplan.Model(
            'taxi_fhvhv',
            # this function performs an S3 scan directly in Python, so we can specify the columns and the filter pushdown
            # by pushing the filters down to S3 we make the system considerably more performant
            columns=[
                'pickup_datetime',
                'dropoff_datetime',
                'PULocationID',
                'DOLocationID',
                'trip_miles',
                'trip_time',
                'base_passenger_fare',
                'tolls',
                'sales_tax',
                'tips',
                ],
            filter="pickup_datetime >= '2022-12-01T00:00:00-05:00' AND pickup_datetime < '2023-01-01T00:00:00-05:00'"
        ),
):
    """

    this function takes the table 'taxi_fhvhv' in the data lake and does some simple data cleaning operations
    using Pandas https://pandas.pydata.org/docs/

    """

    import pandas as pd
    import math

    # print some debug info - you will see every print statement directly in your terminal
    size_in_gb = round(data.nbytes / math.pow(1024, 3), 3)
    print(f"\nThis table is {size_in_gb} GB and has {data.num_rows} rows\n")

    # the following code is Pandas
    # convert data from Arrow to Pandas
    df = data.to_pandas()
    # create time filter on datetime UTC
    time_filter = pd.to_datetime('2022-01-01')
    time_filter_utc = time_filter.tz_localize('UTC')
    # filter df by timestamp
    df = df[df['pickup_datetime'] >= time_filter_utc]
    # exclude rows with trip_miles = 0
    df = df[df['trip_miles'] > 0.0]
    # exclude rows with trip_miles > 200
    df = df[df['trip_miles'] < 200.0]

    # return a Pandas dataframe
    return df


# with materialize=True we materialize this table into our active branch of the data catalog as an Iceberg table
@bauplan.model(materialize=True)
# for this function we specify a different dependency, namely DuckDB 0.10.3
@bauplan.python('3.11', pip={'duckdb': '0.10.3'})
def top_pickup_locations(
        trips=bauplan.Model(
            'normalized_taxi_trips',
            columns=['*']
        ),
        zones=bauplan.Model(
            'taxi_zones',
            columns=['*'],
        ),
):
    """

    this function takes:
    - the parent output of the function 'normalized_taxi_trips',
    - the table 'taxi_zones' directly from the data catalog, and
    - joins them to do some grouping and reordering over the number of trips using DuckDB https://duckdb.org/docs/
    the output is a table with all the NY zones ordered by number of trips

    """
    import duckdb

    # the following code uses DuckDB
    # because DuckDB can query directly Arrow tables we do not need to do anything and can query directly the input tables
    # the input tables 'trips' and 'zones' are referenced directly in the SQL in the FROM and in the JOIN clauses
    sql_query = f"""
    SELECT
        COUNT(trips.pickup_datetime) AS number_of_trips,
        trips.PULocationID,
        zones.Borough,
        zones.Zone
    FROM trips
    JOIN zones
    ON trips.PULocationID = zones.LocationID
    GROUP BY
        trips.PULocationID,
        zones.Borough,
        zones.Zone
    ORDER BY
        number_of_trips DESC;
    """
    # run the query and return the results as an Arrow Table
    data = duckdb.sql(sql_query).arrow()

    # return an Arrow Table
    return data

To showcase the fact that all function are fully containerized, we used different libraries to compute each step: we are going to use Pandas for the first model normalized_taxi_trips and DuckDB for the second one top_pickup_locations. These are arbitrary implementation choices. You can refactor this pipeline using the dependencies you are more comfortable with.

Note

For instance, you could rewrite the pipeline using only Pandas. What would the code of the pipeline look like in either cases?

Create a branch and run the pipeline in it so you will have a table in it named top_pickup_locations,

bauplan branch create <YOUR_BRANCH>
bauplan branch checkout <YOUR_BRANCH>
bauplan run

To make sure everything went as expected inspect the table from your branch.

bauplan table get top_pickup_locations

Streamlit app

To visualize the data we will use Streamlit, a powerful Python framework to build web apps in pure Python. Go into the 01-simple-data-app/app folder,

Create a virtual environment, install the necessary requirements and then run the Streamlit app.

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
streamlit run viz_app.py

This app will simply visualize the final table of our pipeline, nothing fancy. Note that you will need to pick a branch that contains the table top_pickup_locations. This is what it looks like.

simple_data_app.png

A simple Data App

The programmable Lakehouse in action: Bauplan Python SDK

One of the fundamental features of Bauplan is that every core operation of the platform can always be embedded in the code of another application: that’s why we call it The Programmable Lakehouse. All the main concepts illustrated in the tutorial - running, querying, branching, merging - can be accessed with a simple Python SDK. This makes your data stack infinitely programmable and incredibly easy to embed in any existing workflow.

This Streamlit app is a very simple example to illustrate how we can use Bauplan inside another application by simply using our Python SDK. In this example, we showcase query, a method that allows us to use Bauplan as a query engine and run a SQL query and easily cast the results as a Pandas DataFrame. We can use this method to run an interactive queries against any table in any branch of the data catalog that we have access to.

import bauplan

client = bauplan.Client()

# run a query against an arbtrary branch and cast the results to a DataFrame
df = client.query('SELECT c1 FROM my_table', branch_name='my_branch').to_pandas()

We use this method to fetch the data from the table top_pickup_location from a branch and visualize the table in the app Streamlit app.

The entire code of the Streamlit app is the following. The code is heavily commented, check it out to know more about how we used the SDK in the app.

# General Streamlit / Visualization imports
import streamlit as st
import sys
import grpc
import pandas as pd
import matplotlib.pyplot as plt
# we import the bauplan Python SDK, with pre-built functions
# Client.query allows us to use bauplan as an interactive query engine
# we can run query against a table in the data catalog and put the data in a Pandas dataframe
import bauplan

client = bauplan.Client()

@st.cache_data()
def query_as_dataframe(sql, branch, top):
    """
    This function uses query method to query a table in the data catalog
    and cast it as DataFrame
    """
    try:
        return client.query(sql, branch_name=branch, args={'preview': 'True'}).to_pandas().head(top)
    except bauplan.exceptions.BauplanError:
        return None

def plot_bar_chart(df):
    """
        This function plots a bar chart from the table top_pickup_location
    """
    plt.figure(figsize=(11, 11))
    plt.barh(df['Zone'], df['number_of_trips'], color='skyblue', edgecolor='white')
    plt.ylabel('Zone')
    plt.xlabel('Number of Trips')
    plt.title('Number of Trips per Zone')
    plt.tight_layout()
    st.pyplot(plt)

### THE STREAMLIT APP BEGINS HERE
def main():

    st.title('A simple data app to visualize taxi rides and locations in NY')
    # Debug line to ensure correct Python interpreter
    print(sys.executable)
    # define a text input field where the user can indicate her active branch
    selected_branch = st.text_input("What branch are you looking for?", " ")

    if selected_branch.strip():
        # define the target table and the query to run
        table_name = 'top_pickup_locations'
        sql_query = f"SELECT * FROM {table_name}"
        # use bauplan sdk to retrieve the data from the data catalog as a Pandas DataFrame
        df = query_as_dataframe(sql_query, selected_branch, 50)
        if df is not None and not df.empty:
            st.dataframe(df, width=1200)
            plot_bar_chart(df)
        else:
            st.write('No data available for the selected branch or table.')
    else:
        st.write('Please input your branch')

if __name__ == "__main__":
    main()

Summary

We have shown how you can use Bauplan to develop a data pipeline that powers a simple data visualization app.

Through very simple abstractions we were able to:

  • create a branch of our data catalog,

  • build a transformation pipeline that uses several dependencies,

  • materialize the results of the pipeline in the data lake as Iceberg tables.

At no point did we have to worry about the configuration of the compute at runtime, the containerization and management of the Python environments, and the operations needed to persist the data in the data lake and the data catalog.

In addition, with a simple import statement we were able to embed the capabilities of the Bauplan runtime into our Streamlit app. Thanks to Bauplan Python SDK it is simple to use the capabilities of the platform inside other applications.