A simple scheduler

In this example, we will see how to use our Python SDK to integrate with orchestrators and schedulers. Thanks to our Python SDK it is extremely easy to meta-program Bauplan using an orchestrator to call different APIs, such as run and query.

Set up

Since we are going to need a few extra libraries, create and activate a virtual environment with the necessary dependencies.

cd 05-schedule
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

The folder 05-schedule/pipeline contains the pipeline we are going to run on a schedule from Prefect. Make sure you have a bauplan_project.yml file with a unique project id, a project name and a default Python interpreter in that folder.

project:
    id: 48ae05e3-6011-4d8e-84ab-ba62dde61ee7
    name: orchestration

defaults:
    python_version: 3.11

The pipeline code is, a usual, in models.py. The code roughly reproduces the pipeline we used in A simple data app, taking data from the table taxhi_fhvhv, join it with the lookup table taxi_zones and a table a top_pickup_location_table.

flowchart LR id0[(taxi_fhvhv)]-->id2[models.normalized_taxi_trips] id2[models.normalized_taxi_trips] --> id4[models.top_pickup_locations] id3[(taxi_zones)] --> id4[models.top_pickup_locations]

Schedule

In this example, we are going to use Prefect an orchestration and observability framework for handling arbitrarily complex workflows. We chose Prefect as our orchestrator because it is rather simple to use, at least as far as its core functionalities are concerned.

However, nothing fundamental hinges on this choice. The material covered in this example, can easily be ported to other alternatives like Airflow. For more information on Prefect: https://docs.prefect.io/latest/.

Flows and tasks

The file scheduler.py contains the Prefect flow orchestrating different call to Bauplan runtime.

In this file you will find two functions decorated as @task and one function decorated as @flow. The flow maps out the graph of our orchestration process, providing a number of useful affordances like persistent states, recorded transitions between states, timeouts and retries. Tasks break down more complex flows into smaller functions. When functions run as tasks, they can be executed concurrently and their return values can be cached.

In our case, the main flow is composed by two tasks:

  • The first task is the function run_pipeline: this function takes as inputs the directory path where the pipeline is specified and a branch name. The function uses our Ptyhon SDK to run the pipeline as a Prefect task. By running this task, we run the entire pipeline contained in the subfolder of the project, materializing any artifacts we want in the branch that we passed as a parameter.

  • The second task is the function generate_report which takes a branch name as an input, and uses the SDK method query(...).to_pandas to run a query and retrieve the results as a Pandas DataFrame. The DataFrame is then saved to a local csv file that we can use as a report.

flowchart LR id6(scheduler.generate_report) ---> id7(report.csv) subgraph Data Catalog id0[(taxi_fhvhv)] id3[(taxi_zones)] id8[(top_pickup_location)] end subgraph Bauplan Run id0[(taxi_fhvhv)]-->id2[models.normalized_taxi_trips] id2[models.normalized_taxi_trips] --> id4[models.top_pickup_locations] id3[(taxi_zones)] --> id4[models.top_pickup_locations] ----> id8[(top_pickup_location)] end id7(report.csv) subgraph Prefect Flow id5(scheduler.run_pipeline) -----> id6(scheduler.generate_report) id5(scheduler.run_pipeline) --> id2[models.normalized_taxi_trips] id6(scheduler.generate_report) <--> id8[(top_pickup_location)] id6(scheduler.generate_report) end

Schedule

To put a flows on a schedule we have to turn it into a deployment, which exposes a remote API for interacting with it. In our code, this is how we create a deployment that schedules a run per minute on cron job.

run_pipeline_with_bauplan.serve(
        name="my-first-bauplan-deployment",
        cron="* * * * *",
        tags=["testing", "tutorial"],
        description="Run a pipeline with Bauplan.",
        version="tutorial/deployments",
        parameters={'bauplan_branch': bauplan_branch}
    )

To demonstrate the ease of integration with an orchestrator, you can schedule this pipeline by declaring a local Prefect deployment. We can do that by simply running the scheduler.py script passing our target branch as an argument.

python scheduler.py <YOUR_USERNAME>.<BRANCH_NAME>

👏👏Congratulations! You just put a Bauplan pipeline on a schedule

Note the combining scheduling with branching allows us to obtain a very powerful meta-language in which express the complexity of our workloads.

Visualize the deployment

To see the job running, open another terminal tab, activate your venv and run

prefect server start

Then click on the url displayed to open the Prefect dashboard. Once in the dashboard, go in Deployments and you will see your deployment my-first-bauplan-deployment, wait 60 seconds and your first scheduled run will start.

Check out the dashboard at http://127.0.0.1:4200

This will create a deployment of Prefect that will run the pipeline on a cron every minute when a Prefect server is running.

Summary

In this example We have shown how touse Bauplan to integrate with orchestrators and schedulers, using Prefect as an example. By leveraging Bauplan’s Python SDK, we demonstrated how to schedule and run a data pipeline.