Import data

In this example, we will illustrate how to embed bauplan import data API into an orchestration flow. The goal is to quickly go from local parquet files to an Iceberg table in the data catalog that can be queried and processed. While doing this, we will show how simple guardrails can be programmed in the flow to make sure that the data import does not fail silently or that faulty data is not imported in the catalog.

The script uses the Python SDK to implement a flow to:

  • take data in parquet format on a local folder taxi-2024 and load it on a public S3 bucket,

  • create a temporary upload branch separated from the main branch with the SDK method create_branch from bauplan.Client

  • create an import plan for the data as a YAML file, using the method plan_import from bauplan.Client

  • if the plan has no schema conflicts,
    • import the data into an Iceberg table using apply_import from bauplan.Client

    • merge the import branch into the main using merge_branch from bauplan.Client

    • finally delete the import branch upon success using delete_branch from bauplan.Client.

  • if the import plan does have schema conflicts that require resolving manually in the YAML file,
    • the process will stop and an assert error will be thrown.

data-import-image.png

This flow is a somewhat simplified version of a real-world ETL flow, and it can of course be programmed even further with arbitrarily complex logic (e.g. using a scheduler or event-based triggers).

To run the project, we are going to use the following components:

  • Cloud Storage: AWS S3

  • Data runtime: bauplan

  • Orchestrator: Prefect

We use Prefect as an orchestrator, but nothing hinges on this choice and the same logic can easily be ported in other orchestrators, like Airflow or Kestra

Setup

First, create a .env file and put your AWS credentials with the appropriate permissions to upload the data in an S3 bucket. Your S3 will have to be public and have the right access policy to allow the data import with bauplan. To set up the right permissions in your AWS console, navigate to your S3 bucket, then click on permissions and make sure the bucket does not block public access.

s3_image.png

Then navigate to the section Bucket Policy, click on edit and paste this policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AddPerm",
            "Effect": "Allow",
            "Principal": {
                "AWS": "*"
            },
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::alpha-hello-bauplan/*"
        },
        {
            "Sid": "AddPerm",
            "Effect": "Allow",
            "Principal": {
                "AWS": "*"
            },
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::alpha-hello-bauplan"
        }
    ]
}

Then, prepare a Python virtual environment and install the dependencies in the file requirements.txt:

python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Once you have your environment set up, you can run the project by running the file orchestrator.py and passing your username and the name of the table as arguments in terminal.

python orchestrator.py <YOUR_USERNAME> taxi_jan_feb_2024

You should see the flow executing in your terminal one task at the time. If the flow runs to completion, it means that the import plan generated by bauplan had no schema conflicts. If you inspect your data catalog now, you will find your new table taxi_jan_feb_2024 in the main branch. The table can now be inspected and queried as usual.

bauplan branch checkout main
bauplan table get taxi_jan_feb_2024
bauplan query "SELECT * FROM taxi_jan_feb_2024 LIMIT 10"

To see the process fail, you can try to upload files that generate a conflict in the schema for the Iceberg table upload. To make this simple, we provide such data in the folder taxi-2023. Go in the file orchestrator.py and change the variable data_directory to taxi-2023 on the main method. In this way, the flow will upload the parquet files in the folder taxi-2023 which raises a conflict in the import plan generated before the upload. This will make the flow crash with an AssertionError.

if __name__ == '__main__':
    import sys
    import os
    # check if the correct number of arguments is passed
    if len(sys.argv) != 3:
        print("Missing args! Usage: python orchestrator.py <username> <table_name>")
        sys.exit(1)

    # assign variables from command-line arguments
    username = sys.argv[1]
    table_name = sys.argv[2]
    # this is the folder containing the data, modify as needed
    data_directory = 'taxi-2023'
    # this is the S3 folder where the data will be uploaded, modify as needed
    s3_directory = 'upload-test'
    # make sure the environment variables are set before running the flow
    setup_env()

    # call the function with the command-line arguments
    import_data_with_bauplan(
        user_name=username,
        table_name=table_name,
        data_directory=data_directory,
        bucket=os.getenv('S3_BUCKET_NAME'),
        s3_directory=s3_directory
    )

Summary

With this example we have demonstrated how:

  • to use bauplan to create and run an import flow that takes data in parquet format and import that in an Iceberg cloud catalog.

  • to leverage the branching capabilities of the system to create temporary import branches in the process to protect the main branch of the data lake from errors and faulty uploads.