Skip to main content

Estuary via EMR

Estuary Flow

Stream data from any Estuary source into Bauplan Iceberg tables using the Apache Iceberg materialization connector. Estuary Flow orchestrates Spark jobs on AWS EMR Serverless to merge updates into Bauplan-managed tables in near real-time, including full CDC (Change Data Capture) support for relational databases.

The example walkthrough below uses PostgreSQL via CDC as the source, but the Bauplan destination configuration applies to any Estuary capture connector.

When to use this integration

  • You want to stream operational data (e.g., from a relational database) into your Bauplan lakehouse with low latency.
  • You need CDC (inserts, updates, deletes) reflected in Iceberg tables on Bauplan.
  • You want to decouple your ingestion pipeline from your transformation logic in Bauplan.

Prerequisites

  • A Bauplan account with the OAuth client credentials feature enabled (see Step 1 below — currently in beta).
  • An AWS account where you can create EMR Serverless applications, IAM users and roles, and SSM parameters.
  • An Estuary Flow account.
  • A PostgreSQL instance (RDS or self-managed) with logical replication enabled.

Step 1. Create a Bauplan OAuth client

Estuary authenticates against the Bauplan Iceberg REST catalog using OAuth 2.0 client credentials. This feature is currently in beta: contact us to enable it for your account.

Once enabled, navigate to your Bauplan account settings and create a new OAuth application. Specify a name, description, and token lifetime. The system will generate a Client ID and a Client Secret: keep these; you will need them when configuring the Estuary destination.

The Bauplan Iceberg REST endpoint is:

https://api.use1.aprod.bauplanlabs.com/iceberg

As with other catalog integrations, you can target a specific branch by appending the branch path to the endpoint:

https://api.use1.aprod.bauplanlabs.com/iceberg/<branch_name>

By default, writes go to main.


Step 2. Prepare your PostgreSQL source

To enable CDC, Estuary requires a dedicated PostgreSQL user, a replication publication, and a watermarks table for backfill consistency. Run the following on your database:

-- Dedicated user
CREATE USER <estuary_user> WITH LOGIN PASSWORD '<your_password>';
GRANT rds_replication TO <estuary_user>; -- RDS only; omit for self-managed PG
GRANT CONNECT ON DATABASE <your_db> TO <estuary_user>;
GRANT USAGE ON SCHEMA public TO <estuary_user>;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO <estuary_user>;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO <estuary_user>;

-- Watermarks table (required for backfill)
CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO <estuary_user>;

-- Publication
CREATE PUBLICATION flow_publication;
ALTER PUBLICATION flow_publication SET (publish_via_partition_root = true);
ALTER PUBLICATION flow_publication ADD TABLE
public.flow_watermarks,
public.your_table_1,
public.your_table_2;
-- add additional tables here

If your database is in a private subnet (e.g., RDS in an isolated VPC), you will need to expose it via an SSH bastion host with a static public IP. Estuary provides a list of source IPs to allowlist. Refer to the Estuary PostgreSQL connector docs for the SSH tunnel configuration.


Step 3. Configure AWS for EMR Serverless

Estuary uses AWS EMR Serverless to run the Spark jobs that merge data into your Iceberg tables. You need four AWS resources.

3a. EMR Serverless Application

Create an EMR Serverless application with the Spark runtime, release version emr-7.12.0, architecture x86_64. Enable Automatically start application on job submission. Configure VPC, subnets, and security groups so that EMR jobs can reach the Bauplan catalog endpoint over the internet. Note the Application ID: you will need it in Step 5.

3b. Staging S3 bucket

Create (or designate) an S3 bucket in the same region as the EMR application. Estuary stages Parquet and CSV data files, as well as the Python scripts executed by Spark jobs, in this bucket before merging them into Iceberg tables. We recommend using a dedicated prefix within your Bauplan data lake bucket (e.g. estuary-staging/).

3c. IAM execution role

This role is assumed by EMR Serverless at job runtime, referenced below as <execution-role>.

