Airflow 3
Airflow is one of the most widely adopted orchestrators in data engineering. It provides a central way to define, schedule, and monitor workflows, making it the backbone for many production data platforms. With the modern TaskFlow API introduced in Airflow 3, pipelines can be written as plain Python functions, keeping code simple and maintainable.
Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code - from running pipelines to managing branches and tables. Airflow 3 is just Python: with the TaskFlow API, tasks are regular Python functions and DAGs are defined in code. Integrating the two is straightforward: import the Bauplan SDK inside Airflow tasks and call Bauplan methods as you would in a standalone script. Airflow handles orchestration, logging, and scheduling, while Bauplan executes the data work. The result is a clean split: Airflow runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.
Quickstart: run a Bauplan pipeline from an Airflow DAG
Goal: Call a Bauplan project from an Airflow DAG in the smallest possible way: authenticate, run, check status, and fail the DAG if the Bauplan job fails.
Prerequisites
- Python 3.10–3.12 (Airflow 3 supports these, Bauplan support 3.10 and above).
- A Bauplan project (a folder containing your pipeline code and
bauplan_project.yml
). - A Bauplan API key (via environment or passed to the client).
- Airflow 3 installed. See “Installation from PyPI” and “Quick Start.”
If your pipeline writes data, run it on a branch (not on main
). Branch names must be prefixed with your Bauplan username, for example alice.airflow_docs
. Learn more about branches.
Minimal DAG
Define an Airflow DAG that runs your Bauplan project and fails the DAG if the Bauplan job fails.
# bauplan_airflow3_flow.py
from airflow.sdk import dag, task
from airflow.exceptions import AirflowException
import pendulum
import bauplan
@task(task_id="run-bauplan-pipeline")
def run_pipeline(
project_dir: str,
bauplan_branch_suffix: str,
) -> dict:
"""
Execute a Bauplan project and return a minimal, serializable summary.
Returns:
dict with 'job_id' and 'job_status'.
Raises:
AirflowException if the job did not finish with status 'success'.
"""
client = bauplan.Client() # build client inside the task
username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name
state = client.run(project_dir=project_dir, ref=branch)
if str(state.job_status).lower() != "success":
raise AirflowException(
f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)
# return only simple types so Airflow can store results safely via XCom
return {"job_id": state.job_id, "job_status": state.job_status}
@task(task_id="report-success")
def report_success(result: dict) -> None:
print(f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})")
@dag(
dag_id="bauplan-pipeline-run",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["bauplan", "integration"],
)
def main(
pipeline_dir: str,
bauplan_branch_suffix: str,
):
"""
Minimal Airflow → Bauplan integration: run a project and fail fast on error.
"""
result = run_pipeline(project_dir=pipeline_dir, bauplan_branch_suffix=bauplan_branch_suffix)
report_success(result)
# Instantiate the DAG with your arguments
dag = main(
pipeline_dir="your_bauplan_project", # change this with the path to your bauplan project
bauplan_branch_suffix="airflow_docs", # change this with the name of your branch
)
if __name__ == "__main__":
dag.test()
This uses Airflow 3’s public Task SDK (airflow.sdk
) to define DAGs and tasks as plain Python functions. dag.test()
lets you execute the DAG in a single process for fast local debugging.
How to run it locally
To run Airflow components explicitly, start a lightweight local webserver and a scheduler, plus the default SQLite metadata DB.
In one terminal, run these commands (the first two are needed only the first time):
python -m venv .venv && source .venv/bin/activate
pip install apache-airflow bauplan pendulum
python -m airflow db init
python -m airflow webserver
Then in another terminal (same virtual environment), run:
python -m airflow scheduler
This gives you the Airflow UI at http://localhost:8080 and will pick up your DAGs under $AIRFLOW_HOME/dags
.
To run in-process with dag.test()
for fast iteration, create a Python file named bauplan_airflow3_flow.py
, paste the code above in it and run it in another window of your terminal:
python bauplan_airflow3_flow.py
These instructions work at the time of this documentation, however see Airflow quickstart instructions and Airflow debugging guide for details.
Bauplan authentication
Bauplan looks for credentials in a clear precedence order:
- Environment variable -
BAUPLAN_API_KEY
- Profile name -
BAUPLAN_PROFILE
(points to a saved profile) - Config file -
~/.bauplan/config.yml
You can set a key directly in code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key
explicitly will always override other sources.
import bauplan
client = bauplan.Client(api_key="YOUR_KEY")
Passing parameters (optional)
When running Bauplan with an orchestrator, parameters are the key to making your flows reusable and dynamic.
Instead of hardcoding values, parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters and row limits. This is especially important in scheduled environments, where orchestrators trigger the same job many times, so the same flow can run daily with different inputs. Too learn how to set up parameters in Bauplan, see this page.
Learn how to set up parameters in Bauplan.
Suppose you have a Bauplan model that filters rows by a parameter start_time
:
# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00
# models.py
import bauplan
@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()
print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")
return data
You can set the start_time
when running the pipeline through Airflow:
# bauplan_airflow3_with_params.py
from airflow.sdk import dag, task
from airflow.exceptions import AirflowException
import pendulum
import bauplan
@task
def run_with_parameters_task(
project_dir: str,
bauplan_branch_suffix: str,
parameters: dict,
) -> dict:
client = bauplan.Client() # build client inside the task
username = client.info().user.username
branch = f"{username}.{bauplan_branch_suffix}"
state = client.run(project_dir=project_dir, ref=branch, parameters=parameters)
if str(state.job_status).lower() != "success":
raise AirflowException(
f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)
return {"job_id": state.job_id, "job_status": state.job_status}
@dag(
dag_id="bauplan-pipeline-run-with-parameters",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["bauplan", "parameters"],
)
def run_with_parameters(
project_dir: str,
bauplan_branch_suffix: str, # your own branch
parameters: dict, # your arbitrary parameters
):
run_with_parameters_task(
project_dir=project_dir,
bauplan_branch_suffix=bauplan_branch_suffix,
parameters=parameters,
)
dag = run_with_parameters(
project_dir="your_bauplan_project", # change this with the path to your bauplan project
bauplan_branch_suffix="your_branch", # change this with the name of your branch
parameters={"start_time": "2023-01-01T00:00:00-05:00"},
)
if __name__ == "__main__":
dag.test()
This will run the Bauplan project on the given branch, applying start_time="2023-01-01T00:00:00-05:00"
inside the model so only rides on or after 2023-01-01 are processed.