Skip to main content

Dagster

Dagster is a modern orchestrator designed specifically for data applications. Instead of just scheduling tasks, it treats pipelines as first-class objects, with strong typing, assets, and metadata built in. This makes it attractive for teams who want observability and reproducibility beyond traditional DAGs.

Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code — from running pipelines to managing branches and tables. Dagster, like Bauplan, is just Python: jobs and ops are regular functions decorated with Dagster’s APIs. Integrating the two is straightforward: import the Bauplan SDK inside your Dagster ops and call Bauplan methods as you would in a standalone script. Dagster handles orchestration, logging, and scheduling, while Bauplan executes the data work. The result is a clean split: Dagster runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.

Quickstart: run a Bauplan pipeline from a Dagster job

Goal: Call a Bauplan project from a Dagster job in the smallest possible way: authenticate, run, check status, and fail the job if the Bauplan run fails.

Prerequisites

  • Python 3.10-3.13 (Dagster officially supports 3.9–3.13; Bauplan supports 3.10 and above). PyPI.
  • A Bauplan project (a folder containing your pipeline code and bauplan_project.yml).
  • A Bauplan API key (via environment or passed to the client).
  • Dagster installed. From PyPI. See Dagster’s install and quickstart docs for details.
note

If your pipeline writes data, run it on a branch (not on main). Branch names must be prefixed with your Bauplan username, for example alice.dev_branch. Learn more about branches.

Minimal job

Define a Dagster job that runs your Bauplan project and fails the run if the Bauplan job fails.

# bauplan_dagster_flow.py
import dagster as dg
import bauplan

@dg.op(
name="run_bauplan_pipeline",
config_schema={"project_dir": str, "bauplan_branch_suffix": str},
)
def run_pipeline(context) -> dict:
"""
Execute a Bauplan project and return a minimal, serializable summary.

Returns:
dict with 'job_id' and 'job_status'.
Raises:
dg.Failure if the job did not finish with status 'success'.
"""
client = bauplan.Client() # build client inside the op

username = client.info().user.username # get username
cfg = context.op_config
branch = f"{username}.{cfg['bauplan_branch_suffix']}" # construct branch name

state = client.run(project_dir=cfg["project_dir"], ref=branch)

if str(state.job_status).lower() != "success":
raise dg.Failure(
description=f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)

# return only simple types so Dagster can store results safely
return {"job_id": state.job_id, "job_status": state.job_status}

@dg.op(name="report_success")
def report_success(context, result: dict) -> None:
context.log.info(
f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})"
)

@dg.job(name="bauplan_pipeline_run")
def main_job():
report_success(run_pipeline())

# Optional: make jobs discoverable by `dagster dev -m bauplan_dagster_job`
defs = dg.Definitions(jobs=[main_job])

if __name__ == "__main__":
# Fast local debugging: run in a single process
result = main_job.execute_in_process(
run_config={
"ops": {
"run_bauplan_pipeline": {
"config": {
"project_dir": "your_bauplan_project", # change this with the path to your bauplan project
"bauplan_branch_suffix": "dagster_docs", # change this with your branch
}
}
}
}
)
assert result.success

execute_in_process() runs the job synchronously in one process, which is ideal for quick debugging. The Dagster docs recommend this API for unit-style execution, while the UI/daemon-driven flow is used for orchestration.

How to run it locally

python -m venv .venv && source .venv/bin/activate
pip install dagster dagster-webserver bauplan

Run Dagster’s UI and daemon for a lightweight local environment:

dagster dev -m bauplan_dagster_flow

This launches the web UI and daemon together, usually on port 3000, and loads the jobs from the specified module.

Run in-process with execute_in_process() for fast iteration. Create a python file named bauplan_dagster_flow.py, paste the code above in it and run it in another window of your terminal.

python bauplan_dagster_flow.py

This executes the job directly from Python without starting the daemon or webserver, which is ideal for quick debugging. See Dagster’s API and execution guides for details on programmatic runs and run configuration.

Bauplan authentication

Bauplan looks for credentials in a clear precedence order:

  1. Environment variable - BAUPLAN_API_KEY
  2. Profile name - BAUPLAN_PROFILE (points to a saved profile)
  3. Config file - ~/.bauplan/config.yml

You can set a key directly in code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key explicitly will always override other sources.

import bauplan
client = bauplan.Client(api_key="YOUR_KEY")

Passing parameters (optional)

Parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters, limits, or toggling options. Learn how to set up parameters in Bauplan.

Suppose you have a Bauplan model that filters rows by a parameter start_time:

# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00"
# models.py
import bauplan

@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()

print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")

return data

Run the pipeline through Dagster with a parameterized op:

# bauplan_dagster_with_params.py
import dagster as dg
import bauplan

@dg.op(
name="run_bauplan_with_parameters",
config_schema={
"project_dir": str,
"bauplan_branch_suffix": str,
"parameters": dict, # arbitrary parameters for your Bauplan project
},
)
def run_with_parameters(context) -> dict:
client = bauplan.Client() # build client inside the op

username = client.info().user.username
cfg = context.op_config
branch = f"{username}.{cfg['bauplan_branch_suffix']}"

state = client.run(
project_dir=cfg["project_dir"],
ref=branch,
parameters=cfg["parameters"],
)

if str(state.job_status).lower() != "success":
raise dg.Failure(
description=f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)

return {"job_id": state.job_id, "job_status": state.job_status}

@dg.job(name="bauplan_pipeline_run_with_parameters")
def run_with_parameters_job():
run_with_parameters()

defs = dg.Definitions(jobs=[run_with_parameters_job])

if __name__ == "__main__":
run_config = {
"ops": {
"run_bauplan_with_parameters": {
"config": {
"project_dir": "your_bauplan_project", # change this with the path to your bauplan project
"bauplan_branch_suffix": "your_branch", # change this with the name of your branch
"parameters": {"start_time": "2023-01-01T00:00:00-05:00"},
}
}
}
}
result = run_with_parameters_job.execute_in_process(run_config=run_config)
assert result.success

This runs the Bauplan project on the given branch, applying start_time="2023-01-01T00:00:00-05:00" inside the model so only rides on or after 2023-01-01 are processed. Use Dagster’s run config mechanism to supply op config programmatically or via the UI.