Resources
Blog

Apache Pinot 101: Lesson 2 - Streaming Ingest from Kafka


Fasih
Fasih Khatib
Developer Advocate at StarTree
released on
April 22, 2025
READ TIME
10 mins

This is the second lesson in the Apache Pinot 101 Tutorial Series:

Real-time data ingestion is the process of continuously capturing and processing data as it’s generated, enabling immediate analysis and insights. Apache Pinot supports reading from streaming sources like Kinesis, Pulsar, and Kafka. The ability to read from streaming sources enables Pinot to construct tables that reflect the data as it is right now.

The lesson has two parts. First, you’ll write a script to send a stream of data to Kafka. Second, you’ll load this data into Pinot. You should have already completed lesson 1 and cloned the GitHub repository for the pinot-btc directory.

Without further ado, let’s get started:

The data that will be streamed into Kafka is a live ticker feed of Bitcoin. This allows viewing the changes in price of the coin as it gets traded along with some more information. It is available for free through Coinbase via a websocket. Let’s write a python script to consume this information. This can be found as ticker.py in the repo.

def consume():
   producer = Producer({"bootstrap.servers": "kafka:9092"})
   product_ids = ["BTC-USD"]

   with connect("wss://ws-feed.exchange.coinbase.com") as websocket:
       subscription_message = {
           "type": "subscribe",
           "channels": [
               {
                   "name": "ticker",
                   "product_ids": product_ids,
               }
           ],
       }
       websocket.send(json.dumps(subscription_message))

       while True:
           message = json.loads(websocket.recv())

           if message["type"] == "ticker":
               _format_date(message)
               producer.produce(
                   "ticker",
                   json.dumps(message).encode("utf-8"),
               )

The consume function establishes a websocket connection to Coinbase by sending a subscription message. This message specifies that we want to receive ticker information for Bitcoin, in USD, as it is traded. Once the connection is established, a loop retrieves messages from the websocket. As each message is retrieved, a simple transformation adds a new field that converts the time received in the message from timestamp to milliseconds. Finally, this message is written to Kafka.

Here is what a message in Kafka looks like:

{
    "type": "ticker",
    "sequence": 101211706667,
    "product_id": "BTC-USD",
    "price": "80190.12",
    "open_24h": "82302.71",
    "volume_24h": "22769.67727029",
    "low_24h": "76555",
    "high_24h": "84033.66",
    "volume_30d": "376526.50113208",
    "best_bid": "80190.11",
    "best_bid_size": "0.00100000",
    "best_ask": "80190.12",
    "best_ask_size": "0.00003130",
    "side": "buy",
    "time": "2025-03-11T05:47:22.078239Z",
    "trade_id": 795931922,
    "last_size": "0.00001247",
    "time_ms": 1741672042078.239
}

Let’s write the schema for the real-time table which will store these messages. A schema defines the structure of the table. It contains information about the various columns, their data types, and other information such as whether the columns can contain null values.

While manually crafting the schema is feasible, Pinot offers the capability to infer the schema automatically. By supplying a sample message, Pinot can analyze its structure and content to deduce the appropriate schema. This automated inference can save time and effort, especially when dealing with complex or larger payloads. More details about Infering Schema.

The schema example for this lesson is available in the repo at tables/001-ticker/ticker_schema.json

