bauplan
class
Client
A consistent interface to access Bauplan operations.
Using the client
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query('SELECT avg(age) AS average_age FROM bauplan.titanic limit 1', ref='main')
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Notes on authentication
# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'
os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'
os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile
# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')
# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')
Handling Exceptions
Catalog operations (branch/table methods) raise a subclass of bauplan.exceptions.BauplanError
that mirror HTTP status codes.
- 400: InvalidDataError
- 401: UnauthorizedError
- 403: AccessDeniedError
- 404: ResourceNotFoundError e.g .ID doesn't match any records
- 404: ApiRouteError e.g. the given route doesn't exist
- 405: ApiMethodError e.g. POST on a route with only GET defined
- 409: UpdateConflictError e.g. creating a record with a name that already exists
- 429: TooManyRequestsError
Run/Query/Scan/Import operations raise a subclass of bauplan.exceptions.BauplanError
that represents, and also return a RunState
object containing details and logs:
JobError
e.g. something went wrong in a run/query/import/scan; includes error details
Run/import operations also return a state object that includes a job_status
and other details.
There are two ways to check status for run/import operations:
- try/except the JobError exception
- check the
state.job_status
attribute
Examples
state = client.run(...)
if state.job_status != "success":
...
Parameters
profileOptional[str]
The Bauplan config profile name to use to determine api_key.
api_keyOptional[str]
Your unique Bauplan API key; mutually exclusive with profile
. If not provided, fetch precedence is 1) environment BAUPLAN_API_KEY 2) .bauplan/config.yml
branchOptional[str]
The default branch to use for queries and runs. If not provided active_branch from the profile is used.
namespaceOptional[str]
The default namespace to use for queries and runs.
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for all the requests.
debugOptional[bool]
Whether to enable or disable debug mode for all the requests.
verboseOptional[bool]
Whether to enable or disable verbose mode for all the requests.
api_endpointOptional[str]
The Bauplan API endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_API_ENDPOINT 2) .bauplan/config.yml
catalog_endpointOptional[str]
The Bauplan catalog endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_CATALOG_ENDPOINT 2) .bauplan/config.yml
catalog_max_recordsOptional[int]
The maximum number of records to fetch, per page, from the catalog.
client_timeoutOptional[int]
The client timeout in seconds for all the requests.
envOptional[str]
The environment to use for all the requests. Default: 'prod'.
config_file_pathOptional[Union[str, Path]]
The path to the Bauplan config file to use. If not provided, fetch precedence is 1) environment BAUPLAN_CONFIG_PATH 2) ~/.bauplan/config.yml
user_session_tokenOptional[str]
Your unique Bauplan user session token.
feature_flagsOptional[Dict[str, Any]]
def
apply_table_creation_plan
Apply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype
Parameters
planUnion[Dict, TableCreatePlanState]
The plan to apply.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
cancel_job
EXPERIMENTAL: Cancel a job by ID.
Parameters
job_idstr
str:
def
create_branch
Create a new branch at a given ref. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.create_branch(
branch='my_branch_name',
from_ref='my_ref_or_branch_name',
if_not_exists=True,
)
Parameters
branchUnion[str, Branch]
The name of the new branch.
from_refUnion[str, Branch, Tag]
The name of the base branch; either a branch like "main" or ref like "main@[sha]".
if_not_existsbool
If set to True
, the branch will not be created if it already exists.
def
create_namespace
Create a new namespace at a given branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.create_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
properties={'k1': 'v1', 'k2': 'v2'},
if_not_exists=True,
)
Parameters
namespaceUnion[str, Namespace]
The name of the namespace.
branchUnion[str, Branch]
The name of the branch to create the namespace on.
commit_bodyOptional[str]
Optional, the commit body to attach to the operation.
commit_propertiesOptional[Dict[str, str]]
Optional, a list of properties to attach to the commit.
if_not_existsbool
If set to True
, the namespace will not be created if it already exists.
propertiesOptional[Dict[str, str]]
def
create_table
Create a table from an S3 location.
This operation will attempt to create a table based of schemas of N
parquet files found by a given search uri. This is a two step operation using
plan_table_creation
and apply_table_creation_plan
.
import bauplan
client = bauplan.Client()
table = client.create_table(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
Parameters
tableUnion[str, Table]
The table which will be created.
search_uristr
The location of the files to scan for schema.
branchOptional[Union[str, Branch]]
The branch name in which to create the table in.
namespaceOptional[Union[str, Namespace]]
Optional argument specifying the namespace. If not specified, it will be inferred based on table location or the default.
partitioned_byOptional[str]
Optional argument specifying the table partitioning.
replaceOptional[bool]
Replace the table if it already exists.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
create_tag
Create a new tag at a given ref.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.create_tag(
tag='my_tag',
from_ref='my_ref_or_branch_name',
)
Parameters
tagUnion[str, Tag]
The name of the new tag.
from_refUnion[str, Branch, Ref]
The name of the base branch; either a branch like "main" or ref like "main@[sha]".
if_not_existsbool
If set to True
, the tag will not be created if it already exists.
def
delete_branch
Delete a branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_branch('my_branch_name')
Returns: A boolean for if the branch was deleted.
Parameters
branchUnion[str, Branch]
The name of the branch to delete.
if_existsbool
If set to True
, the branch will not raise an error if it does not exist.
def
delete_namespace
Delete a namespace.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
)
Parameters
namespaceUnion[str, Namespace]
The name of the namespace to delete.
branchUnion[str, Branch]
The name of the branch to delete the namespace from.
if_existsbool
If set to True
, the namespace will not be deleted if it does not exist.
commit_bodyOptional[str]
Optional, the commit body to attach to the operation.
commit_propertiesOptional[Dict[str, str]]
Optional, a list of properties to attach to the commit.
propertiesOptional[Dict[str, str]]
def
delete_table
Drop a table.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_table(
table='my_table_name',
branch='my_branch_name',
namespace='my_namespace',
)
Parameters
tableUnion[str, Table]
The table to delete.
branchUnion[str, Branch]
The branch on which the table is stored.
namespaceOptional[Union[str, Namespace]]
The namespace of the table to delete.
if_existsbool
If set to True
, the table will not raise an error if it does not exist.
commit_bodyOptional[str]
Optional, the commit body message to attach to the commit.
commit_propertiesOptional[Dict[str, str]]
Optional, a list of properties to attach to the commit.
propertiesOptional[Dict[str, str]]
def
delete_tag
Delete a tag.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_tag('my_tag_name')
Parameters
tagUnion[str, Tag]
The name of the tag to delete.
if_existsbool
If set to True
, the tag will not raise an error if it does not exist.
def
get_branch
Get the branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
branch = client.get_branch('my_branch_name')
Parameters
branchUnion[str, Branch]
The name of the branch to retrieve.
def
get_branches
Get the available data branches in the Bauplan catalog.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
for branch in client.get_branches():
...
Parameters
nameOptional[str]
Filter the branches by name.
userOptional[str]
Filter the branches by user.
limitOptional[int]
Optional, max number of branches to get.
itersizeOptional[int]
Optional, overwrites profile.catalog_max_records
, the max number of objects per HTTP request.
def
get_commits
Get the commits for the target branch or ref.
Upon failure, raises bauplan.exceptions.BauplanError
Parameters
refUnion[str, Branch, Tag, Ref]
The ref or branch to get the commits from.
filter_by_messageOptional[str]
Optional, filter the commits by message (can be a string or a regex like '^abc.*$')
filter_by_author_usernameOptional[str]
Optional, filter the commits by author username (can be a string or a regex like '^abc.*$')
filter_by_author_nameOptional[str]
Optional, filter the commits by author name (can be a string or a regex like '^abc.*$')
filter_by_author_emailOptional[str]
Optional, filter the commits by author email (can be a string or a regex like '^abc.*$')
filter_by_authored_dateOptional[Union[str, datetime]]
Optional, filter the commits by the exact authored date.
filter_by_authored_date_start_atOptional[Union[str, datetime]]
Optional, filter the commits by authored date start at.
filter_by_authored_date_end_atOptional[Union[str, datetime]]
Optional, filter the commits by authored date end at.
filter_by_parent_hashOptional[str]
Optional, filter the commits by parent hash.
filter_by_propertiesOptional[Dict[str, str]]
Optional, filter the commits by commit properties.
filterOptional[str]
Optional, a CEL filter expression to filter the commits.
limitOptional[int]
Optional, max number of commits to get.
itersizeOptional[int]
Optional, overwrites profile.catalog_max_records
, the max number of objects per HTTP request.
def
get_job
EXPERIMENTAL: Get a job by ID or ID prefix.
Parameters
job_idstr
str:
def
get_job_context
EXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job
.
Parameters
jobsList[Union[str, Job]]
def
get_job_logs
EXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job
.
Parameters
job_id_prefixstr
str: The prefix of a Job ID (deprecated in favor of job
).
jobUnion[str, Job]
Union[str, Job]: A job ID, prefix of a job ID, a Job instance.
def
get_namespace
Get a namespace.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
namespace = client.get_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Parameters
namespaceUnion[str, Namespace]
The name of the namespace to get.
refUnion[str, Branch, Tag, Ref]
The ref, branch name or tag name to check the namespace on.
def
get_namespaces
Get the available data namespaces in the Bauplan catalog branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
for namespace in client.get_namespaces('my_ref_or_branch_name'):
...
Parameters
refUnion[str, Branch, Tag, Ref]
The ref, branch name or tag name to retrieve the namespaces from.
filter_by_nameOptional[str]
Optional, filter the namespaces by name.
limitOptional[int]
Optional, max number of namespaces to get.
itersizeOptional[int]
Optional, overwrites profile.catalog_max_records
, the max number of objects per HTTP request.
def
get_table
Get the table data and metadata for a table in the target branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
# get the fields and metadata for a table
table = client.get_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
# You can get the total number of rows this way.
num_records = table.records
# Or access the schema.
for c in table.fields:
...
Parameters
tableUnion[str, Table]
The table to retrieve.
refUnion[str, Branch, Tag, Ref]
The ref, branch name or tag name to get the table from.
namespaceOptional[Union[str, Namespace]]
The namespace of the table to retrieve.
include_rawbool
Whether or not to include the raw metadata.json object as a nested dict.
def
get_tables
Get the tables and views in the target branch.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
for table in client.get_tables('my_branch_name'):
...
Parameters
refUnion[str, Branch, Tag, Ref]
The ref or branch to get the tables from.
filter_by_nameOptional[str]
Optional, the table name to filter by.
filter_by_namespaceOptional[str]
Optional, the namespace to get filtered tables from.
namespaceOptional[Union[str, Namespace]]
DEPRECATED: Optional, the namespace to get filtered tables from.
include_rawbool
Whether or not to include the raw metadata.json object as a nested dict.
limitOptional[int]
Optional, max number of tables to get.
itersizeOptional[int]
Optional, overwrites profile.catalog_max_records
, the max number of objects per HTTP request.
def
get_tag
Get the tag.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
tag = client.get_tag('my_tag_name')
Parameters
tagUnion[str, Tag]
The name of the tag to retrieve.
def
get_tags
Get all the tags.
Upon failure, raises bauplan.exceptions.BauplanError
Parameters
filter_by_nameOptional[str]
Optional, filter the commits by message.
limitOptional[int]
Optional, max number of commits to get.
itersizeOptional[int]
Optional, overwrites profile.catalog_max_records
, the max number of objects per HTTP request.
def
has_branch
Check if a branch exists.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.has_branch('my_branch_name')
Parameters
branchUnion[str, Branch]
The name of the branch to check.
def
has_namespace
Check if a namespace exists.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.has_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Parameters
namespaceUnion[str, Namespace]
The name of the namespace to check.
refUnion[str, Branch, Tag, Ref]
The ref, branch name or tag name to check the namespace on.
def
has_table
Check if a table exists.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.has_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
Parameters
tableUnion[str, Table]
The table to retrieve.
refUnion[str, Branch, Tag, Ref]
The ref, branch name or tag name to get the table from.
namespaceOptional[Union[str, Namespace]]
The namespace of the table to check.
def
has_tag
Check if a tag exists.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.has_tag(
tag='my_tag_name',
)
Parameters
tagUnion[str, Tag]
The tag to retrieve.
def
import_data
Imports data into an already existing table.
import bauplan
client = bauplan.Client()
plan_state = client.import_data(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Parameters
tableUnion[str, Table]
Previously created table in into which data will be imported.
search_uristr
Uri which to scan for files to import.
branchOptional[Union[str, Branch]]
Branch in which to import the table.
namespaceOptional[Union[str, Namespace]]
Namespace of the table. If not specified, namespace will be infered from table name or default settings.
continue_on_errorbool
Do not fail the import even if 1 data import fails.
import_duplicate_filesbool
Ignore prevention of importing s3 files that were already imported.
best_effortbool
Don't fail if schema of table does not match.
previewOptional[Union[Literal[on, off, head, tail], str]]
Whether to enable or disable preview mode for the import.
debugOptional[bool]
Whether to enable or disable debug mode for the import.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
detachbool
Whether to detach the job and return immediately without waiting for the job to finish.
def
info
Fetch organization & account information.
Parameters
debugOptional[bool]
verboseOptional[bool]
client_timeoutOptional[Union[int, float]]
def
list_jobs
EXPERIMENTAL: List all jobs
A DateRange is an alias for tuple[Optional[datetime], Optional[datetime]]
, where the
first element is an "after" (start) filter and the second element is a "before" (end)
filter.
The filter_by_finish_time
parameter takes a DateRange and allows jobs with a finish time
later than "after" (if specified) and a finish time earlier than "before" (if specified),
or between both. If neither is specified, for example (None, None)
, then the behavior is
the same as not specifying the filter itself, for example filter_by_finish_time=None
.
Parameters
all_usersOptional[bool]
Optional[bool]: (Default value = None)
filter_by_idOptional[str]
Optional[str]: (Default value = None)
filter_by_statusOptional[Union[str, JobState]]
Optional[Union[str, JobState]]: (Default value = None)
filter_by_finish_timeOptional[DateRange]
Optional[DateRange]: (Default value = None)
def
merge_branch
Merge one branch into another.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.merge_branch(
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Parameters
source_refUnion[str, Branch, Tag]
The name of the merge source; either a branch like "main" or ref like "main@[sha]".
into_branchUnion[str, Branch]
The name of the merge target.
commit_messageOptional[str]
Optional, the commit message.
commit_bodyOptional[str]
Optional, the commit body.
commit_propertiesOptional[Dict[str, str]]
Optional, a list of properties to attach to the merge.
messageOptional[str]
propertiesOptional[Dict[str, str]]
def
plan_table_creation
Create a table import plan from an S3 location.
This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.
import bauplan
client = bauplan.Client()
plan_state = client.plan_table_creation(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Parameters
tableUnion[str, Table]
The table which will be created.
search_uristr
The location of the files to scan for schema.
branchOptional[Union[str, Branch]]
The branch name in which to create the table in.
namespaceOptional[Union[str, Namespace]]
Optional argument specifying the namespace. If not specified, it will be inferred based on table location or the default.
partitioned_byOptional[str]
Optional argument specifying the table partitioning.
replaceOptional[bool]
Replace the table if it already exists.
debugOptional[bool]
Whether to enable or disable debug mode.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
query
Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.
If you prefer to return the results as a pandas DataFrame, use
the to_pandas
function of pyarrow.Table.
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query(
query='SELECT avg(age) as average_age FROM bauplan.titanic',
ref='my_ref_or_branch_name',
)
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Parameters
querystr
The Bauplan query to execute.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to query from.
max_rowsOptional[int]
The maximum number of rows to return; default: None
(no limit).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode for the query.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
query_to_csv_file
Export the results of a SQL query to a file in CSV format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_csv_file(
path='/tmp/out.csv',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Parameters
pathUnion[str, Path]
The name or path of the file csv to write the results to.
querystr
The Bauplan query to execute.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to query from.
max_rowsOptional[int]
The maximum number of rows to return; default: None
(no limit).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
verboseOptional[bool]
Whether to enable or disable verbose mode for the query.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
query_to_generator
Execute a SQL query and return the results as a generator, where each row is a Python dictionary.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
res = client.query_to_generator(
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
for row in res:
... # handle results
Parameters
querystr
The Bauplan query to execute.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to query from.
max_rowsOptional[int]
The maximum number of rows to return; default: None
(no limit).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
as_jsonOptional[bool]
Whether to return the results as a JSON-compatible string (default: False
).
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode for the query.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
query_to_json_file
Export the results of a SQL query to a file in JSON format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_json_file(
path='/tmp/out.json',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Parameters
pathUnion[str, Path]
The name or path of the file json to write the results to.
querystr
The Bauplan query to execute.
file_formatOptional[Literal[json, jsonl]]
The format to write the results in; default: json
. Allowed values are 'json' and 'jsonl'.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to query from.
max_rowsOptional[int]
The maximum number of rows to return; default: None
(no limit).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
verboseOptional[bool]
Whether to enable or disable verbose mode for the query.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
query_to_parquet_file
Export the results of a SQL query to a file in Parquet format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
path='/tmp/out.parquet',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Parameters
pathUnion[str, Path]
The name or path of the file parquet to write the results to.
querystr
The Bauplan query to execute.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to query from.
max_rowsOptional[int]
The maximum number of rows to return; default: None
(no limit).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
verboseOptional[bool]
Whether to enable or disable verbose mode for the query.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
rename_branch
Rename an existing branch. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_branch(
branch='username.old_name',
new_branch='username.new_name',
)
Parameters
branchUnion[str, Branch]
The name of the branch to rename.
new_branchUnion[str, Branch]
The name of the new branch.
def
rename_tag
Rename an existing tag.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_tag(
tag='old_tag_name',
new_tag='new_tag_name',
)
Parameters
tagUnion[str, Tag]
The name of the tag to rename.
new_tagUnion[str, Tag]
The name of the new tag.
def
rerun
Re run a Bauplan project by its ID and return the state of the run. This is the equivalent of
running through the CLI the bauplan rerun
command.
Parameters
job_idstr
The Job ID of the previous run. This can be used to re-run a previous run, e.g., on a different branch.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name from which to rerun the project.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the job in. If not set, the job will be run in the default namespace.
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the run.
transactionOptional[Literal[on, off]]
Whether to enable or disable transaction mode for the run.
dry_runOptional[bool]
Whether to enable or disable dry-run mode for the run; models are not materialized.
strictOptional[Literal[on, off]]
Whether to enable or disable strict schema validation.
previewOptional[Union[Literal[on, off, head, tail], str]]
Whether to enable or disable preview mode for the run.
debugOptional[bool]
Whether to enable or disable debug mode for the run.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode for the run.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
def
revert_table
Revert a table to a previous state.
Upon failure, raises bauplan.exceptions.BauplanError
import bauplan
client = bauplan.Client()
assert client.revert_table(
table='my_table_name',
namespace='my_namespace',
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Parameters
tableUnion[str, Table]
The table to revert.
namespaceOptional[Union[str, Namespace]]
The namespace of the table to revert.
source_refUnion[str, Branch, Tag, Ref]
The name of the source ref; either a branch like "main" or ref like "main@[sha]".
into_branchUnion[str, Branch]
The name of the target branch where the table will be reverted.
replaceOptional[bool]
Optional, whether to replace the table if it already exists.
commit_bodyOptional[str]
Optional, the commit body message to attach to the operation.
commit_propertiesOptional[Dict[str, str]]
Optional, a list of properties to attach to the operation.
def
run
Run a Bauplan project and return the state of the run. This is the equivalent of
running through the CLI the bauplan run
command.
Parameters
project_dirOptional[str]
The directory of the project (where the bauplan_project.yml
or bauplan_project.yaml
file is located).
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name from which to run the project.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the job in. If not set, the job will be run in the default namespace.
parametersOptional[Dict[str, Optional[Union[str, int, float, bool]]]]
Parameters for templating into SQL or Python models.
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the run.
transactionOptional[Literal[on, off]]
Whether to enable or disable transaction mode for the run.
dry_runOptional[bool]
Whether to enable or disable dry-run mode for the run; models are not materialized.
strictOptional[Literal[on, off]]
Whether to enable or disable strict schema validation.
previewOptional[Union[Literal[on, off, head, tail], str]]
Whether to enable or disable preview mode for the run.
debugOptional[bool]
Whether to enable or disable debug mode for the run.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
verboseOptional[bool]
Whether to enable or disable verbose mode for the run.
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
detachbool
Whether to detach the run and return immediately instead of blocking on log streaming.
def
scan
Execute a table scan (with optional filters) and return the results as an arrow Table.
Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.
import bauplan
client = bauplan.Client()
# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
columns=['name'],
filters='age < 30',
)
Parameters
tableUnion[str, Table]
The table to scan.
refOptional[Union[str, Branch, Tag, Ref]]
The ref, branch name or tag name to scan from.
columnsOptional[List[str]]
The columns to return (default: None
).
filtersOptional[str]
The filters to apply (default: None
).
limitOptional[int]
The maximum number of rows to return (default: None
).
cacheOptional[Literal[on, off]]
Whether to enable or disable caching for the query.
connectorOptional[str]
The connector type for the model (defaults to Bauplan). Allowed values are 'snowflake' and 'dremio'.
connector_config_keyOptional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_uriOptional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
namespaceOptional[Union[str, Namespace]]
The Namespace to run the scan in. If not set, the scan will be run in the default namespace for your account.
debugOptional[bool]
Whether to enable or disable debug mode for the query.
priorityOptional[int]
Optional job priority (1-10, where 10 is highest priority).
client_timeoutOptional[Union[int, float]]
seconds to timeout; this also cancels the remote job execution.
class
InfoState
class
JobState
class
JobStatus
The status of a submitted job.
Parameters
canceledstr
cancelledstr
failedstr
rejectedstr
successstr
timeoutstr
heartbeat_failurestr
unknownstr
class
Model
Represents a model (dataframe/table representing a DAG step) as an input to a function.
e.g.
import bauplan
@bauplan.model()
def some_parent_model():
return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})
@bauplan.model()
def your_cool_model(
# parent models are passed as inputs, using bauplan.Model
# class
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
)
):
# Can return a pandas dataframe or a pyarrow table
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.
The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:
import bauplan
@bauplan.model()
def your_cool_model(
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
connector='dremio',
connector_config_key='bauplan',
)
):
# parent_0 inside the function body
# will still be an Arrow table: the user code
# should still be the same as the data is moved
# transparently by Bauplan from an engine to the function.
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
Parameters
namestr
The name of the model.
columnstyping.Optional[typing.List[str]]
The list of columns in the model. If the arg is not provided, the model will load all columns.
filtertyping.Optional[str]
The optional filter for the model. Defaults to None.
reftyping.Optional[str]
The optional reference to the model. Defaults to None.
connectortyping.Optional[str]
The connector type for the model (defaults to Bauplan SQL). Allowed values are 'snowflake' and 'dremio'.
connector_config_keytyping.Optional[str]
The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>
.
connector_config_urityping.Optional[str]
Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio
.
class
OrganizationInfo
def
Parameter
_ParameterKwargTracker is a callable object that is used to track the parameters that are used in a model. Because default function args are evaluated once and only once, we can build a set of the args for kwargs.
Parameters
param_namestr
class
Profile
A configuration profile.
Parameters
nameOptional[str]
api_keyOptional[str]
user_session_tokenOptional[str]
project_dirOptional[Union[str, Path]]
branchOptional[str]
namespaceOptional[str]
cacheOptional[Literal['on', 'off']]
debugOptional[bool]
verboseOptional[bool]
api_endpointOptional[str]
catalog_endpointOptional[str]
catalog_max_recordsOptional[int]
client_timeoutOptional[int]
envOptional[str]
config_file_pathOptional[Union[str, Path]]
feature_flagsOptional[Dict[str, Any]]
def
load_profile
Load a profile from a profile file.
Parameters
profileOptional[str]
api_keyOptional[str]
user_session_tokenOptional[str]
project_dirOptional[Union[str, Path]]
branchOptional[str]
namespaceOptional[str]
cacheOptional[Literal['on', 'off']]
debugOptional[bool]
verboseOptional[bool]
api_endpointOptional[str]
catalog_endpointOptional[str]
catalog_max_recordsOptional[int]
client_timeoutOptional[int]
envOptional[str]
config_file_pathOptional[Union[str, Path]]
feature_flagsOptional[Dict[str, Any]]
class
RunnerNodeInfo
class
UserInfo
def
expectation
Decorator that defines a Bauplan expectation.
An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it
is commonly used to perform data validation and data quality checks when running a pipeline.
Expectations takes as input the table(s) they are validating and return a boolean indicating
whether the expectation is met or not. A Python expectation needs a Python environment to run,
which is defined using the python
decorator, e.g.:
import bauplan
from bauplan.standard_expectations import expect_column_no_nulls
@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
data=bauplan.Model(
'join_dataset',
columns=['anomaly']
)
):
# your data validation code here
return expect_column_no_nulls(data, 'anomaly')
Parameters
def
model
Decorator that specifies a Bauplan model.
A model is a function from one (or more) dataframe-like object(s)
to another dataframe-like object: it is used to define a transformation in a
pipeline. Models are chained together implicitly by using them as inputs to
their children. A Python model needs a Python environment to run, which is defined
using the python
decorator, e.g.:
import bauplan
@bauplan.model(
columns=['*'],
materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
data=bauplan.Model(
'iot_kaggle',
columns=['*'],
filter="motion='false'"
)
):
# your code here
return data
Parameters
nametyping.Optional[str]
the name of the model (e.g. 'users'); if missing the function name is used.
columnstyping.Optional[typing.List[str]]
the columns of the output dataframe after the model runs (e.g. ['id', 'name', 'email']). Use ['*'] as a wildcard.
partitioned_bytyping.Union[str, typing.List[str], typing.Tuple[str, ...], NoneType]
the columns to partition the data by.
materialization_strategytyping.Optional[typing.Literal['NONE', 'REPLACE', 'APPEND']]
the materialization strategy to use.
cache_strategytyping.Optional[typing.Literal['NONE', 'DEFAULT']]
the cache strategy to use.
internet_accesstyping.Optional[bool]
whether the model requires internet access.
def
pyspark
Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args
Parameters
versiontyping.Optional[str]
the version string of pyspark
conftyping.Optional[typing.Dict[str, str]]
A dict containing the pyspark config
def
python
Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.
Parameters
versiontyping.Optional[str]
The python version for the interpreter (e.g. '3.11'
).
piptyping.Optional[typing.Dict[str, str]]
A dictionary of dependencies (and versions) required by the function (e.g. {'requests': '2.26.0'}
).
def
resources
Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.
Parameters
cpustyping.Union[int, float, NoneType]
The number of CPUs required by the function (e.g: 0.5
)
memorytyping.Union[int, str, NoneType]
The amount of memory required by the function (e.g: 1G
, 1000
)
memory_swaptyping.Union[int, str, NoneType]
The amount of swap memory required by the function (e.g: 1G
, 1000
)
timeouttyping.Optional[int]
The maximum time the function is allowed to run (e.g: 60
)
def
synthetic_model
Decorator that defines a Bauplan Synthetic Model.
Parameters
namestr
The name of the model. Defaults to the function name.
columnstyping.List[str]
The columns of the synthetic model (e.g. ['id', 'name', 'email']
).