bauplan package

Submodules

Module contents

class bauplan.Client(profile: str | None = None, api_key: str | None = None, branch: str | None = None, namespace: str | None = None, cache: 'on' | 'off' | None = None, debug: bool | None = None, verbose: bool | None = None, args: dict[str, str] | None = None, api_endpoint: str | None = None, catalog_endpoint: str | None = None, catalog_max_records: int | None = None, client_timeout: int | None = None, env: str | None = None, config_file_path: str | Path | None = None, user_session_token: str | None = None, feature_flags: dict[str, Any] | None = None)

Bases: _OperationContainer

A consistent interface to access Bauplan operations.

Using the client

import bauplan
client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query('SELECT sum(trips) trips FROM travel_table', branch_name='main')

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

Notes on authentication

# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'

os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'

os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile

# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')

# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')

Handling Exceptions

Catalog operations (branch/table methods) raise a subclass of bauplan.exceptions.BauplanError that mirror HTTP status codes.

  • 400: InvalidDataError

  • 401: UnauthorizedError

  • 403: AccessDeniedError

  • 404: ResourceNotFoundError e.g .ID doesn’t match any records

  • 404: ApiRouteError e.g. the given route doesn’t exist

  • 405: ApiMethodError e.g. POST on a route with only GET defined

  • 409: UpdateConflictError e.g. creating a record with a name that already exists

  • 429: TooManyRequestsError

Run/Query/Scan/Import operations raise a subclass of bauplan.exceptions.BauplanError that represents, and also return a RunState object containing details and logs:

  • JobError e.g. something went wrong in a run/query/import/scan; includes error details

Run/import operations also return a state object that includes a job_status and other details. There are two ways to check status for run/import operations:

  1. try/except the JobError exception

  2. check the state.job_status attribute

Examples:

try:
    state = client.run(...)
    state = client.query(...)
    state = client.scan(...)
    state = client.plan_table_creation(...)
except bauplan.exceptions.JobError as e:
    ...

state = client.run(...)
if state.job_status != "success":
    ...
Parameters:
profile: str | None = None

(optional) The Bauplan config profile name to use to determine api_key.

api_key: str | None = None

(optional) Your unique Bauplan API key; mutually exclusive with profile. If not provided, fetch precedence is 1) environment BAUPLAN_API_KEY 2) .bauplan/config.yml

branch: str | None = None

(optional) The default branch to use for queries and runs. If not provided active_branch from the profile is used.

namespace: str | None = None

(optional) The default namespace to use for queries and runs.

cache: 'on' | 'off' | None = None

(optional) Whether to enable or disable caching for all the requests.

debug: bool | None = None

(optional) Whether to enable or disable debug mode for all the requests.

verbose: bool | None = None

(optional) Whether to enable or disable verbose mode for all the requests.

args: dict[str, str] | None = None

(optional) Additional arguments to pass to all the requests.

api_endpoint: str | None = None

(optional) The Bauplan API endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_API_ENDPOINT 2) .bauplan/config.yml

catalog_endpoint: str | None = None

(optional) The Bauplan catalog endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_CATALOG_ENDPOINT 2) .bauplan/config.yml

catalog_max_records: int | None = None

(optional) The maximum number of records to fetch, per page, from the catalog.

client_timeout: int | None = None

(optional) The client timeout in seconds for all the requests.

env: str | None = None

(optional) The environment to use for all the requests. Default: ‘prod’.

config_file_path: str | Path | None = None

(optional) The path to the Bauplan config file to use. If not provided, fetch precedence is 1) environment BAUPLAN_CONFIG_PATH 2) ~/.bauplan/config.yml

user_session_token: str | None = None

(optional) Your unique Bauplan user session token.

apply_table_creation_plan(plan: dict | TableCreatePlanState, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) TableCreatePlanApplyState

Apply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype

Parameters:
plan: dict | TableCreatePlanState

The plan to apply.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

dict of arbitrary args to pass to the backend.

verbose: bool | None = None

Whether to enable or disable verbose mode.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Raises:

TableCreatePlanApplyStatusError – if the table creation plan apply fails.

:return The plan state.

create_branch(branch: str | Branch, from_ref: str | Branch | Ref) Branch

Create a new branch at a given ref.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_branch(
    branch='my_branch_name',
    from_ref='my_ref_or_branch_name',
)
Parameters:
branch: str | Branch

