Prefect
Prefect is a modern workflow orchestrator built entirely in Python. Instead of managing YAML or DSLs, you define flows and tasks as plain Python functions, and Prefect takes care of scheduling, retries, logging, and observability. This makes it easy to embed external libraries and SDKs directly into your workflows.
Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code — from running pipelines to managing branches and tables. Integrating the two is straightforward: you import the Bauplan SDK inside your Prefect tasks and call Bauplan methods as you would in a standalone script. Prefect handles orchestration, logging, and scheduling, while Bauplan executes the data work. The result is a clean split: Prefect runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.
Quickstart: run a Bauplan pipeline from a Prefect flow
Goal: Call a Bauplan project from a Prefect flow in the smallest possible way: authenticate, run, check status, and fail the flow if the Bauplan job fails.
Prerequisites
- Python 3.10+ installed.
- A Bauplan project (a folder containing your pipeline code and
bauplan_project.yml
). - A Bauplan API key (via environment or passed to the client).
- Prefect 3 installed.
If your pipeline writes data, run it on a data branch (not on main
). Branch names must be prefixed with your Bauplan username, for example alice.prefect_docs
.
Minimal flow
Define a Prefect flow that runs your Bauplan project and fails the flow if the Bauplan job fails.
# bauplan_prefect_flow.py
from prefect import flow, task, get_run_logger
import bauplan
@task(name="run-bauplan-pipeline")
def run_pipeline(
project_dir: str,
bauplan_branch_suffix: str,
) -> dict:
"""
Execute a Bauplan project and return a minimal, serializable summary.
Returns:
dict with 'job_id' and 'job_status'.
Raises:
RuntimeError if the job did not finish with status 'success'.
"""
client = bauplan.Client() # build client inside the task
username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name
state = client.run(project_dir=project_dir, ref=branch)
if status.job_status.lower() != "success":
raise RuntimeError(f"Bauplan job {state.job_id} ended with status='{state.job_status}'")
# return only simple types so Prefect can store results safely
return {"job_id": state.job_id, "job_status": job_status}
@flow(name="bauplan-pipeline-run")
def main(
pipeline_dir: str,
bauplan_branch_suffix: str,
) -> dict:
"""
Minimal Prefect → Bauplan integration: run a project and fail fast on error.
"""
logger = get_run_logger()
result = run_pipeline(project_dir=pipeline_dir, bauplan_branch_suffix=bauplan_branch_suffix)
get_run_logger().info(
f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})"
)
return result
if __name__ == "__main__":
pipeline_path = 'your_bauplan_project' # change this with the path to your bauplan project
branch_suffix = 'prefect_docs' # change this with your branch name
main(pipeline_path, branch_suffix)
How to run it
By default, this code runs fully local with Prefect. You do not need to run a Prefect API or server for this example. If you prefer to use Prefect Server or Cloud for observability and scheduling, you can do that later without changing the Bauplan code. Run the following commands in your terminal, after which you’ll be prompted in your browser to create or login to your free Prefect Cloud account. This commands use uv, a Python package manager. To install uv see the uv Installation Guide. To see the updated documentation on how to run Prefect locally see here.
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .venv
source .venv/bin/activate
uv pip install prefect bauplan
prefect cloud login
Then create a python file named bauplan_prefect_flow.py
, paste the code above in it and run it in another window of your terminal:
python bauplan_prefect_flow.py
Bauplan authentication
Bauplan looks for credentials in a clear precedence order:
- Environment variable –
BAUPLAN_API_KEY
- Profile name –
BAUPLAN_PROFILE
(points to a saved profile) - Config file –
~/.bauplan/config.yml
This means you can either set a key directly in your code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key
explicitly will always override other sources.
import bauplan
client = bauplan.Client(api_key="YOUR_KEY") # overrides environment/profile
Passing parameters (optional)
When running Bauplan with an orchestrator, parameters are the key to making your flows reusable and dynamic.
Instead of hardcoding values, parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters and row limits. This is especially important in scheduled environments, where orchestrators trigger the same job many times, so the same flow can run daily with different inputs. Too learn how to set up parameters in Bauplan, see this page.
Suppose you have a Bauplan model that filters rows by a parameter start_time
:
# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00"
# models.py
import bauplan
@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()
print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")
return data
You can set the start_time
when running the pipeline through Prefect:
# bauplan_prefect_with_params.py
from prefect import flow
import bauplan
@flow
def run_with_parameters(
project_dir: str,
bauplan_branch_suffix: str, # your own branch
parameters: dict # your arbitrary parameters
):
client = bauplan.Client() # build client inside the task
username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name
state = client.run(project_dir=project_dir, ref=branch, parameters=parameters)
if state.job_status != "SUCCESS":
raise RuntimeError(f"Bauplan job {state.job_id} ended with status='{state.job_status}'")
# return only simple types so Prefect can store results safely
return {"job_id": state.job_id, "job_status": state.job_status}
if __name__ == "__main__":
pipeline_path = 'your_bauplan_project' # change this with the path to your bauplan project
branch_suffix = 'prefect_docs' # change this with the name of your branch
run_with_parameters(
project_dir=pipeline_path,
bauplan_branch_suffix=branch_suffix,
parameters={"start_time": "2023-01-01T00:00:00-05:00"}
)
This will run the a bauplan project in a certain branch, applying start_time="2023-01-01T00:00:00-05:00"
inside the model, so only the rides after the 01-01-2023 are processed.