Handle casting programmatically¶
If you’d rather manage schema conflicts without editing YAML by hand, you can handle the entire flow programmatically using the Bauplan Python SDK.
def generate_plan(
client: bauplan.Client,
table: str,
search_uri: str,
branch: str
) -> Dict[str, Any]:
"""
Generate a schema plan for importing data into Bauplan.
This plan will include inferred column types and any detected schema conflicts.
Returns:
A dictionary containing the table creation plan.
"""
response = client.plan_table_creation(
table=table,
search_uri=search_uri,
branch=branch
)
return response.plan # Extract the actual plan dictionary from the response
import bauplan
client = bauplan.Client()
plan = generate_plan(client=client, table='your_table', search_uri='s3://your-bucket/*.parquet', branch='import_branch')
Overriding Column Types in a Schema Plan¶
Sometimes you know more about your data than Bauplan’s automatic inference — for example, when a column should be interpreted as a timestamp rather than a string. You can override inferred types by modifying the dst_datatype
field in the schema plan.
This pattern is especially useful when:
You want schema logic to live in code and version control
You have known transformations (e.g., parsing timestamp strings)
You want repeatable, auditable data type enforcement
Here’s a reusable helper to apply type overrides based on a type_map
: a Python dictionary where each key is a column name and the value is a list of type definitions using the same structure Bauplan uses in dst_datatype
.
type_map = {
"EventTime": [
{
"datatype": "timestamp",
"unit": "us",
"parse_format": "%Y%m%d%H%M%S"
}
],
"UserID": [
{
"datatype": "int"
}
]
}
This allows you to override inferred types programmatically, in a format that’s fully compatible with Bauplan schema plans.
def override_column_types(plan: dict, type_map: dict) -> dict:
"""
Update destination types for specific columns in a Bauplan schema plan.
Clears the conflict list after applying overrides.
"""
for col in plan['schema_info']['detected_schemas']:
if col['column_name'] in type_map:
col['dst_datatype'] = type_map[col['column_name']]
# Mark the plan as resolved
plan['schema_info']['conflicts'] = []
return plan
This function clears schema_info.conflicts
after applying overrides. This makes the procedure compatible with both clean plans (where you’re programmatically casting columns) and plans with conflicts (where inferred types are ambiguous).
Because Bauplan plans include a list of detected columns and their inferred types, you can modify dst_datatype
to control how each column will be interpreted during import — regardless of what was inferred from the files:
plan['schema_info']['detected_schemas'] = [
{
"column_name": "EventTime",
"src_datatypes": [{"datatype": "string"}],
"dst_datatype": [{"datatype": "string"}]
},
...
]
Once you’ve updated the plan, you can now apply the plan and import the data
import bauplan
client = bauplan.Client()
client.apply_table_creation_plan(plan=plan)
client.import_data(
table='your_table',
search_uri='s3://your-bucket/*.parquet',
branch='import_branch'
)