Skip to main content

bauplan


class Client

A consistent interface to access Bauplan operations.

Using the client

import bauplan
client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query('SELECT avg(age) AS average_age FROM bauplan.titanic limit 1', ref='main')

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

Notes on authentication

# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'

os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'

os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile

# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')

# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')

Handling Exceptions

Catalog operations (branch/table methods) raise a subclass of bauplan.exceptions.BauplanError that mirror HTTP status codes.

  • 400: InvalidDataError
  • 401: UnauthorizedError
  • 403: AccessDeniedError
  • 404: ResourceNotFoundError e.g .ID doesn't match any records
  • 404: ApiRouteError e.g. the given route doesn't exist
  • 405: ApiMethodError e.g. POST on a route with only GET defined
  • 409: UpdateConflictError e.g. creating a record with a name that already exists
  • 429: TooManyRequestsError

Run/Query/Scan/Import operations raise a subclass of bauplan.exceptions.BauplanError that represents, and also return a RunState object containing details and logs:

  • JobError e.g. something went wrong in a run/query/import/scan; includes error details

Run/import operations also return a state object that includes a job_status and other details. There are two ways to check status for run/import operations:

  1. try/except the JobError exception
  2. check the state.job_status attribute

Examples

state = client.run(...)
if state.job_status != "success":
...

Parameters

profile Optional[str]

The Bauplan config profile name to use to determine api_key.

api_key Optional[str]

Your unique Bauplan API key; mutually exclusive with profile. If not provided, fetch precedence is 1) environment BAUPLAN_API_KEY 2) .bauplan/config.yml

branch Optional[str]

The default branch to use for queries and runs. If not provided active_branch from the profile is used.

namespace Optional[str]

The default namespace to use for queries and runs.

cache Optional[Literal[on, off]]

Whether to enable or disable caching for all the requests.

debug Optional[bool]

Whether to enable or disable debug mode for all the requests.

verbose Optional[bool]

Whether to enable or disable verbose mode for all the requests.

api_endpoint Optional[str]

The Bauplan API endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_API_ENDPOINT 2) .bauplan/config.yml

catalog_endpoint Optional[str]

The Bauplan catalog endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_CATALOG_ENDPOINT 2) .bauplan/config.yml

catalog_max_records Optional[int]

The maximum number of records to fetch, per page, from the catalog.

client_timeout Optional[int]

The client timeout in seconds for all the requests.

env Optional[str]

The environment to use for all the requests. Default: 'prod'.

config_file_path Optional[Union[str, Path]]

The path to the Bauplan config file to use. If not provided, fetch precedence is 1) environment BAUPLAN_CONFIG_PATH 2) ~/.bauplan/config.yml

user_session_token Optional[str]

Your unique Bauplan user session token.

feature_flags Optional[Dict[str, Any]]

def apply_table_creation_plan

Apply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype

Parameters

plan Union[Dict, TableCreatePlanState]

The plan to apply.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def cancel_job

EXPERIMENTAL: Cancel a job by ID.

Parameters

job_id str

str:

def create_branch

Create a new branch at a given ref. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_branch(
branch='my_branch_name',
from_ref='my_ref_or_branch_name',
if_not_exists=True,
)

Parameters

branch Union[str, Branch]

The name of the new branch.

from_ref Union[str, Branch, Tag]

The name of the base branch; either a branch like "main" or ref like "main@[sha]".

if_not_exists bool

If set to True, the branch will not be created if it already exists.

def create_namespace

Create a new namespace at a given branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
properties={'k1': 'v1', 'k2': 'v2'},
if_not_exists=True,
)

Parameters

namespace Union[str, Namespace]

The name of the namespace.

branch Union[str, Branch]

The name of the branch to create the namespace on.

commit_body Optional[str]

Optional, the commit body to attach to the operation.

commit_properties Optional[Dict[str, str]]

Optional, a list of properties to attach to the commit.

if_not_exists bool