The name of the new branch.

from_ref: str | Branch | Ref

The name of the base branch; either a branch like “main” or ref like “main@[sha]”.

Returns:

The created branch object.

create_namespace(namespace: str | Namespace, branch: str | Branch) Namespace

Create a new namespace at a given branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_namespace(
    namespace='my_namespace_name'
    branch='my_branch_name',
)
Parameters:
namespace: str | Namespace

The name of the namespace.

branch: str | Branch

The name of the branch to create the namespace on.

Returns:

The created namespace.

create_table(table: str | Table, search_uri: str, branch: str | Branch | None = None, namespace: str | Namespace | None = None, partitioned_by: str | None = None, replace: bool | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) Table

Create a table from an S3 location.

This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. This is a two step operation using plan_table_creation `` and  ``apply_table_creation_plan.

import bauplan
client = bauplan.Client()

table = client.create_table(
    table='my_table_name',
    search_uri='s3://path/to/my/files/*.parquet',
    ref='my_ref_or_branch_name',
)
Parameters:
table: str | Table

The table which will be created.

search_uri: str

The location of the files to scan for schema.

branch: str | Branch | None = None

The branch name in which to create the table in.

namespace: str | Namespace | None = None

Optional argument specifying the namespace. If not. specified, it will be inferred based on table location or the default. namespace.

partitioned_by: str | None = None

Optional argument specifying the table partitioning.

replace: bool | None = None

Replace the table if it already exists.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

dict of arbitrary args to pass to the backend.

verbose: bool | None = None

Whether to enable or disable verbose mode.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Raises:
Returns:

Table

delete_branch(branch: str | Branch) bool

Delete a branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_branch('my_branch_name')
Parameters:
branch: str | Branch

The name of the branch to delete.

Returns:

A boolean for if the branch was deleted.

delete_namespace(namespace: str | Namespace, branch: str | Branch) bool

Delete a namespace.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_namespace(
    namespace='my_namespace_name',
    branch='my_branch_name',
)
Parameters:
namespace: str | Namespace

The name of the namespace to delete.

form_branch

The name of the branch to delete the namespace from.

Returns:

A boolean for if the namespace was deleted.

delete_table(table: str | Table, branch: str | Branch) bool

Drop a table.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_table(
    table='my_table_name',
    branch='my_branch_name',
)
Parameters:
table: str | Table

The table to delete.

branch: str | Branch

The branch on which the table is stored.

Returns:

A boolean for if the table was deleted.

get_branch(branch: str | Branch) Branch

Get the branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
branch = client.get_branch('my_branch_name')
print(branch.hash)
Parameters:
branch: str | Branch

The name of the branch to retrieve.

Returns:

A Branch object.

get_branches(name: str | None = None, user: str | None = None, limit: int | None = None, itersize: int | None = None) GetBranchesResponse

Get the available data branches in the Bauplan catalog.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for branch in client.get_branches():
    print(branch.name, branch.hash)
Parameters:
name: str | None = None

Filter the branches by name.

user: str | None = None

Filter the branches by user.

limit: int | None = None

Optional, max number of branches to get.

itersize: int | None = None

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

Returns:

A BranchesIterableResponse object.

get_commits(ref: str | Branch | Ref, *, filter_by_message: str | None = None, filter_by_author_name: str | None = None, filter_by_author_email: str | None = None, filter_by_authored_date: str | datetime | None = None, filter_by_authored_date_start_at: str | datetime | None = None, filter_by_authored_date_end_at: str | datetime | None = None, filter_by_committer_name: str | None = None, filter_by_committer_email: str | None = None, filter_by_committed_date: str | datetime | None = None, filter_by_committed_date_start_at: str | datetime | None = None, filter_by_committed_date_end_at: str | datetime | None = None, filter_by_parent_hash: str | None = None, filter: str | None = None, limit: int | None = None, itersize: int | None = None) GetCommitsResponse

Get the commits for the target branch or ref.

Upon failure, raises bauplan.exceptions.BauplanError

Parameters:
ref: str | Branch | Ref

The ref or branch to get the commits from.

filter_by_message: str | None = None

Optional, filter the commits by message.

filter_by_author_name: str | None = None

Optional, filter the commits by author name.