{
 "schemaName": "ticker",
 "enableColumnBasedNullHandling": true,
 "dimensionFieldSpecs": [
   {
     "name": "type",
     "dataType": "STRING"
   },
   {
     "name": "product_id",
     "dataType": "STRING"
   },
   {
     "name": "sequence",
     "dataType": "LONG"
   }
 ],
 "dateTimeFieldSpecs": [
   {
     "name": "time_ms",
     "dataType": "LONG",
     "format": "1:MILLISECONDS:EPOCH",
     "granularity": "1:MILLISECONDS",
     "notNull": true
   }
 ],
 "primaryKeyColumns": [
   "sequence"
 ],
 "metricFieldSpecs": [
   {
     "name": "price",
     "dataType": "FLOAT"
   },
   {
     "name": "open_24h",
     "dataType": "FLOAT"
   },
   {
     "name": "volume_24h",
     "dataType": "FLOAT"
   },
   {
     "name": "low_24h",
     "dataType": "FLOAT"
   },
   {
     "name": "high_24h",
     "dataType": "FLOAT"
   },
   {
     "name": "volume_30d",
     "dataType": "FLOAT"
   },
   {
     "name": "best_bid",
     "dataType": "FLOAT"
   },
   {
     "name": "best_bid_size",
     "dataType": "FLOAT"
   },
   {
     "name": "best_ask",
     "dataType": "FLOAT"
   },
   {
     "name": "best_ask_size",
     "dataType": "FLOAT"
   },
   {
     "name": "side",
     "dataType": "STRING"
   },
   {
     "name": "trade_id",
     "dataType": "LONG"
   },
   {
     "name": "last_size",
     "dataType": "FLOAT"
   }
 ]
}

The breakdown of the schema is as follows:

  • This creates a table named ticker.
  • The product_id, type, and sequence fields are dimensions. These are the ones we can slice and dice by.
  • The time_ms field is a datetime representing the time columns in the table.
  • The rest of the fields are metrics. These are the quantitative information.

Next, let’s create the table. A table is conceptually similar to a relational table with rows and columns. The table configuration contains information such as which indexes apply to which columns and which Kafka topic to stream data from.

The table config is another JSON object viewable at tables/001-ticker/ticker_table.json in the repo. Let’s look at it piece by piece. You’re encouraged to open the ticker_table.json file in the repository in your favorite editor and follow along.

Let’s start with the segmentConfig object.

{
 "timeColumnName": "time_ms",
 "timeType": "MILLISECONDS",
 "schemaName": "ticker",
 "replicasPerPartition": "1",
 "retentionTimeValue": "365",
 "retentionTimeUnit": "DAYS"
}

The segmentsConfig object contains information about how the segments, which are shards, for this table will be created and replicated across the cluster. The schemaName defines the schema associated with the segments of this table. The combination of retentionTimeValue and retentionTimeUnit define how long we’d like to keep the segment. Beyond this value, the segment is purged from the cluster. The combination of timeColumnName and timeType is used to define whether to retain the segment or not.

Next, let’s look at the ingestionConfig.

{
 "streamIngestionConfig": {
   "streamConfigMaps": [
     {
       "realtime.segment.flush.threshold.rows": "0",
       "stream.kafka.decoder.prop.format": "JSON",
       "key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
       "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
       "streamType": "kafka",
       "value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
       "stream.kafka.consumer.type": "LOWLEVEL",
       "realtime.segment.flush.threshold.segment.rows": "50000",
       "stream.kafka.broker.list": "kafka:9092",
       "realtime.segment.flush.threshold.time": "3600000",
       "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
       "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
       "stream.kafka.topic.name": "ticker"
     }
   ]
 },
 "transformConfigs": []
}

The ingestionConfig contains sections for how to ingest data and apply any ingestion-time transformations to the messages. streamConfigMaps defines the Kafka topic we’d like to read from, and the key and value serializers. It is here we specify that we’d like to read from the ticker topic.

To learn more about each of these sections, head over to the ingestion configuration reference.

You can now create the table and the schema using the Pinot controller’s REST API.

curl -F schemaName=@tables/001-ticker/ticker_schema.json localhost:9000/schemas

and

curl -XPOST -H 'Content-Type: application/json' -d @tables/001-ticker/ticker_table.json localhost:9000/tables

Wait for a few seconds for the events to get ingested into Kafka. After that, head over to the query console on localhost:9000. You should now see the ticker table and be able to view the events.

That’s it. You’ve learned how to ingest data from Kafka into Pinot. In the next post you’ll discover how to ingest data from a CSV file into Pinot. You’ll ingest a file that contains information about Bitcoin that we’ve bought and sold.

Coming Wednesday: 3. Ingesting Batch Data →


Ready to deploy real-time analytics?

We’re here to help!