How to query a table by segment

In this recipe we’ll learn how to query a table to find out which records are in a particular segment.

Prerequisites

To follow the code examples in this guide, you must install Docker(opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven’t already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/query-by-segment

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml(opens in a new tab) file on GitHub.

Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID. You can generate data by running the following command:

python datagen.py 2>/dev/null | head -n1 | jq

Output is shown below:

{
  "ts": 1680171022742,
  "uuid": "5328f9b8-83ff-4e4c-8ea1-d09524866841",
  "count": 260
}

Kafka ingestion

We’re going to ingest this data into an Apache Kafka topic using the kcat(opens in a new tab) command line tool. We’ll also use jq to structure the data in the key:payload structure that Kafka expects:

python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We can check that Kafka has some data by running the following command:

docker exec -it kafka-querysegment kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events

We’ll see something like the following:

events:0:19960902

Pinot Schema and Table

Now let’s create a Pinot Schema and Table.

First, the schema:

{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Now for the table config:

{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "realtime.segment.flush.threshold.rows":"100000",
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-querysegment:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.time":"1h"
      }
    },
    "ingestionConfig": {
       
      },
    "tenants": {},
    "metadata": {}
}

This highlighted section indicates that we’re going to create new segments after every 100,000 rows.

We’ll create the table by running the following:

docker run \
   --network querysegment \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-querysegment" \
    -exec

Querying by segment

Once that’s been created, we can head over to the Pinot UI(opens in a new tab) and run some queries.

Pinot has several built-in virtual columns inside every schema that can be used for debugging purposes:

Column NameColumn Type Data Type Description
$hostName
Dimension

STRING

Name of the server hosting the data
$segmentName
Dimension

STRING

Name of the segment containing the record

$docId

Dimension

INT

Document id of the record within the segment

The one that’s useful for us is $segmentName, which we can use like this to count the number of records in each segment:

select $segmentName, count(*)
from events
group by $segmentName
limit 10
$segmentName count(*)
events__0__142__20230330T1004Z

100000

events__0__27__20230330T1003Z

100000

events__0__123__20230330T1004Z

100000

events__0__15__20230330T1003Z

100000

events__0__185__20230330T1004Z

100000

events__0__96__20230330T1003Z

100000

events__0__169__20230330T1004Z

100000

events__0__160__20230330T1004Z

100000

events__0__77__20230330T1003Z

100000

events__0__71__20230330T1003Z

100000

Query Results

We can then pick one of those segments and see what records are stored in that segment:

select *
from events
-- Change the segment name to match one that's returned by the previous query
WHERE $segmentName = 'events__0__142__20230330T1004Z'
limit 10
counttsuuid

666

 

2023-03-30 10:03:04.498
8e4f5bb3-ad5c-45ec-89cb-946acee52625
298
2023-03-30 10:03:04.498
f439c3ad-86a1-4043-895e-681d5497cda0

128

2023-03-30 10:03:04.498
9e29317f-c8fd-44d8-8313-aa28a80f1e5e
40

 

2023-03-30 10:03:04.498
697e2dbc-aee5-466e-b24e-763b18aff003

659

 

2023-03-30 10:03:04.498
8d981789-7e61-4a25-81d1-b62616e877e8

436

2023-03-30 10:03:04.498
f4a883f3-3e6a-4169-a7ca-2469a5100ef2

408

2023-03-30 10:03:04.498
dc21499e-cdb3-4fae-93c1-0b9bd0233ae0

457

2023-03-30 10:03:04.498
0de9a860-5af3-4155-93c9-d396df3cd0b3

197

 

2023-03-30 10:03:04.498
c4ddd15a-814f-4036-9eee-3121f59783f8

362

 

2023-03-30 10:03:04.498
11b00463-2587-4838-9f05-089c05ad9fea

Query Results