bauplan package

Submodules

Module contents

class bauplan.Client(api_key: str | None = None, profile: str | None = None, namespace: str | None = None, **kwargs)

Bases: object

A consistent interface to access Bauplan operations.

Using the client

import bauplan
client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query('SELECT sum(trips) trips FROM travel_table', branch_name='main')

# efficiently cast the table to a pandas DataFrame
df = my_table.to_pandas()

Notes on authentication

# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml
client = bauplan.Client()
# client used ~/.bauplan/config.yml profile 'default'

os.environ['BAUPLAN_PROFILE'] = "someprofile"
client = bauplan.Client()
# >> client now uses profile 'someprofile'

os.environ['BAUPLAN_API_KEY'] = "mykey"
client = bauplan.Client()
# >> client now authenticates with api_key value "mykey", because api key > profile

# specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment
client = bauplan.Client(api_key='MY_KEY')

# specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment
client = bauplan.Client(profile='default')

Handling Exceptions

Catalog operations (branch/table methods) raise a subclass of bauplan.exceptions.BauplanError that mirror HTTP status codes.

  • 400: InvalidDataError

  • 401: UnauthorizedError

  • 403: AccessDeniedError

  • 404: ResourceNotFoundError e.g .ID doesn’t match any records

  • 404: ApiRouteError e.g. the given route doesn’t exist

  • 405: ApiMethodError e.g. POST on a route with only GET defined

  • 409: UpdateConflictError e.g. creating a record with a name that already exists

  • 429: TooManyRequestsError

Run/Query/Scan/Import operations raise a subclass of bauplan.exceptions.BauplanError that represents, and also return a RunState object containing details and logs:

  • JobError e.g. something went wrong in a run/query/import/scan; includes error details

Run/import operations also return a state object that includes a job_status and other details. There are two ways to check status for run/import operations:

  1. try/except the JobError exception

  2. check the state.job_status attribute

Examples:

try:
    state = client.run(...)
    state = client.scan(...)
    state = client.plan_import(...)
    state = client.apply_import(...)
    state = client.query(...)
except bauplan.exceptions.JobError as e:
    ...

state = client.run(...)
if state.job_status != "success":
    ...
Parameters:
  • api_key – (optional) Your unique Bauplan API key; mutually exclusive with profile. If not provided, fetch precedence is 1) environment BAUPLAN_API_KEY 2) .bauplan/config.yml

  • profile – (optional) The Bauplan config profile name to use to determine api_key; mutually exclusive with api_key

  • namespace – (optional) The default namespace to use for queries and runs.

apply_import(plan: Dict, onto_branch: str, args: Dict | None = None, client_timeout: int | float | None = None) ApplyPlanState

Apply a Bauplan table import plan for a given branch and table.

An import is an operation to create a table in Bauplan from a file in the cloud. This is the equivalent of running through the CLI the bauplan import apply command.

import bauplan

# get the object representing the table import plan
s3_path = 's3://path/to/my/files/*.parquet'
plan_state = client.plan_import(
    from_ref='main',
    table_name='newtablename',
    search_string=s3_path
)
if plan_state.error:
    plan_error_action(...)

# apply the table import plan to create/replace a table on this branch
apply_state = client.apply_import(
    plan=plan_state.plan,
    onto_branch='myname.mybranch',
)
if apply_state.error:
    apply_error_action(...)
Parameters:
  • plan – dict representation of an import plan, generated by client.plan_import

  • onto_branch – name of the branch on which to apply the plan

  • args – dict of arbitrary args to pass to the backend

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

create_branch(branch_name: str, from_ref: str) APIBranch

Create a new branch at a given ref.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_branch(
    branch_name='myzone.newbranch',
    from_ref='main'
)
Parameters:
  • branch_name – The name of the new branch

  • ref – The name of the base branch; either a branch like “main” or ref like “main@[sha]”

Returns:

a boolean for whether the new branch was created

create_namespace(branch_name: str, namespace_name: str) Namespace

Create a new namespace at a given branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.create_namespace(
    branch_name='myzone.newbranch',
    namespace_name='main'
)
Parameters:
  • branch_name – The name of the branch to create the namespace on.

  • ref – The namespace_name of the namespace.

Returns:

a boolean for whether the new namespace was created

delete_branch(branch_name: str) bool

Delete a branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_branch(branch_name='mybranch')
Parameters:

branch_name – The name of the branch to delete.

Returns:

A boolean for if the branch was deleted

delete_namespace(branch_name: str, namespace_name: str) bool