filter_by_author_email: str | None = None

Optional, filter the commits by author email.

filter_by_authored_date: str | datetime | None = None

Optional, filter the commits by the exact authored date.

filter_by_authored_date_start_at: str | datetime | None = None

Optional, filter the commits by authored date start at.

filter_by_authored_date_end_at: str | datetime | None = None

Optional, filter the commits by authored date end at.

filter_by_committer_name: str | None = None

Optional, filter the commits by committer name.

filter_by_committer_email: str | None = None

Optional, filter the commits by committer email.

filter_by_committed_date: str | datetime | None = None

Optional, filter the commits by the exact committed date.

filter_by_committed_date_start_at: str | datetime | None = None

Optional, filter the commits by committed date start at.

filter_by_committed_date_end_at: str | datetime | None = None

Optional, filter the commits by committed date end at.

filter_by_parent_hash: str | None = None

Optional, filter the commits by parent hash.

filter: str | None = None

Optional, a CEL filter expression to filter the commits.

limit: int | None = None

Optional, max number of commits to get.

itersize: int | None = None

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

Returns:

A CommitsIterableResponse object.

get_job(job_id: str) Job

EXPERIMENTAL: Get a job by ID.

get_job_logs(job_id_prefix: str) list[JobLog]

EXPERIMENTAL: Get logs for a job by ID prefix.

get_namespace(namespace: str | Namespace, ref: str | Branch | Ref) Namespace

Get a namespace.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

namespace =  client.get_namespace(
    namespace='my_namespace_name',
    ref='my_ref_or_branch_name',
)
Parameters:
namespace: str | Namespace

The name of the namespace to get.

ref: str | Branch | Ref

The ref or branch to check the namespace on.

Returns:

A Namespace object.

get_namespaces(ref: str | Branch | Ref, *, filter_by_name: str | None = None, limit: int | None = None, itersize: int | None = None) GetNamespacesResponse

Get the available data namespaces in the Bauplan catalog branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for namespace in client.get_namespaces('my_namespace_name'):
    print(namespace.name)
Parameters:
ref: str | Branch | Ref

The ref or branch to retrieve the namespaces from.

filter_by_name: str | None = None

Optional, filter the namespaces by name.

limit: int | None = None

Optional, max number of namespaces to get.

itersize: int | None = None

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

Yield:

A Namespace object.

get_table(table: str | Table, ref: str | Ref | Branch, include_raw: bool = False) TableWithMetadata

Get the table data and metadata for a table in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# get the fields and metadata for a table
table = client.get_table(
    table='my_table_name',
    ref='my_ref_or_branch_name',
)

# loop through the fields and print their name, required, and type
for c in table.fields:
    print(c.name, c.required, c.type)

# show the number of records in the table
print(table.records)
Parameters:
ref: str | Ref | Branch

The ref or branch to get the table from.

table: str | Table

The table to retrieve.

include_raw: bool = False

Whether or not to include the raw metadata.json object as a nested dict.

Returns:

a TableWithMetadata object, optionally including the raw metadata.json object.

get_tables(ref: str | Branch | Ref, *, filter_by_name: str | None = None, filter_by_namespace: str | None = None, namespace: str | Namespace | None = None, limit: int | None = None, itersize: int | None = None) GetTablesResponse

Get the tables and views in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
for table in client.get_tables('my_ref_or_branch_name'):
    print(table.name, table.kind)
Parameters:
ref: str | Branch | Ref

The ref or branch to get the tables from.

filter_by_name: str | None = None

Optional, the table name to filter by.

filter_by_namespace: str | None = None

Optional, the namespace to get filtered tables from.

namespace: str | Namespace | None = None

DEPRECATED: Optional, the namespace to get filtered tables from.

limit: int | None = None

Optional, max number of tables to get.

itersize: int | None = None

Optional, overwrites profile.catalog_max_records, the max number of objects per HTTP request.

Returns:

A TableTablesIterableResponse object.

has_branch(branch: str | Branch) bool

Check if a branch exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_branch('my_branch_name')
Parameters:
branch: str | Branch

The name of the branch to check.

Returns:

A boolean for if the branch exists.

has_namespace(namespace: str | Namespace, ref: str | Branch | Ref) bool

Check if a namespace exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_namespace(
    namespace='my_namespace_name',
    ref='my_ref_or_branch_name',
)
Parameters:
namespace: str | Namespace

