bauplan
classClient
classClientA consistent interface to access Bauplan operations.
Using the client
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query('SELECT avg(age) AS average_age FROM bauplan.titanic limit 1', ref='main')
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Notes on authentication
# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'
os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'
os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile
# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')
# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')
Handling Exceptions
Catalog operations (branch/table methods) raise a subclass of BauplanError that mirror HTTP status codes.
- 400:
InvalidDataError - 401:
UnauthorizedError - 403:
AccessDeniedError - 404:
ResourceNotFoundErrore.g .ID doesn't match any records - 404:
ApiRouteErrore.g. the given route doesn't exist - 405:
ApiMethodErrore.g. POST on a route with only GET defined - 409:
UpdateConflictErrore.g. creating a record with a name that already exists - 429:
TooManyRequestsError
Run/Query/Scan/Import operations raise a subclass of BauplanError that represents, and also return a RunState object containing details and logs:
JobErrore.g. something went wrong in a run/query/import/scan; includes error details
Run/import operations also return a state object that includes a job_status and other details.
There are two ways to check status for run/import operations:
- try/except
JobError - check the
state.job_statusattribute
Examples
state = client.run(...)
if state.job_status != "SUCCESS":
...
def apply_table_creation_plan
def apply_table_creation_planApply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype
Returns: The plan state.
Parameters
def cancel_job
def cancel_jobEXPERIMENTAL: Cancel a job by ID.
Parameters
def create_branch
def create_branchCreate a new branch at a given ref.
The branch name should follow the convention of username.branch_name,
otherwise non-admin users won't be able to complete the operation.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
username = client.info().user.username
branch = client.create_branch(
branch = username+'.feature_branch',
from_ref = 'branch_name@hash',
if_not_exists = True,
)
Returns: The created branch object.
Parameters
def create_external_table_from_metadata
def create_external_table_from_metadataCreate an external table from an Iceberg metadata.json file.
This operation creates an external table by pointing to an existing Iceberg table's metadata.json file. This is useful for importing external Iceberg tables into Bauplan without copying the data.
import bauplan
client = bauplan.Client()
# Create an external table from metadata
result = client.create_external_table_from_metadata(
table='my_external_table',
metadata_json_uri='s3://my-bucket/path/to/metadata/00001-abc123.metadata.json',
namespace='my_namespace',
branch='my_branch_name',
)
Parameters
def create_external_table_from_parquet
def create_external_table_from_parquetCreates an external table from S3 files.
import bauplan
client = bauplan.Client()
# Create from S3 files
state = client.create_external_table_from_parquet(
table='my_external_table',
search_patterns=['s3://path1/to/my/files/*.parquet', 's3://path2/to/my/file/f1.parquet'],
branch='my_branch_name',
)
if state.error:
handle_error(state.error)
else:
print(f"External table created: {state.ctx.table_name}")
Parameters
def create_namespace
def create_namespaceCreate a new namespace at a given branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.create_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
properties={'k1': 'v1', 'k2': 'v2'},
if_not_exists=True,
)
Returns:
The created Namespace object.
Parameters
def create_table
def create_tableCreate a table from an S3 location.
This operation will attempt to create a table based of schemas of N
parquet files found by a given search uri. This is a two step operation using
plan_table_creation and apply_table_creation_plan.
import bauplan
client = bauplan.Client()
table = client.create_table(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
Returns: Table
Parameters
def create_tag
def create_tagCreate a new tag at a given ref.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.create_tag(
tag='my_tag',
from_ref='my_ref_or_branch_name',
)
Returns:
The created Tag object.
Parameters
def delete_branch
def delete_branchDelete a branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
if client.delete_branch('my_branch_name')
#do something
Returns: A boolean for if the branch was deleted.
Parameters
def delete_namespace
def delete_namespaceDelete a namespace.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_namespace(
namespace='my_namespace_name',
branch='my_branch_name',
)
Returns:
A Branch object pointing to head.
Parameters
def delete_table
def delete_tableDrop a table.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_table(
table='my_table_name',
branch='my_branch_name',
namespace='my_namespace',
)
Returns:
The deleted Table object.
Parameters
def delete_tag
def delete_tagDelete a tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.delete_tag('my_tag_name')
Returns: A boolean for if the tag was deleted.
Parameters
def get_branch
def get_branchGet the branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
branch = client.get_branch('my_branch_name')
Returns:
A Branch object.
Parameters
def get_branches
def get_branchesGet the available data branches in the Bauplan catalog.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for branch in client.get_branches():
...
Returns:
A GetBranchesResponse object.
Parameters
def get_commits
def get_commitsGet the commits for the target branch or ref.
Upon failure, raises BauplanError
Returns:
A GetCommitsResponse object.
Parameters
def get_job
def get_jobEXPERIMENTAL: Get a job by ID or ID prefix.
Parameters
def get_job_context
def get_job_contextEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.
Parameters
def get_job_contexts
def get_job_contextsEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.
Parameters
def get_job_logs
def get_job_logsEXPERIMENTAL: Get logs for a job by ID prefix or from a specified Job.
Parameters
def get_namespace
def get_namespaceGet a namespace.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
namespace = client.get_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Returns:
A Namespace object.
Parameters
def get_namespaces
def get_namespacesGet the available data namespaces in the Bauplan catalog branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for namespace in client.get_namespaces('my_ref_or_branch_name'):
...
Parameters
def get_table
def get_tableGet the table data and metadata for a table in the target branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# get the fields and metadata for a table
table = client.get_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
# You can get the total number of rows this way.
num_records = table.records
# Or access the schema.
for c in table.fields:
...
Returns:
a TableWithMetadata object, optionally including the raw metadata.json object.
Parameters
def get_tables
def get_tablesGet the tables and views in the target branch.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
for table in client.get_tables('my_branch_name'):
...
Returns:
A GetTablesResponse object.
Parameters
def get_tag
def get_tagGet the tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
# retrieve only the tables as tuples of (name, kind)
tag = client.get_tag('my_tag_name')
Returns:
A Tag object.
Parameters
def get_tags
def get_tagsGet all the tags.
Upon failure, raises BauplanError
Returns:
A GetTagsResponse object.
Parameters
def has_branch
def has_branchCheck if a branch exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
if client.has_branch('my_branch_name')
# do something
Returns: A boolean for if the branch exists.
Parameters
def has_namespace
def has_namespaceCheck if a namespace exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_namespace(
namespace='my_namespace_name',
ref='my_branch_name',
)
Parameters
def has_table
def has_tableCheck if a table exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_table(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
)
Returns: A boolean for if the table exists.
Parameters
def has_tag
def has_tagCheck if a tag exists.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.has_tag(
tag='my_tag_name',
)
Returns: A boolean for if the tag exists.
Parameters
def import_data
def import_dataImports data into an already existing table.
import bauplan
client = bauplan.Client()
plan_state = client.import_data(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Returns:
A TableDataImportState object.
Parameters
def info
def infoFetch organization & account information.
Parameters
def list_jobs
def list_jobsEXPERIMENTAL: List all jobs
A DateRange is an alias for tuple[Optional[datetime], Optional[datetime]], where the
first element is an "after" (start) filter and the second element is a "before" (end)
filter.
The filter_by_finish_time parameter takes a DateRange and allows jobs with a finish time
later than "after" (if specified) and a finish time earlier than "before" (if specified),
or between both. If neither is specified, for example (None, None), then the behavior is
the same as not specifying the filter itself, for example filter_by_finish_time=None.
Parameters
def merge_branch
def merge_branchMerge one branch into another.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.merge_branch(
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Returns:
the Branch where the merge was made.
Parameters
def plan_table_creation
def plan_table_creationCreate a table import plan from an S3 location.
This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.
import bauplan
client = bauplan.Client()
plan_state = client.plan_table_creation(
table='my_table_name',
search_uri='s3://path/to/my/files/*.parquet',
branch='my_branch_name',
)
if plan_state.error:
plan_error_action(...)
success_action(plan_state.plan)
Parameters
def query
def queryExecute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.
If you prefer to return the results as a pandas DataFrame, use
the to_pandas function of pyarrow.Table.
import bauplan
client = bauplan.Client()
# query the table and return result set as an arrow Table
my_table = client.query(
query='SELECT avg(age) as average_age FROM bauplan.titanic',
ref='my_ref_or_branch_name',
)
# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()
Returns:
The query results as a pyarrow.Table.
Parameters
def query_to_csv_file
def query_to_csv_fileExport the results of a SQL query to a file in CSV format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_csv_file(
path='/tmp/out.csv',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def query_to_generator
def query_to_generatorExecute a SQL query and return the results as a generator, where each row is a Python dictionary.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
res = client.query_to_generator(
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
for row in res:
... # handle results
Parameters
def query_to_json_file
def query_to_json_fileExport the results of a SQL query to a file in JSON format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_json_file(
path='/tmp/out.json',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def query_to_parquet_file
def query_to_parquet_fileExport the results of a SQL query to a file in Parquet format.
import bauplan
client = bauplan.Client()
# query the table and iterate through the results one row at a time
client.query_to_parquet_file(
path='/tmp/out.parquet',
query='SELECT name, age FROM bauplan.titanic LIMIT 100',
ref='my_ref_or_branch_name',
)
Returns: The path of the file written.
Parameters
def rename_branch
def rename_branchRename an existing branch. The branch name should follow the convention of "username.branch_name", otherwise non-admin users won't be able to complete the operation.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_branch(
branch='username.old_name',
new_branch='username.new_name',
)
Returns:
The renamed Branch object.
Parameters
def rename_tag
def rename_tagRename an existing tag.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.rename_tag(
tag='old_tag_name',
new_tag='new_tag_name',
)
Returns: The renamed tag object.
Parameters
def rerun
def rerunRe run a Bauplan job using its ID and return the state of the run. All on and off / bool parameters default to 'off' unless otherwise specified.
Examples
rerun_state = client.rerun(
job_id=prod_job_id,
ref='feature-branch',
cache='off'
)
# Check if rerun succeeded (best practice)
if str(rerun_state.job_status).lower() != "success":
raise Exception(f"Rerun failed with status: {rerun_state.job_status}")
Returns:
ReRunState: The state of the run.
Parameters
def revert_table
def revert_tableRevert a table to a previous state.
Upon failure, raises BauplanError
import bauplan
client = bauplan.Client()
assert client.revert_table(
table='my_table_name',
namespace='my_namespace',
source_ref='my_ref_or_branch_name',
into_branch='main',
)
Returns:
The Branch where the revert was made.
Parameters
def run
def runRun a Bauplan project and return the state of the run. This is the equivalent of
running through the CLI the bauplan run command. All parameters default to 'off'/false unless otherwise specified.
Examples
# Run a daily sales pipeline on a dev branch, and if successful and data is good, merge to main
run_state = client.run(
project_dir='./etl_pipelines/daily_sales',
ref="username.dev_branch",
namespace='sales_analytics',
)
if str(run_state.job_status).lower() != "success":
raise Exception(f"{run_state.job_id} failed: {run_state.job_status}")
Returns:
RunState: The state of the run.
Parameters
def scan
def scanExecute a table scan (with optional filters) and return the results as an arrow Table.
Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.
import bauplan
client = bauplan.Client()
# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
table='titanic',
ref='my_ref_or_branch_name',
namespace='bauplan',
columns=['name'],
filters='age < 30',
)
Returns:
The scan results as a pyarrow.Table.
Parameters
classInfoState
classInfoStatebauplan._info.InfoState(
*,
client_version: Optional[str] = None,
organization: Optional[bauplan._info.OrganizationInfo] = None,
user: Optional[bauplan._info.UserInfo] = None,
runners: Optional[List[bauplan._info.RunnerNodeInfo]] = None
)-> None
classJobState
classJobStateclassJobStatus
classJobStatusThe status of a submitted job.
classModel
classModelRepresents a model (dataframe/table representing a DAG step) as an input to a function.
e.g.
import bauplan
@bauplan.model()
def some_parent_model():
return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})
@bauplan.model()
def your_cool_model(
# parent models are passed as inputs, using bauplan.Model
# class
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
)
):
# Can return a pandas dataframe or a pyarrow table
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.
The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:
import bauplan
@bauplan.model()
def your_cool_model(
parent_0=bauplan.Model(
'some_parent_model',
columns=['bar'],
filter='bar > 1',
connector='dremio',
connector_config_key='bauplan',
)
):
# parent_0 inside the function body
# will still be an Arrow table: the user code
# should still be the same as the data is moved
# transparently by Bauplan from an engine to the function.
return pyarrow.Table.from_pandas(
pd.DataFrame({
'foo': parent_0['bar'] * 2,
})
)
classOrganizationInfo
classOrganizationInfobauplan._info.OrganizationInfo(
*,
id: Optional[str] = None,
name: Optional[str] = None,
slug: Optional[str] = None,
default_parameter_secret_key: Optional[str] = None,
default_parameter_secret_public_key: Optional[str] = None
)-> None
def Parameter
def Parameter_ParameterKwargTracker is a callable object that is used to track the parameters that are used in a model. Because default function args are evaluated once and only once, we can build a set of the args for kwargs.
Parameters
classProfile
classProfileA configuration profile.
def load_profile
def load_profileLoad a profile from a profile file.
Parameters
classRunnerNodeInfo
classRunnerNodeInfobauplan._info.RunnerNodeInfo(
*,
hostname: Optional[str] = None
)-> None
classUserInfo
classUserInfobauplan._info.UserInfo(
*,
id: Optional[str] = None,
username: Optional[str] = None,
first_name: Optional[str] = None,
last_name: Optional[str] = None
)-> None
def expectation
def expectationDecorator that defines a Bauplan expectation.
An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it
is commonly used to perform data validation and data quality checks when running a pipeline.
Expectations takes as input the table(s) they are validating and return a boolean indicating
whether the expectation is met or not. A Python expectation needs a Python environment to run,
which is defined using the python decorator, e.g.:
import bauplan
from bauplan.standard_expectations import expect_column_no_nulls
@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
data=bauplan.Model(
'join_dataset',
columns=['anomaly']
)
):
# your data validation code here
return expect_column_no_nulls(data, 'anomaly')
Parameters
def model
def modelDecorator that specifies a Bauplan model.
A model is a function from one (or more) dataframe-like object(s)
to another dataframe-like object: it is used to define a transformation in a
pipeline. Models are chained together implicitly by using them as inputs to
their children. A Python model needs a Python environment to run, which is defined
using the python decorator, e.g.:
import bauplan
@bauplan.model(
columns=['*'],
materialization_strategy='NONE'
)
@bauplan.python('3.11')
def source_scan(
data=bauplan.Model(
'iot_kaggle',
columns=['*'],
filter="motion='false'"
)
):
# your code here
return data
Parameters
def pyspark
def pysparkDecorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args
Parameters
def python
def pythonDecorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.
Parameters
def resources
def resourcesDecorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.
Parameters
def synthetic_model
def synthetic_modelDecorator that defines a Bauplan Synthetic Model.