If set to True, the namespace will not be created if it already exists.

properties Optional[Dict[str, str]]

def create_table

Create a table from an S3 location.

This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. This is a two step operation using plan_table_creation and apply_table_creation_plan.

import bauplan
client = bauplan.Client()

table = client.create_table(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)

Parameters

table Union[str, Table]

The table which will be created.

search_uri str

The location of the files to scan for schema.

branch Optional[Union[str, Branch]]

The branch name in which to create the table in.

namespace Optional[Union[str, Namespace]]

Optional argument specifying the namespace. If not specified, it will be inferred based on table location or the default.

partitioned_by Optional[str]

Optional argument specifying the table partitioning.

replace Optional[bool]

Replace the table if it already exists.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def create_tag

Create a new tag at a given ref.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_tag(
tag='my_tag',
from_ref='my_ref_or_branch_name',
)

Parameters

tag Union[str, Tag]

The name of the new tag.

from_ref Union[str, Branch, Ref]

The name of the base branch; either a branch like "main" or ref like "main@[sha]".

if_not_exists bool

If set to True, the tag will not be created if it already exists.

def delete_branch

Delete a branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_branch('my_branch_name')

Returns: A boolean for if the branch was deleted.

Parameters

branch Union[str, Branch]

The name of the branch to delete.

if_exists bool

If set to True, the branch will not raise an error if it does not exist.

def delete_namespace

Delete a namespace.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
)

Parameters

namespace Union[str, Namespace]

The name of the namespace to delete.

branch Union[str, Branch]

The name of the branch to delete the namespace from.

if_exists bool

If set to True, the namespace will not be deleted if it does not exist.

commit_body Optional[str]

Optional, the commit body to attach to the operation.

commit_properties Optional[Dict[str, str]]

Optional, a list of properties to attach to the commit.

properties Optional[Dict[str, str]]

def delete_table

Drop a table.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_table(
table='my_table_name',
branch='my_branch_name',
namespace='my_namespace',
)

Parameters

table Union[str, Table]

The table to delete.

branch Union[str, Branch]

The branch on which the table is stored.

namespace Optional[Union[str, Namespace]]

The namespace of the table to delete.

if_exists bool

If set to True, the table will not raise an error if it does not exist.

commit_body Optional[str]

Optional, the commit body message to attach to the commit.

commit_properties Optional[Dict[str, str]]

Optional, a list of properties to attach to the commit.

properties Optional[Dict[str, str]]

def delete_tag

Delete a tag.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_tag('my_tag_name')

Parameters

tag Union[str, Tag]

The name of the tag to delete.

if_exists bool

If set to True, the tag will not raise an error if it does not exist.

def get_branch

Get the branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
branch = client.get_branch('my_branch_name')

Parameters

branch Union[str, Branch]

The name of the branch to retrieve.

def get_branches

Get the available data branches in the Bauplan catalog.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for branch in client.get_branches():
...

Parameters

name Optional[str]

Filter the branches by name.

user Optional[str]

Filter the branches by user.

limit Optional[int]

Optional, max number of branches to get.

itersize Optional[int]

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

def get_commits

Get the commits for the target branch or ref.

Upon failure, raises bauplan.exceptions.BauplanError

Parameters

ref Union[str, Branch, Tag, Ref]

The ref or branch to get the commits from.

filter_by_message Optional[str]

Optional, filter the commits by message (can be a string or a regex like '^abc.*$')

filter_by_author_username Optional[str]

Optional, filter the commits by author username (can be a string or a regex like '^abc.*$')

filter_by_author_name Optional[str]

Optional, filter the commits by author name (can be a string or a regex like '^abc.*$')

filter_by_author_email Optional[str]

Optional, filter the commits by author email (can be a string or a regex like '^abc.*$')

filter_by_authored_date Optional[Union[str, datetime]]

Optional, filter the commits by the exact authored date.

filter_by_authored_date_start_at Optional[Union[str, datetime]]