The name of the namespace to check.

ref: str | Branch | Ref

The ref or branch to check the namespace on.

Returns:

A boolean for if the namespace exists.

has_table(table: str | Table, ref: str | Ref | Branch) bool

Check if a table exists.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.has_table(
    table='my_table_name',
    ref='my_ref_or_branch_name',
)
Parameters:
ref: str | Ref | Branch

The ref or branch to get the table from.

table: str | Table

The table to retrieve.

Returns:

A boolean for if the table exists.

import_data(table: str | Table, search_uri: str, branch: str | Branch | None = None, namespace: str | Namespace | None = None, continue_on_error: bool = False, import_duplicate_files: bool = False, best_effort: bool = False, preview: 'on' | 'off' | 'head' | 'tail' | str | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) TableDataImportState

Imports data into an already existing table.

import bauplan
client = bauplan.Client()

plan_state = client.import_data(
    table='my_table_name',
    search_uri='s3://path/to/my/files/*.parquet',
    branch='my_branch_name',
)
if plan_state.error:
    plan_error_action(...)
success_action(plan_state.plan)
Parameters:
table: str | Table

Previously created table in into which data will be imported.

search_uri: str

Uri which to scan for files to import.

branch: str | Branch | None = None

Branch in which to import the table.

namespace: str | Namespace | None = None

Namespace of the table. If not specified, namespace will be infered from table name or default settings.

continue_on_error: bool = False

Do not fail the import even if 1 data import fails.

import_duplicate_files: bool = False

Ignore prevention of importing s3 files that were already imported.

best_effort: bool = False

Don’t fail if schema of table does not match.

transformation_query

Optional duckdb compliant query applied on each parquet file. Use original_table as the table in the query.

preview: 'on' | 'off' | 'head' | 'tail' | str | None = None

Whether to enable or disable preview mode for the import.

debug: bool | None = None

Whether to enable or disable debug mode for the import.

args: dict[str, str] | None = None

dict of arbitrary args to pass to the backend.

verbose: bool | None = None

Whether to enable or disable verbose mode.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The plan state.

info(debug: bool | None = None, verbose: bool | None = None, client_timeout: int | float | None = None, **kwargs: Any) InfoState

Fetch organization & account information.

list_jobs() list[Job]

EXPERIMENTAL: List all jobs

merge_branch(source_ref: str | Branch | Ref, into_branch: str | Branch) bool

Merge one branch into another.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert merge_branch(
    source_ref='my_ref_or_branch_name',
    into_branch='main',
)
Parameters:
source_ref: str | Branch | Ref

The name of the merge source; either a branch like “main” or ref like “main@[sha]”.

into_branch: str | Branch

The name of the merge target.

Returns:

a boolean for whether the merge worked.

plan_table_creation(table: str | Table, search_uri: str, branch: str | Branch | None = None, namespace: str | Namespace | None = None, partitioned_by: str | None = None, replace: bool | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) TableCreatePlanState

Create a table import plan from an S3 location.

This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.

import bauplan
client = bauplan.Client()

plan_state = client.plan_table_creation(
    table='my_table_name',
    search_uri='s3://path/to/my/files/*.parquet',
    ref='my_ref_or_branch_name',
)
if plan_state.error:
    plan_error_action(...)
success_action(plan_state.plan)
Parameters:
table: str | Table

The table which will be created.

search_uri: str

The location of the files to scan for schema.

branch: str | Branch | None = None

The branch name in which to create the table in.

namespace: str | Namespace | None = None

Optional argument specifying the namespace. If not. specified, it will be inferred based on table location or the default. namespace.

partitioned_by: str | None = None

Optional argument specifying the table partitioning.

replace: bool | None = None

Replace the table if it already exists.

debug: bool | None = None

Whether to enable or disable debug mode.

args: dict[str, str] | None = None

dict of arbitrary args to pass to the backend.

verbose: bool | None = None

Whether to enable or disable verbose mode.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Raises:

TableCreatePlanStatusError – if the table creation plan fails.

Returns:

The plan state.

query(query: str, ref: str | Branch | Ref | None = None, max_rows: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) Table

Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.

If you prefer to return the results as a pandas DataFrame, use the to_pandas function of pyarrow.Table.

