bauplan
class
Client
class
ClientA consistent interface to access Bauplan operations.
Using the client
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query('SELECT avg(age) AS average_age FROM bauplan.titanic limit 1', ref='main')
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Notes on authentication
# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'
os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'
os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile
# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')
# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')
Handling Exceptions
Catalog operations (branch/table methods) raise a subclass of BauplanError
that mirror HTTP status codes.
- 400:
InvalidDataError
- 401:
UnauthorizedError
- 403:
AccessDeniedError
- 404:
ResourceNotFoundError
e.g .ID doesn't match any records - 404:
ApiRouteError
e.g. the given route doesn't exist - 405:
ApiMethodError
e.g. POST on a route with only GET defined - 409:
UpdateConflictError
e.g. creating a record with a name that already exists - 429:
TooManyRequestsError
Run/Query/Scan/Import operations raise a subclass of BauplanError
that represents, and also return a RunState
object containing details and logs:
JobError
e.g. something went wrong in a run/query/import/scan; includes error details
Run/import operations also return a state object that includes a job_status
and other details.
There are two ways to check status for run/import operations:
- try/except
JobError
- check the
state.job_status
attribute
Examples
state = client.run(...)
if state.job_status != "SUCCESS":
...
def
apply_table_creation_plan
def
apply_table_creation_planApply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype
Returns: The plan state.
Parameters
def
cancel_job
def
cancel_jobEXPERIMENTAL: Cancel a job by ID.
Parameters
def
create_branch
def
create_branchCreate a new branch at a given ref.
The branch name should follow the convention of username.branch_name
,
otherwise non-admin users won't be able to complete the operation.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
username = client.info().user.username
branch = client.create_branch(
branch = username+'.feature_branch',
from_ref = 'branch_name@hash',
if_not_exists = True,
)
Returns: The created branch object.
Parameters
def
create_namespace
def
create_namespaceCreate a new namespace at a given branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.create_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
properties={'k1': 'v1', 'k2': 'v2'},
if_not_exists=True,
)
Returns:
The created Namespace
object.
Parameters
def
create_table
def
create_tableCreate a table from an S3 location.
This operation will attempt to create a table based of schemas of N
parquet files found by a given search uri. This is a two step operation using
plan_table_creation
and apply_table_creation_plan
.
import bauplan
client = bauplan.Client()
table = client.create_table(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
Returns: Table
Parameters
def
create_tag
def
create_tagCreate a new tag at a given ref.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.create_tag(
tag='my_tag',
from_ref='my_ref_or_branch_name',
)
Returns:
The created Tag
object.
Parameters
def
delete_branch
def
delete_branchDelete a branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
if client.delete_branch('my_branch_name')
#do something
Returns: A boolean for if the branch was deleted.
Parameters
def
delete_namespace
def
delete_namespaceDelete a namespace.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
)
Returns:
A Branch
object pointing to head.
Parameters
def
delete_table
def
delete_tableDrop a table.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_table(
table='my_table_name',
branch='my_branch_name',
namespace='my_namespace',
)
Returns:
The deleted Table
object.
Parameters
def
delete_tag
def
delete_tagDelete a tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_tag('my_tag_name')
Returns: A boolean for if the tag was deleted.
Parameters
def
get_branch
def
get_branchGet the branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
branch = client.get_branch('my_branch_name')
Returns:
A Branch
object.
Parameters
def
get_branches
def
get_branchesGet the available data branches in the Bauplan catalog.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for branch in client.get_branches():
...
Returns:
A GetBranchesResponse
object.
Parameters
def
get_commits
def
get_commitsGet the commits for the target branch or ref.
Upon failure, raises BauplanError
Returns:
A GetCommitsResponse
object.
Parameters
def
get_job
def
get_jobEXPERIMENTAL: Get a job by ID or ID prefix.
Parameters
def
get_job_context
def
get_job_contextEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job
.
Parameters
def
get_job_contexts
def
get_job_contextsEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job
.
Parameters
def
get_job_logs
def
get_job_logsEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job
.
Parameters
def
get_namespace
def
get_namespaceGet a namespace.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
namespace = client.get_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Returns:
A Namespace
object.
Parameters
def
get_namespaces
def
get_namespacesGet the available data namespaces in the Bauplan catalog branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for namespace in client.get_namespaces('my_ref_or_branch_name'):
...
Parameters
def
get_table
def
get_tableGet the table data and metadata for a table in the target branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# get the fields and metadata for a table
table = client.get_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
# You can get the total number of rows this way.
num_records = table.records
# Or access the schema.
for c in table.fields:
...
Returns:
a TableWithMetadata
object, optionally including the raw metadata.json
object.
Parameters
def
get_tables
def
get_tablesGet the tables and views in the target branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for table in client.get_tables('my_branch_name'):
...
Returns:
A GetTablesResponse
object.
Parameters
def
get_tag
def
get_tagGet the tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
tag = client.get_tag('my_tag_name')
Returns:
A Tag
object.
Parameters
def
get_tags
def
get_tagsGet all the tags.
Upon failure, raises BauplanError
Returns:
A GetTagsResponse
object.
Parameters
def
has_branch
def
has_branchCheck if a branch exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
if client.has_branch('my_branch_name')
# do something
Returns: A boolean for if the branch exists.
Parameters
def
has_namespace
def
has_namespaceCheck if a namespace exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Parameters
def
has_table
def
has_tableCheck if a table exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
Returns: A boolean for if the table exists.
Parameters
def
has_tag
def
has_tagCheck if a tag exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_tag(
tag='my_tag_name',
)
Returns: A boolean for if the tag exists.
Parameters
def
import_data
def
import_dataImports data into an already existing table.
import bauplan
client = bauplan.Client()
plan_state = client.import_data(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Returns:
A TableDataImportState
object.
Parameters
def
info
def
infoFetch organization & account information.
Parameters
def
list_jobs
def
list_jobsEXPERIMENTAL: List all jobs
A DateRange is an alias for tuple[Optional[datetime], Optional[datetime]]
, where the
first element is an "after" (start) filter and the second element is a "before" (end)
filter.
The filter_by_finish_time
parameter takes a DateRange and allows jobs with a finish time
later than "after" (if specified) and a finish time earlier than "before" (if specified),
or between both. If neither is specified, for example (None, None)
, then the behavior is
the same as not specifying the filter itself, for example filter_by_finish_time=None
.
Parameters
def
merge_branch
def
merge_branchMerge one branch into another.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.merge_branch(
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Returns:
the Branch
where the merge was made.
Parameters
def
plan_table_creation
def
plan_table_creationCreate a table import plan from an S3 location.
This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.
import bauplan
client = bauplan.Client()
plan_state = client.plan_table_creation(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Parameters
def
query
def
queryExecute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.
If you prefer to return the results as a pandas DataFrame, use
the to_pandas
function of pyarrow.Table.
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query(
query='SELECT avg(age) as average_age FROM bauplan.titanic',
ref='my_ref_or_branch_name',
)
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Returns:
The query results as a pyarrow.Table
.
Parameters
def
query_to_csv_file
def
query_to_csv_fileExport the results of a SQL query to a file in CSV format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_csv_file(
path='/tmp/out.csv',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def
query_to_generator
def
query_to_generatorExecute a SQL query and return the results as a generator, where each row is a Python dictionary.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
res = client.query_to_generator(
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
for row in res:
... # handle results
Parameters
def
query_to_json_file
def
query_to_json_fileExport the results of a SQL query to a file in JSON format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_json_file(
path='/tmp/out.json',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def
query_to_parquet_file
def
query_to_parquet_fileExport the results of a SQL query to a file in Parquet format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
path='/tmp/out.parquet',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def
rename_branch
def
rename_branchRename an existing branch. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_branch(
branch='username.old_name',
new_branch='username.new_name',
)
Returns:
The renamed Branch
object.
Parameters
def
rename_tag
def
rename_tagRename an existing tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_tag(
tag='old_tag_name',
new_tag='new_tag_name',
)
Returns: The renamed tag object.
Parameters
def
rerun
def
rerunRe run a Bauplan job using its ID and return the state of the run. All on and off / bool parameters default to 'off' unless otherwise specified.
Examples
rerun_state = client.rerun(
job_id=prod_job_id,
ref='feature-branch',
cache='off'
)
# Check if rerun succeeded (best practice)
if str(rerun_state.job_status).lower() != "success":
raise Exception(f"Rerun failed with status: {rerun_state.job_status}")
Returns:
ReRunState
: The state of the run.
Parameters
def
revert_table
def
revert_tableRevert a table to a previous state.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.revert_table(
table='my_table_name',
namespace='my_namespace',
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Returns:
The Branch
where the revert was made.
Parameters
def
run
def
runRun a Bauplan project and return the state of the run. This is the equivalent of
running through the CLI the bauplan run
command. All parameters default to 'off'/false unless otherwise specified.
Examples
# Run a daily sales pipeline on a dev branch, and if successful and data is good, merge to main
run_state = client.run(
project_dir='./etl_pipelines/daily_sales',
ref="username.dev_branch",
namespace='sales_analytics',
)
if str(run_state.job_status).lower() != "success":
raise Exception(f"{run_state.job_id} failed: {run_state.job_status}")
Returns:
RunState
: The state of the run.
Parameters
def
scan
def
scanExecute a table scan (with optional filters) and return the results as an arrow Table.
Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.
import bauplan
client = bauplan.Client()
# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
columns=['name'],
filters='age < 30',
)
Returns:
The scan results as a pyarrow.Table
.
Parameters
class
InfoState
class
InfoStatebauplan._info.InfoState(
*,
client_version: Optional[str] = None,
organization: Optional[bauplan._info.OrganizationInfo] = None,
user: Optional[bauplan._info.UserInfo] = None,
runners: Optional[List[bauplan._info.RunnerNodeInfo]] = None
)-> None
class
JobState
class
JobStateclass
JobStatus
class
JobStatusThe status of a submitted job.
class
Model
class
ModelRepresents a model (dataframe/table representing a DAG step) as an input to a function.
e.g.
import bauplan
@bauplan.model()
def some_parent_model():
return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})
@bauplan.model()
def your_cool_model(
# parent models are passed as inputs, using bauplan.Model
# class
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
)
):
# Can return a pandas dataframe or a pyarrow table
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.
The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:
import bauplan
@bauplan.model()
def your_cool_model(
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
connector='dremio',
connector_config_key='bauplan',
)
):
# parent_0 inside the function body
# will still be an Arrow table: the user code
# should still be the same as the data is moved
# transparently by Bauplan from an engine to the function.
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
class
OrganizationInfo
class
OrganizationInfobauplan._info.OrganizationInfo(
*,
id: Optional[str] = None,
name: Optional[str] = None,
slug: Optional[str] = None,
default_parameter_secret_key: Optional[str] = None,
default_parameter_secret_public_key: Optional[str] = None
)-> None
def
Parameter
def
Parameter_ParameterKwargTracker is a callable object that is used to track the parameters that are used in a model. Because default function args are evaluated once and only once, we can build a set of the args for kwargs.
Parameters
class
Profile
class
ProfileA configuration profile.
def
load_profile
def
load_profileLoad a profile from a profile file.
Parameters
class
RunnerNodeInfo
class
RunnerNodeInfobauplan._info.RunnerNodeInfo(
*,
hostname: Optional[str] = None
)-> None
class
UserInfo
class
UserInfobauplan._info.UserInfo(
*,
id: Optional[str] = None,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None
)-> None
def
expectation
def
expectationDecorator that defines a Bauplan expectation.
An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it
is commonly used to perform data validation and data quality checks when running a pipeline.
Expectations takes as input the table(s) they are validating and return a boolean indicating
whether the expectation is met or not. A Python expectation needs a Python environment to run,
which is defined using the python
decorator, e.g.:
import bauplan
from bauplan.standard_expectations import expect_column_no_nulls
@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
data=bauplan.Model(
'join_dataset',
columns=['anomaly']
)
):
# your data validation code here
return expect_column_no_nulls(data, 'anomaly')
Parameters
def
model
def
modelDecorator that specifies a Bauplan model.
A model is a function from one (or more) dataframe-like object(s)
to another dataframe-like object: it is used to define a transformation in a
pipeline. Models are chained together implicitly by using them as inputs to
their children. A Python model needs a Python environment to run, which is defined
using the python
decorator, e.g.:
import bauplan
@bauplan.model(
columns=['*'],
materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
data=bauplan.Model(
'iot_kaggle',
columns=['*'],
filter="motion='false'"
)
):
# your code here
return data
Parameters
def
pyspark
def
pysparkDecorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args
Parameters
def
python
def
pythonDecorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.
Parameters
def
resources
def
resourcesDecorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.
Parameters
def
synthetic_model
def
synthetic_modelDecorator that defines a Bauplan Synthetic Model.