Optional, filter the commits by authored date start at.

filter_by_authored_date_end_at Optional[Union[str, datetime]]

Optional, filter the commits by authored date end at.

filter_by_parent_hash Optional[str]

Optional, filter the commits by parent hash.

filter_by_properties Optional[Dict[str, str]]

Optional, filter the commits by commit properties.

filter Optional[str]

Optional, a CEL filter expression to filter the commits.

limit Optional[int]

Optional, max number of commits to get.

itersize Optional[int]

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

def get_job

EXPERIMENTAL: Get a job by ID or ID prefix.

Parameters

job_id str

str:

def get_job_context

EXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.

Parameters

job Union[str, Job]

Union[str, Job]: A job ID, prefix of a job ID, a Job instance.

include_logs bool

include_snapshot bool

def get_job_contexts

EXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.

Parameters

jobs Union[List[str], List[Job]]

include_logs bool

include_snapshot bool

def get_job_logs

EXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.

Parameters

job_id_prefix str

str: The prefix of a Job ID (deprecated in favor of job).

job Union[str, Job]

Union[str, Job]: A job ID, prefix of a job ID, a Job instance.

def get_namespace

Get a namespace.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

namespace = client.get_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)

Parameters

namespace Union[str, Namespace]

The name of the namespace to get.

ref Union[str, Branch, Tag, Ref]

The ref, branch name or tag name to check the namespace on.

def get_namespaces

Get the available data namespaces in the Bauplan catalog branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for namespace in client.get_namespaces('my_ref_or_branch_name'):
...

Parameters

ref Union[str, Branch, Tag, Ref]

The ref, branch name or tag name to retrieve the namespaces from.

filter_by_name Optional[str]

Optional, filter the namespaces by name.

limit Optional[int]

Optional, max number of namespaces to get.

itersize Optional[int]

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

def get_table

Get the table data and metadata for a table in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# get the fields and metadata for a table
table = client.get_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)

# You can get the total number of rows this way.
num_records = table.records

# Or access the schema.
for c in table.fields:
...

Parameters

table Union[str, Table]

The table to retrieve.

ref Union[str, Branch, Tag, Ref]

The ref, branch name or tag name to get the table from.

namespace Optional[Union[str, Namespace]]

The namespace of the table to retrieve.

include_raw bool

Whether or not to include the raw metadata.json object as a nested dict.

def get_tables

Get the tables and views in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for table in client.get_tables('my_branch_name'):
...

Parameters

ref Union[str, Branch, Tag, Ref]

The ref or branch to get the tables from.

filter_by_name Optional[str]

Optional, the table name to filter by.

filter_by_namespace Optional[str]

Optional, the namespace to get filtered tables from.

namespace Optional[Union[str, Namespace]]

DEPRECATED: Optional, the namespace to get filtered tables from.

include_raw bool

Whether or not to include the raw metadata.json object as a nested dict.

limit Optional[int]

Optional, max number of tables to get.

itersize Optional[int]

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

def get_tag

Get the tag.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
tag = client.get_tag('my_tag_name')

Parameters

tag Union[str, Tag]

The name of the tag to retrieve.

def get_tags

Get all the tags.

Upon failure, raises bauplan.exceptions.BauplanError

Parameters

filter_by_name Optional[str]

Optional, filter the commits by message.

limit Optional[int]

Optional, max number of commits to get.

itersize Optional[int]

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

def has_branch

Check if a branch exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_branch('my_branch_name')

Parameters

branch Union[str, Branch]

The name of the branch to check.

def has_namespace

Check if a namespace exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)

Parameters

namespace Union[str, Namespace]

The name of the namespace to check.

ref Union[str, Branch, Tag, Ref]

The ref, branch name or tag name to check the namespace on.

def has_table

Check if a table exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)

Parameters

table Union[str, Table]

The table to retrieve.

ref Union[str, Branch, Tag, Ref]