Delete a namespace.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.delete_namespace(
    branch_name='mybranch',
    namespace_name='mynamespace',
)
Parameters:
  • branch_name – The name of the branch to delete the namespace from.

  • namespace_name – The name of the namespace to delete.

Returns:

A boolean for if the namespace was deleted

drop_table(table_name: str, branch_name: str) bool

Drop a table.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert client.drop_table(table_name='mytable', branch_name='mybranch')
Parameters:
  • table_name – The name of the table to delete

  • branch_name – The name of the branch on which the table is stored

Returns:

A boolean for if the table was deleted

get_branch(branch_name: str, limit: int | None = None, itersize: int | None = None) Generator[Table, None, None]

Get the tables and views in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
tables = [(b.name, b.kind) for b in client.get_branch('main')]
Parameters:

branch_name – The name of the branch to retrieve.

Returns:

A list of Table objects, each having “name”, “kind” (e.g. TABLE)

get_branch_metadata(branch_name: str) Ref

Get the data and metadata for a branch.

import bauplan
client = bauplan.Client()

data = get_branch_metadata('main')
# print the number of total commits on the branch
print(data.num_total_commits)
Parameters:

branch_name – The name of the branch to retrieve.

Returns:

A dictionary of metadata of type RefMetadata

get_branches(itersize: int | None = None, limit: int | None = None, name: str | None = None, user: str | None = None) Generator[APIBranch, None, None]

Get the available data branches in the Bauplan catalog.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for branch in client.get_branches():
    print(branch.name, branch.hash)
Parameters:
  • itersize – int 1-500

  • limit – int > 0

Returns:

a list of Ref objects, each having attributes: “name”, “hash”

get_namespaces(branch_name: str, itersize: int | None = None, limit: int | None = None, in_namespace: str | None = None) Generator[Namespace, None, None]

Get the available data namespaces in the Bauplan catalog branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

for namespace in client.get_namespaces():
    print(namespace.name)
Parameters:
  • itersize – int 1-500

  • limit – int > 0

  • in_namespace – The namespace to filter by.

Returns:

a list of Namespace objects, each having attributes: “name”

get_table(branch_name: str, table_name: str) List[TableField]

Get the fields metadata for a table in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# get the fields and metadata for the taxi_zones table in the main branch
fields = get_table(branch_name='main', table_name='taxi_zones')

# loop through the fields and print their name, required, and type
for c in fields:
    print(c.name, c.required, c.type)
Parameters:
  • branch_name – The name of the branch to get the table from.

  • table_name – The name of the table to retrieve.

Returns:

a list of fields, each having “name”, “required”, “type”

get_table_with_metadata(branch_name: str, table_name: str, include_raw: bool = False) TableWithMetadata

Get the table data and metadata for a table in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# get the fields and metadata for the taxi_zones table in the main branch
table = client.get_table_with_metadata(branch_name='main', table_name='taxi_zones')

# loop through the fields and print their name, required, and type
for c in table.fields:
    print(c.name, c.required, c.type)

# show the number of records in the table
print(table.records)
Parameters:
  • branch_name – The name of the branch to get the table from.

  • table_name – The name of the table to retrieve.

  • include_raw – Whether or not to include the raw metadata.json object as a nested dict

Returns:

a TableWithMetadata object, optionally including the raw metadata.json object

get_tables(branch_name: str, limit: int | None = None, itersize: int | None = None) Generator[Table, None, None]

Get the tables and views in the target branch.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

# retrieve only the tables as tuples of (name, kind)
tables = client.get_tables('main')
for table in tables:
    print(table.name, table.kind)
Parameters:

branch_name – The name of the branch to retrieve.

Returns:

A list of tables, each having “name”, “kind” (e.g. TABLE)

merge_branch(onto_branch: str, from_ref: str) bool

Merge one branch into another.

Upon failure, raises bauplan.exceptions.BauplanError

import bauplan
client = bauplan.Client()

assert merge_branch(
    onto_branch='myzone.somebranch',
    from_ref='myzone.oldbranch'
)
Parameters:
  • onto_branch – The name of the merge target

  • from_ref – The name of the merge source; either a branch like “main” or ref like “main@[sha]”

Returns:

a boolean for whether the merge worked

plan_import(table_name: str, search_string: str, from_ref: str = 'main', append: bool = False, replace: bool = False, args: Dict | None = None, client_timeout: int | float | None = None) PlanImportState

Create a table import plan from an S3 location.

An import is an operation to create a table in Bauplan from a file in the cloud. This is the equivalent of running through the CLI the bauplan import plan command.

import bauplan
client = bauplan.Client()

