DBOS
DBOS is a distributed operating system for workflows: it gives you durable, exactly-once execution of Python functions, with built-in scheduling, retries, and observability. You write plain Python functions, annotate them with @DBOS.workflow()
and @DBOS.step()
, and DBOS takes care of orchestration, crash recovery, and state management under the hood.
Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code — from running pipelines to managing branches and tables. DBOS is just Python too: workflows are annotated Python functions and steps are where you perform I/O. Integrating the two is straightforward: import the Bauplan SDK inside DBOS steps and call Bauplan methods as you would in a standalone script. DBOS handles orchestration, durability, retries, and scheduling, while Bauplan executes the data work. The result is a clean split: DBOS runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.
Quickstart: run a Bauplan pipeline from an DBOS DAG
Goal: Call a Bauplan project from a DBOS workflow in the smallest possible way: authenticate, run, check status, and fail the workflow if the Bauplan job fails.
Prerequisites
-
Python 3.10-3.12.
Bauplan supports 3.10+, and DBOS supports Python 3.9+; 3.10-3.12 is a good overlap.
-
A Bauplan project (a folder containing your pipeline code and
bauplan_project.yml
). -
A Bauplan API key (via environment or passed to the client).
-
DBOS installed.
If your pipeline writes data, run it on a branch (not on main
). Branch names must be prefixed with your Bauplan username, for example alice.dbos_docs
. Learn more about branches.
Minimal DBOS app
Define a DBOS step that runs your Bauplan project, a workflow that calls the step, and fail the workflow if the Bauplan job fails. DBOS requires a one-time initializer when your app starts.
# dbos_bauplan_flow.py
import os
from typing import Optional, Dict, Any
import bauplan
from dbos import DBOS, DBOSConfig
# 1) Initialize DBOS (uses SQLite by default; see "How to run it locally")
config: DBOSConfig = {
"name": "bauplan-dbos-example",
# For production, point to Postgres with DBOS_SYSTEM_DATABASE_URL
"system_database_url": os.environ.get("DBOS_SYSTEM_DATABASE_URL"),
}
DBOS(config=config)
DBOS.launch()
# 2) Define a DBOS step that performs I/O (calls Bauplan)
@DBOS.step()
def run_bauplan_step(project_dir: str, bauplan_branch_suffix: str,
parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Execute a Bauplan project and return a minimal, serializable summary.
Returns:
dict with 'job_id' and 'job_status'.
Raises:
RuntimeError if the job did not finish with status 'success'.
"""
client = bauplan.Client() # build client inside the step (I/O)
username = client.info().user.username # get username
branch = f"{username}.{bauplan_branch_suffix}" # construct branch name
state = client.run(project_dir=project_dir, ref=branch, parameters=parameters or {})
if str(state.job_status).lower() != "success":
raise RuntimeError(
f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)
return {"job_id": state.job_id, "job_status": state.job_status}
# 3) (Optional) A tiny reporting step
@DBOS.step()
def report_success(result: Dict[str, Any]) -> None:
print(f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})")
# 4) Define a DBOS workflow that calls the steps
@DBOS.workflow()
def bauplan_pipeline_workflow(
pipeline_dir: str,
bauplan_branch_suffix: str,
) -> Dict[str, Any]:
result = run_bauplan_step(pipeline_dir, bauplan_branch_suffix)
report_success(result)
return result
if __name__ == "__main__":
# Call the workflow directly for fast local debugging (runs in-process, durably).
# You can also start in the background with DBOS.start_workflow(...). # see docs
bauplan_pipeline_workflow(
pipeline_dir="your_bauplan_project", # change it with the path to your bauplan project
bauplan_branch_suffix="dbos_docs", # change it with your bauplan branch
)
This uses DBOS’s Python API: define steps and workflows as plain Python functions with decorators, and run them in-process. If a workflow raises an exception, DBOS marks it as ERROR
; if the process crashes, DBOS resumes from the last completed step.
How to run it locally
Install and initialize DBOS in your app:
python -m venv .venv && source .venv/bin/activate
pip install dbos bauplan
DBOS runs in-process and needs a database to store workflow state. By default it uses SQLite and requires no configuration. For production or multi-process execution, configure Postgres by setting DBOS_SYSTEM_DATABASE_URL
.
Run your script in-process for fast iteration:
python dbos_bauplan_workflow.py
This executes the DBOS workflow in a single process without any external scheduler or webserver, which is ideal for quick debugging. You can also start workflows in the background with DBOS.start_workflow(...)
and query them later via the DBOS CLI or client.
Bauplan authentication
Bauplan looks for credentials in a clear precedence order:
- Environment variable —
BAUPLAN_API_KEY
- Profile name —
BAUPLAN_PROFILE
(points to a saved profile) - Config file —
~/.bauplan/config.yml
You can set a key directly in code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key
explicitly will always override other sources.
import bauplan
client = bauplan.Client(api_key="YOUR_KEY")
Passing parameters (optional)
When running Bauplan with an orchestrator, parameters are the key to making your flows reusable and dynamic.
Instead of hardcoding values, parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters and row limits. This is especially important in scheduled environments, where orchestrators trigger the same job many times, so the same flow can run daily with different inputs. Too learn how to set up parameters in Bauplan, see this page.
Suppose you have a Bauplan model that filters rows by a parameter start_time
:
# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00"
# models.py
import bauplan
# models.py
@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()
print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")
return data
You can set start_time
when running the pipeline through DBOS:
# dbos_bauplan_with_params.py
import os
from typing import Dict, Any, Optional
import bauplan
from dbos import DBOS, DBOSConfig
config: DBOSConfig = {
"name": "bauplan-dbos-params",
"system_database_url": os.environ.get("DBOS_SYSTEM_DATABASE_URL"),
}
DBOS(config=config)
DBOS.launch()
@DBOS.step()
def run_with_parameters_step(
project_dir: str,
bauplan_branch_suffix: str,
parameters: Dict[str, Any],
) -> Dict[str, Any]:
client = bauplan.Client() # build client inside the step
username = client.info().user.username
branch = f"{username}.{bauplan_branch_suffix}"
state = client.run(project_dir=project_dir, ref=branch, parameters=parameters)
if str(state.job_status).lower() != "success":
raise RuntimeError(
f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)
return {"job_id": state.job_id, "job_status": state.job_status}
@DBOS.workflow()
def run_with_parameters_workflow(
project_dir: str,
bauplan_branch_suffix: str, # your own branch
parameters: Optional[Dict[str, Any]], # your arbitrary parameters
) -> Dict[str, Any]:
return run_with_parameters_step(project_dir, bauplan_branch_suffix, parameters or {})
if __name__ == "__main__":
run_with_parameters_workflow(
project_dir="your_bauplan_project", # change it with the path to your bauplan project
bauplan_branch_suffix="dbos_docs", # change it with your bauplan branch
parameters={"start_time": "2023-01-01T00:00:00-05:00"},
)
This will run the Bauplan project on the given branch, applying start_time="2023-01-01T00:00:00-05:00"
inside the model so only rides on or after 2023-01-01 are processed.