Handle casting programmatically

If you’d rather manage schema conflicts without editing YAML by hand, you can handle the entire flow programmatically using the Bauplan Python SDK.

def generate_plan(
    client: bauplan.Client,
    table: str,
    search_uri: str,
    branch: str
) -> Dict[str, Any]:
    """
    Generate a schema plan for importing data into Bauplan.

    This plan will include inferred column types and any detected schema conflicts.

    Returns:
        A dictionary containing the table creation plan.
    """
    response = client.plan_table_creation(
        table=table,
        search_uri=search_uri,
        branch=branch
    )
    return response.plan  # Extract the actual plan dictionary from the response
import bauplan

client = bauplan.Client()
plan = generate_plan(client=client, table='your_table', search_uri='s3://your-bucket/*.parquet', branch='import_branch')

Overriding Column Types in a Schema Plan

Sometimes you know more about your data than Bauplan’s automatic inference — for example, when a column should be interpreted as a timestamp rather than a string. You can override inferred types by modifying the dst_datatype field in the schema plan.

This pattern is especially useful when:

  • You want schema logic to live in code and version control

  • You have known transformations (e.g., parsing timestamp strings)

  • You want repeatable, auditable data type enforcement

Here’s a reusable helper to apply type overrides based on a type_map: a Python dictionary where each key is a column name and the value is a list of type definitions using the same structure Bauplan uses in dst_datatype.

type_map = {
    "EventTime": [
        {
            "datatype": "timestamp",
            "unit": "us",
            "parse_format": "%Y%m%d%H%M%S"
        }
    ],
    "UserID": [
        {
            "datatype": "int"
        }
    ]
}

This allows you to override inferred types programmatically, in a format that’s fully compatible with Bauplan schema plans.

def override_column_types(plan: dict, type_map: dict) -> dict:
    """
    Update destination types for specific columns in a Bauplan schema plan.
    Clears the conflict list after applying overrides.
    """
    for col in plan['schema_info']['detected_schemas']:
        if col['column_name'] in type_map:
            col['dst_datatype'] = type_map[col['column_name']]

    # Mark the plan as resolved
    plan['schema_info']['conflicts'] = []

    return plan

This function clears schema_info.conflicts after applying overrides. This makes the procedure compatible with both clean plans (where you’re programmatically casting columns) and plans with conflicts (where inferred types are ambiguous).

Because Bauplan plans include a list of detected columns and their inferred types, you can modify dst_datatype to control how each column will be interpreted during import — regardless of what was inferred from the files:

plan['schema_info']['detected_schemas'] = [
    {
        "column_name": "EventTime",
        "src_datatypes": [{"datatype": "string"}],
        "dst_datatype": [{"datatype": "string"}]
    },
    ...
]

Once you’ve updated the plan, you can now apply the plan and import the data

import bauplan

client = bauplan.Client()

client.apply_table_creation_plan(plan=plan)
client.import_data(
    table='your_table',
    search_uri='s3://your-bucket/*.parquet',
    branch='import_branch'
)