The ref, branch name or tag name to get the table from.

namespace Optional[Union[str, Namespace]]

The namespace of the table to check.

def has_tag

Check if a tag exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_tag(
tag='my_tag_name',
)

Parameters

tag Union[str, Tag]

The tag to retrieve.

def import_data

Imports data into an already existing table.

import bauplan
client = bauplan.Client()

plan_state = client.import_data(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)

Parameters

table Union[str, Table]

Previously created table in into which data will be imported.

search_uri str

Uri which to scan for files to import.

branch Optional[Union[str, Branch]]

Branch in which to import the table.

namespace Optional[Union[str, Namespace]]

Namespace of the table. If not specified, namespace will be infered from table name or default settings.

continue_on_error bool

Do not fail the import even if 1 data import fails.

import_duplicate_files bool

Ignore prevention of importing s3 files that were already imported.

best_effort bool

Don't fail if schema of table does not match.

preview Optional[Union[Literal[on, off, head, tail], str]]

Whether to enable or disable preview mode for the import.

debug Optional[bool]

Whether to enable or disable debug mode for the import.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

detach bool

Whether to detach the job and return immediately without waiting for the job to finish.

def info

Fetch organization & account information.

Parameters

debug Optional[bool]

verbose Optional[bool]

client_timeout Optional[Union[int, float]]

def list_jobs

EXPERIMENTAL: List all jobs

A DateRange is an alias for tuple[Optional[datetime], Optional[datetime]], where the first element is an "after" (start) filter and the second element is a "before" (end) filter.

The filter_by_finish_time parameter takes a DateRange and allows jobs with a finish time later than "after" (if specified) and a finish time earlier than "before" (if specified), or between both. If neither is specified, for example (None, None), then the behavior is the same as not specifying the filter itself, for example filter_by_finish_time=None.

Parameters

all_users Optional[bool]

Optional[bool]: (Default value = None)

filter_by_id Optional[str]

Optional[str]: (Default value = None)

filter_by_status Optional[Union[str, JobState]]

Optional[Union[str, JobState]]: (Default value = None)

filter_by_finish_time Optional[DateRange]

Optional[DateRange]: (Default value = None)

def merge_branch

Merge one branch into another.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.merge_branch(
source_ref='my_ref_or_branch_name',
into_branch='main',
)

Parameters

source_ref Union[str, Branch, Tag]

The name of the merge source; either a branch like "main" or ref like "main@[sha]".

into_branch Union[str, Branch]

The name of the merge target.

commit_message Optional[str]

Optional, the commit message.

commit_body Optional[str]

Optional, the commit body.

commit_properties Optional[Dict[str, str]]

Optional, a list of properties to attach to the merge.

message Optional[str]

properties Optional[Dict[str, str]]

def plan_table_creation

Create a table import plan from an S3 location.

This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.

import bauplan
client = bauplan.Client()

plan_state = client.plan_table_creation(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)

Parameters

table Union[str, Table]

The table which will be created.

search_uri str

The location of the files to scan for schema.

branch Optional[Union[str, Branch]]

The branch name in which to create the table in.

namespace Optional[Union[str, Namespace]]

Optional argument specifying the namespace. If not specified, it will be inferred based on table location or the default.

partitioned_by Optional[str]

Optional argument specifying the table partitioning.

replace Optional[bool]

Replace the table if it already exists.

debug Optional[bool]

Whether to enable or disable debug mode.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def query

Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.

If you prefer to return the results as a pandas DataFrame, use the to_pandas function of pyarrow.Table.

import bauplan

client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query(
query='SELECT avg(age) as average_age FROM bauplan.titanic',
ref='my_ref_or_branch_name',
)

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

Parameters

query str

The Bauplan query to execute.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to query from.

max_rows Optional[int]

The maximum number of rows to return; default: None (no limit).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode for the query.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def query_to_csv_file

