Import data
In this example, we will illustrate how to embed bauplan import data API into an orchestration flow. The goal is to quickly go from local parquet files to an Iceberg table in the data catalog that can be queried and processed. While doing this, we will show how simple guardrails can be programmed in the flow to make sure that the data import does not fail silently or that faulty data is not imported in the catalog.
The script uses the Python SDK to implement a flow to:
take data in parquet format on a local folder
taxi-2024
and load it on a public S3 bucket,create a temporary
upload branch
separated from themain
branch with the SDK methodcreate_branch
frombauplan.Client
create an
import plan
for the data as aYAML
file, using the methodplan_import
frombauplan.Client
- if the plan has no schema conflicts,
import the data into an Iceberg table using
apply_import
frombauplan.Client
merge the import branch into the
main
usingmerge_branch
frombauplan.Client
finally delete the import branch upon success using
delete_branch
frombauplan.Client
.
- if the import plan does have schema conflicts that require resolving manually in the YAML file,
the process will stop and an assert error will be thrown.
This flow is a somewhat simplified version of a real-world ETL flow, and it can of course be programmed even further with arbitrarily complex logic (e.g. using a scheduler or event-based triggers).
To run the project, we are going to use the following components:
We use Prefect as an orchestrator, but nothing hinges on this choice and the same logic can easily be ported in other orchestrators, like Airflow or Kestra
Setup
First, create a .env
file and put your AWS credentials with the appropriate permissions to upload the data in an S3 bucket. Your S3 will have to be public and have the right access policy to allow the data import with bauplan.
To set up the right permissions in your AWS console, navigate to your S3 bucket, then click on permissions
and make sure the bucket does not block public access.
Then navigate to the section Bucket Policy
, click on edit
and paste this policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AddPerm",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::alpha-hello-bauplan/*"
},
{
"Sid": "AddPerm",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::alpha-hello-bauplan"
}
]
}
Then, prepare a Python virtual environment and install the dependencies in the file requirements.txt:
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
Once you have your environment set up, you can run the project by running the file orchestrator.py
and passing your username
and the name of the table as arguments in terminal.
python orchestrator.py <YOUR_USERNAME> taxi_jan_feb_2024
You should see the flow executing in your terminal one task at the time. If the flow runs to completion, it means that the import plan generated by bauplan had no schema conflicts. If you inspect your data catalog now, you will find your new table taxi_jan_feb_2024
in the main
branch.
The table can now be inspected and queried as usual.
bauplan branch checkout main
bauplan table get taxi_jan_feb_2024
bauplan query "SELECT * FROM taxi_jan_feb_2024 LIMIT 10"
To see the process fail, you can try to upload files that generate a conflict in the schema for the Iceberg table upload. To make this simple, we provide such data in the folder taxi-2023
. Go in the file orchestrator.py
and change the variable data_directory
to taxi-2023
on the main method.
In this way, the flow will upload the parquet files in the folder taxi-2023
which raises a conflict in the import plan generated before the upload. This will make the flow crash with an AssertionError
.
if __name__ == '__main__':
import sys
import os
# check if the correct number of arguments is passed
if len(sys.argv) != 3:
print("Missing args! Usage: python orchestrator.py <username> <table_name>")
sys.exit(1)
# assign variables from command-line arguments
username = sys.argv[1]
table_name = sys.argv[2]
# this is the folder containing the data, modify as needed
data_directory = 'taxi-2023'
# this is the S3 folder where the data will be uploaded, modify as needed
s3_directory = 'upload-test'
# make sure the environment variables are set before running the flow
setup_env()
# call the function with the command-line arguments
import_data_with_bauplan(
user_name=username,
table_name=table_name,
data_directory=data_directory,
bucket=os.getenv('S3_BUCKET_NAME'),
s3_directory=s3_directory
)
Summary
With this example we have demonstrated how:
to use bauplan to create and run an import flow that takes data in parquet format and import that in an Iceberg cloud catalog.
to leverage the branching capabilities of the system to create temporary import branches in the process to protect the main branch of the data lake from errors and faulty uploads.