Replicating DynamoDB to Apache Pinot
Hey there, data enthusiast! š Ready to embark on an exciting journey of real-time data replication? Buckle up, because we’re about to turn your DynamoDB data into a high-speed, analytics-ready powerhouse with Apache Pinot!
š Introduction
Imagine having the robust storage capabilities of DynamoDB combined with the lightning-fast analytics of Apache Pinot. Sounds like a data dream team, right? Well, you’re about to make that dream a reality!
š§° What You’ll Need
- AWS account (with DynamoDB and Kinesis access)
- Apache Pinot cluster
- Your favorite code editor
- A cup of coffee (or tea, we don’t judge! ā)
šļø Setting Up the Replication Pipeline
Step 1: Create a DynamoDB Table
Let’s start by creating our source of truth – a DynamoDB table.

Step 2: Create a Kinesis Data Stream
Time to create a highway for our data – Kinesis stream where dynamo will push its CDC.

Step 3: Enable DynamoDB-Kinesis stream
Now, let’s turn on the data faucet by connecting dynamodb to kinesis

{
"schemaName": "pinot_dynamo_ingestion",
"dimensionFieldSpecs": [
{
"name": "eventName",
"dataType": "STRING",
"notNull": false
},
{
"name": "venue_name",
"dataType": "STRING",
"notNull": false
},
{
"name": "meetup_name",
"dataType": "STRING",
"notNull": false
},
{
"name": "meetup_id",
"dataType": "STRING",
"notNull": false
},
{
"name": "group_city",
"dataType": "STRING",
"notNull": false
},
{
"name": "group_country",
"dataType": "STRING",
"notNull": false
},
{
"name": "group_id",
"dataType": "LONG",
"notNull": false
},
{
"name": "group_name",
"dataType": "STRING",
"notNull": false
},
{
"name": "group_lat",
"dataType": "DOUBLE",
"notNull": false
},
{
"name": "group_lon",
"dataType": "DOUBLE",
"notNull": false
},
{
"name": "location",
"dataType": "BYTES",
"transformFunction": "toSphericalGeography(stPoint(group_lon,group_lat))",
"notNull": false
},
{
"name": "is_delete",
"dataType": "BOOLEAN",
"transformFunction": "strcmp(eventName, 'REMOVE') = 0",
"notNull": false
}
],
"metricFieldSpecs": [
{
"name": "rsvp_count",
"dataType": "INT",
"notNull": false
}
],
"dateTimeFieldSpecs": [
{
"name": "mtime",
"dataType": "TIMESTAMP",
"notNull": false,
"format": "TIMESTAMP",
"granularity": "1:MILLISECONDS"
},
{
"name": "ApproximateCreationDateTime",
"dataType": "TIMESTAMP",
"notNull": false,
"format": "TIMESTAMP",
"granularity": "1:MILLISECONDS"
}
],
"primaryKeyColumns": ["meetup_id"]
}
{
"tableName": "pinot_dynamo_ingestion",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7",
"schemaName": "pinot_dynamo_ingestion",
"replication": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kinesis",
"stream.kinesis.topic.name": "pinot-ingestion-stream",
"region": "us-east-1",
"accessKey": "XXXX",
"secretKey": "XXXX",
"shardIteratorType": "AFTER_SEQUENCE_NUMBER",
"stream.kinesis.consumer.type": "lowlevel",
"stream.kinesis.fetch.timeout.millis": "120000",
"stream.kinesis.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
"stream.kinesis.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
"stream.kinesis.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
"stream.kinesis.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
"realtime.segment.flush.threshold.rows": "1000000",
"realtime.segment.flush.threshold.time": "6h"
},
"nullHandlingEnabled": true
},
"routing": {
"segmentPrunerTypes": ["time"],
"instanceSelectorType": "strictReplicaGroup"
},
"ingestionConfig": {
"transformConfigs": [
]
},
"upsertConfig": {
"mode": "PARTIAL",
"deleteRecordColumn": "is_delete",
"partialUpsertStrategies":{},
"comparisonColumns": ["ApproximateCreationDateTime"]
},
"metadata": {
"customConfigs": {}
}
}
Why do we have so many configurations?
Let’s try to understand which of these configs are necessary. When you enable CDC on dynamoDB table, it starts sending the data in the following format
{
"awsRegion": "us-east-2",
"eventID": "e5c2d473-43e0-4dc3-bf28-cdcb575a83aa",
"eventName": "INSERT",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "pinot-ingestion-demo",
"dynamodb": {
"ApproximateCreationDateTime": 1719039879869591,
"Keys": {
"event_id": {
"S": "badbe259-82d3-4869-b197-e935b0e942f2"
},
"mtime": {
"N": "1719039879651"
}
},
"NewImage": {
"group_city": {
"S": "New York"
},
"location": {
"B": "Y0d4aFkyVm9iMnhrWlhKZllubDBaWE09"
},
"group_lon": {
"N": "-97.169068"
},
"event_time": {
"S": "2024-07-10T12:34:39.651036"
},
"group_id": {
"N": "8032"
},
"mtime": {
"N": "1719039879651"
},
"rsvp_count": {
"N": "45"
},
"group_country": {
"S": "UK"
},
"group_lat": {
"N": "-41.13523"
},
"event_id": {
"S": "badbe259-82d3-4869-b197-e935b0e942f2"
},
"venue_name": {
"S": "Venue 1"
},
"group_name": {
"S": "Group 10"
},
"event_name": {
"S": "Event 3"
}
},
"SizeBytes": 313,
"ApproximateCreationDateTimePrecision": "MICROSECOND"
},
"eventSource": "aws:dynamodb"
}
{
"awsRegion": "us-east-1",
"eventID": "1020b6ed-fc46-4767-8819-55c440041264",
"eventName": "MODIFY",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "pinot-ingestion-demo",
"dynamodb": {
"ApproximateCreationDateTime": 1719082124026,
"Keys": {
"mtime": {
"N": "1719080907442"
},
"meetup_id": {
"S": "4a60c849-d475-4ad5-adbc-776cbf7ca5df"
}
},
"NewImage": {
"group_city": {
"S": "New York"
},
"mtime": {
"N": "1719080907442"
},
"group_lon": {
"N": "74.891677"
},
"rsvp_count": {
"N": "23"
},
"group_lat": {
"N": "-59.493753"
},
"group_country": {
"S": "UK"
},
"meetup_id": {
"S": "4a60c849-d475-4ad5-adbc-776cbf7ca5df"
},
"venue_name": {
"S": "Updated Venue 3"
},
"group_id": {
"N": "4673"
},
"meetup_name": {
"S": "Meetup 6"
},
"group_name": {
"S": "Group 8"
}
},
"OldImage": {
"group_city": {
"S": "New York"
},
"mtime": {
"N": "1719080907442"
},
"group_lon": {
"N": "-130.035739"
},
"rsvp_count": {
"N": "22"
},
"group_lat": {
"N": "66.888498"
},
"group_country": {
"S": "UK"
},
"meetup_id": {
"S": "4a60c849-d475-4ad5-adbc-776cbf7ca5df"
},
"venue_name": {
"S": "Updated Venue 2"
},
"group_id": {
"N": "4673"
},
"meetup_name": {
"S": "Meetup 6"
},
"group_name": {
"S": "Group 8"
}
},
"SizeBytes": 467
},
"eventSource": "aws:dynamodb"
}
{
"awsRegion": "us-east-1",
"eventID": "78b55ff0-82ab-4fec-9f03-6312c5d2f6eb",
"eventName": "REMOVE",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "pinot-ingestion-demo",
"dynamodb": {
"ApproximateCreationDateTime": 1719081613486,
"Keys": {
"mtime": {
"N": "1719080908912"
},
"meetup_id": {
"S": "735fd994-2071-4145-8f35-7e69e537aaba"
}
},
"OldImage": {
"group_city": {
"S": "London"
},
"mtime": {
"N": "1719080908912"
},
"rsvp_count": {
"N": "26"
},
"group_lon": {
"N": "-19.773637"
},
"group_country": {
"S": "Germany"
},
"group_lat": {
"N": "23.60754"
},
"meetup_id": {
"S": "735fd994-2071-4145-8f35-7e69e537aaba"
},
"venue_name": {
"S": "Venue 2"
},
"group_id": {
"N": "8524"
},
"meetup_name": {
"S": "Meetup 2"
},
"group_name": {
"S": "Group 6"
}
},
"SizeBytes": 257
},
"eventSource": "aws:dynamodb"
}
Decoder Configuration
To help pinot understand the dynamodb data format, we need to add decoder configs to our table
"stream.kinesis.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
"stream.kinesis.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
"stream.kinesis.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
"stream.kinesis.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
TheĀ decoder.class.name
Ā specifies our primary decoder. TheĀ timeColumnName
Ā specifies the column that should be filled with theĀ ApproximateCreationDateTime
Ā from dynamodb json record. theĀ deleteColumnName
Ā specifies the column that should be set toĀ true
Ā in case we receive aĀ REMOVE
Ā record from dynamodb Finally, theĀ envelope.decoder.class.name
Ā simply specifies the vanilla decoder that should be used to parse the message. Since them dynamodb messages come in json format, we specify theĀ JSONMessageDecoder
Ā here
Upserts Configuration
To handle updates properly, you need to enable upserts in Pinot. This is done in theĀ upsertConfig
Ā section of the table configuration:
"upsertConfig": {
"mode": "PARTIAL",
"deleteRecordColumn": "is_delete",
"partialUpsertStrategies": {},
"comparisonColumns": ["ApproximateCreationDateTime"]
}
Key points:
mode
: Set to “PARTIAL” for partial updates.deleteRecordColumn
: Specifies the column that indicates if a record should be deleted.comparisonColumns
: UsesĀApproximateCreationDateTime
Ā to determine the order of changes.
Derived Column for Deletions
A new derived columnĀ is_delete
is created in the schema to signify whether a key needs to be removed from the upsert metadata:
{
"name": "is_delete",
"dataType": "BOOLEAN",
"transformFunction": "strcmp(eventName, 'REMOVE') = 0",
"notNull": false
}
This column is set to true when theĀ eventName
Ā in the DynamoDB stream event is “REMOVE”.
Handling Different Event Types
The configuration handles different event types as follows:
- INSERT: New records are added to Pinot.
- MODIFY: Existing records are updated using the upsert configuration.
- REMOVE: Records are marked for deletion using theĀ
is_delete
Ā column.
ApproximateCreationDateTime Usage
TheĀ ApproximateCreationDateTime
Ā from the DynamoDB payload is used in theĀ comparisonColumns
of the upsert configuration. This ensures that changes are applied in the correct order, as it represents the sequence of events in DynamoDB.
"comparisonColumns": ["ApproximateCreationDateTime"]
A corresponding column is added to the schema:
{
"name": "ApproximateCreationDateTime",
"dataType": "TIMESTAMP",
"notNull": false,
"format": "TIMESTAMP",
"granularity": "1:MILLISECONDS"
}
./pinot-admin.sh AddTable \
-schemaFile schema.json \
-tableConfigFile table_config.json \
-exec
Insert, Update, Delete!
Now that we’ve set the stage, let’s watch our data perform!
Insert
Let’s add some data to our DynamoDB table:
python publish_data.py
Publish item with meetup_id: cc3fbaca-445c-4ebb-8f60-7b2a5eab3530 and mtime = 1719124016204
Publish item with meetup_id: c1731e4b-6726-40f5-8c8f-b34bc34a0744 and mtime = 1719124017046
Publish item with meetup_id: 9c4b139b-26aa-4d45-8efe-5d1ad2d4e1e3 and mtime = 1719124017265
Publish item with meetup_id: cfcb025b-7547-4002-9b52-95ad0b0a4ac8 and mtime = 1719124017488
Publish item with meetup_id: be08dc25-42a4-4fd3-8b6f-0bdaaeb4caf7 and mtime = 1719124017787
Publish item with meetup_id: bdc86183-a63a-4bc2-9901-22be89eb9774 and mtime = 1719124018010
Publish item with meetup_id: 0b27b577-dce5-43d1-b23f-8372b1882571 and mtime = 1719124018232
Publish item with meetup_id: 4d959048-796a-464f-ab6c-5afe174ec015 and mtime = 1719124018506
Publish item with meetup_id: 8a2e679c-3044-4526-bd9d-a2de908dbc62 and mtime = 1719124018723
Publish item with meetup_id: 98b3c314-75c2-420b-a154-d891a43e6efa and mtime = 1719124018945
DynamoDB insertion completed.
Voila! Check your Pinot table, and you’ll see these rows magically appear!

python update_data.py --meetup_id cc3fbaca-445c-4ebb-8f60-7b2a5eab3530 --mtime 1719124016204
Updated item in DynamoDB with meetup_id: cc3fbaca-445c-4ebb-8f60-7b2a5eab3530 and mtime: 1719124016204
UpdatedAttributes: {'group_lon': Decimal('28.454082'), 'rsvp_count': Decimal('84'), 'venue_name': 'Updated Venue 1', 'group_lat': Decimal('11.194998')}


python delete_data.py --meetup_id cc3fbaca-445c-4ebb-8f60-7b2a5eab3530 --mtime 1719124016204
Deleted item with meetup_id: cc3fbaca-445c-4ebb-8f60-7b2a5eab3530 and mtime: 1719124016204
Check Pinot – the row has vanished into thin air!

Behind the Scenes: Viewing Operation Order
Want to see how the magic happens? Use this special incantation in your Pinot queries:
SELECT * FROM pinot_dynamo_ingestion
WHERE meetup_id = 'cc3fbaca-445c-4ebb-8f60-7b2a5eab3530'
ORDER BY ApproximateCreationDateTime DESC
OPTION(skipUpsert=True)
This will reveal the entire history of your data’s journey!
