Apache Pinot™ 0.12: Consumer Record Lag

Mark Needham
ByWritten byMark Needham
March 30, 20234 minutes read

The Apache Pinot community recently released version 0.12.0, which has lots of goodies for you to play with. I’ve been exploring and writing about those features in a series of blog posts.

This post will explore a new API endpoint that lets you check how much Pinot is lagging when ingesting from Apache Kafka.

Why do we need this?

A common question in the Pinot community is how to work out the consumption status of real-time tables. 

This was a tricky one to answer, but Pinot 0.12 sees the addition of a new API that lets us see exactly what’s going on.

Worked Example

Let’s have a look at how it works with help from a worked example. 

First, we’re going to create a Kafka topic with 5 partitions:

docker exec -it kafka-lag-blog kafka-topics.sh \
--bootstrap-server localhost:9092 \
--partitions 5 \
--topic events \
--create 
Copy

We’re going to populate this topic with data from a data generator, which is shown below:

import datetime, uuid, random, json, click, time

@click.command()
@click.option('--sleep', default=0.0, help='Sleep between each message')
def generate(sleep):
    while True:
        ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        id = str(uuid.uuid4())
        count = random.randint(0, 1000)
        print(json.dumps({"tsString": ts, "uuid": id, "count": count}))
        time.sleep(sleep)

if __name__ == '__main__':
    generate()
Copy

We can see an example of the messages generated by this script by running the following:

python datagen.py --sleep 0.01 2>/dev/null | head -n3 | jq -c

You should see something like this:

{"tsString":"2023-03-17T12:10:03.854680Z","uuid":"f3b7b5d3-b352-4cfb-a5e3-527f2c663143","count":690}
{"tsString":"2023-03-17T12:10:03.864815Z","uuid":"eac57622-4b58-4456-bb38-96d1ef5a1ed5","count":522}
{"tsString":"2023-03-17T12:10:03.875723Z","uuid":"65926a80-208a-408b-90d0-36cf74c8923a","count":154}
Copy

So far, so good. Let’s now ingest this data into Kafka:

python datagen.py --sleep 0.01 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
Copy

Next we’re going to create a Pinot schema and table. First, the schema config:

{
    "schemaName": "events",
    "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
    "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
    "dateTimeFieldSpecs": [
      {
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
  }
Copy

And now, the table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-lag-blog:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.rows": "10000000"
      }
    },
    "ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "ts",
          "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
        }
      ]
    },
    "tenants": {},
    "metadata": {}
  }
Copy

We can create both the table and schema using the AddTable command:

docker run \
  --network lag_blog \
  -v $PWD/config:/config \
  apachepinot/pinot:0.12.0-arm64 AddTable \
  -schemaFile /config/schema.json \
  -tableConfigFile /config/table.json \
  -controllerHost "pinot-controller-lag-blog" \
  -exec
Copy

Now let’s call the /consumingSegmentsInfo endpoint to see what’s going on:

curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null | jq

The output of calling this end point is shown below:

{
  "_segmentToConsumingInfoMap": {
    "events__0__0__20230317T1133Z": [
      {
        "serverName": "Server_172.29.0.4_8098",
        "consumerState": "CONSUMING",
        "lastConsumedTimestamp": 1679052823350,
        "partitionToOffsetMap": {
          "0": "969"
        },
        "partitionOffsetInfo": {
          "currentOffsetsMap": {
            "0": "969"
          },
          "latestUpstreamOffsetMap": {
            "0": "969"
          },
          "recordsLagMap": {
            "0": "0"
          },
          "availabilityLagMsMap": {
            "0": "26"
          }
        }
      }
    ],
}
Copy

If we look under partitionOffsetInfo, we can see what’s going on:

  • currentOffsetsMap is Pinot’s current offset

  • latestUpstreamOffsetMap is Kafka’s offset

  • recordsLagMap is the record lag

  • availabilityLagMsMap is the time lag

This output is a bit unwieldy, so let’s create a bash function to tidy up the output into something that’s easier to consume:

function consuming_info() {
  curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null | 
  jq -rc '[._segmentToConsumingInfoMap | keys[] as $k | (.[$k] | .[] | {
    segment: $k,
    kafka: (.partitionOffsetInfo.currentOffsetsMap | keys[] as $k | (.[$k])),
    pinot: (.partitionOffsetInfo.latestUpstreamOffsetMap | keys[] as $k | (.[$k])),
    recordLag: (.partitionOffsetInfo.recordsLagMap | keys[] as $k | (.[$k])),
    timeLagMs: (.partitionOffsetInfo.availabilityLagMsMap | keys[] as $k | (.[$k]))
})] | (.[0] |keys_unsorted | @tsv), (.[]  |map(.) |@tsv)'  | column -t
  printf "\n"

}
Copy

Let’s call the function:

consuming_info

We’ll see the following output:

Consumer record lag output

Now let’s put it in a script and call the watch command so that it will be refreshed every couple of seconds:

!#/bin/bash

function consuming_info() {
  curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null |
  jq -rc '[._segmentToConsumingInfoMap | keys[] as $k | (.[$k] | .[] | {
    segment: $k,
    kafka: (.partitionOffsetInfo.currentOffsetsMap | keys[] as $k | (.[$k])),
    pinot: (.partitionOffsetInfo.latestUpstreamOffsetMap | keys[] as $k | (.[$k])),
    recordLag: (.partitionOffsetInfo.recordsLagMap | keys[] as $k | (.[$k])),
    timeLagMs: (.partitionOffsetInfo.availabilityLagMsMap | keys[] as $k | (.[$k]))
})] | (.[0] |keys_unsorted | @tsv), (.[]  |map(.) |@tsv)'  | column -t
  printf "\n"
}

export -f consuming_info
watch bash -c consuming_info
Copy

Give permissions to run it as a script:

chmod u+x watch_consuming_info.sh

And finally, run it:

./watch_consuming_info.sh

This will print out a new table every two seconds. Let’s now make things more interesting by removing the sleep from our ingestion command:

python datagen.py  2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
Copy

And now if we look at the watch output:

Apache Pinot Consumer Record Lag

We get some transitory lag, but it generally goes away by the next time the command is run. 

Summary

I love this feature, and it solves a problem I’ve struggled with when using my datasets. I hope you’ll find it just as useful.

Give it a try, and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.