Apache Pinot

Apache Pinot™ 0.11: Deduplication on Real-Time Tables

Mark Needham
Written by Mark NeedhamJanuary 09, 20237 minutes read

Last fall, the Apache Pinot community released version 0.11.0, which has lots of goodies for you to play with.

In this post, we’re going to learn about the deduplication for the real-time tables feature

Why do we need deduplication on real-time tables?

This feature was built to deal with duplicate data in the streaming platform. 

Users have previously used the upsert feature to de-duplicate data, but this has the following limitations:

  • It forces us to keep redundant records that we don’t want to keep, which increases overall storage costs.

  • We can’t yet use the StarTree index with upserts, so the speed benefits we get from using that indexing technique are lost.

How does dedup differ from upserts?

Both upserts and dedup keep track of multiple documents that have the same primary key. They differ as follows:

  • Upserts are used when we want to get the latest copy of a document for a given primary key. It’s likely that some or all of the other fields will be different. Pinot stores all documents it receives, but when querying it will only return the latest document for each primary key.

  • Dedup is used when we know that multiple documents with the same primary key are identical. Only the first event received for a given primary key is stored in Pinot—any future events with the same primary key are thrown away.

Let’s see how to use this functionality with help from a worked example.

Setting up Apache Kafka and Apache Pinot

We’re going to spin up Kafka and Pinot using the following Docker Compose config:

version: "3"
services:
  zookeeper:
    image: zookeeper:3.8.0
    hostname: zookeeper
    container_name: zookeeper-dedup-blog
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks: 
      - dedup_blog
  kafka:
    image: wurstmeister/kafka:latest
    restart: unless-stopped
    container_name: "kafka-dedup-blog"
    ports:
      - "9092:9092"
    expose:
      - "9093"
    depends_on:
     - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-dedup-blog:2181/kafka
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_HOST_NAME: kafka-dedup-blog
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-dedup-blog:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
    networks: 
      - dedup_blog
  pinot-controller:
    image: apachepinot/pinot:0.11.0-arm64
    command: "QuickStart -type EMPTY"
    container_name: "pinot-controller-dedup-blog"
    volumes:
      - ./config:/config
    restart: unless-stopped
    ports:
      - "9000:9000"
    networks: 
      - dedup_blog
networks:
  dedup_blog:
    name: dedup_blog
Copy

We can spin up our infrastructure using the following command:

docker-compose up

Data Generation

Let’s imagine that we want to ingest events generated by the following Python script:

import datetime
import uuid
import random
import json

while True:
    ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    id = str(uuid.uuid4())
    count = random.randint(0, 1000)
    print(
        json.dumps({"tsString": ts, "uuid": id[:3], "count": count})
    )
Copy

We can view the data generated by this script by pasting the above code into a file called datagen.py and then running the following command:

python datagen.py 2>/dev/null | head -n3 | jq

We’ll see the following output:

{
  "tsString": "2023-01-03T10:59:17.355081Z",
  "uuid": "f94",
  "count": 541
}
{
  "tsString": "2023-01-03T10:59:17.355125Z",
  "uuid": "057",
  "count": 96
}
{
  "tsString": "2023-01-03T10:59:17.355141Z",
  "uuid": "d7b",
  "count": 288
}
Copy

If we generate only 25,000 events, we’ll get some duplicates, which we can see by running the following command:

python datagen.py 2>/dev/null  | 
jq -r '.uuid' | head -n25000 | uniq -cd
Copy

The results of running that command are shown below:

2 3a2
2 a04
2 433
2 291
2 d73
Copy

We’re going to pipe this data into a Kafka stream called events, like this:

python datagen.py 2>/dev/null | 
jq -cr --arg sep 😊 '[.uuid, tostring] | join($sep)' | 
kcat -P -b localhost:9092 -t events -K😊
Copy

The construction of the key/value structure comes from Robin Moffatt’s excellent blog post. Since that blog post was written, kcat has started supporting multi byte separators, which is why we can use a smiley face to separate our key and value.

Pinot Schema/Table Config

Next, we’re going to create a Pinot table and schema with the same name. Let’s first define a schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
Copy

Note that the timestamp field is called ts and not tsString, as it is in the Kafka stream. We’re going to transform the DateTime string value held in that field into a proper timestamp using a transformation function. 

Our table config is described below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-dedup-blog:9093",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "ts",
        "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
      }
    ]
  },
  "tenants": {},
  "metadata": {}
}
Copy

Let’s create the table using the following command:

docker run \
   --network dedup_blog \
   -v $PWD/pinot/config:/config \
   apachepinot/pinot:0.11.0-arm64 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-dedup-blog" \
    -exec 
Copy

Now we can navigate to http://localhost:9000 and run a query that will return a count of the number of each uuid:

select uuid, count(*)
from events 
group by uuid
order by count(*)
limit 10
Copy

The results of this query are shown below:

We can see loads of duplicates! 

Now let’s add a table and schema that uses the de-duplicate feature, starting with the schema:

{
  "schemaName": "events_dedup",
  "primaryKeyColumns": ["uuid"],
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
Copy

The main difference between this schema and the events schema is that we need to specify a primary key. This key can be any number of fields, but in this case, we’re only using the uuid field.

Next, the table config:

{
  "tableName": "events_dedup",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events_dedup",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-dedup-blog:9093",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "routing": {"instanceSelectorType": "strictReplicaGroup"},
  "dedupConfig": {"dedupEnabled": true, "hashFunction": "NONE"},
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "ts",
        "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
      }
    ]
  },
  "tenants": {},
  "metadata": {}
}
Copy

The changes to notice here are:

  • "dedupConfig": {"dedupEnabled": true, "hashFunction": "NONE"} - This enables the feature and indicates that we won’t use a hash function on our primary key.

  • "routing": {"instanceSelectorType": "strictReplicaGroup"} - This makes sure that all segments of the same partition are served from the same server to ensure data consistency across the segments. 

docker run \
   --network dedup_blog \
   -v $PWD/pinot/config:/config \
   apachepinot/pinot:0.11.0-arm64 AddTable \
     -schemaFile /config/schema-dedup.json \
     -tableConfigFile /config/table-dedup.json \
     -controllerHost "pinot-controller-dedup-blog" \
    -exec

select uuid, count(*)
from events_dedup
group by uuid
order by count(*)
limit 10
Copy

We have every combination of hex values (16^3=4096) and no duplicates! Pinot’s de-duplication feature has done its job.

How does it work? 

When we’re not using the deduplication feature, events are ingested from our streaming platform into Pinot, as shown in the diagram below:

When de-dup is enabled, we have to check whether records can be ingested, as shown in the diagram below:

De-dup works out whether a primary key has already been ingested by using an in memory map of (primary key -> corresponding segment reference).

We need to take that into account when using this feature, otherwise, we’ll end up using all the available memory on the Pinot Server. Below are some tips for using this feature:

  • Try to use a simple primary key type and avoid composite keys. If you don’t have a simple primary key, consider using one of the available hash functions to reduce the space taken up.

  • Create more partitions in the streaming platform than you might otherwise create. The number of partitions determines the partition numbers of the Pinot table. The more partitions you have in the streaming platform, the more Pinot servers you can distribute the Pinot table to, and the more horizontally scalable the table will be.

Summary

This feature makes it easier to ensure that we don’t end up with duplicate data in our Apache Pinot estate. 

So give it a try and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

And if you’re interested in how this feature was implemented, you can look at the pull request on GitHub.

Apache Pinot