Detached Job Runs
In Bauplan you can run jobs in a detached mode. This means that the job will be submitted to the system and the client will return immediately, then the job can be followed up on later.
When to use:
- Production / scheduled pipelines - a scheduler (for example Airflow, Prefect) triggers a pipeline on a recurring cadence and does not need to hold a connection open for the entire run.
- Long-running jobs - large data imports or complex DAGs that may take minutes or hours to complete.
- Unreliable connectivity - you cannot guarantee a stable connection between your client and the Bauplan server for the duration of the job.
During interactive development, attached mode (bauplan run) gives you
real-time feedback. Once a pipeline moves to production, detached mode
is the recommended approach.
Running in Detached Mode
For regular runs:
- CLI
- SDK
$ bauplan run --detach
The job ID is printed to the console.
run_state = client.run(
project_dir='path/to/project',
ref='yourusername.your_branch',
detach=True,
)
job_id = run_state.job_id
The job ID is available on the returned state object via .job_id.
For data imports:
- CLI
- SDK
$ bauplan table import --search-uri s3://my-bucket/path --detach my_table
The job ID is printed to the console.
import_state = client.import_data(
table='my_table',
search_uri='s3://my-bucket/path',
branch='my_branch',
detach=True,
)
job_id = import_state.job_id
The job ID is available on the returned state object via .job_id.
Checking Job Status
- CLI
- SDK
Use job get to check a single job:
$ bauplan job get <YOUR_JOB_ID>
Job ID: <YOUR_JOB_ID>
Status: Running
Kind: CodeSnapshotRun
User: yourusername
Runner: bauplan-runner-dev
Created: 23 seconds ago
Use job ls to see recent jobs:
$ bauplan job ls
ID TYPE USER STATUS CREATED FINISHED
9f4b6b03 CodeSnapshotRun yourusername Completed 3 minutes ago 2 minutes ago
eeef1b40 CodeSnapshotRun yourusername Completed 3 minutes ago 2 minutes ago
0dd03881 CodeSnapshotRun yourusername Completed 3 minutes ago 2 minutes ago
72d0703e CodeSnapshotRun yourusername Aborted 2 hours ago 2 hours ago
1d0205ef CodeSnapshotRun yourusername Aborted 2 hours ago 2 hours ago
By default, this returns the 10 most recent jobs. Use --limit to
fetch more, or --created-after / --created-before to filter by date.
Use get_job to check a single job:
job = client.get_job(job_id)
print(f'Status: {job.status}')
Use get_jobs to list recent jobs with optional filters:
from datetime import datetime, timezone
for job in client.get_jobs(limit=10):
print(f'{job.id} {job.status}')
# Filter by date range
for job in client.get_jobs(
filter_by_created_after=datetime(2025, 1, 1, tzinfo=timezone.utc),
limit=5,
):
print(f'{job.id} {job.status}')
Retrieving Job Output
Logs are persisted for 24 hours as a way to troubleshoot jobs, after which they are deleted.
- CLI
- SDK
$ bauplan job logs <YOUR_JOB_ID>
This is the 0th time I'm very sleepy, and I've become exceedingly efficient at it
This is the 1st time I'm very sleepy, and I've become exceedingly efficient at it
This is the 2nd time I'm very sleepy, and I've become exceedingly efficient at it
logs = client.get_job_logs(job_id)
for log_line in logs:
print(f'[{log_line.stream}] {log_line.message}')
Cancel a Job
To cancel a running job, use the CLI or SDK. When a job is
successfully cancelled, its status changes to Aborted.
- CLI
- SDK
$ bauplan job stop <YOUR_JOB_ID>
Job <YOUR_JOB_ID> stopped
cancel_job polls
until the job reaches
JobState.ABORT
and then returns.
client.cancel_job(job_id)
Using the Python SDK
Both run() and import_data() methods support the detach parameter. The following snippet shows how the primitives above - launching a job, polling for status, and retrieving logs - can be combined programmatically in a single script.
import time
import bauplan
from bauplan.schema import JobState
def run_detached_job_and_poll() -> None:
client = bauplan.Client()
run_state = client.run(
project_dir='path/to/longrunning_project',
ref='yourusername.run_detached_job',
detach=True,
)
job_id = run_state.job_id
assert job_id is not None, 'Expected a job ID from detached run'
print(f'Job submitted in detach mode with ID={job_id}')
max_retries = 10
retry_count = 0
while retry_count < max_retries:
job = client.get_job(job_id)
print(f'Polling job={job.id}, state={job.status}')
if job.status in (JobState.COMPLETE, JobState.FAIL, JobState.ABORT):
print(f'Job finished with state={job.status}')
break
time.sleep(1)
retry_count += 1
else:
raise TimeoutError(f'Job polling exceeded {max_retries} retries')
logs = client.get_job_logs(job_id)
for log_line in logs:
print(f'[{log_line.stream}] {log_line.message}')
if __name__ == '__main__':
run_detached_job_and_poll()