import bauplan

client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query(
    query='SELECT c1 FROM my_table',
    ref='my_ref_or_branch_name',
)

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Parameters:
query: str

The Bauplan query to execute.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

max_rows: int | None = None

The maximum number of rows to return; default: None (no limit).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

Additional arguments to pass to the query (default: None).

verbose: bool | None = None

Whether to enable or disable verbose mode for the query.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The query results as a pyarrow.Table.

query_to_csv_file(path: str | Path, query: str, ref: str | Branch | Ref | None = None, max_rows: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None, **kwargs: Any) Path

Export the results of a SQL query to a file in CSV format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_csv_file(
    path='./my.csv',
    query='SELECT c1 FROM my_table',
    ref='my_ref_or_branch_name',
):
Parameters:
path: str | Path

The name or path of the file csv to write the results to.

query: str

The Bauplan query to execute.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

max_rows: int | None = None

The maximum number of rows to return; default: None (no limit).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

Additional arguments to pass to the query (default: None).

verbose: bool | None = None

Whether to enable or disable verbose mode for the query.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The path of the file written.

query_to_generator(query: str, ref: str | Branch | Ref | None = None, max_rows: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, as_json: bool | None = False, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) Generator[dict[str, Any], None, None]

Execute a SQL query and return the results as a generator, where each row is a Python dictionary.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
res = client.query_to_generator(
    query='SELECT c1 FROM my_table',
    ref='my_ref_or_branch_name',
)
for row in res:
    # do logic
Parameters:
query: str

The Bauplan query to execute.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

max_rows: int | None = None

The maximum number of rows to return; default: None (no limit).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

as_json: bool | None = False

Whether to return the results as a JSON-compatible string (default: False).

args: dict[str, str] | None = None

Additional arguments to pass to the query (default: None).

verbose: bool | None = None

Whether to enable or disable verbose mode for the query.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Yield:

A dictionary representing a row of query results.

query_to_json_file(path: str | Path, query: str, file_format: 'json' | 'jsonl' | None = 'json', ref: str | Branch | Ref | None = None, max_rows: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) Path

Export the results of a SQL query to a file in JSON format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_json_file(
    path='./my.json',
    query='SELECT c1 FROM my_table',
    ref='my_ref_or_branch_name',
):
Parameters:
path: str | Path

The name or path of the file json to write the results to.

query: str

The Bauplan query to execute.

file_format: 'json' | 'jsonl' | None = 'json'

The format to write the results in; default: json. Allowed values are ‘json’ and ‘jsonl’.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

max_rows: int | None = None

The maximum number of rows to return; default: None (no limit).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

Additional arguments to pass to the query (default: None).

verbose: bool | None = None

Whether to enable or disable verbose mode for the query.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The path of the file written.

query_to_parquet_file(path: str | Path, query: str, ref: str | Branch | Ref | None = None, max_rows: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None, **kwargs: Any) Path

Export the results of a SQL query to a file in Parquet format.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
    path='./my.parquet',
    query='SELECT c1 FROM my_table',
    ref='my_ref_or_branch_name',
):
Parameters:
path: str | Path

The name or path of the file parquet to write the results to.

query: str

The Bauplan query to execute.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

max_rows: int | None = None

The maximum number of rows to return; default: None (no limit).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

Additional arguments to pass to the query (default: None).

verbose: bool | None = None

Whether to enable or disable verbose mode for the query.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The path of the file written.

rerun(job_id: str, ref: str | Branch | Ref | None = None, namespace: str | Namespace | None = None, cache: 'on' | 'off' | None = None, transaction: 'on' | 'off' | None = None, dry_run: bool | None = None, strict: 'on' | 'off' | None = None, preview: 'on' | 'off' | 'head' | 'tail' | str | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) ReRunState

Re run a Bauplan project by its ID and return the state of the run. This is the equivalent of running through the CLI the bauplan rerun command.

Parameters:
job_id: str

The Job ID of the previous run. This can be used to re-run a previous run, e.g., on a different branch.

ref: str | Branch | Ref | None = None

The ref or branch name to read.

namespace: str | Namespace | None = None

The Namespace to run the job in. If not set, the job will be run in the default namespace.

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the run.

transaction: 'on' | 'off' | None = None

Whether to enable or disable transaction mode for the run.

