Tables¶
bauplan uses Apache Iceberg tables to provide database-like features in the data lake, including versioning, schema evolution, and efficient querying. This approach helps resolve common data lake issues:
Complex schema management
Limited versioning
Inefficient querying
Metadata consistency1
Complex partitioning
Table Creation and Import¶
You can create tables by importing data from Parquet files into the data catalog using a two-step process:
Create an Empty Table: Define a schema design to create an empty table.
Import Data: Import data into the empty table.
Note
Currently, only S3 is supported as the data source and Parquet and CSV as file formats.
Data engineers often import large datasets from various sources into the data catalog. To prevent unintentional imports, such as faulty data or sensitive information, bauplan requires data to be imported into branches before merging with main
. Branching is a zero-copy operation, making it instantaneous and cost-efficient.
# Create a new import branch
bauplan branch create <your_import_branch>
# Create a new table in the data catalog from a source location in S3
bauplan table create --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'
# Import data into the newly created table from the same source location in S3
bauplan table import --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'
Plan Generation and Conflict Resolution¶
When Iceberg tables are built from multiple Parquet files, schema conflicts can occur—such as a column defined as string
in one file and long
in another.
Because Iceberg requires a coherent schema for successful imports, bauplan provides an import plan feature to simplify and automate conflict resolution.
An import plan captures the schema details of each contributing file and allows conflicts to be resolved manually or automatically before importing data. The import plan is either saved as a YAML file or returned as an iterable Python object when using the bauplan client.
Generating an Import Plan¶
To generate an import plan:
# Create a new import branch
bauplan branch create <your_import_branch>
# Create an import plan from a source location in S3 and save it in a YAML file
bauplan table create-plan --name <your_table> --search-uri 's3://your/data/to/import/*.parquet' --save-plan <your_table_creation_plan>.yml
This command creates and saves an import plan file (<your_table_creation_plan>.yml
) with details about inferred schemas. The plan includes:
Detected Schemas: Each column lists its source data types (
src_datatypes
) and the destination data type (dst_datatype
), which you can adjust as needed. For example, if a column is detected asint
but should be imported asstring
, simply setdst_datatype
tostring
.Conflicts: Any detected schema conflicts are listed under a
conflicts
section, detailing incompatible definitions across files. Resolving conflicts requires modifying the conflicting columns so that each one has a consistentdst_datatype
. Once resolved, remove entries from theconflicts
field to indicate they are addressed.
plan_state:
plan_id: to-be-filled
branch_name: <your_import_branch>
table_identifier:
- <namespace>
- <table_name>
table_replace: false
schema_info:
conflicts: []
detected_schemas:
- column_name: col1
src_datatypes:
- datatype: str
dst_datatype:
- datatype: str
- column_name: col2
src_datatypes:
- datatype: timestamp
unit: us
dst_datatype:
- datatype: timestamp
unit: us
timezone: UTC
partitions: []
plan_metadata:
cloud_provider_info:
provider: s3
bucket_region: <your_s3_regione>
internal_scan_summary:
- column_to_type:
col1:
datatype: str
col2:
datatype: timestamp
unit: us
lpep_dropoff_datetime:
datatype: timestamp
unit: us
path: <'s3://your/data/to/import/*.parquet'>
plan_schema_version: 0.3.0
With a fully defined and conflict-free import plan, bauplan can apply the schema changes across the dataset, ensuring smooth table creation and data import. To do this, we simply apply the import plan creating a new table, and then proceed to import the data into the newly created table as usual.
# Apply the import plan - this operation will create a new table in the data catalog
bauplan table create-plan-apply --plan <your_table_creation_plan>.yml
# Import data into the newly created table from a source location in S3
bauplan table import --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'
Scripting Ingestion Flows¶
The bauplan Python SDK allows you to automate import workflows and implement custom logic. Below is an example function to handle the end-to-end import process using the bauplan client:
import bauplan
def create_import_branch(
client: bauplan.Client,
import_branch: str,
from_branch: str = 'main'
):
# Assert the creation of a new import branch
assert client.create_branch(import_branch, from_branch)
def create_new_table(
client: bauplan.Client,
table_name: str,
source_s3_location: str,
import_branch: str
):
# Try to create the new table in the import branch of the data catalog
try:
return client.create_table(table_name, source_s3_location, import_branch)
except bauplan.exceptions.BauplanError as e:
print(f"Error: {e}")
return None
except Exception as e:
print(f"Unexpected Error: {e}")
return None
def import_data(
client: bauplan.Client,
table_name: str,
import_branch: str,
source_s3_location: str,
):
# Try to import data into the newly created table in the import branch of the data catalog
try:
return client.import_data(table_name, source_s3_location, import_branch)
except bauplan.exceptions.BauplanError as e:
print(f"Error: {e}")
return None
except Exception as e:
print(f"Unexpected Error: {e}")
return None
def merge_import_branch(
client: bauplan.Client,
import_branch: str,
into_branch: str = 'main'
):
# Assert the merge of the import branch into the main branch
assert client.merge_branch(source_ref=import_branch, into_branch=into_branch)
def main():
# Instantiate the bauplan client
client = bauplan.Client()
# Populate variables for the workflow
my_table = 'your_table_name'
import_branch = 'username.your_import_branch'
main_branch = 'main'
s3_source_location = 's3://your/data/to/import/*.parquet'
# Create an import branch
print(f'Info: creating import branch {import_branch}.')
create_import_branch(client=client,
import_branch=import_branch,
from_branch=main_branch,
)
print(f'Success: import branch {import_branch} created.')
# Create the new table in the import branch
print(f'Info: creating new table {my_table}.')
create_new_table(client=client,
table_name=my_table,
source_s3_location=s3_source_location,
import_branch=import_branch,
)
print(f'Success: new table {my_table} created.')
# Import data into the newly created table in the import branch
print(f'Info: importing data into {my_table}.')
import_data(client=client,
table_name=my_table,
import_branch=import_branch,
source_s3_location=s3_source_location,
)
print(f'Success: data imported into {my_table}')
# Merge the import branch into the main branch
print(f'Info: merging branch {import_branch} into {main_branch}.')
merge_import_branch(
client=client,
import_branch=import_branch,
into_branch='main',
)
print(f'Success: branch {import_branch} merged into {main_branch}.')
print('So long, and thanks for all the fish')
if __name__ == '__main__':
main()