Walkthrough: refs, commits, branches, and tags¶
When developing in Bauplan, you always start by creating a new development branch from a source branch, like main
:
import bauplan
client = bauplan.Client()
client.create_branch('ciro.feature_xyz', from_ref='main')
Since nothing has changed in your development branch yet, both branches point to the same commit. You can verify that, and learn the get_commits
API:
my_branch_last_commit = client.get_commits(my_branch, limit=1)[0]
source_branch_last_commit = client.get_commits('main', limit=1)[0]
assert my_branch_last_commit.ref.hash == source_branch_last_commit.ref.hash
Now let’s run a pipeline on our branch. This pipeline materializes a table based on its parameter (so if run_id=1
you will find a table with that value), therefore generating a new commit:
run_1 = client.run(..., ref=my_branch, parameters={'run_id': 1})
The branch head now changed:
my_branch_run_1_commit = client.get_commits(my_branch, limit=1)[0]
assert my_branch_run_1_commit.ref.hash != source_branch_last_commit.ref.hash
Crucially, each commit records the job that generated it. This means you can always trace back which run created what data:
job_id_in_the_commit = my_branch_run_1_commit.properties['bpln_job_id']
assert job_id_in_the_commit == run_1.job_id
Let’s run the pipeline again with a different DAG parameter: this creates a new commit, since run_2
comes after run_1
on our branch:
run_2 = client.run(..., ref=my_branch, parameters={'run_id': 2})
Now if you query the materialized table, it will reflect the latest value:
rows = client.query(run_id_query, ref=my_branch).to_pylist()
assert rows == [{'run_id': 2}]
But you can just as easily query the previous state by passing the older commit (the object my_branch_run_1_commit
we got just after run 1):
rows = client.query(run_id_query, ref=my_branch_run_1_commit.ref).to_pylist()
assert rows == [{'run_id': 1}]
Tags: giving names to important commits¶
To simplify navigation, you can tag specific commits. For instance, we might want to mark a dataset version that passed compliance checks:
tag_1_ref = client.create_tag(my_compliance_dataset_tag, my_branch_run_1_commit.ref)
Now you can use the tag as a permanent reference in our operations, for example when querying:
target_tag = client.get_tag(my_compliance_dataset_tag)
rows = client.query(run_id_query, ref=target_tag).to_pylist()
assert rows == [{'run_id': 1}]
Who did what?¶
Since every commit is tracked, you can also filter by author to audit recent changes:
my_author_commit_history = client.get_commits(my_branch, filter_by_author_name=full_name, limit=5)
Inspecting Failed Runs¶
All runs are transactional by default: i.e. a pipeline run either succeeds and it’s on the branch, or fails and the branch is untouched. In practice, a temporary branch is created from the current branch and all the intermediate artifacts are materialized there — which we can easily check with our APIs:
failed_run = client.run(..., ref=my_branch, parameters={'something_that_fails_the_DAG': ...})
# This confirms the current branch still reflects the last successful run
assert client.query(run_id_query, ref=my_branch_name).to_pylist() == [{'run_id': 2}]
You can now inspect the failed run’s job metadata, logs, and any intermediate tables that were materialized:
logs = client.get_job_logs(job_id)
for log_line in logs:
print(f'[{log_line.stream.name}] {log_line.message}')
Reverting back in time¶
Suppose that we now decide that the original version — the one tagged as “compliant” — is the one we want. We can revert the table to that state with a simple API:
revert_ref = client.revert_table(
table=my_test_table_name,
source_ref=target_tag,
into_branch=my_branch_name,
# this commit will be added to the standard body for clarity!
commit_body=f'Revert to tag {my_compliance_dataset_tag}',
replace=True
)
You confirm it worked by querying the table again, and checking we got the table from the first run back:
rows = client.query(run_id_query, ref=my_branch_name).to_pylist()
assert rows == [{'run_id': 1}]
Just like that, we’ve reverted to a known-good state!