Permissions policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3ListBucket",
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<your-bucket>"
},
{
"Sid": "S3ReadAll",
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<your-bucket>/*"
},
{
"Sid": "S3WriteIceberg",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<your-bucket>/iceberg/*"
},
{
"Sid": "S3WriteStaging",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<your-bucket>/estuary-staging/*"
},
{
"Sid": "SSMReadCatalogCredentials",
"Effect": "Allow",
"Action": [
"ssm:GetParameter",
"ssm:GetParameters"
],
"Resource": "arn:aws:ssm:<region>:<account-id>:parameter<ssm-prefix>*"
}
]
}

Trust policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "emr-serverless.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}

3d. IAM user

Estuary uses this user to submit jobs and manage SSM secrets, referenced below as <iam-user>.

Attach the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "EMRServerlessManageJobs",
"Effect": "Allow",
"Action": [
"emr-serverless:ListApplications",
"emr-serverless:StartJobRun",
"emr-serverless:GetJobRun",
"emr-serverless:ListJobRuns",
"emr-serverless:CancelJobRun",
"emr-serverless:GetApplication",
"emr-serverless:StartApplication"
],
"Resource": [
"arn:aws:emr-serverless:<region>:<account-id>:/applications/<emr-application-id>",
"arn:aws:emr-serverless:<region>:<account-id>:/applications/<emr-application-id>/jobruns/*"
]
},
{
"Sid": "IAMPassExecutionRole",
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::<account-id>:role/<execution-role>",
"Condition": {
"StringEquals": {
"iam:PassedToService": "emr-serverless.amazonaws.com"
}
}
},
{
"Sid": "S3ListBucket",
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<your-bucket>"
},
{
"Sid": "S3ReadWriteStaging",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<your-bucket>/estuary-staging/*"
},
{
"Sid": "S3ReadWriteIceberg",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<your-bucket>/iceberg/*"
},
{
"Sid": "SSMManageCatalogCredentials",
"Effect": "Allow",
"Action": [
"ssm:PutParameter",
"ssm:GetParameter",
"ssm:GetParameters",
"ssm:DeleteParameter"
],
"Resource": "arn:aws:ssm:<region>:<account-id>:parameter<ssm-prefix>*"
}
]
}

Generate an Access Key ID (<aws-access-key-id>) and Secret Access Key(<aws-secret-access-key>) for this user, you will need them in Step 5.


Step 4. Configure the Estuary capture (PostgreSQL source)

In the Estuary Flow dashboard, create a new PostgreSQL capture connector. Supply the database hostname, port, database name, user credentials, the publication name (flow_publication), and the replication slot name. If using an SSH tunnel, configure it under Advanced Options with the bastion host address, port, and private key.

After a successful connection test, Estuary will enumerate your tables. Select the collections you want to materialize.

Refer to the Estuary PostgreSQL capture docs for the complete setup.


Step 5. Configure the Estuary materialization (Bauplan / Iceberg destination)

In Estuary, create a new Apache Iceberg materialization and configure it as follows.

Endpoint (Bauplan catalog):

FieldValue
URLhttps://api.use1.aprod.bauplanlabs.com/iceberg — to target a specific branch: https://api.use1.aprod.bauplanlabs.com/iceberg/<branch_name>
Warehousedefault
NamespaceYour target namespace in Bauplan
Base Locations3://<your-bauplan-bucket>/iceberg
Catalog AuthenticationOAuth 2.0 Client Credentials
OAuth 2.0 Server URIapi/v1/oauth/tokens
Catalog Credential<client_id>:<client_secret> (from Step 1)
Scope(leave empty)

Compute (EMR Serverless):

FieldValue
RegionAWS region of your EMR application
Application ID<emr-application-id>
Execution Role ARNarn:aws:iam::<account-id>:role/<execution-role>
Bucket<your-bauplan-bucket>
EMR AuthenticationAWSAccessKey<aws-access-key-id> / <aws-secret-access-key>
Systems Manager Prefix<ssm-prefix> (must start and end with /, e.g. /estuary/)

Add your collections as bindings, mapping each Estuary collection to its target Iceberg table name. Set the namespace to match your target Bauplan namespace if you want to avoid the default. Save and publish the connector.

If you previously wrote data under a different namespace and need to move collections, you will need to delete the old tables, update the namespace in the binding configuration, and trigger a backfill for the affected collections.