Tables

bauplan uses Apache Iceberg tables to provide database-like features in the data lake, including versioning, schema evolution, and efficient querying. This approach helps resolve common data lake issues:

  • Complex schema management

  • Limited versioning

  • Inefficient querying

  • Metadata consistency1

  • Complex partitioning

Table Creation and Import

You can create tables by importing data from Parquet files into the data catalog using a two-step process:

  1. Create an Empty Table: Define a schema design to create an empty table.

  2. Import Data: Import data into the empty table.

Note

Currently, only S3 is supported as the data source and Parquet and CSV as file formats.

Data engineers often import large datasets from various sources into the data catalog. To prevent unintentional imports, such as faulty data or sensitive information, bauplan requires data to be imported into branches before merging with main. Branching is a zero-copy operation, making it instantaneous and cost-efficient.

# Create a new import branch
bauplan branch create <your_import_branch>

# Create a new table in the data catalog from a source location in S3
bauplan table create --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'

# Import data into the newly created table from the same source location in S3
bauplan table import --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'

Plan Generation and Conflict Resolution

When Iceberg tables are built from multiple Parquet files, schema conflicts can occur—such as a column defined as string in one file and long in another.

Because Iceberg requires a coherent schema for successful imports, bauplan provides an import plan feature to simplify and automate conflict resolution.

An import plan captures the schema details of each contributing file and allows conflicts to be resolved manually or automatically before importing data. The import plan is either saved as a YAML file or returned as an iterable Python object when using the bauplan client.

Generating an Import Plan

To generate an import plan:

# Create a new import branch
bauplan branch create <your_import_branch>

# Create an import plan from a source location in S3 and save it in a YAML file
bauplan table create-plan --name <your_table> --search-uri 's3://your/data/to/import/*.parquet' --save-plan <your_table_creation_plan>.yml

This command creates and saves an import plan file (<your_table_creation_plan>.yml) with details about inferred schemas. The plan includes:

  • Detected Schemas: Each column lists its source data types (src_datatypes) and the destination data type (dst_datatype), which you can adjust as needed. For example, if a column is detected as int but should be imported as string, simply set dst_datatype to string.

  • Conflicts: Any detected schema conflicts are listed under a conflicts section, detailing incompatible definitions across files. Resolving conflicts requires modifying the conflicting columns so that each one has a consistent dst_datatype. Once resolved, remove entries from the conflicts field to indicate they are addressed.

plan_state:
  plan_id: to-be-filled
  branch_name: <your_import_branch>
  table_identifier:
  - <namespace>
  - <table_name>
  table_replace: false
schema_info:
  conflicts: []
  detected_schemas:
  - column_name: col1
    src_datatypes:
    - datatype: str
    dst_datatype:
    - datatype: str
  - column_name: col2
    src_datatypes:
    - datatype: timestamp
      unit: us
    dst_datatype:
    - datatype: timestamp
      unit: us
      timezone: UTC
  partitions: []
plan_metadata:
  cloud_provider_info:
    provider: s3
    bucket_region: <your_s3_regione>
internal_scan_summary:
- column_to_type:
    col1:
      datatype: str
    col2:
      datatype: timestamp
      unit: us
    lpep_dropoff_datetime:
      datatype: timestamp
      unit: us
  path: <'s3://your/data/to/import/*.parquet'>
plan_schema_version: 0.3.0

With a fully defined and conflict-free import plan, bauplan can apply the schema changes across the dataset, ensuring smooth table creation and data import. To do this, we simply apply the import plan creating a new table, and then proceed to import the data into the newly created table as usual.

# Apply the import plan - this operation will create a new table in the data catalog
bauplan table create-plan-apply --plan <your_table_creation_plan>.yml

# Import data into the newly created table from a source location in S3
bauplan table import --name <your_table_name> --search-uri 's3://your/data/to/import/*.parquet'

Scripting Ingestion Flows

The bauplan Python SDK allows you to automate import workflows and implement custom logic. Below is an example function to handle the end-to-end import process using the bauplan client:

import bauplan

def create_import_branch(
        client: bauplan.Client,
        import_branch: str,
        from_branch: str = 'main'
):
    # Assert the creation of a new import branch
    assert client.create_branch(import_branch, from_branch)

def create_new_table(
        client: bauplan.Client,
        table_name: str,
        source_s3_location: str,
        import_branch: str
):
    # Try to create the new table in the import branch of the data catalog
    try:
        return client.create_table(table_name, source_s3_location, import_branch)

    except bauplan.exceptions.BauplanError as e:
        print(f"Error: {e}")
        return None
    except Exception as e:
        print(f"Unexpected Error: {e}")
        return None

def import_data(
        client: bauplan.Client,
        table_name: str,
        import_branch: str,
        source_s3_location: str,
):
    # Try to import data into the newly created table in the import branch of the data catalog
    try:
        return client.import_data(table_name, source_s3_location, import_branch)

    except bauplan.exceptions.BauplanError as e:
        print(f"Error: {e}")
        return None
    except Exception as e:
        print(f"Unexpected Error: {e}")
        return None

def merge_import_branch(
        client: bauplan.Client,
        import_branch: str,
        into_branch: str = 'main'
):
    # Assert the merge of the import branch into the main branch
    assert client.merge_branch(source_ref=import_branch, into_branch=into_branch)

def main():
    # Instantiate the bauplan client
    client = bauplan.Client()

    # Populate variables for the workflow
    my_table = 'your_table_name'
    import_branch = 'username.your_import_branch'
    main_branch = 'main'
    s3_source_location = 's3://your/data/to/import/*.parquet'

    # Create an import branch
    print(f'Info: creating import branch {import_branch}.')
    create_import_branch(client=client,
                        import_branch=import_branch,
                        from_branch=main_branch,
                        )
    print(f'Success: import branch {import_branch} created.')

    # Create the new table in the import branch
    print(f'Info: creating new table {my_table}.')
    create_new_table(client=client,
                    table_name=my_table,
                    source_s3_location=s3_source_location,
                    import_branch=import_branch,
                    )
    print(f'Success: new table {my_table} created.')

    # Import data into the newly created table in the import branch
    print(f'Info: importing data into {my_table}.')
    import_data(client=client,
                table_name=my_table,
                import_branch=import_branch,
                source_s3_location=s3_source_location,
                )
    print(f'Success: data imported into {my_table}')

    # Merge the import branch into the main branch
    print(f'Info: merging branch {import_branch} into {main_branch}.')
    merge_import_branch(
        client=client,
        import_branch=import_branch,
        into_branch='main',
    )
    print(f'Success: branch {import_branch} merged into {main_branch}.')
    print('So long, and thanks for all the fish')

if __name__ == '__main__':
    main()