Skip to main content

bauplan


classClient

A client for the Bauplan API.

bauplan.Client(
    profile: str | None = None,
    api_key: str | None = None,
    client_timeout: int | None = None,
    config_file_path: str | None = None,
)

Parameters

Using the client

import bauplan
client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query('SELECT avg(Age) AS average_age FROM bauplan.titanic limit 1', ref='main')

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

Notes on authentication

# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'

import os
os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'

os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile

# specify authentication directly - this supersedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')

# specify a profile from ~/.bauplan/config.yml - this supersedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')

Handling Exceptions

Catalog operations (branch/table methods) raise a subclass of bauplan.exceptions.BauplanError that mirror HTTP status codes.

Run/Query/Scan/Import operations raise a subclass of bauplan.exceptions.BauplanError that represents the error, and also return a bauplan.state.RunState object containing details and logs:

Run/import operations also return a state object that includes a job_status and other details. There are two ways to check status for run/import operations:

  1. try/except bauplan.exceptions.BauplanJobError
  2. check the state.job_status attribute

Logging

You can use python's standard logging apparatus to tap logs from the client:

import logging

# Enable debug logs from the Bauplan client.
logging.basicConfig()
logging.getLogger("bauplan").setLevel(logging.DEBUG)

client = bauplan.Client()
client.query("SELECT count(*) FROM titanic", ref="main")

Examples

state = client.run(...)
if state.job_status != "SUCCESS":
...

defapply_table_creation_plan (...)

Apply a plan for creating a table. It is done automatically during the table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. The most common schema conflict is two parquet files with the same column name but different datatypes.

Returns:

A bauplan.state.TableCreatePlanApplyState object.

def apply_table_creation_plan(
    plan: TableCreatePlanState | str,
    *
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> TableCreatePlanApplyState: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

plan_state = client.plan_table_creation(
table='my_table',
search_uri='s3://my-bucket/data/*.parquet',
branch='main',
namespace='my_namespace',
)
if plan_state.error:
raise Exception(f"Planning failed: {plan_state.error}")

if plan_state.can_auto_apply:
# No schema conflicts — table was already created automatically
print("Table created automatically (no conflicts)")
else:
# Schema conflicts detected (e.g. same column name, different types across files).
# Inspect and resolve the plan YAML, then apply manually.
print(plan_state.plan) # review/edit the schema plan
apply_state = client.apply_table_creation_plan(
plan=plan_state,
priority=5,
client_timeout=30,
)
if apply_state.error:
raise Exception(f"Apply failed: {apply_state.error}")
print(f"Table created after conflict resolution: {apply_state.job_status}")

defcancel_job (...)

EXPERIMENTAL: Cancel a job by ID.

def cancel_job(
    job_id: str,
) -> None: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# Kick off a pipeline without blocking, then cancel it if it's taking too long
state = client.run(
project_dir='./my_pipeline',
ref='username.dev_branch',
detach=True,
)
if state.job_id:
client.cancel_job(state.job_id)

defcreate_branch (...)

Create a new branch at a given ref. The branch name should follow the convention of username.branch_name, otherwise non-admin users won't be able to complete the operation.

Returns:

The created bauplan.schema.Branch object.

def create_branch(
    branch: str | Branch,
    from_ref: str | Ref,
    *
    if_not_exists: bool = False,
) -> Branch: ...

Parameters

Raises

Example

import bauplan

client = bauplan.Client()
user = client.info().user
assert user is not None
username = user.username

branch = client.create_branch(
branch = username+'.feature_branch',
from_ref = 'branch_name@abcd1234',
if_not_exists = True,
)

defcreate_external_table_from_metadata (...)

Create an external table from an Iceberg metadata.json file.

Returns:

The registered bauplan.schema.Table with full metadata.

def create_external_table_from_metadata(
    table: str | Table,
    metadata_json_uri: str,
    *
    namespace: str | Namespace,
    branch: str | Branch | None = None,
    overwrite: bool = False,
) -> Table: ...

Parameters

Raises

This operation creates an external table by pointing to an existing Iceberg table's metadata.json file. This is useful for importing external Iceberg tables into Bauplan without copying the data.

Example

import bauplan
client = bauplan.Client()

# Create an external table from metadata
result = client.create_external_table_from_metadata(
table='my_external_table',
metadata_json_uri='s3://my-bucket/path/to/metadata/00001-abc123.metadata.json',
namespace='my_namespace',
branch='my_branch_name',
)

defcreate_external_table_from_parquet (...)

Creates an external table from S3 files.

Returns:

A bauplan.state.ExternalTableCreateState object.

def create_external_table_from_parquet(
    table: str | Table,
    search_patterns: list[str],
    *
    branch: str | Branch | None = None,
    namespace: str | Namespace | None = None,
    overwrite: bool = False,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
    detach: bool = False,
) -> ExternalTableCreateState: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# Create from S3 files
state = client.create_external_table_from_parquet(
table='my_external_table',
search_patterns=['s3://path1/to/my/files/*.parquet', 's3://path2/to/my/file/f1.parquet'],
branch='my_branch_name',
)

if state.error:
print(f"Error: {state.error}")
else:
print(f"External table created: {state.ctx.table_name}")

defcreate_namespace (...)

Create a new namespace at a given branch.

Returns:

The created bauplan.schema.Namespace object.

def create_namespace(
    namespace: str | Namespace,
    branch: str | Branch,
    *
    commit_body: str | None = None,
    commit_properties: dict[str, str] | None = None,
    if_not_exists: bool = False,
) -> Namespace: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.create_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
if_not_exists=True,
)