Export the results of a SQL query to a file in CSV format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_csv_file(
path='/tmp/out.csv',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

Parameters

path Union[str, Path]

The name or path of the file csv to write the results to.

query str

The Bauplan query to execute.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to query from.

max_rows Optional[int]

The maximum number of rows to return; default: None (no limit).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

verbose Optional[bool]

Whether to enable or disable verbose mode for the query.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def query_to_generator

Execute a SQL query and return the results as a generator, where each row is a Python dictionary.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
res = client.query_to_generator(
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

for row in res:
... # handle results

Parameters

query str

The Bauplan query to execute.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to query from.

max_rows Optional[int]

The maximum number of rows to return; default: None (no limit).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

as_json Optional[bool]

Whether to return the results as a JSON-compatible string (default: False).

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode for the query.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def query_to_json_file

Export the results of a SQL query to a file in JSON format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_json_file(
path='/tmp/out.json',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

Parameters

path Union[str, Path]

The name or path of the file json to write the results to.

query str

The Bauplan query to execute.

file_format Optional[Literal[json, jsonl]]

The format to write the results in; default: json. Allowed values are 'json' and 'jsonl'.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to query from.

max_rows Optional[int]

The maximum number of rows to return; default: None (no limit).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

verbose Optional[bool]

Whether to enable or disable verbose mode for the query.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def query_to_parquet_file

Export the results of a SQL query to a file in Parquet format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
path='/tmp/out.parquet',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)

Parameters

path Union[str, Path]

The name or path of the file parquet to write the results to.

query str

The Bauplan query to execute.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to query from.

max_rows Optional[int]

The maximum number of rows to return; default: None (no limit).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

verbose Optional[bool]

Whether to enable or disable verbose mode for the query.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def rename_branch

Rename an existing branch. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.rename_branch(
branch='username.old_name',
new_branch='username.new_name',
)

Parameters

branch Union[str, Branch]

The name of the branch to rename.

new_branch Union[str, Branch]

The name of the new branch.

def rename_tag

Rename an existing tag.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.rename_tag(
tag='old_tag_name',
new_tag='new_tag_name',
)

Parameters

tag Union[str, Tag]

The name of the tag to rename.

new_tag Union[str, Tag]

The name of the new tag.

def rerun

Re run a Bauplan project by its ID and return the state of the run. This is the equivalent of running through the CLI the bauplan rerun command.

Parameters

job_id str

The Job ID of the previous run. This can be used to re-run a previous run, e.g., on a different branch.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name from which to rerun the project.

namespace Optional[Union[str, Namespace]]

The Namespace to run the job in. If not set, the job will be run in the default namespace.

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the run.

transaction Optional[Literal[on, off]]

Whether to enable or disable transaction mode for the run.

dry_run Optional[bool]

Whether to enable or disable dry-run mode for the run; models are not materialized.

strict Optional[Literal[on, off]]

Whether to enable or disable strict schema validation.

preview Optional[Union[Literal[on, off, head, tail], str]]

Whether to enable or disable preview mode for the run.

debug Optional[bool]

Whether to enable or disable debug mode for the run.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode for the run.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

def revert_table

Revert a table to a previous state.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.revert_table(
table='my_table_name',
namespace='my_namespace',
source_ref='my_ref_or_branch_name',
into_branch='main',
)

Parameters

table Union[str, Table]

The table to revert.

namespace Optional[Union[str, Namespace]]

The namespace of the table to revert.

source_ref Union[str, Branch, Tag, Ref]

The name of the source ref; either a branch like "main" or ref like "main@[sha]".

into_branch Union[str, Branch]

The name of the target branch where the table will be reverted.

replace Optional[bool]

Optional, whether to replace the table if it already exists.

commit_body Optional[str]

Optional, the commit body message to attach to the operation.

commit_properties Optional[Dict[str, str]]

Optional, a list of properties to attach to the operation.

def run

Run a Bauplan project and return the state of the run. This is the equivalent of running through the CLI the bauplan run command.

Parameters

project_dir Optional[str]

The directory of the project (where the bauplan_project.yml or bauplan_project.yaml file is located).

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name from which to run the project.

namespace Optional[Union[str, Namespace]]

The Namespace to run the job in. If not set, the job will be run in the default namespace.

parameters Optional[Dict[str, Optional[Union[str, int, float, bool]]]]

Parameters for templating into SQL or Python models.

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the run.

transaction Optional[Literal[on, off]]

Whether to enable or disable transaction mode for the run.

dry_run Optional[bool]

Whether to enable or disable dry-run mode for the run; models are not materialized.

strict Optional[Literal[on, off]]

Whether to enable or disable strict schema validation.

preview Optional[Union[Literal[on, off, head, tail], str]]

Whether to enable or disable preview mode for the run.

debug Optional[bool]

Whether to enable or disable debug mode for the run.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

verbose Optional[bool]

Whether to enable or disable verbose mode for the run.

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.

detach bool

Whether to detach the run and return immediately instead of blocking on log streaming.

def scan

Execute a table scan (with optional filters) and return the results as an arrow Table.

Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.

import bauplan
client = bauplan.Client()

# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
columns=['name'],
filters='age < 30',
)

Parameters

table Union[str, Table]

The table to scan.

ref Optional[Union[str, Branch, Tag, Ref]]

The ref, branch name or tag name to scan from.

columns Optional[List[str]]

The columns to return (default: None).

filters Optional[str]

The filters to apply (default: None).

limit Optional[int]

The maximum number of rows to return (default: None).

cache Optional[Literal[on, off]]

Whether to enable or disable caching for the query.

connector Optional[str]

The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.

connector_config_key Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace Optional[Union[str, Namespace]]

The Namespace to run the scan in. If not set, the scan will be run in the default namespace for your account.

debug Optional[bool]

Whether to enable or disable debug mode for the query.

priority Optional[int]

Optional job priority (1-10, where 10 is highest priority).

client_timeout Optional[Union[int, float]]

seconds to timeout; this also cancels the remote job execution.


class InfoState


class JobState


class JobStatus

The status of a submitted job.

Parameters

canceled str

cancelled str

failed str

rejected str

success str

timeout str

heartbeat_failure str

unknown str


class Model

Represents a model (dataframe/table representing a DAG step) as an input to a function.

e.g.

import bauplan

@bauplan.model()
def some_parent_model():
return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})