dry_run: bool | None = None

Whether to enable or disable dry-run mode for the run; models are not materialized.

strict: 'on' | 'off' | None = None

Whether to enable or disable strict schema validation.

preview: 'on' | 'off' | 'head' | 'tail' | str | None = None

Whether to enable or disable preview mode for the run.

debug: bool | None = None

Whether to enable or disable debug mode for the run.

args: dict[str, str] | None = None

Additional arguments (optional).

verbose: bool | None = None

Whether to enable or disable verbose mode for the run.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The state of the run.

run(project_dir: str | None = None, ref: str | Branch | Ref | None = None, namespace: str | Namespace | None = None, parameters: dict[str, str | int | float | bool | None] | None = None, cache: 'on' | 'off' | None = None, transaction: 'on' | 'off' | None = None, dry_run: bool | None = None, strict: 'on' | 'off' | None = None, preview: 'on' | 'off' | 'head' | 'tail' | str | None = None, debug: bool | None = None, args: dict[str, str] | None = None, verbose: bool | None = None, client_timeout: int | float | None = None) RunState

Run a Bauplan project and return the state of the run. This is the equivalent of running through the CLI the bauplan run command.

Parameters:
project_dir: str | None = None

The directory of the project (where the bauplan_project.yml file is located).

ref: str | Branch | Ref | None = None

The ref or branch name to read.

namespace: str | Namespace | None = None

The Namespace to run the job in. If not set, the job will be run in the default namespace.

parameters: dict[str, str | int | float | bool | None] | None = None

Parameters for templating into SQL or Python models.

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the run.

transaction: 'on' | 'off' | None = None

Whether to enable or disable transaction mode for the run.

dry_run: bool | None = None

Whether to enable or disable dry-run mode for the run; models are not materialized.

strict: 'on' | 'off' | None = None

Whether to enable or disable strict schema validation.

preview: 'on' | 'off' | 'head' | 'tail' | str | None = None

Whether to enable or disable preview mode for the run.

debug: bool | None = None

Whether to enable or disable debug mode for the run.

args: dict[str, str] | None = None

Additional arguments (optional).

verbose: bool | None = None

Whether to enable or disable verbose mode for the run.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The state of the run.

scan(table: str | Table, ref: str | Branch | Ref | None = None, columns: list[str] | None = None, filters: str | None = None, limit: int | None = None, cache: 'on' | 'off' | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | Namespace | None = None, debug: bool | None = None, args: dict[str, str] | None = None, client_timeout: int | float | None = None, **kwargs: Any) Table

Execute a table scan (with optional filters) and return the results as an arrow Table.

Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.

import bauplan
client = bauplan.Client()

# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
    table='my_table_name',
    ref='my_ref_or_branch_name',
    columns=['c1'],
    filters='c2 > 10',
)
Parameters:
table: str | Table

The table to scan.

ref: str | Branch | Ref | None = None

The ref or branch name to read data from.

columns: list[str] | None = None

The columns to return (default: None).

filters: str | None = None

The filters to apply (default: None).

limit: int | None = None

The maximum number of rows to return (default: None).

cache: 'on' | 'off' | None = None

Whether to enable or disable caching for the query.

connector: str | None = None

The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

namespace: str | Namespace | None = None

The Namespace to run the scan in. If not set, the scan will be run in the default namespace for your account.

debug: bool | None = None

Whether to enable or disable debug mode for the query.

args: dict[str, str] | None = None

dict of arbitrary args to pass to the backend.

client_timeout: int | float | None = None

seconds to timeout; this also cancels the remote job execution.

Returns:

The scan results as a pyarrow.Table.

class bauplan.InfoState(*, client_version: str | None = None, organization: OrganizationInfo | None = None, user: UserInfo | None = None, runners: list[RunnerNodeInfo] | None = None)

Bases: _BauplanData