defcreate_table (...)

Create a table from an S3 location.

Returns:

The created bauplan.schema.Table.

def create_table(
    table: str | Table,
    search_uri: str,
    *
    branch: str | Branch | None = None,
    namespace: str | Namespace | None = None,
    partitioned_by: str | None = None,
    replace: bool | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> Table: ...

Parameters

Raises

This operation will attempt to create a table based on schemas of N parquet files found by a given search uri. This is a two step operation using Client.plan_table_creation and Client.apply_table_creation_plan.

Example

import bauplan
client = bauplan.Client()

table = client.create_table(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)

defcreate_tag (...)

Create a new tag at a given ref.

Returns:

The created bauplan.schema.Tag object.

def create_tag(
    tag: str | Tag,
    from_ref: str | Ref,
    *
    if_not_exists: bool = False,
) -> Tag: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.create_tag(
tag='my_tag',
from_ref='my_ref_or_branch_name',
)

defdelete_branch (...)

Delete a branch.

Returns:

A boolean for if the branch was deleted.

def delete_branch(
    branch: str | Branch,
    *
    if_exists: bool = False,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

if client.delete_branch('my_branch_name'):
...

defdelete_namespace (...)

Delete a namespace.

Returns:

A bauplan.schema.Branch object pointing to head.

def delete_namespace(
    namespace: str | Namespace,
    branch: str | Branch,
    *
    if_exists: bool = False,
    commit_body: str | None = None,
    commit_properties: dict[str, str] | None = None,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.delete_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
)

defdelete_table (...)

Drop a table.

Returns:

A bauplan.schema.Branch object pointing to the new head.

def delete_table(
    table: str | Table,
    branch: str | Branch,
    *
    namespace: str | Namespace | None = None,
    if_exists: bool = False,
    commit_body: str | None = None,
    commit_properties: dict[str, str] | None = None,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.delete_table(
table='my_table_name',
branch='my_branch_name',
namespace='my_namespace',
)

defdelete_tag (...)

Delete a tag.

Returns:

A boolean for if the tag was deleted.

def delete_tag(
    tag: str | Tag,
    *
    if_exists: bool = False,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.delete_tag('my_tag_name')

defget_branch (...)

Get the branch.

Returns:

A bauplan.schema.Branch object.

def get_branch(
    branch: str | Branch,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

branch = client.get_branch('my_branch_name')

defget_branches (...)

Get the available data branches in the Bauplan catalog.

Returns:

An iterator over bauplan.schema.Branch objects.

def get_branches(
    *
    name: str | None = None,
    user: str | None = None,
    limit: int | None = None,
) -> Iterator[Branch]: ...

Parameters

Example

import bauplan
client = bauplan.Client()

for branch in client.get_branches():
...

defget_commits (...)

Get the commits for the target branch or ref.

Returns:

An iterator over bauplan.schema.Commit objects.

def get_commits(
    ref: str | Ref,
    *
    filter_by_message: str | None = None,
    filter_by_author_username: str | None = None,
    filter_by_author_name: str | None = None,
    filter_by_author_email: str | None = None,
    filter_by_authored_date: str | datetime | None = None,
    filter_by_authored_date_start_at: str | datetime | None = None,
    filter_by_authored_date_end_at: str | datetime | None = None,
    filter_by_parent_hash: str | None = None,
    filter_by_properties: dict[str, str] | None = None,
    filter: str | None = None,
    limit: int | None = None,
) -> Iterator[Commit]: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

for commit in client.get_commits('my_ref_or_branch_name', limit=50):
...

defget_job (...)

EXPERIMENTAL: Get a job by ID.

Returns:

A bauplan.schema.Job object.

def get_job(
    job_id: str,
) -> Job: ...

Parameters

Example

import bauplan
client = bauplan.Client()

job = client.get_job(my_job.id)
print(f"{job.human_readable_status} ({job.kind})")

defget_job_context (...)

EXPERIMENTAL: Get context for a job by ID.

Returns:

A bauplan.schema.JobContext object containing the job details, and optionally logs and snapshot.

def get_job_context(
    job: str | Job,
    *
    include_logs: bool = False,
    include_snapshot: bool = False,
) -> JobContext: ...

Parameters

Example

import bauplan
client = bauplan.Client()

ctx = client.get_job_context(my_job.id, include_logs=True)
print(f"ref: {ctx.ref}, project: {ctx.project_name}")
for log in ctx.logs:
print(f"[{log.level}] {log.message}")

defget_job_contexts (...)

EXPERIMENTAL: Get context for multiple jobs.

Returns:

A list of bauplan.schema.JobContext objects containing the job details, and optionally logs and snapshot.

def get_job_contexts(
    jobs: str | list[str] | list[Job],
    *
    include_logs: bool = False,
    include_snapshot: bool = False,
) -> list[JobContext]: ...

Parameters

Example

import bauplan
client = bauplan.Client()

contexts = client.get_job_contexts([my_job], include_logs=True)
for ctx in contexts:
print(f"{ctx.id}: {ctx.project_name} on {ctx.ref}")

defget_job_logs (...)

EXPERIMENTAL: Get logs for a job.

Returns:

A list of bauplan.schema.JobLogEvent objects representing the log events for the job.

def get_job_logs(
    job: str | Job,
) -> list[JobLogEvent]: ...

Parameters

Example

import bauplan
client = bauplan.Client()

for log in client.get_job_logs(my_job.id):
print(f"[{log.level}] {log.message}")

defget_jobs (...)

Get jobs with optional filtering.

Returns:

An iterator over bauplan.schema.Job objects.

def get_jobs(
    *
    all_users: bool = False,
    filter_by_ids: str | list[str] | list[Job] | None = None,
    filter_by_users: str | list[str] | None = None,
    filter_by_kinds: str | JobKind | list[str] | list[JobKind] | None = None,
    filter_by_statuses: str | JobState | list[str] | list[JobState] | None = None,
    filter_by_created_after: datetime | None = None,
    filter_by_created_before: datetime | None = None,
    limit: int | None = None,
) -> Iterator[Job]: ...

Parameters

Example

import bauplan
client = bauplan.Client()

for job in client.get_jobs():
print(f"{job.id}: {job.status} ({job.kind})")

defget_namespace (...)

Get a namespace.

Returns:

A bauplan.schema.Namespace object.

def get_namespace(
    namespace: str | Namespace,
    ref: str | Ref,
) -> Namespace: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

namespace = client.get_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)

defget_namespaces (...)

Get the available data namespaces in the Bauplan catalog branch.

Returns:

bauplan.schema.Namespace objects.

def get_namespaces(
    ref: str | Ref,
    *
    filter_by_name: str | None = None,
    limit: int | None = None,
) -> Iterator[Namespace]: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

for namespace in client.get_namespaces('my_ref_or_branch_name'):
...

defget_table (...)

Get the table data and metadata for a table in the target branch.

Returns:

a bauplan.schema.Table object

def get_table(
    table: str | Table,
    ref: str | Ref,
    *
    namespace: str | Namespace | None = None,
) -> Table: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

# get the fields and metadata for a table
table = client.get_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)

# You can get the total number of rows this way.
num_records = table.records

# Or access the schema.
for c in table.fields:
...

defget_tables (...)

Get the tables and views in the target branch.

Returns:

An iterator over bauplan.schema.Table objects.

def get_tables(
    ref: str | Ref,
    *
    filter_by_name: str | None = None,
    filter_by_namespace: str | Namespace | None = None,
    limit: int | None = None,
) -> Iterator[Table]: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

for table in client.get_tables('my_branch_name'):
...

defget_tag (...)

Get the tag.

Returns:

A bauplan.schema.Tag object.

def get_tag(
    tag: str | Tag,
) -> Tag: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

tag = client.get_tag('my_tag_name')

defget_tags (...)

Get all the tags.

Returns:

An iterator over bauplan.schema.Tag objects.

def get_tags(
    *
    filter_by_name: str | None = None,
    limit: int | None = None,
) -> Iterator[Tag]: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

tags = client.get_tags()
for tag in tags:
print(tag.name)

defhas_branch (...)

Check if a branch exists.

Returns:

A boolean for if the branch exists.

def has_branch(
    branch: str | Branch,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

if client.has_branch('my_branch_name'):
...

defhas_namespace (...)

Check if a namespace exists.

Returns:

A boolean for if the namespace exists.

def has_namespace(
    namespace: str | Namespace,
    ref: str | Ref,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.has_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)

defhas_table (...)

Check if a table exists.

Returns:

A boolean for if the table exists.

def has_table(
    table: str | Table,
    ref: str | Ref,
    *
    namespace: str | Namespace | None = None,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.has_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)

defhas_tag (...)

Check if a tag exists.

Returns:

A boolean for if the tag exists.

def has_tag(
    tag: str | Tag,
) -> bool: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.has_tag(
tag='my_tag_name',
)

defimport_data (...)

Imports data into an already existing table.

Returns:

A bauplan.state.TableDataImportState object.

def import_data(
    table: str | Table,
    search_uri: str,
    *
    branch: str | Branch | None = None,
    namespace: str | Namespace | None = None,
    continue_on_error: bool = False,
    import_duplicate_files: bool = False,
    best_effort: bool = False,
    preview: str | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
    detach: bool = False,
) -> TableDataImportState: ...

Parameters

Example

import bauplan
client = bauplan.Client()

state = client.import_data(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if state.error:
print(f"Import failed: {state.error}")
else:
print(f"Import succeeded: {state.job_status}")

definfo (...)

Fetch organization & account information.

Returns:

A bauplan.InfoState object containing organization, user, and runner information.

def info(
    *
    client_timeout: int | None = None,
) -> InfoState: ...

Parameters

Example

import bauplan
client = bauplan.Client()

info = client.info()
if info.user:
print(info.user.username)
if info.organization:
print(info.organization.name)

defmerge_branch (...)

Merge one branch into another.

Returns:

The bauplan.schema.Branch where the merge was made.

def merge_branch(
    source_ref: str | Ref,
    into_branch: str | Branch,
    *
    commit_message: str | None = None,
    commit_body: str | None = None,
    commit_properties: dict[str, str] | None = None,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.merge_branch(
source_ref='my_ref_or_branch_name',
into_branch='main',
)

defplan_table_creation (...)

Create a table import plan from an S3 location.

Returns:

A bauplan.state.TableCreatePlanState object.

def plan_table_creation(
    table: str | Table,
    search_uri: str,
    *
    branch: str | Branch | None = None,
    namespace: str | Namespace | None = None,
    partitioned_by: str | None = None,
    replace: bool | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> TableCreatePlanState: ...

Parameters

Raises

This operation will attempt to create a table based on schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returned and if there are no conflicts, it is automatically applied.

Example

import bauplan
client = bauplan.Client()

plan_state = client.plan_table_creation(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
print(f"Plan failed: {plan_state.error}")
else:
print(plan_state.plan)

defquery (...)

Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.

Returns:

The query results as a pyarrow.Table.

def query(
    query: str,
    *
    ref: str | Ref | None = None,
    max_rows: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> pyarrow.Table: ...

Parameters

If you prefer to return the results as a pandas DataFrame, use the to_pandas function of pyarrow.Table.

Example

import bauplan

client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query(
query='SELECT avg(Age) as average_age FROM bauplan.titanic',
ref='my_ref_or_branch_name',
)

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

defquery_to_csv_file (...)

Export the results of a SQL query to a file in CSV format.

Returns:

The path of the file written.

def query_to_csv_file(
    path: str | pathlib.Path,
    query: str,
    *
    ref: str | Ref | None = None,
    max_rows: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> pathlib.Path: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_csv_file(
path='/tmp/out.csv',
query='SELECT Name, Age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

defquery_to_generator (...)

Execute a SQL query and return the results as a generator, where each row is a Python dictionary.

Returns:

A dictionary representing a row of query results.

def query_to_generator(
    query: str,
    *
    ref: str | Ref | None = None,
    max_rows: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> Iterator[dict[str, Any]]: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
res = client.query_to_generator(
query='SELECT Name, Age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

for row in res:
... # handle results

defquery_to_json_file (...)

Export the results of a SQL query to a file in JSON format.

Returns:

The path of the file written.

def query_to_json_file(
    path: str | pathlib.Path,
    query: str,
    *
    file_format: Literal['json', 'jsonl'] = 'json',
    ref: str | Ref | None = None,
    max_rows: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> pathlib.Path: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_json_file(
path='/tmp/out.json',
query='SELECT Name, Age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

defquery_to_parquet_file (...)

Export the results of a SQL query to a file in Parquet format.

Returns:

The path of the file written.

def query_to_parquet_file(
    path: str | pathlib.Path,
    query: str,
    *
    ref: str | Ref | None = None,
    max_rows: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> pathlib.Path: ...

Parameters

Example

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
path='/tmp/out.parquet',
query='SELECT Name, Age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

defrename_branch (...)

Rename an existing branch. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.

Returns:

The renamed bauplan.schema.Branch object.

def rename_branch(
    branch: str | Branch,
    new_branch: str | Branch,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.rename_branch(
branch='username.old_name',
new_branch='username.new_name',
)

defrename_tag (...)

Rename an existing tag.

Returns:

The renamed bauplan.schema.Tag object.

def rename_tag(
    tag: str | Tag,
    new_tag: str | Tag,
) -> Tag: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.rename_tag(
tag='old_tag_name',
new_tag='new_tag_name',
)

defrevert_table (...)

Revert a table to a previous state.

Returns:

The bauplan.schema.Branch where the revert was made.

def revert_table(
    table: str | Table,
    *
    namespace: str | Namespace | None = None,
    source_ref: str | Ref,
    into_branch: str | Branch,
    replace: bool | None = None,
    commit_body: str | None = None,
    commit_properties: dict[str, str] | None = None,
) -> Branch: ...

Parameters

Raises

Example

import bauplan
client = bauplan.Client()

assert client.revert_table(
table='my_table_name',
namespace='my_namespace',
source_ref='my_ref_or_branch_name',
into_branch='main',
)

defrun (...)

Run a Bauplan project and return the state of the run. This is the equivalent of running through the CLI the bauplan run command. All parameters default to 'off'/false unless otherwise specified.

Returns:

bauplan.state.RunState: The state of the run.

def run(
    project_dir: str,
    *
    ref: str | Ref | None = None,
    namespace: str | Namespace | None = None,
    parameters: dict[str, str | int | float | bool | None] | None = None,
    cache: Literal['on', 'off'] | None = None,
    transaction: Literal['on', 'off'] | None = None,
    dry_run: bool | None = None,
    strict: Literal['on', 'off'] | None = None,
    preview: str | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
    detach: bool = False,
) -> RunState: ...

Parameters

Example

# Run a daily sales pipeline on a dev branch, and if successful and data is good, merge to main
run_state = client.run(
project_dir='./etl_pipelines/daily_sales',
ref="username.dev_branch",
namespace='sales_analytics',
)

if str(run_state.job_status).lower() != "success":
raise Exception(f"{run_state.job_id} failed: {run_state.job_status}{run_state.error}")

defscan (...)

Execute a table scan (with optional filters) and return the results as an arrow Table.

Returns:

The scan results as a pyarrow.Table.

def scan(
    table: str | Table,
    *
    ref: str | Ref | None = None,
    columns: list[str] | None = None,
    filters: str | None = None,
    limit: int | None = None,
    cache: Literal['on', 'off'] | None = None,
    namespace: str | Namespace | None = None,
    args: dict[str, str] | None = None,
    priority: int | None = None,
    client_timeout: int | None = None,
) -> pyarrow.Table: ...

Parameters

Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query function for the actual scan.

Example

import bauplan
client = bauplan.Client()

# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
columns=['Name'],
filters='Age < 30',
)

classInfoStateNon-instantiable

Attributes

client_version:

str

organization:

OrganizationInfo | None

runners:

list[RunnerNodeInfo]

user:

UserInfo | None


classJobKindNon-instantiable

The kind/type of a job.

Attributes

IMPORT_PLAN_APPLY:

Final[JobKind]

IMPORT_PLAN_CREATE:

Final[JobKind]

QUERY:

Final[JobKind]

RUN:

Final[JobKind]

TABLE_IMPORT:

Final[JobKind]

TABLE_PLAN_CREATE:

Final[JobKind]

TABLE_PLAN_CREATE_APPLY:

Final[JobKind]

UNSPECIFIED:

Final[JobKind]


classJobStateNon-instantiable

The execution state of a job.

Attributes

ABORT:

Final[JobState]

COMPLETE:

Final[JobState]

FAIL:

Final[JobState]

NOT_STARTED:

Final[JobState]

OTHER:

Final[JobState]

RUNNING:

Final[JobState]

UNSPECIFIED:

Final[JobState]


classModel

Represents a model (dataframe/table representing a DAG step) as an input to a function.

bauplan.Model(
    name: str,
    columns: Optional[List[str]] = None,
    filter: Optional[str] = None,
    ref: Optional[str] = None,
    connector: Optional[str] = None,
    connector_config_key: Optional[str] = None,
    connector_config_uri: Optional[str] = None,
)

e.g.

import bauplan

@bauplan.model()
def some_parent_model():
return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})

@bauplan.model()
def your_cool_model(
# parent models are passed as inputs, using bauplan.Model
# class
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
)
):
# Can return a pandas dataframe or a pyarrow table
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)

Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.

The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:

import bauplan

@bauplan.model()
def your_cool_model(
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
connector='dremio',
connector_config_key='bauplan',
)
):
# parent_0 inside the function body
# will still be an Arrow table: the user code
# should still be the same as the data is moved
# transparently by Bauplan from an engine to the function.
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)

Attributes

name:
columns:
filter:
ref:
connector:
connector_config_key:
connector_config_uri:

classOrganizationInfoNon-instantiable

Attributes

default_parameter_secret_key:

str | None

default_parameter_secret_public_key:

str | None

id:

str

name:

str

slug:

str


classParameter

Represents a parameter that can be used to "template" values passed to a model during a run or query with, e.g., bauplan run --parameter interest_rate=2.0.

bauplan.Parameter(
    param_name: str,
)

Parameters

Parameters must be defined with default value under the top level parameters key in the bauplan.yml project file.

e.g.

project:
id: xyzxyz
name: eggs

parameters:
interest_rate:
default: 5.5
loan_amount:
default: 100000
customer_name:
default: "John MacDonald"

Then, to use them in a model, use bauplan.Parameter:

def a_model_using_params(
# parent models are passed as inputs, using bauplan.Model
interest_rate=bauplan.Parameter('interest_rate'),
loan_amount=bauplan.Parameter('loan_amount'),
customer_name=bauplan.Parameter('customer_name'),
):
print(f"Calculating interest for {customer_name}")
return pyarrow.Table.from_pydict({'interest': [loan_amount * interest_rate]})

classRefTypeNon-instantiable

The type of a ref.

Attributes

BRANCH:

Final[RefType]

DETACHED:

Final[RefType]

TAG:

Final[RefType]


classRunnerNodeInfoNon-instantiable

Attributes

hostname:

str


classUserInfoNon-instantiable

Attributes

first_name:

str

full_name:

str

id:

str

last_name:

str

username:

str


defexpectation (...)

Decorator that defines a Bauplan expectation.

def expectation(
    **kwargs: Any = {},
) -> Callable: ...

Parameters

An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it is commonly used to perform data validation and data quality checks when running a pipeline. Expectations take as input the table(s) they are validating and return a boolean indicating whether the expectation is met or not. A Python expectation needs a Python environment to run, which is defined using the python decorator, e.g.:

Example

import bauplan
from bauplan.standard_expectations import expect_column_no_nulls

@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
data=bauplan.Model(
'join_dataset',
columns=['anomaly']
)
):
# your data validation code here
return expect_column_no_nulls(data, 'anomaly')

defextras (...)

Decorator that defines the bauplan package extras to install.

def extras(
    *args = (),
) -> Callable: ...

Parameters

This decorator allows specifying which optional feature sets (extras) of the bauplan package are required by the decorated function.

For example, using @bauplan.extras('ai') will request the installation of ai specific functionalities, ensuring that the right dependencies are installed.


defmodel (...)

Decorator that specifies a Bauplan model.

def model(
    name: Optional[str] = None,
    columns: Optional[List[str]] = None,
    partitioned_by: Optional[Union[str, List[str], Tuple[str, ...]]] = None,
    materialization_strategy: Optional[ModelMaterializationStrategy] = None,
    cache_strategy: Optional[ModelCacheStrategy] = None,
    internet_access: Optional[bool] = None,
    overwrite_filter: Optional[str] = None,
    **kwargs: Any = {},
) -> Callable: ...

Parameters

A model is a function from one (or more) dataframe-like object(s) to another dataframe-like object: it is used to define a transformation in a pipeline. Models are chained together implicitly by using them as inputs to their children. A Python model needs a Python environment to run, which is defined using the python decorator, e.g.:

Example

import bauplan

@bauplan.model(
columns=['*'],
materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
data=bauplan.Model(
'iot_kaggle',
columns=['*'],
filter="motion='false'"
)
):
# your code here
return data

defpyspark (...)

Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args

def pyspark(
    version: Optional[str] = None,
    conf: Optional[Dict[str, str]] = None,
    **kwargs: Any = {},
) -> Callable: ...

Parameters


defpython (...)

Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.

def python(
    version: Optional[str] = None,
    pip: Optional[Dict[str, str]] = None,
    **kwargs: Any = {},
) -> Callable: ...

Parameters


defresources (...)

Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.

def resources(
    cpus: Optional[Union[int, float]] = None,
    memory: Optional[Union[int, str]] = None,
    memory_swap: Optional[Union[int, str]] = None,
    timeout: Optional[int] = None,
    **kwargs: Any = {},
) -> Callable: ...

Parameters


defsynthetic_model (...)

Decorator that defines a Bauplan Synthetic Model.

def synthetic_model(
    name: str,
    columns: List[str],
    **kwargs: Any = {},
) -> Callable: ...

Parameters