@bauplan.model()
def your_cool_model(
# parent models are passed as inputs, using bauplan.Model
# class
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
)
):
# Can return a pandas dataframe or a pyarrow table
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)

Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.

The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:

import bauplan

@bauplan.model()
def your_cool_model(
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
connector='dremio',
connector_config_key='bauplan',
)
):
# parent_0 inside the function body
# will still be an Arrow table: the user code
# should still be the same as the data is moved
# transparently by Bauplan from an engine to the function.
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)

Parameters

name str

The name of the model.

columns typing.Optional[typing.List[str]]

The list of columns in the model. If the arg is not provided, the model will load all columns.

filter typing.Optional[str]

The optional filter for the model. Defaults to None.

ref typing.Optional[str]

The optional reference to the model. Defaults to None.

connector typing.Optional[str]

The connector type for the model (defaults to Bauplan SQL). Allowed values are 'snowflake' and 'dremio'.

connector_config_key typing.Optional[str]

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri typing.Optional[str]

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.


class OrganizationInfo


def Parameter

_ParameterKwargTracker is a callable object that is used to track the parameters that are used in a model. Because default function args are evaluated once and only once, we can build a set of the args for kwargs.

Parameters

param_name str


class Profile

A configuration profile.

Parameters

name Optional[str]

api_key Optional[str]

user_session_token Optional[str]

project_dir Optional[Union[str, Path]]

branch Optional[str]

namespace Optional[str]

cache Optional[Literal['on', 'off']]