s3_path = 's3://path/to/my/files/*.parquet'
plan_state = client.plan_import(
    from_ref='main', # optional
    table_name='newtablename',
    search_string=s3_path,
)
if plan_state.error:
    plan_error_action(...)
success_action(plan_state.plan)

If you want to save the plan object output for record-keeping or future processing, you can use the plan object attribute to do something like:

plan_state = client.plan_import(...)

import yaml
plan_dict = plan_state.plan
yaml.safe_dump(plan_dict, open('path/to/file.yaml','w'))
Parameters:
  • search_string – The filepath of the plan to import.

  • table_name – The name of the table to import into.

  • append – Append the data to an existing table. Mutually exclusive with replace.

  • replace – Replace the data in an existing table. Mutually exclusive with append.

  • table_name – The name of the table to import into.

  • from_ref – The name of the branch to import from.

  • args – dict of arbitrary args to pass to the backend

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

query(query: str, branch_name: str = 'main', max_rows: int | None = None, no_cache: bool | None = False, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | None = None, args: Dict[str, Any] | None = None, client_timeout: int | float | None = None) Table

Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.

If you prefer to return the results as a pandas DataFrame, use the to_pandas function of pyarrow.Table.

import bauplan

client = bauplan.Client()

# query the table and return result set as an arrow Table
my_table = client.query('SELECT c1 FROM my_table', branch_name='main')

# efficiently cast the table to a pandas DataFrame
df = mytable.to_pandas()
Parameters:
  • query – The Bauplan query to execute.

  • branch_name – The branch to read from and write to (default: your local active branch, else ‘main’).

  • max_rows – The maximum number of rows to return; default: None (no limit).

  • no_cache – Whether to disable caching for the query (default: False).

  • connector – The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

  • connector_config_key – The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

  • connector_config_uri – Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

  • namespace – The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

  • args – Additional arguments to pass to the query (default: None).

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

Returns:

The query results as a pyarrow.Table.

query_to_file(filename: str, query: str, branch_name: str = 'main', max_rows: int | None = None, no_cache: bool | None = False, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | None = None, args: Dict[str, Any] | None = None, client_timeout: int | float | None = None) None

Execute a SQL query and write the results to a file.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
for row in client.query_to_generator('SELECT c1 FROM my_table', branch_name='main'):
    # do logic
Parameters:
  • filename – The name of the file to write the results to.

  • query – The Bauplan query to execute.

  • max_rows – The maximum number of rows to return; default: None (no limit).

  • no_cache – Whether to disable caching for the query (default: False).

  • branch_name – The branch to read from and write to (default: your local active branch, else ‘main’).

  • connector – The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

  • connector_config_key – The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

  • connector_config_uri – Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

  • namespace – The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

  • args – Additional arguments to pass to the query (default: None).

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

query_to_generator(query: str, branch_name: str | None = None, max_rows: int | None = None, no_cache: bool | None = False, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | None = None, args: Dict[str, Any] | None = None, client_timeout: int | float | None = None) Generator[Dict[str, Any], None, None]

Execute a SQL query and return the results as a generator, where each row is a Python dictionary.

import bauplan
client = bauplan.Client()

# query the table and iterate through the results one row at a time
for row in client.query_to_generator('SELECT c1 FROM my_table', branch_name='main'):
    # do logic
Parameters:
  • query – The Bauplan query to execute.

  • branch_name – The branch to read from and write to (default: your local active branch, else ‘main’).

  • max_rows – The maximum number of rows to return; default: None (no limit).

  • no_cache – Whether to disable caching for the query (default: False).

  • connector – The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.

  • connector_config_key – The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

  • connector_config_uri – Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

  • namespace – The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.

  • args – Additional arguments to pass to the query (default: None).

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

Yield:

A dictionary representing a row of query results.

run(project_dir: str = '.', branch_name: str | None = None, id: str | None = None, parameters: Dict[str, str | int | float | bool] | None = None, namespace: str | None = None, args: Dict[str, Any] | None = None, client_timeout: int | float | None = None) RunState

Run a Bauplan project and return the state of the run. This is the equivalent of running through the CLI the bauplan run command.

Parameters:
  • project_dir – The directory of the project (where the bauplan_project.yml file is located).

  • branch_name – The branch to read from and write to (default: your local active branch, else ‘main’).

  • id – The ID of the run (optional). This can be used to re-run a previous run, e.g., on a different branch.

  • parameters – Parameters for templating into SQL or Python models.

  • namespace – The Namespace to run the job in. If not set, the job will be run in the default namespace for the project.

  • args – Additional arguments (optional).

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

Returns:

The state of the run.