client_version : str | None
model_computed_fields : ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields : ClassVar[Dict[str, FieldInfo]] = {'client_version': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'organization': FieldInfo(annotation=Union[OrganizationInfo, NoneType], required=False, default=None), 'runners': FieldInfo(annotation=Union[List[RunnerNodeInfo], NoneType], required=False, default=None), 'user': FieldInfo(annotation=Union[UserInfo, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

organization : OrganizationInfo | None
runners : List[RunnerNodeInfo] | None
user : UserInfo | None
class bauplan.JobStatus(canceled: str = 'CANCELLED', cancelled: str = 'CANCELLED', failed: str = 'FAILED', rejected: str = 'REJECTED', success: str = 'SUCCESS', timeout: str = 'TIMEOUT', unknown: str = 'UNKNOWN')

Bases: object

canceled : str = 'CANCELLED'
cancelled : str = 'CANCELLED'
failed : str = 'FAILED'
rejected : str = 'REJECTED'
success : str = 'SUCCESS'
timeout : str = 'TIMEOUT'
unknown : str = 'UNKNOWN'
class bauplan.Model(name: str, columns: list[str] | None = None, filter: str | None = None, ref: str | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, **kwargs: Any)

Bases: object

Represents a model (dataframe/table representing a DAG step) as an input to a function.

e.g.

@bauplan.model()
def some_parent_model():
    return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})

@bauplan.model()
def your_cool_model(
    # parent models are passed as inputs, using bauplan.Model
    # class
    parent_0=bauplan.Model(
        'some_parent_model',
        columns=['bar'],
        filter='bar > 1',
    )
):
    # Can return a pandas dataframe or a pyarrow table
    return pyarrow.Table.from_pandas(
        pd.DataFrame({
            'foo': parent_0['bar'] * 2,
        })
    )

Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.

The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:

@bauplan.model()
def your_cool_model(
    parent_0=bauplan.Model(
        'some_parent_model',
        columns=['bar'],
        filter='bar > 1',
        connector='dremio',
        connector_config_key='bauplan',
    )
):
    # parent_0 inside the function body
    # will still be an Arrow table: the user code
    # should still be the same as the data is moved
    # transparently by Bauplan from an engine to the function.
    return pyarrow.Table.from_pandas(
        pd.DataFrame({
            'foo': parent_0['bar'] * 2,
        })
    )
Parameters:
name: str

The name of the model.

columns: list[str] | None = None

The list of columns in the model. If the arg is not provided, the model will load all columns.

filter: str | None = None

The optional filter for the model. Defaults to None.

ref: str | None = None

The optional reference to the model. Defaults to None.

connector: str | None = None

The connector type for the model (defaults to Bauplan SQL). Allowed values are ‘snowflake’ and ‘dremio’.

connector_config_key: str | None = None

The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

connector_config_uri: str | None = None

Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

class bauplan.OrganizationInfo(*, id: str | None = None, name: str | None = None, slug: str | None = None, default_parameter_secret_key: str | None = None, default_parameter_secret_public_key: str | None = None)

Bases: _BauplanData

default_parameter_secret_key : str | None
default_parameter_secret_public_key : str | None
id : str | None
model_computed_fields : ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields : ClassVar[Dict[str, FieldInfo]] = {'default_parameter_secret_key': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'default_parameter_secret_public_key': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'id': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'name': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'slug': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

name : str | None
slug : str | None
class bauplan.Profile(name: 'Optional[str]' = None, api_key: 'Optional[str]' = None, user_session_token: 'Optional[str]' = None, project_dir: 'Optional[Union[str, Path]]' = None, branch: 'Optional[str]' = None, namespace: 'Optional[str]' = None, cache: "Optional[Literal['on', 'off']]" = None, debug: 'Optional[bool]' = None, verbose: 'Optional[bool]' = None, args: 'Optional[Dict[str, str]]' = None, api_endpoint: 'Optional[str]' = None, catalog_endpoint: 'Optional[str]' = None, catalog_max_records: 'Optional[int]' = None, client_timeout: 'Optional[int]' = None, env: 'Optional[str]' = None, config_file_path: 'Optional[Union[str, Path]]' = None, feature_flags: 'Optional[Dict[str, Any]]' = None)

Bases: object

api_endpoint : str
api_key : str | None
args : Dict[str, str] | None
branch : str | None
cache : str | None
catalog_endpoint : str
catalog_max_records : int
client_timeout : int | None
config_file_path : str | Path | None
debug : bool | None
env : str | None
feature_flags : Dict[str, str]
classmethod load_profile(profile: str | None = None, api_key: str | None = None, user_session_token: str | None = None, project_dir: str | Path | None = None, branch: str | None = None, namespace: str | None = None, cache: 'on' | 'off' | None = None, debug: bool | None = None, verbose: bool | None = None, args: dict[str, str] | None = None, api_endpoint: str | None = None, catalog_endpoint: str | None = None, catalog_max_records: int | None = None, client_timeout: int | None = None, env: str | None = None, config_file_path: str | Path | None = None, feature_flags: dict[str, Any] | None = None) Profile

Load a profile from a profile file.

name : str | None
namespace : str | None
project_dir : str | Path | None
user_session_token : str | None
verbose : bool | None
class bauplan.RunnerNodeInfo(*, hostname: str | None = None)

Bases: _BauplanData

hostname : str | None
model_computed_fields : ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields : ClassVar[Dict[str, FieldInfo]] = {'hostname': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class bauplan.UserInfo(*, id: str | None = None, username: str | None = None, first_name: str | None = None, last_name: str | None = None)

Bases: _BauplanData

first_name : str | None
property full_name : str | None
id : str | None
last_name : str | None
model_computed_fields : ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields : ClassVar[Dict[str, FieldInfo]] = {'first_name': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'id': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'last_name': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'username': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

username : str | None
bauplan.expectation(**kwargs: Any) Callable

Decorator that defines a Bauplan expectation.

An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it is commonly used to perform data validation and data quality checks when running a pipeline. Expectations takes as input the table(s) they are validating and return a boolean indicating whether the expectation is met or not. A Python expectation needs a Python environment to run, which is defined using the python decorator, e.g.:

@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
    data=bauplan.Model(
        'join_dataset',
        columns=['anomaly']
    )
):
    # your data validation code here
    return expect_column_no_nulls(data, 'anomaly')
Parameters:
f

The function to decorate.

bauplan.model(name: str | None = None, columns: list[str] | None = None, partitioned_by: str | list[str] | tuple[str, ...] | None = None, materialization_strategy: 'NONE' | 'REPLACE' | 'APPEND' | None = None, cache_strategy: 'NONE' | 'DEFAULT' | None = None, internet_access: bool | None = None, **kwargs: Any) Callable

Decorator that specifies a Bauplan model.

A model is a function from one (or more) dataframe-like object(s) to another dataframe-like object: it is used to define a transformation in a pipeline. Models are chained together implicitly by using them as inputs to their children. A Python model needs a Python environment to run, which is defined using the python decorator, e.g.:

@bauplan.model(
    columns=['*'],
    materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
    data=bauplan.Model(
        'iot_kaggle',
        columns=['*'],
        filter="motion='false'"
    )
):
    # your code here
    return data
Parameters:
name: str | None = None

the name of the model (e.g. ‘users’); if missing the function name is used.

columns: list[str] | None = None

the columns of the output dataframe after the model runs (e.g. [‘id’, ‘name’, ‘email’]). Use [‘*’] as a wildcard.

internet_access: bool | None = None

whether the model requires internet access.

partitioned_by: str | list[str] | tuple[str, ...] | None = None

the columns to partition the data by.

materialization_strategy: 'NONE' | 'REPLACE' | 'APPEND' | None = None

the materialization strategy to use.

cache_strategy: 'NONE' | 'DEFAULT' | None = None

the cache strategy to use.

bauplan.pyspark(version: str | None = None, conf: dict[str, str] | None = None, **kwargs: Any) Callable

Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args

Parameters:
version: str | None = None

the version string of pyspark

conf: dict[str, str] | None = None

A dict containing the pyspark config

bauplan.python(version: str | None = None, pip: dict[str, str] | None = None, **kwargs: Any) Callable

Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.

Parameters:
version: str | None = None

The python version for the interpreter (e.g. '3.11').

pip: dict[str, str] | None = None

A dictionary of dependencies (and versions) required by the function (e.g. {'requests': '2.26.0'}).

bauplan.resources(cpus: int | float | None = None, memory: int | str | None = None, memory_swap: int | str | None = None, timeout: int | None = None, **kwargs: Any) Callable

Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.

Parameters:
cpus: int | float | None = None

The number of CPUs required by the function (e.g: `0.5`)

memory: int | str | None = None

The amount of memory required by the function (e.g: `1G`, `1000`)

memory_swap: int | str | None = None

The amount of swap memory required by the function (e.g: `1G`, `1000`)

timeout: int | None = None

The maximum time the function is allowed to run (e.g: `60`)