debug Optional[bool]

verbose Optional[bool]

api_endpoint Optional[str]

catalog_endpoint Optional[str]

catalog_max_records Optional[int]

client_timeout Optional[int]

env Optional[str]

config_file_path Optional[Union[str, Path]]

feature_flags Optional[Dict[str, Any]]

def load_profile

Load a profile from a profile file.

Parameters

profile Optional[str]

api_key Optional[str]

user_session_token Optional[str]

project_dir Optional[Union[str, Path]]

branch Optional[str]

namespace Optional[str]

cache Optional[Literal['on', 'off']]

debug Optional[bool]

verbose Optional[bool]

api_endpoint Optional[str]

catalog_endpoint Optional[str]

catalog_max_records Optional[int]

client_timeout Optional[int]

env Optional[str]

config_file_path Optional[Union[str, Path]]

feature_flags Optional[Dict[str, Any]]


class RunnerNodeInfo


class UserInfo


def expectation

Decorator that defines a Bauplan expectation.

An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it is commonly used to perform data validation and data quality checks when running a pipeline. Expectations takes as input the table(s) they are validating and return a boolean indicating whether the expectation is met or not. A Python expectation needs a Python environment to run, which is defined using the python decorator, e.g.:

import bauplan
from bauplan.standard_expectations import expect_column_no_nulls

@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
data=bauplan.Model(
'join_dataset',
columns=['anomaly']
)
):
# your data validation code here
return expect_column_no_nulls(data, 'anomaly')

Parameters


def model

Decorator that specifies a Bauplan model.

A model is a function from one (or more) dataframe-like object(s) to another dataframe-like object: it is used to define a transformation in a pipeline. Models are chained together implicitly by using them as inputs to their children. A Python model needs a Python environment to run, which is defined using the python decorator, e.g.:

import bauplan

@bauplan.model(
columns=['*'],
materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
data=bauplan.Model(
'iot_kaggle',
columns=['*'],
filter="motion='false'"
)
):
# your code here
return data

Parameters

name typing.Optional[str]

the name of the model (e.g. 'users'); if missing the function name is used.

columns typing.Optional[typing.List[str]]

the columns of the output dataframe after the model runs (e.g. ['id', 'name', 'email']). Use ['*'] as a wildcard.

partitioned_by typing.Union[str, typing.List[str], typing.Tuple[str, ...], NoneType]

the columns to partition the data by.

materialization_strategy typing.Optional[typing.Literal['NONE', 'REPLACE', 'APPEND', 'OVERWRITE_PARTITIONS']]

the materialization strategy to use.

cache_strategy typing.Optional[typing.Literal['NONE', 'DEFAULT']]

the cache strategy to use.

internet_access typing.Optional[bool]

whether the model requires internet access.

overwrite_filter typing.Optional[str]

the overwrite filter expression.


def pyspark

Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args

Parameters

version typing.Optional[str]

the version string of pyspark

conf typing.Optional[typing.Dict[str, str]]

A dict containing the pyspark config


def python

Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.

Parameters

version typing.Optional[str]

The python version for the interpreter (e.g. '3.11').

pip typing.Optional[typing.Dict[str, str]]

A dictionary of dependencies (and versions) required by the function (e.g. {'requests': '2.26.0'}).


def resources

Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.

Parameters

cpus typing.Union[int, float, NoneType]

The number of CPUs required by the function (e.g: 0.5)

memory typing.Union[int, str, NoneType]

The amount of memory required by the function (e.g: 1G, 1000)

memory_swap typing.Union[int, str, NoneType]

The amount of swap memory required by the function (e.g: 1G, 1000)

timeout typing.Optional[int]

The maximum time the function is allowed to run (e.g: 60)


def synthetic_model

Decorator that defines a Bauplan Synthetic Model.

Parameters

name str

The name of the model. Defaults to the function name.

columns typing.List[str]

The columns of the synthetic model (e.g. ['id', 'name', 'email']).