Skip to main content

Temporal

Bauplan exposes a Python SDK that lets you do everything with your lakehouse in code — from running pipelines to managing branches and tables. Temporal is just Python too: you write Workflows and Activities as regular Python functions using the Temporal Python SDK decorators. Integrating the two is straightforward: import the Bauplan SDK inside a Temporal Activity and call Bauplan methods as you would in a standalone script. Temporal handles orchestration, durability, logging, and scheduling, while Bauplan executes the data work. The result is a clean split: Temporal runs the workflow, Bauplan runs the pipelines, the queries, and the lakehouse infrastructure.

Quickstart: run a Bauplan pipeline from a Temporal Workflow

Goal: Call a Bauplan project from a Temporal Workflow in the smallest possible way: authenticate, run, check status, and fail the Workflow if the Bauplan job fails.

Prerequisites

  • Python 3.10-3.12 (Bauplan supports 3.10+; the Temporal Python SDK supports modern Python 3.x and is distributed on PyPI as temporalio).
  • A Bauplan project (a folder containing your pipeline code and bauplan_project.yml).
  • A Bauplan API key (via environment or passed to the client).
  • Temporal installed for local development:
    • Install the Python SDK: pip install temporalio.
    • Install the Temporal CLI and start the dev server with temporal server start-dev. This also launches the Web UI at http://localhost:8233.
note

If your pipeline writes data, run it on a branch (not on main). Branch names must be prefixed with your Bauplan username, for example alice.temporal_docs. Learn more.

Minimal workflow

Define a Temporal Activity that runs your Bauplan project, a Workflow that calls it, and fail fast if the Bauplan job fails. Temporal requires a Worker polling a Task Queue for your Workflow and Activities. In the Workflow, specify an Activity timeout (Temporal requires at least start_to_close_timeout or schedule_to_close_timeout).

# workflow_and_worker.py
import asyncio
from datetime import timedelta
from uuid import uuid4
from concurrent.futures import ThreadPoolExecutor

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

# Import activities as pass-through so the workflow sandbox doesn't re-import bauplan
with workflow.unsafe.imports_passed_through():
from activities import run_bauplan_activity, report_success_activity

@workflow.defn
class BauplanWorkflow:
@workflow.run
async def run(self, pipeline_dir: str, bauplan_branch_suffix: str) -> dict:
"""
Minimal Temporal → Bauplan integration: run a project and fail fast on error.
"""
result = await workflow.execute_activity(
run_bauplan_activity,
args=[pipeline_dir, bauplan_branch_suffix],
schedule_to_close_timeout=timedelta(minutes=60), # required by Temporal
)
await workflow.execute_activity(
report_success_activity,
args=[result],
schedule_to_close_timeout=timedelta(seconds=30),
)
return result

async def main():
# Connect a client to the local Temporal dev server
client = await Client.connect("localhost:7233")
task_queue = "bauplan-pipeline-run"

# Start a worker that hosts the workflow and activities
worker = Worker(
client,
task_queue=task_queue,
workflows=[BauplanWorkflow],
activities=[run_bauplan_activity, report_success_activity],
# Activities are sync (they call Bauplan); run them in a thread pool
activity_executor=ThreadPoolExecutor(max_workers=8),
)

# Run the worker and start a workflow in the same process for fast local debugging
async with worker:
handle = await client.start_workflow(
BauplanWorkflow.run,
id=f"bauplan-{uuid4()}",
task_queue=task_queue,
args=[
"your_bauplan_project", # change this with the path to your bauplan project
"temporal_docs", # change this with your branch suffix
],
)
print("Workflow result:", await handle.result())

if __name__ == "__main__":
asyncio.run(main())

# activities.py
from temporalio import activity
from pathlib import Path

@activity.defn
def run_bauplan_activity(project_dir: str, bauplan_branch_suffix: str) -> dict:
# Import inside the activity so it never touches the workflow sandbox
import bauplan

project_path = Path(project_dir).expanduser().resolve()
if not project_path.exists():
raise RuntimeError(f"Project path does not exist: {project_path}")

client = bauplan.Client() # credentials: BAUPLAN_API_KEY / BAUPLAN_PROFILE / ~/.bauplan/config.yml
username = client.info().user.username
branch = f"{username}.{bauplan_branch_suffix}"

state = client.run(project_dir=str(project_path), ref=branch)
if str(state.job_status).lower() != "success":
raise RuntimeError(f"Bauplan job {state.job_id} ended with status='{state.job_status}'")

return {"job_id": state.job_id, "job_status": state.job_status}

@activity.defn
def report_success_activity(result: dict) -> None:
print(f"Bauplan run succeeded (job_id={result['job_id']}, status={result['job_status']})")

This uses Temporal’s Python SDK to define Workflows and Activities as plain Python functions, runs a Worker, and starts the Workflow in-process for fast local debugging. The Activity timeout on execute_activity is required and ensures correct retry/failure semantics. The imports_passed_through() block prevents the workflow sandbox from importing bauplan (which would violate determinism).

How to run it locally

Install and start Temporal dev server in one terminal (install both the Python SDK and the CLI):

