bauplan package¶
Submodules¶
- bauplan.exceptions module
- bauplan.exceptions.AccessDeniedError
- bauplan.exceptions.ApiMethodError
- bauplan.exceptions.ApiRouteError
- bauplan.exceptions.BauplanError
- bauplan.exceptions.BauplanInternalError
- bauplan.exceptions.InvalidDataError
- bauplan.exceptions.InvalidPlanError
- bauplan.exceptions.JobError
- bauplan.exceptions.MismatchedPythonVersionsError
- bauplan.exceptions.MissingMagicCellError
- bauplan.exceptions.MissingPandasError
- bauplan.exceptions.NoResultsFoundError
- bauplan.exceptions.ObjectCannotBeSerializedError
- bauplan.exceptions.ObjectTooBigError
- bauplan.exceptions.ResourceNotFoundError
- bauplan.exceptions.TableCreatePlanApplyStatusError
- bauplan.exceptions.TableCreatePlanError
- bauplan.exceptions.TableCreatePlanStatusError
- bauplan.exceptions.TooManyRequestsError
- bauplan.exceptions.UnauthorizedError
- bauplan.exceptions.UnhandledRuntimeError
- bauplan.exceptions.UpdateConflictError
- bauplan.exceptions.UserObjectKeyNotExistsError
- bauplan.exceptions.UserObjectWithKeyExistsError
- bauplan.helpers module
- bauplan.schema module
- bauplan.standard_expectations module
- bauplan.standard_expectations.expect_column_accepted_values
- bauplan.standard_expectations.expect_column_all_null
- bauplan.standard_expectations.expect_column_all_unique
- bauplan.standard_expectations.expect_column_equal_concatenation
- bauplan.standard_expectations.expect_column_mean_greater_or_equal_than
- bauplan.standard_expectations.expect_column_mean_greater_than
- bauplan.standard_expectations.expect_column_mean_smaller_or_equal_than
- bauplan.standard_expectations.expect_column_mean_smaller_than
- bauplan.standard_expectations.expect_column_no_nulls
- bauplan.standard_expectations.expect_column_not_unique
- bauplan.standard_expectations.expect_column_some_null
- bauplan.state module
- bauplan.state.ApplyPlanState
- bauplan.state.CommonRunState
- bauplan.state.PlanImportState
- bauplan.state.ReRunExecutionContext
- bauplan.state.ReRunState
- bauplan.state.RunExecutionContext
- bauplan.state.RunState
- bauplan.state.TableCreatePlanApplyContext
- bauplan.state.TableCreatePlanApplyState
- bauplan.state.TableCreatePlanContext
- bauplan.state.TableCreatePlanState
- bauplan.state.TableDataImportContext
- bauplan.state.TableDataImportState
- bauplan.store module
Module contents¶
-
class bauplan.Client(profile: str | None =
None
, api_key: str | None =None
, branch: str | None =None
, namespace: str | None =None
, cache: 'on' | 'off' | None =None
, debug: bool | None =None
, verbose: bool | None =None
, args: dict[str, str] | None =None
, api_endpoint: str | None =None
, catalog_endpoint: str | None =None
, client_timeout: int | None =None
, env: str | None =None
, config_file_path: str | Path | None =None
, user_session_token: str | None =None
, feature_flags: dict[str, Any] | None =None
)¶ Bases:
_OperationContainer
A consistent interface to access Bauplan operations.
Using the client
import bauplan client = bauplan.Client() # query the table and return result set as an arrow Table my_table = client.query('SELECT sum(trips) trips FROM travel_table', branch_name='main') # efficiently cast the table to a pandas DataFrame df = my_table.to_pandas()
Notes on authentication
# by default, authenticate from BAUPLAN_API_KEY >> BAUPLAN_PROFILE >> ~/.bauplan/config.yml client = bauplan.Client() # client used ~/.bauplan/config.yml profile 'default' os.environ['BAUPLAN_PROFILE'] = "someprofile" client = bauplan.Client() # >> client now uses profile 'someprofile' os.environ['BAUPLAN_API_KEY'] = "mykey" client = bauplan.Client() # >> client now authenticates with api_key value "mykey", because api key > profile # specify authentication directly - this supercedes BAUPLAN_API_KEY in the environment client = bauplan.Client(api_key='MY_KEY') # specify a profile from ~/.bauplan/config.yml - this supercedes BAUPLAN_PROFILE in the environment client = bauplan.Client(profile='default')
Handling Exceptions
Catalog operations (branch/table methods) raise a subclass of
bauplan.exceptions.BauplanError
that mirror HTTP status codes.400: InvalidDataError
401: UnauthorizedError
403: AccessDeniedError
404: ResourceNotFoundError e.g .ID doesn’t match any records
404: ApiRouteError e.g. the given route doesn’t exist
405: ApiMethodError e.g. POST on a route with only GET defined
409: UpdateConflictError e.g. creating a record with a name that already exists
429: TooManyRequestsError
Run/Query/Scan/Import operations raise a subclass of
bauplan.exceptions.BauplanError
that represents, and also return aRunState
object containing details and logs:JobError
e.g. something went wrong in a run/query/import/scan; includes error details
Run/import operations also return a state object that includes a
job_status
and other details. There are two ways to check status for run/import operations:try/except the JobError exception
check the
state.job_status
attribute
Examples:
try: state = client.run(...) state = client.query(...) state = client.scan(...) state = client.plan_table_creation(...) except bauplan.exceptions.JobError as e: ... state = client.run(...) if state.job_status != "success": ...
- Parameters:¶
- profile: str | None =
None
¶ (optional) The Bauplan config profile name to use to determine api_key.
- api_key: str | None =
None
¶ (optional) Your unique Bauplan API key; mutually exclusive with
profile
. If not provided, fetch precedence is 1) environment BAUPLAN_API_KEY 2) .bauplan/config.yml- branch: str | None =
None
¶ (optional) The default branch to use for queries and runs. If not provided active_branch from the profile is used.
- namespace: str | None =
None
¶ (optional) The default namespace to use for queries and runs.
- cache: 'on' | 'off' | None =
None
¶ (optional) Whether to enable or disable caching for all the requests.
- debug: bool | None =
None
¶ (optional) Whether to enable or disable debug mode for all the requests.
- verbose: bool | None =
None
¶ (optional) Whether to enable or disable verbose mode for all the requests.
- args: dict[str, str] | None =
None
¶ (optional) Additional arguments to pass to all the requests.
- api_endpoint: str | None =
None
¶ (optional) The Bauplan API endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_API_ENDPOINT 2) .bauplan/config.yml
- catalog_endpoint: str | None =
None
¶ (optional) The Bauplan catalog endpoint to use. If not provided, fetch precedence is 1) environment BAUPLAN_CATALOG_ENDPOINT 2) .bauplan/config.yml
- client_timeout: int | None =
None
¶ (optional) The client timeout in seconds for all the requests.
- env: str | None =
None
¶ (optional) The environment to use for all the requests. Default: ‘prod’.
- config_file_path: str | Path | None =
None
¶ (optional) The path to the Bauplan config file to use. If not provided, fetch precedence is 1) environment BAUPLAN_CONFIG_PATH 2) ~/.bauplan/config.yml
- user_session_token: str | None =
None
¶ (optional) Your unique Bauplan user session token.
- profile: str | None =
-
apply_table_creation_plan(plan: dict | TableCreatePlanState, debug: bool | None =
None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) TableCreatePlanApplyState ¶ Apply a plan for creating a table. It is done automaticaly during th table plan creation if no schema conflicts exist. Otherwise, if schema conflicts exist, then this function is used to apply them after the schema conflicts are resolved. Most common schema conflict is a two parquet files with the same column name but different datatype
- Parameters:¶
- plan: dict | TableCreatePlanState¶
The plan to apply.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ dict of arbitrary args to pass to the backend.
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Raises:¶
TableCreatePlanApplyStatusError – if the table creation plan apply fails.
:return The plan state.
- create_branch(branch: str | Branch, from_ref: str | Branch | Ref) Branch ¶
Create a new branch at a given ref.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.create_branch( branch='my_branch_name', from_ref='my_ref_or_branch_name', )
- create_namespace(namespace: str | Namespace, branch: str | Branch) Namespace ¶
Create a new namespace at a given branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.create_namespace( namespace='my_namespace_name' branch='my_branch_name', )
-
create_table(table: str | Table, search_uri: str, branch: str | Branch | None =
None
, namespace: str | Namespace | None =None
, partitioned_by: str | None =None
, replace: bool | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) Table ¶ Create a table from an S3 location.
This operation will attempt to create a table based of schemas of N parquet files found by a given search uri.
import bauplan client = bauplan.Client() plan_state = client.create_table( table='my_table_name', search_uri='s3://path/to/my/files/*.parquet', ref='my_ref_or_branch_name', ) if plan_state.error: plan_error_action(...) success_action(plan_state.plan)
- Parameters:¶
- table: str | Table¶
The table which will be created.
- search_uri: str¶
The location of the files to scan for schema.
- branch: str | Branch | None =
None
¶ The branch name in which to create the table in.
- namespace: str | Namespace | None =
None
¶ Optional argument specifying the namespace. If not. specified, it will be inferred based on table location or the default. namespace.
- partitioned_by: str | None =
None
¶ Optional argument specifying the table partitioning.
- replace: bool | None =
None
¶ Replace the table if it already exists.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ dict of arbitrary args to pass to the backend.
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Raises:¶
TableCreatePlanStatusError – if the table creation plan fails.
TableCreatePlanApplyStatusError – if the table creation plan apply fails.
- Returns:¶
The plan state.
- delete_branch(branch: str | Branch) bool ¶
Delete a branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.delete_branch('my_branch_name')
- delete_namespace(namespace: str | Namespace, branch: str | Branch) bool ¶
Delete a namespace.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.delete_namespace( namespace='my_namespace_name', branch='my_branch_name', )
- delete_table(table: str | Table, branch: str | Branch) bool ¶
Drop a table.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.delete_table( table='my_table_name', branch='my_branch_name', )
- get_branch(branch: str | Branch) Branch ¶
Get the branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() # retrieve only the tables as tuples of (name, kind) branch = client.get_branch('my_branch_name') print(branch.hash)
-
get_branches(name: str | None =
None
, user: str | None =None
, limit: int | None =None
, itersize: int | None =None
) Generator[Branch, None, None] ¶ Get the available data branches in the Bauplan catalog.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() for branch in client.get_branches(): print(branch.name, branch.hash)
- get_namespace(namespace: str | Namespace, ref: str | Branch | Ref) Namespace ¶
Get a namespace.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() namespace = client.get_namespace( namespace='my_namespace_name', ref='my_ref_or_branch_name', )
-
get_namespaces(ref: str | Branch | Ref, filter_by_name: str | None =
None
, itersize: int | None =None
, limit: int | None =None
) Generator[Namespace, None, None] ¶ Get the available data namespaces in the Bauplan catalog branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() for namespace in client.get_namespaces('my_namespace_name'): print(namespace.name)
-
get_table(table: str | Table, ref: str | Ref | Branch, include_raw: bool =
False
) TableWithMetadata ¶ Get the table data and metadata for a table in the target branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() # get the fields and metadata for a table table = client.get_table( table='my_table_name', ref='my_ref_or_branch_name', ) # loop through the fields and print their name, required, and type for c in table.fields: print(c.name, c.required, c.type) # show the number of records in the table print(table.records)
-
get_tables(ref: str | Branch | Ref, namespace: str | Namespace | None =
None
, limit: int | None =None
, itersize: int | None =None
) Generator[Table, None, None] ¶ Get the tables and views in the target branch.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() # retrieve only the tables as tuples of (name, kind) for table in client.get_tables('my_ref_or_branch_name'): print(table.name, table.kind)
- has_branch(branch: str | Branch) bool ¶
Check if a branch exists.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.has_branch('my_branch_name')
- has_namespace(namespace: str | Namespace, ref: str | Branch | Ref) bool ¶
Check if a namespace exists.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.has_namespace( namespace='my_namespace_name', ref='my_ref_or_branch_name', )
- has_table(table: str | Table, ref: str | Ref | Branch) bool ¶
Check if a table exists.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert client.has_table( table='my_table_name', ref='my_ref_or_branch_name', )
-
import_data(table: str | Table, search_uri: str, branch: str | Branch | None =
None
, namespace: str | Namespace | None =None
, continue_on_error: bool =False
, import_duplicate_files: bool =False
, best_effort: bool =False
, preview: 'on' | 'off' | 'head' | 'tail' | str | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) TableDataImportState ¶ Imports data into an already existing table.
import bauplan client = bauplan.Client() plan_state = client.import_data( table='my_table_name', search_uri='s3://path/to/my/files/*.parquet', branch='my_branch_name', ) if plan_state.error: plan_error_action(...) success_action(plan_state.plan)
- Parameters:¶
- table: str | Table¶
Previously created table in into which data will be imported.
- search_uri: str¶
Uri which to scan for files to import.
- branch: str | Branch | None =
None
¶ Branch in which to import the table.
- namespace: str | Namespace | None =
None
¶ Namespace of the table. If not specified, namespace will be infered from table name or default settings.
- continue_on_error: bool =
False
¶ Do not fail the import even if 1 data import fails.
- import_duplicate_files: bool =
False
¶ Ignore prevention of importing s3 files that were already imported.
- best_effort: bool =
False
¶ Don’t fail if schema of table does not match.
- transformation_query
Optional duckdb compliant query applied on each parquet file. Use original_table as the table in the query.
- preview: 'on' | 'off' | 'head' | 'tail' | str | None =
None
¶ Whether to enable or disable preview mode for the import.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the import.
- args: dict[str, str] | None =
None
¶ dict of arbitrary args to pass to the backend.
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The plan state.
- merge_branch(source_ref: str | Branch | Ref, into_branch: str | Branch) bool ¶
Merge one branch into another.
Upon failure, raises
bauplan.exceptions.BauplanError
import bauplan client = bauplan.Client() assert merge_branch( source_ref='my_ref_or_branch_name', into_branch='main', )
-
plan_table_creation(table: str | Table, search_uri: str, branch: str | Branch | None =
None
, namespace: str | Namespace | None =None
, partitioned_by: str | None =None
, replace: bool | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) TableCreatePlanState ¶ Create a table import plan from an S3 location.
This operation will attempt to create a table based of schemas of N parquet files found by a given search uri. A YAML file containing the schema and plan is returns and if there are no conflicts, it is automatically applied.
import bauplan client = bauplan.Client() plan_state = client.plan_table_creation( table='my_table_name', search_uri='s3://path/to/my/files/*.parquet', ref='my_ref_or_branch_name', ) if plan_state.error: plan_error_action(...) success_action(plan_state.plan)
- Parameters:¶
- table: str | Table¶
The table which will be created.
- search_uri: str¶
The location of the files to scan for schema.
- branch: str | Branch | None =
None
¶ The branch name in which to create the table in.
- namespace: str | Namespace | None =
None
¶ Optional argument specifying the namespace. If not. specified, it will be inferred based on table location or the default. namespace.
- partitioned_by: str | None =
None
¶ Optional argument specifying the table partitioning.
- replace: bool | None =
None
¶ Replace the table if it already exists.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode.
- args: dict[str, str] | None =
None
¶ dict of arbitrary args to pass to the backend.
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Raises:¶
TableCreatePlanStatusError – if the table creation plan fails.
- Returns:¶
The plan state.
-
query(query: str, ref: str | Branch | Ref | None =
None
, max_rows: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) Table ¶ Execute a SQL query and return the results as a pyarrow.Table. Note that this function uses Arrow also internally, resulting in a fast data transfer.
If you prefer to return the results as a pandas DataFrame, use the
to_pandas
function of pyarrow.Table.import bauplan client = bauplan.Client() # query the table and return result set as an arrow Table my_table = client.query( query='SELECT c1 FROM my_table', ref='my_ref_or_branch_name', ) # efficiently cast the table to a pandas DataFrame df = my_table.to_pandas()
- Parameters:¶
- query: str¶
The Bauplan query to execute.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- max_rows: int | None =
None
¶ The maximum number of rows to return; default:
None
(no limit).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ Additional arguments to pass to the query (default: None).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the query.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The query results as a
pyarrow.Table
.
-
query_to_csv_file(path: str | Path, query: str, ref: str | Branch | Ref | None =
None
, max_rows: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
, **kwargs: Any) Path ¶ Export the results of a SQL query to a file in CSV format.
import bauplan client = bauplan.Client() # query the table and iterate through the results one row at a time client.query_to_csv_file( path='./my.csv', query='SELECT c1 FROM my_table', ref='my_ref_or_branch_name', ):
- Parameters:¶
- path: str | Path¶
The name or path of the file csv to write the results to.
- query: str¶
The Bauplan query to execute.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- max_rows: int | None =
None
¶ The maximum number of rows to return; default:
None
(no limit).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ Additional arguments to pass to the query (default: None).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the query.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The path of the file written.
-
query_to_generator(query: str, ref: str | Branch | Ref | None =
None
, max_rows: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, as_json: bool | None =False
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) Generator[dict[str, Any], None, None] ¶ Execute a SQL query and return the results as a generator, where each row is a Python dictionary.
import bauplan client = bauplan.Client() # query the table and iterate through the results one row at a time res = client.query_to_generator( query='SELECT c1 FROM my_table', ref='my_ref_or_branch_name', ) for row in res: # do logic
- Parameters:¶
- query: str¶
The Bauplan query to execute.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- max_rows: int | None =
None
¶ The maximum number of rows to return; default:
None
(no limit).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- as_json: bool | None =
False
¶ Whether to return the results as a JSON-compatible string (default:
False
).- args: dict[str, str] | None =
None
¶ Additional arguments to pass to the query (default:
None
).- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the query.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Yield:¶
A dictionary representing a row of query results.
-
query_to_json_file(path: str | Path, query: str, file_format: 'json' | 'jsonl' | None =
'json'
, ref: str | Branch | Ref | None =None
, max_rows: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) Path ¶ Export the results of a SQL query to a file in JSON format.
import bauplan client = bauplan.Client() # query the table and iterate through the results one row at a time client.query_to_json_file( path='./my.json', query='SELECT c1 FROM my_table', ref='my_ref_or_branch_name', ):
- Parameters:¶
- path: str | Path¶
The name or path of the file json to write the results to.
- query: str¶
The Bauplan query to execute.
- file_format: 'json' | 'jsonl' | None =
'json'
¶ The format to write the results in; default:
json
. Allowed values are ‘json’ and ‘jsonl’.- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- max_rows: int | None =
None
¶ The maximum number of rows to return; default:
None
(no limit).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ Additional arguments to pass to the query (default: None).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the query.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The path of the file written.
-
query_to_parquet_file(path: str | Path, query: str, ref: str | Branch | Ref | None =
None
, max_rows: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
, **kwargs: Any) Path ¶ Export the results of a SQL query to a file in Parquet format.
import bauplan client = bauplan.Client() # query the table and iterate through the results one row at a time client.query_to_parquet_file( path='./my.parquet', query='SELECT c1 FROM my_table', ref='my_ref_or_branch_name', ):
- Parameters:¶
- path: str | Path¶
The name or path of the file parquet to write the results to.
- query: str¶
The Bauplan query to execute.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- max_rows: int | None =
None
¶ The maximum number of rows to return; default:
None
(no limit).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the query in. If not set, the query will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ Additional arguments to pass to the query (default: None).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the query.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The path of the file written.
-
rerun(job_id: str, ref: str | Branch | Ref | None =
None
, namespace: str | Namespace | None =None
, cache: 'on' | 'off' | None =None
, transaction: 'on' | 'off' | None =None
, dry_run: bool | None =None
, strict: 'on' | 'off' | None =None
, preview: 'on' | 'off' | 'head' | 'tail' | str | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) ReRunState ¶ Re run a Bauplan project by its ID and return the state of the run. This is the equivalent of running through the CLI the
bauplan rerun
command.- Parameters:¶
- job_id: str¶
The Job ID of the previous run. This can be used to re-run a previous run, e.g., on a different branch.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the job in. If not set, the job will be run in the default namespace.
- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the run.
- transaction: 'on' | 'off' | None =
None
¶ Whether to enable or disable transaction mode for the run.
- dry_run: bool | None =
None
¶ Whether to enable or disable dry-run mode for the run; models are not materialized.
- strict: 'on' | 'off' | None =
None
¶ Whether to enable or disable strict schema validation.
- preview: 'on' | 'off' | 'head' | 'tail' | str | None =
None
¶ Whether to enable or disable preview mode for the run.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the run.
- args: dict[str, str] | None =
None
¶ Additional arguments (optional).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the run.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The state of the run.
-
run(project_dir: str | None =
None
, ref: str | Branch | Ref | None =None
, namespace: str | Namespace | None =None
, parameters: dict[str, str | int | float | bool] | None =None
, cache: 'on' | 'off' | None =None
, transaction: 'on' | 'off' | None =None
, dry_run: bool | None =None
, strict: 'on' | 'off' | None =None
, preview: 'on' | 'off' | 'head' | 'tail' | str | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, verbose: bool | None =None
, client_timeout: int | float | None =None
) RunState ¶ Run a Bauplan project and return the state of the run. This is the equivalent of running through the CLI the
bauplan run
command.- Parameters:¶
- project_dir: str | None =
None
¶ The directory of the project (where the
bauplan_project.yml
file is located).- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the job in. If not set, the job will be run in the default namespace.
- parameters: dict[str, str | int | float | bool] | None =
None
¶ Parameters for templating into SQL or Python models.
- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the run.
- transaction: 'on' | 'off' | None =
None
¶ Whether to enable or disable transaction mode for the run.
- dry_run: bool | None =
None
¶ Whether to enable or disable dry-run mode for the run; models are not materialized.
- strict: 'on' | 'off' | None =
None
¶ Whether to enable or disable strict schema validation.
- preview: 'on' | 'off' | 'head' | 'tail' | str | None =
None
¶ Whether to enable or disable preview mode for the run.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the run.
- args: dict[str, str] | None =
None
¶ Additional arguments (optional).
- verbose: bool | None =
None
¶ Whether to enable or disable verbose mode for the run.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- project_dir: str | None =
- Returns:¶
The state of the run.
-
scan(table: str | Table, ref: str | Branch | Ref | None =
None
, columns: list[str] | None =None
, filters: str | None =None
, limit: int | None =None
, cache: 'on' | 'off' | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, namespace: str | Namespace | None =None
, debug: bool | None =None
, args: dict[str, str] | None =None
, client_timeout: int | float | None =None
, **kwargs: Any) Table ¶ Execute a table scan (with optional filters) and return the results as an arrow Table.
Note that this function uses SQLGlot to compose a safe SQL query, and then internally defer to the query_to_arrow function for the actual scan.
import bauplan client = bauplan.Client() # run a table scan over the data lake # filters are passed as a string my_table = client.scan( table='my_table_name', ref='my_ref_or_branch_name', columns=['c1'], filters='c2 > 10', )
- Parameters:¶
- table: str | Table¶
The table to scan.
- ref: str | Branch | Ref | None =
None
¶ The ref or branch name to read data from.
- columns: list[str] | None =
None
¶ The columns to return (default:
None
).- filters: str | None =
None
¶ The filters to apply (default:
None
).- limit: int | None =
None
¶ The maximum number of rows to return (default:
None
).- cache: 'on' | 'off' | None =
None
¶ Whether to enable or disable caching for the query.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
- namespace: str | Namespace | None =
None
¶ The Namespace to run the scan in. If not set, the scan will be run in the default namespace for your account.
- debug: bool | None =
None
¶ Whether to enable or disable debug mode for the query.
- args: dict[str, str] | None =
None
¶ dict of arbitrary args to pass to the backend.
- client_timeout: int | float | None =
None
¶ seconds to timeout; this also cancels the remote job execution.
- Returns:¶
The scan results as a
pyarrow.Table
.
-
class bauplan.JobStatus(canceled: str =
'CANCELLED'
, cancelled: str ='CANCELLED'
, failed: str ='FAILED'
, rejected: str ='REJECTED'
, success: str ='SUCCESS'
, timeout: str ='TIMEOUT'
, unknown: str ='UNKNOWN'
)¶ Bases:
object
-
canceled : str =
'CANCELLED'
¶
-
cancelled : str =
'CANCELLED'
¶
-
failed : str =
'FAILED'
¶
-
rejected : str =
'REJECTED'
¶
-
success : str =
'SUCCESS'
¶
-
timeout : str =
'TIMEOUT'
¶
-
unknown : str =
'UNKNOWN'
¶
-
canceled : str =
-
class bauplan.Model(name: str, columns: list[str] | None =
None
, filter: str | None =None
, ref: str | None =None
, connector: str | None =None
, connector_config_key: str | None =None
, connector_config_uri: str | None =None
, **kwargs: Any)¶ Bases:
object
Represents a model (dataframe/table representing a DAG step) as an input to a function.
e.g.
@bauplan.model() def some_parent_model(): return pyarrow.Table.from_pydict({'bar': [1, 2, 3]}) @bauplan.model() def your_cool_model( # parent models are passed as inputs, using bauplan.Model # class parent_0=bauplan.Model( 'some_parent_model', columns=['bar'], filter='bar > 1', ) ): # Can return a pandas dataframe or a pyarrow table return pyarrow.Table.from_pandas( pd.DataFrame({ 'foo': parent_0['bar'] * 2, }) )
Bauplan can wrap other engines for the processing of some models, exposing a common interface and unified API for the user while dispatching the relevant operations to the underlying engine.
The authentication and authorization happens securely and transparently through ssm; the user is asked to specify a connector type and the credentials through the relevant keywords:
@bauplan.model() def your_cool_model( parent_0=bauplan.Model( 'some_parent_model', columns=['bar'], filter='bar > 1', connector='dremio', connector_config_key='bauplan', ) ): # parent_0 inside the function body # will still be an Arrow table: the user code # should still be the same as the data is moved # transparently by Bauplan from an engine to the function. return pyarrow.Table.from_pandas( pd.DataFrame({ 'foo': parent_0['bar'] * 2, }) )
- Parameters:¶
- name: str¶
The name of the model.
- columns: list[str] | None =
None
¶ The list of columns in the model. If the arg is not provided, the model will load all columns.
- filter: str | None =
None
¶ The optional filter for the model. Defaults to None.
- ref: str | None =
None
¶ The optional reference to the model. Defaults to None.
- connector: str | None =
None
¶ The connector type for the model (defaults to Bauplan SQL). Allowed values are ‘snowflake’ and ‘dremio’.
- connector_config_key: str | None =
None
¶ The key name if the SSM key is custom with the pattern bauplan/connectors/<connector_type>/<key>.
- connector_config_uri: str | None =
None
¶ Full SSM uri if completely custom path, e.g. ssm://us-west-2/123456789012/baubau/dremio.
-
class bauplan.Profile(name: 'Optional[str]' =
None
, api_key: 'Optional[str]' =None
, user_session_token: 'Optional[str]' =None
, project_dir: 'Optional[Union[str, Path]]' =None
, branch: 'Optional[str]' =None
, namespace: 'Optional[str]' =None
, cache: "Optional[Literal['on', 'off']]" =None
, debug: 'Optional[bool]' =None
, verbose: 'Optional[bool]' =None
, args: 'Optional[Dict[str, str]]' =None
, api_endpoint: 'Optional[str]' =None
, catalog_endpoint: 'Optional[str]' =None
, client_timeout: 'Optional[int]' =None
, env: 'Optional[str]' =None
, config_file_path: 'Optional[Union[str, Path]]' =None
, feature_flags: 'Optional[Dict[str, Any]]' =None
)¶ Bases:
object
- api_endpoint : str¶
- api_key : str | None¶
- args : Dict[str, str] | None¶
- branch : str | None¶
- cache : str | None¶
- catalog_endpoint : str¶
- client_timeout : int | None¶
- config_file_path : str | Path | None¶
- debug : bool | None¶
- env : str | None¶
- feature_flags : Dict[str, str]¶
-
classmethod load_profile(profile: str | None =
None
, api_key: str | None =None
, user_session_token: str | None =None
, project_dir: str | Path | None =None
, branch: str | None =None
, namespace: str | None =None
, cache: 'on' | 'off' | None =None
, debug: bool | None =None
, verbose: bool | None =None
, args: dict[str, str] | None =None
, api_endpoint: str | None =None
, catalog_endpoint: str | None =None
, client_timeout: int | None =None
, env: str | None =None
, config_file_path: str | Path | None =None
, feature_flags: dict[str, Any] | None =None
) Profile ¶ Load a profile from a profile file.
- name : str | None¶
- namespace : str | None¶
- project_dir : str | Path | None¶
- user_session_token : str | None¶
- verbose : bool | None¶
- bauplan.expectation(**kwargs: Any) Callable ¶
Decorator that defines a Bauplan expectation.
An expectation is a function from one (or more) dataframe-like object(s) to a boolean: it is commonly used to perform data validation and data quality checks when running a pipeline. Expectations takes as input the table(s) they are validating and return a boolean indicating whether the expectation is met or not. A Python expectation needs a Python environment to run, which is defined using the python decorator, e.g.:
@bauplan.expectation() @bauplan.python('3.10') def test_joined_dataset( data=bauplan.Model( 'join_dataset', columns=['anomaly'] ) ): # your data validation code here return expect_column_no_nulls(data, 'anomaly')
- Parameters:¶
- f
The function to decorate.
-
bauplan.model(name: str | None =
None
, columns: list[str] | None =None
, partitioned_by: str | list[str] | tuple[str, ...] | None =None
, materialization_strategy: 'NONE' | 'REPLACE' | 'APPEND' | None =None
, cache_strategy: 'NONE' | 'DEFAULT' | None =None
, internet_access: bool | None =None
, **kwargs: Any) Callable ¶ Decorator that specifies a Bauplan model.
A model is a function from one (or more) dataframe-like object(s) to another dataframe-like object: it is used to define a transformation in a pipeline. Models are chained together implicitly by using them as inputs to their children. A Python model needs a Python environment to run, which is defined using the python decorator, e.g.:
@bauplan.model( columns=['*'], materialization_strategy='NONE' ) @bauplan.python('3.11') def source_scan( data=bauplan.Model( 'iot_kaggle', columns=['*'], filter="motion='false'" ) ): # your code here return data
- Parameters:¶
- name: str | None =
None
¶ the name of the model (e.g. ‘users’); if missing the function name is used.
- columns: list[str] | None =
None
¶ the columns of the output dataframe after the model runs (e.g. [‘id’, ‘name’, ‘email’]). Use [‘*’] as a wildcard.
- internet_access: bool | None =
None
¶ whether the model requires internet access.
- partitioned_by: str | list[str] | tuple[str, ...] | None =
None
¶ the columns to partition the data by.
- materialization_strategy: 'NONE' | 'REPLACE' | 'APPEND' | None =
None
¶ the materialization strategy to use.
- cache_strategy: 'NONE' | 'DEFAULT' | None =
None
¶ the cache strategy to use.
- name: str | None =
-
bauplan.pyspark(version: str | None =
None
, conf: dict[str, str] | None =None
, **kwargs: Any) Callable ¶ Decorator that makes a pyspark session available to a Bauplan function (a model or an expectation). Add a spark=None parameter to the function model args
-
bauplan.python(version: str | None =
None
, pip: dict[str, str] | None =None
, **kwargs: Any) Callable ¶ Decorator that defines a Python environment for a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the Python environment required to run the function, i.e. the Python version and the Python packages required.
-
bauplan.resources(cpus: int | float | None =
None
, memory: int | str | None =None
, memory_swap: int | str | None =None
, timeout: int | None =None
, **kwargs: Any) Callable ¶ Decorator that defines the resources required by a Bauplan function (e.g. a model or expectation). It is used to specify directly in code the configuration of the resources required to run the function.
- Parameters:¶
- cpus: int | float | None =
None
¶ The number of CPUs required by the function (e.g:
`0.5`
)- memory: int | str | None =
None
¶ The amount of memory required by the function (e.g:
`1G`
,`1000`
)- memory_swap: int | str | None =
None
¶ The amount of swap memory required by the function (e.g:
`1G`
,`1000`
)- timeout: int | None =
None
¶ The maximum time the function is allowed to run (e.g:
`60`
)
- cpus: int | float | None =