scan(table_name: str, branch_name: str | None = None, columns: list | None = None, filters: str | None = None, limit: int | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, namespace: str | None = None, args: Dict[str, str] | None = None, client_timeout: int | float | None = None, **kwargs: Any) Table

Execute a table scan (with optional filters) and return the results as an arrow Table.

Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.

import bauplan
client = bauplan.Client()

# run a table scan over the data lake
# filters are passed as a string
my_table = client.scan(
    table_name='my_table',
    columns=['c1'],
    filter='c2 > 10'
    branch_name='main'
)
Parameters:
  • table_name – The table to scan.

  • branch_name – The branch to read from and write to (default: your local active branch, else ‘main’).

  • columns – The columns to return (default: None).

  • filters – The filters to apply (default: None).

  • namespace – The Namespace to run the scan in. If not set, the scan will be run in the default namespace for your account.

  • args – dict of arbitrary args to pass to the backend.

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

Returns:

The scan results as a pyarrow.Table.

table_create_plan(table_name: str, search_uri: str, branch: str, namespace: str | None = None, replace: bool = False, args: Dict | None = None, client_timeout: int | float | None = None) TableCreatePlanState

Create a table import plan from an S3 location.

This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.

import bauplan
client = bauplan.Client()

s3_path = 's3://path/to/my/files/*.parquet'
plan_state = client.table_create_plan(
    branch='main'
    table_name='newtablename',
    search_string=s3_path,
)
if plan_state.error:
    plan_error_action(...)
success_action(plan_state.plan)
Parameters:
  • search_string – The filepath of the plan to import.

  • table_name – The name of the table which will be created

  • branch – The branch in which to create the table in

  • namespace – Optional argument specifying the namespace. If not specified, it will be inferred based on table location or the default namespace

table_create_plan_apply(plan: Dict, args: Dict | None = None, client_timeout: int | float | None = None) TableCreatePlanState

Apply a plan for creating a table. It is done automaticaly during the table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype

Parameters:
  • search_string – The filepath of the plan to import.

  • plan – The name of the table to import into.

  • append – Append the data to an existing table. Mutually exclusive with replace.

  • replace – Replace the data in an existing table. Mutually exclusive with append.

  • table_name – The name of the table to import into.

  • from_ref – The name of the branch to import from.

  • args – dict of arbitrary args to pass to the backend

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

table_data_import(table_name: str, branch: str, search_uri: str, namespace: str | None = None, continue_on_error: bool = False, import_duplicate_files: bool = False, best_effort: bool = False, transformation_query: str | None = None, args: Dict | None = None, client_timeout: int | float | None = None) TableDataImportState

Imports data into an already existing table.

import bauplan
client = bauplan.Client()

s3_path = 's3://path/to/my/files/*.parquet'
plan_state = client.table_data_import(
    table_name='newtablename',
    search_uri=s3_path,
    branch_name='main'
)
if plan_state.error:
    plan_error_action(...)
success_action(plan_state.plan)
Parameters:
  • table_name – Previously created table in into which data will be imported

  • branch – Branch in which to import the table

  • search_uri – Uri which to scan for files to import

  • namespace – Namespace of the table. If not specified, namespace will be infered from table name or default settings

  • continue_on_error – Do not fail the import even if 1 data import fails

  • import_duplicate_files – Ignore prevention of importing s3 files that were already imported

  • best_effort – Don’t fail if schema of table does not match.

  • transformation_query – Optional duckdb compliant query applied on each parquet file. Use original_table as the table in the query

  • args – dict of arbitrary args to pass to the backend

  • client_timeout – seconds to timeout; this also cancels the remote job execution.

class bauplan.JobStatus(canceled: 'str' = 'CANCELLED', cancelled: 'str' = 'CANCELLED', failed: 'str' = 'FAILED', rejected: 'str' = 'REJECTED', success: 'str' = 'SUCCESS', timeout: 'str' = 'TIMEOUT', unknown: 'str' = 'UNKNOWN')

Bases: object

canceled: str = 'CANCELLED'
cancelled: str = 'CANCELLED'
failed: str = 'FAILED'
rejected: str = 'REJECTED'
success: str = 'SUCCESS'
timeout: str = 'TIMEOUT'
unknown: str = 'UNKNOWN'
class bauplan.Model(name: str, columns: List[str] | None = None, filter: str | None = None, ref: str | None = None, connector: str | None = None, connector_config_key: str | None = None, connector_config_uri: str | None = None, **kwargs: Any)

Bases: object

Represents a model (dataframe/table representing a DAG step) as an input to a function.

e.g.