python -m venv .venv && source .venv/bin/activate
pip install temporalio bauplan
brew install temporal
temporal server start-dev

This gives you the Temporal Web UI at http://localhost:8233 and a local gRPC endpoint at localhost:7233.

Run the worker and workflow in another terminal (same venv):

python workflow_and_worker.py

This starts a Worker and immediately kicks off a BauplanWorkflow run with your arguments. Use an absolute project path. If you change code, restart this Python process and start a new workflow execution.

Small tips

Retries in dev. To avoid noisy auto-retries while iterating, set a no-retry policy in the workflow call:

from temporalio.common import RetryPolicy

result = await workflow.execute_activity(
run_bauplan_activity,
args=[pipeline_dir, bauplan_branch_suffix],
schedule_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(maximum_attempts=1),
)
  • Sandbox rule of thumb. Never import bauplan at workflow module top-level. Keep it inside activities (or in an activities.py imported under imports_passed_through()).
  • Dev loop. Leave the dev server running. Restart only your Python worker when code changes. Start a fresh workflow execution each run (the snippet uses a new UUID each time).

Bauplan authentication

Bauplan looks for credentials in a clear precedence order:

  1. Environment variableBAUPLAN_API_KEY
  2. Profile nameBAUPLAN_PROFILE (points to a saved profile)
  3. Config file~/.bauplan/config.yml

You can set a key directly in code, export it as an environment variable for scripts and CI/CD, or manage multiple profiles via the config file. Passing api_key explicitly will always override other sources.

import bauplan
client = bauplan.Client(api_key="YOUR_KEY")

Passing parameters (optional)

Parameters let you control your pipeline logic in Bauplan without changing the code. They show up in your SQL or Python models as templated values. Typical uses: date filters, limits, or toggling options.

Learn how to set up parameters in Bauplan.

Suppose you have a Bauplan model that filters rows by a parameter start_time:

# bauplan_project.yml
project:
id: 2b74eb5c-548f-4192-bf18-fcfa33a0746f
name: test-pipeline
parameters:
start_time:
type: str
default: "2022-12-15T00:00:00-05:00"
# models.py
import bauplan

@bauplan.model(materialization_strategy='REPLACE')
@bauplan.python('3.11', pip={'pandas': '2.2.0'})
def filtered_taxi_rides_parameters(
data=bauplan.Model(
"taxi_fhvhv",
columns=['pickup_datetime'],
filter="pickup_datetime >= $start_time"
),
):
data = data.to_pandas()

print(f"\nEarliest pickup in result:, {data['pickup_datetime'].min()}")
print(f"\nRows returned:, {len(data)}\n")

return data

You can set the start_time when running the pipeline through Temporal.

# activities_with_param.py
from temporalio import activity
from typing import Dict, Any

@activity.defn
def run_with_parameters_activity(
project_dir: str,
bauplan_branch_suffix: str,
parameters: Dict[str, Any],
) -> dict:
# Import inside the activity so it never touches the workflow sandbox
import bauplan

client = bauplan.Client()
username = client.info().user.username
branch = f"{username}.{bauplan_branch_suffix}"

state = client.run(project_dir=project_dir, ref=branch, parameters=parameters)

if str(state.job_status).lower() != "success":
raise RuntimeError(
f"Bauplan job {state.job_id} ended with status='{state.job_status}'"
)

return {"job_id": state.job_id, "job_status": state.job_status}

# workflow_and_worker_with_param.py
import asyncio
from datetime import timedelta
from uuid import uuid4
from typing import Dict, Any

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
from concurrent.futures import ThreadPoolExecutor

# Import activity via passthrough so the workflow sandbox doesn’t re-import bauplan
with workflow.unsafe.imports_passed_through():
from activities import run_with_parameters_activity

@workflow.defn
class RunWithParametersWorkflow:
@workflow.run
async def run(
self,
project_dir: str,
bauplan_branch_suffix: str,
parameters: Dict[str, Any],
) -> dict:
return await workflow.execute_activity(
run_with_parameters_activity,
args=[project_dir, bauplan_branch_suffix, parameters],
schedule_to_close_timeout=timedelta(minutes=60),
)

async def main():
client = await Client.connect("localhost:7233")
task_queue = "bauplan-parameters"

worker = Worker(
client,
task_queue=task_queue,
workflows=[RunWithParametersWorkflow],
activities=[run_with_parameters_activity],
activity_executor=ThreadPoolExecutor(max_workers=8), # needed since activity is sync
)

async with worker:
handle = await client.start_workflow(
RunWithParametersWorkflow.run,
id=f"bauplan-params-{uuid4()}",
task_queue=task_queue,
args=[
"your_bauplan_project", # change this with the path to your bauplan project
"temporal_docs", # change this with your branch suffix
{"start_time": "2023-01-01T00:00:00-05:00"},
],
)
result = await handle.result()
print("Workflow result:", result)

if __name__ == "__main__":
asyncio.run(main())

This will run the Bauplan project on the given branch, applying start_time="2023-01-01T00:00:00-05:00" inside the model so only rides on or after 2023-01-01 are processed.