Skip to main content

Prefect

Prefect is a modern workflow orchestrator built entirely in Python. Instead of managing YAML or DSLs, you define flows and tasks as plain Python functions, and Prefect takes care of scheduling, retries, logging, and observability. This makes it easy to embed external libraries and SDKs directly into your workflows.

Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code — from running pipelines to managing branches and tables. Integrating the two is straightforward: you import the Bauplan SDK inside your Prefect tasks and call Bauplan methods as you would in a standalone script. Prefect handles orchestration, logging, and scheduling, while Bauplan executes the data work. The result is a clean split: Prefect runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.

Quickstart: run a Bauplan pipeline from a Prefect flow

Goal: Call a Bauplan project from a Prefect flow in the smallest possible way: authenticate, run, check status, and fail the flow if the Bauplan job fails.

Prerequisites

  • Python 3.10+ installed.
  • A Bauplan project (a folder containing your pipeline code and bauplan_project.yml).
  • A Bauplan API key (via environment or passed to the client).
  • Prefect 3 installed.
note

If your pipeline writes data, run it on a data branch (not on main). Branch names must be prefixed with your Bauplan username, for example alice.prefect_docs.

Learn more about branches.

Minimal flow

Define a Prefect flow that runs your Bauplan project and fails the flow if the Bauplan job fails.

# bauplan_prefect_flow.py
from prefect import flow, task, get_run_logger
import bauplan

@task(name="run-bauplan-pipeline")
def run_pipeline(
project_dir: str,
bauplan_branch_suffix: str,
) -> dict:
"""
Execute a Bauplan project and return a minimal, serializable summary.

Returns:
dict with 'job_id' and 'job_status'.
Raises:
RuntimeError if the job did not finish with status 'success'.
"""
client = bauplan.Client() # build client inside the task

username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name

state = client.run(project_dir=project_dir, ref=branch)


if status.job_status.lower() != "success":
raise RuntimeError(f"Bauplan job {state.job_id} ended with status='{state.job_status}'")

# return only simple types so Prefect can store results safely
return {"job_id": state.job_id, "job_status": job_status}


@flow(name="bauplan-pipeline-run")
def main(
pipeline_dir: str,
bauplan_branch_suffix: str,
) -> dict:
"""
Minimal Prefect → Bauplan integration: run a project and fail fast on error.
"""

logger = get_run_logger()

result = run_pipeline(project_dir=pipeline_dir, bauplan_branch_suffix=bauplan_branch_suffix)

get_run_logger().info(
f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})"
)
return result

if __name__ == "__main__":
pipeline_path = 'your_bauplan_project' # change this with the path to your bauplan project
branch_suffix = 'prefect_docs' # change this with your branch name
main(pipeline_path, branch_suffix)

How to run it

By default, this code runs fully local with Prefect. You do not need to run a Prefect API or server for this example. If you prefer to use Prefect Server or Cloud for observability and scheduling, you can do that later without changing the Bauplan code. Run the following commands in your terminal, after which you’ll be prompted in your browser to create or login to your free Prefect Cloud account. This commands use uv, a Python package manager. To install uv see the uv Installation Guide. To see the updated documentation on how to run Prefect locally see here.

curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .venv
source .venv/bin/activate
uv pip install prefect bauplan
prefect cloud login

Then create a python file named bauplan_prefect_flow.py, paste the code above in it and run it in another window of your terminal:

python bauplan_prefect_flow.py

Bauplan authentication

Bauplan looks for credentials in a clear precedence order:

  1. Environment variableBAUPLAN_API_KEY
  2. Profile nameBAUPLAN_PROFILE (points to a saved profile)
  3. Config file~/.bauplan/config.yml

This means you can either set a key directly in your code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key explicitly will always override other sources.

import bauplan
client = bauplan.Client(api_key="YOUR_KEY") # overrides environment/profile

Passing parameters (optional)

When running Bauplan with an orchestrator, parameters are the key to making your flows reusable and dynamic.

Instead of hardcoding values, parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters and row limits. This is especially important in scheduled environments, where orchestrators trigger the same job many times, so the same flow can run daily with different inputs. Too learn how to set up parameters in Bauplan, see this page.

Suppose you have a Bauplan model that filters rows by a parameter start_time:

# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00"
# models.py
import bauplan

@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()

print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")

return data

You can set the start_time when running the pipeline through Prefect:

# bauplan_prefect_with_params.py
from prefect import flow
import bauplan

@flow
def run_with_parameters(
project_dir: str,
bauplan_branch_suffix: str, # your own branch
parameters: dict # your arbitrary parameters
):

client = bauplan.Client() # build client inside the task

username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name

state = client.run(project_dir=project_dir, ref=branch, parameters=parameters)

if state.job_status != "SUCCESS":
raise RuntimeError(f"Bauplan job {state.job_id} ended with status='{state.job_status}'")

# return only simple types so Prefect can store results safely
return {"job_id": state.job_id, "job_status": state.job_status}

if __name__ == "__main__":
pipeline_path = 'your_bauplan_project' # change this with the path to your bauplan project
branch_suffix = 'prefect_docs' # change this with the name of your branch
run_with_parameters(
project_dir=pipeline_path,
bauplan_branch_suffix=branch_suffix,
parameters={"start_time": "2023-01-01T00:00:00-05:00"}
)

This will run the a bauplan project in a certain branch, applying start_time="2023-01-01T00:00:00-05:00" inside the model, so only the rides after the 01-01-2023 are processed.