@bauplan.model()
def some_parent_model():
    return pyarrow.Table.from_pydict({'bar': [1, 2, 3]})

@bauplan.model()
def your_cool_model(
    # parent models are passed as inputs, using bauplan.Model
    # class
    parent_0=bauplan.Model(
        'some_parent_model',
        columns=['bar'],
        filter='bar > 1',
    )
):
    # Can return a pandas dataframe or a pyarrow table
    return pyarrow.Table.from_pandas(
        pd.DataFrame({
            'foo': parent_0['bar'] * 2,
        })
    )

Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.

The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:

@bauplan.model()
def your_cool_model(
    parent_0=bauplan.Model(
        'some_parent_model',
        columns=['bar'],
        filter='bar > 1',
        connector='dremio',
        connector_config_key='bauplan',
    )
):
    # parent_0 inside the function body
    # will still be an Arrow table: the user code
    # should still be the same as the data is moved
    # transparently by Bauplan from an engine to the function.
    return pyarrow.Table.from_pandas(
        pd.DataFrame({
            'foo': parent_0['bar'] * 2,
        })
    )
Parameters:
  • name – The name of the model.

  • columns – The list of columns in the model. If the arg is not provided, the model will load all columns.

  • filter – The optional filter for the model. Defaults to None.

  • ref – The optional reference to the model. Defaults to None.

  • connector – The connector type for the model (defaults to Bauplan SQL). Allowed values are ‘snowflake’ and ‘dremio’.

  • connector_config_key – The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.

  • connector_config_uri – Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.

bauplan.expectation(**kwargs: Any) Callable

Decorator that defines a Bauplan expectation.

An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it is commonly used to perform data validation and data quality checks when running a pipeline. Expectations takes as input the table(s) they are validating and return a boolean indicating whether the expectation is met or not. A Python expectation needs a Python environment to run, which is defined using the python decorator, e.g.:

@bauplan.expectation()
@bauplan.python('3.10')
def test_joined_dataset(
    data=bauplan.Model(
        'join_dataset',
        columns=['anomaly']
    )
):
    # your data validation code here
    return expect_column_no_nulls(data, 'anomaly')
Parameters:

f – The function to decorate.

bauplan.model(name: str | None = None, columns: List[str] | None = None, materialize: bool | None = None, internet_access: bool | None = None, partitioned_by: str | List[str] | Tuple[str, ...] | None = None, materialization_strategy: Literal['NONE', 'REPLACE', 'APPEND'] | None = None, cache_strategy: Literal['NONE', 'DEFAULT'] | None = None, **kwargs: Any) Callable

Decorator that specifies a Bauplan model.

A model is a function from one (or more) dataframe-like object(s) to another dataframe-like object: it is used to define a transformation in a pipeline. Models are chained together implicitly by using them as inputs to their children. A Python model needs a Python environment to run, which is defined using the python decorator, e.g.:

@bauplan.model(
    columns=['*'],
    materialize=False
)
@bauplan.python('3.11')
def source_scan(
    data=bauplan.Model(
        'iot_kaggle',
        columns=['*'],
        filter="motion='false'"
    )
):
    # your code here
    return data
Parameters:
  • name – the name of the model (e.g. ‘users’); if missing the function name is used.

  • columns – the columns of the output dataframe after the model runs (e.g. [‘id’, ‘name’, ‘email’]). Use [‘*’] as a wildcard.

  • materialize – whether the model should be materialized.

  • internet_access – whether the model requires internet access.

  • partitioned_by – the columns to partition the data by.

  • materialization_strategy – the materialization strategy to use.

  • cache_strategy – the cache strategy to use.

bauplan.pyspark(version: str | None = None, conf: Dict[str, str] | None = None, **kwargs: Any) Callable

Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args

Parameters:
  • version – the version string of pyspark

  • conf – A dict containing the pyspark config

bauplan.python(version: str | None = None, pip: Dict[str, str] | None = None, **kwargs: Any) Callable

Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.

Parameters:
  • version – The python version for the interpreter (e.g. '3.11').

  • pip – A dictionary of dependencies (and versions) required by the function (e.g. {'requests': '2.26.0'}).

bauplan.resources(cpus: int | float | None = None, memory: int | str | None = None, memory_swap: int | str | None = None, timeout: int | None = None, **kwargs: Any) Callable

Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.

Parameters:
  • cpus – The number of CPUs required by the function (e.g: `0.5`)

  • memory – The amount of memory required by the function (e.g: `1G`, `1000`)

  • memory_swap – The amount of swap memory required by the function (e.g: `1G`, `1000`)

  • timeout – The maximum time the function is allowed to run (e.g: `60`)