Apache Pinot

Best Practices for Designing Tables in Apache Pinot™

Kulbir Nijjer
Sandeep Dabade
Written by Sandeep Dabade,Kulbir NijjerOctober 07, 202210 minutes read

Designing a table and schema is one of the critical activities that has a direct impact on query performance.

In this blog, we will define a systematic framework that can be used to design a pinot table. Apache Pinot™ is an extremely versatile high performance OLAP datastore, and one of the key factors in that is the variety of indexes that it supports out of the box, powering a diverse set of real-time analytic use cases. As such, defining the right indexes is one of the key important steps in table design, and we will cover that in this blog post. We will use a hypothetical sales order use case and design a Hybrid Pinot Table. 

Use Case: Sales Orders

Consider an omni-channel e-commerce platform that captures sales orders from multiple channels such as web browsers (desktop, mobile), Native Apps on Smartphones, and traditional retail POS systems, etc. All orders get stored in an ACID compliant transactional datastore. The platform collects events around users’ browsing behavior across all the channels in real-time and publishes them on a Kafka topic. All data (including transactional) eventually lands in the Analytical system through some ETL transformations. 

Choosing the right analytical system for a use case is a topic of discussion for another day. But at a high level, the key decision factors are the three Analytics KPIs - Latency, Throughput, and Freshness as depicted in the diagram below.



The business use case is to proactively monitor the daily order rate across different dimensions such as country, state, etc. Every new order triggers a “create order” event and publishes it to a Kafka topic. Every order status update triggers an “update order” event and publishes it to a Kafka topic. The use case also requires support for the ability to update order status as the order transitions from one state to another (upsert), and a backfill job that bootstraps (one time or on an ongoing basis) the historical data into the Pinot table, so that entire dataset is queryable in a single place. Lastly, the use case demands the purging of data that is older than 60 days (alternatively, in StarTree cloud Tiered storage feature can be used to move older data to S3 instead and directly query from Pinot).

Pinot Table Design Framework:

In this section, we will outline the framework before moving to design the table.

1. Schema Design

  • DateTime Fields

  • Metric Fields

  • Dimension Fields

2. Table Design

  • Table Type

  • Segment Config

    • Time Column

    • Table Type

    • Retention Period

    • Replication

  • Configure Indexes

    • Sorted Index

    • Inverted Index

    • Range Index

    • NoDictionary Columns

    • StarTree Index

  • Ingestion-time Transformations

    • Derived Columns / Transform Configs

    • Filter Configs

  • Ingestion Configuration

Schema Design:

1. Determine DateTime Field(s) and Granularity

Determine the Date Time fields and data type, format, and granularity for these fields.

For this use case, we have three DateTime Fields: 

  1. dateTimeSinceEpochMills:

    This is an input date column that has the date in epoch milliseconds format.

    • Data Type = LONG

    • Format = 1:MILLISECONDS:EPOCH

    • Granularity = 1:MILLISECONDS

  2. dateTimeSinceEpochSeconds:

    This is a derived column to store date in epoch seconds format.

    • Data Type = LONG

    • Format = 1:SECONDS:EPOCH

    • Granularity = 1:SECONDS - all values within 1 SECOND timeframe will be added to 1 bucket.

  3. dateStr:

    This is a derived column to store the date in yyyy-MM-dd format. 

    • Data Type = STRING

    • Format = 1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd

    • Granularity = 1:DAYS - all values within 1 DAY timeframe will be added to 1 bucket.

"dateTimeFieldSpecs": [
    {
      "name": "orderDateSinceEpochMills",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    },
    {
      "name": "orderDateSinceEpochSeconds",
      "dataType": "LONG",
      "format": "1:SECONDS:EPOCH",
      "granularity": "1:SECONDS"
    },
    {
      "name": "orderDateStr",
      "dataType": "STRING",
      "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
      "granularity": "1:DAYS"
    }
]
Copy

2. Determine Metric Fields

Metric fields are the fields that you intend to apply aggregate functions on, usually some numeric value.

"metricFieldSpecs": [
    {
      "name": "discount",
      "dataType": "DOUBLE"
    },
    {
      "name": "profit",
      "dataType": "DOUBLE"
    },
    {
      "name": "quantity",
      "dataType": "INT"
    },
    {
      "name": "sales",
      "dataType": "DOUBLE"
    }
]
Copy

3. Determine Dimension Fields

Dimension fields are the fields that you will use to slice and dice (filter predicates) the data and aggregate metrics.

"dimensionFieldSpecs": [
    {
      "name": "category",
      "dataType": "STRING"
    },
    {
      "name": "city",
      "dataType": "STRING"
    },
    {
      "name": "country",
      "dataType": "STRING"
    },
    {
      "name": "customerName",
      "dataType": "STRING"
    },
    {
      "name": "productName",
      "dataType": "STRING"
    },
    {
      "name": "region",
      "dataType": "STRING"
    },
    {
      "name": "segment",
      "dataType": "STRING"
    },
    {
      "name": "state",
      "dataType": "STRING"
    },
    {
      "name": "subCategory",
      "dataType": "STRING"
    }
]
Copy

Table Design:

1. Table >> Determine Table Type

Now that schema design is ready, let’s move on to the table configuration. First determine the table type for your table: REALTIME, OFFLINE or HYBRID (BOTH).

For this use case, we will create a hybrid table - i.e: both realtime and offline tables

2. Segment Config:

  1. Determine the Time Column

    Time column is used for determining the time boundary and managing the life cycle of data stored in pinot segments.

    Note: There can only be ONE time column for a given Pinot Table.

    For this use case, we will use orderDateSinceEpochMills as a Time Column. 

  2. Determine replication factor for your pinot segments

    Configure the replication factor for your segments. For Production / high availability environments, the recommendation is to use a replication of at least 3 so the system can still serve data even if one or more copies is unavailable. 

  3. Determine retention period for your pinot segments

    If your use case requires infinite retention then you can skip this step. 

    Configure the retention period for your Pinot segments. For Real time tables, typically this number is low (a few days) and for offline tables this number is relatively high.

    For this use case, we will configure the retention period for the real time table to be 3 days and for the offline table to be 60 days. Please note that you can also consider using Managed Offline flows (using Minions) to automatically move data from RealTime Table to Offline table instead of using an offline push job.

Segment Config:

"segmentsConfig": {
      "replication": "1",
      "schemaName": "SalesOrderData",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "60",
      "replicasPerPartition": "1",
      "timeColumnName": "orderDateSinceEpochMills"
}
Copy

3. Determine Indexes

Indexes are the key aspect of any schema design. Choosing the right index for each column is one of the key decisions that you will make, which, when done correctly, can improve your query performance significantly.

Let’s start with inverted indexes. Frequently used predicate columns are great candidates for inverted indexes.

  • Determine the sort column - Sorted Inverted Index

    First, identify the sort column for your Pinot table. 

    A pinot table can have only ONE sort column.

    The most frequently used predicate column with low cardinality should be used for applying sorted inverted index.

    One of the key use cases for the Sales Orders example is to find orders by a specific date. So let’s use orderDateStr as a sorted column so that we can take advantage of compression as well as data locality on this column.

"tableIndexConfig": {
      "sortedColumn":["orderDateStr"],
}
Copy
  • Determine inverted index columns - Bitmap Inverted Index

    Next, identify if there are any low cardinality columns. Using an inverted index on such columns will help achieve a great degree of compression. 

"tableIndexConfig": {
      "invertedIndexColumns": [
        "category",
        "city",
        "country",
        "customerName",
        "productName",
        "region",
        "segment",
        "state",
        "subCategory"
      ],
}
Copy

More details can be found in Pinot official docs here.

  • Determine range index columns - Range Index

    Range Indexes are good for scenarios where you want to apply a range predicate on metric/timestamp columns with high cardinality (i.e: they have a large number of unique values). 

    For this use case, the date range will be used on the orderDateStr field. 

"tableIndexConfig": {
     "rangeIndexVersion": 2,
      "rangeIndexColumns": [
        "orderDateStr"
      ],
}
Copy
  • Raw Value Forward Index

    Now, let’s identify any of the remaining columns that are high cardinality and might be adversely affected by the default index (dictionary encoded forward index). For such columns, the Raw Value Forward Index is the way to go. 

    For this use case, we have event_id which is very high cardinality.

"tableIndexConfig": {
     "noDictionaryColumns": [
        "discount",
        "profit",
        "quantity",
        "sales"
      ],
}
Copy
  • Determine StarTree index - StarTree Index

    StarTree Index builds aggregates at ingestion time thereby helps you avoid expensive aggregation compute operations at query time. Queries that compute aggregations metrics over a large data set  by slicing and dicing across different dimensions will have significant performance improvement with StarTree Index.

    Note:StarTree index generation incurs a performance/memory/storage overhead in the system, so general recommendation is to first use regular indexing strategies to optimize query performance and only use it if those don’t help. 

    Sample Query:

SELECT
  country, state, city, Count(*) total_orders, SUM(sales) total_revenue
FROM
  SalesOrder
WHERE
  orderDateStr >= ‘2022-08-01’ AND  orderDateStr <= ‘2022-08-31’’
GROUP BY
  country, state, city
ORDER BY
  orderDateStr
Copy

StarTree Index:

"tableIndexConfig": {
     "starTreeIndexConfigs": [{
        "dimensionsSplitOrder": [
          "orderDateStr",
          "city",
          "state",
          "country"
        ],
        "skipStarNodeCreationForDimensions": [],
        "functionColumnPairs": [
          "COUNT__*", 
          "SUM__sales"
        ],
        "maxLeafRecords": 10000
      }],
}
Copy

Note: Dimensions specified in dimesionsSplitOrder should be in descending order of cardinality to optimize index performance.

  • Dictionary Encoded Forward Index (Default)

    All other columns that are not listed in the specialized indexes above will automatically get the default indexing scheme which is a dictionary-encoded forward index with bit compression.

4. Table >> Define the Stream Configuration (applicable to Real-time Tables only)

For real-time tables, we need to specify kafka topic details in stream config. Apart from standard kafka configs, a few additional pinot configs can be configured. These configs provide knobs to control the number and size of pinot segments. 

  • Configure Segment Flush Threshold:

    This config provides knobs to control the number and size of segments being generated. By default, segments continue to consume data until the configured segment flush threshold time (default 6h) or segment flush threshold size (default 100 MB), whichever occurs first, is reached. At that point, the segment gets flushed out from memory to deepstore.

  • Configure Kafka consumer offset reset

    This config defines whether to start consumption from the smallest offset (meaning consume all the data starting from the oldest available) or the largest offset (which means consume only the latest/fresh data that is arriving. 

    Stream config:

"streamConfigs":{
  ...
    "realtime.segment.flush.threshold.rows": "0",
    "realtime.segment.flush.threshold.time": "24h",
    "realtime.segment.flush.threshold.segment.size": "200M",
    "stream.kafka.consumer.prop.auto.offset.reset":"smallest",
    ...
}
Copy

5. Table >> Ingestion Configuration (optional):

This is where you can configure filters and transformations that you want to apply on the input data before it gets into your Pinot cluster.

  • Filter:

    Pinot as your OLAP data store should host only gold quality data. It is generally a good practice to eliminate unwanted or bad data values (such as null or empty records). However, there could be scenarios where you don’t have control over the source. In such cases, you can apply filter conditions in the filter config to eliminate records that meet certain criteria. 

    More details can be found here

    The input source could potentially have the orderDateSinceEpochMillis field with a 0 value. Since an epoch timestamp with value 0 does not make sense, we can add a filter function to eliminate such events from entering into Pinot: 

    Filter config:

"ingestionConfig": {
     "filterConfig": {
          "filterFunction": "Groovy({orderDateSinceEpochMillis == 0}, orderDateSinceEpochMillis)"
      },
    }
Copy
  • Column Transformation:

    You can also apply ingestion time transformations on individual columns or create derived columns to store data in the desired transformed format that can benefit performance at query runtime.

    More details can be found here.

    In this example, we are creating two derived columns with respective transformFunctions which are self-explanatory. 

    Transform config:

"ingestionConfig": {
      "transformConfigs": [{
          "columnName": "orderDateSinceEpochSeconds",
          "transformFunction": "toEpochSeconds(orderDateSinceEpochMillis)"
        },
        {
          "columnName": "orderDateStr",
          "transformFunction": "toDateTime(orderDateSinceEpochMillis, 'yyyy-MM-dd')"
        }],
    }
Copy

6. Minion Task (Optional)

  • File Ingestion Task: (Applicable to Offline tables only)

    Note: This is a StarTree proprietary feature.

    You can configure FileIngestionTask to set up ingestion from a batch source (such as Amazon S3 Bucket) at a configurable periodic frequency. More details can be found here.

    FileIngestionTask config:

"task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "schedule": "0 20 * ? * * *",
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "us-west-2",
          "inputDirURI": "s3://<BUKET_PATH>/",
          "inputFormat": "PARQUET"
          "taskMaxNumFiles": "16" 
        }
      }
}
Copy
  • Minion Merge Rollup Task: 

    This is a segment optimization task that merges smaller segments into a larger one.

    More details here.

    Note: As of Jan 2022, this is applicable to offline tables only. 

    MergeRollupTask config:

"task": {
  "taskTypeConfigsMap": {
    "MergeRollupTask": {
          "1day.mergeType": "concat",
          "1day.bucketTimePeriod": "1d",
          "1day.bufferTimePeriod": "1d"  
     }
   }
}
Copy
  • Managed Offline Flow > RealTime to Offline Segments Task: 

    If you are creating a hybrid table, you can configure the following automated task - RealtimeToOfflineSegmentsTask - which can run periodically to move all eligible segments from Realtime to Offline table.

    More details here.

    RealtimeToOfflineSegmentsTask config:

"task": {
  "taskTypeConfigsMap": {
    "RealtimeToOfflineSegmentsTask": {
      "bucketTimePeriod": "1h",
      "bufferTimePeriod": "1h",
      "schedule": "0 * * * * ?"
    }
  }
}
Copy

Important Note: Please note that the minion tasks will not run unless the controller is configured to schedule the task frequency config. More details in this doc.

In conclusion, the above framework can be easily expanded to cover other additional use case scenarios as well as utilizing other Apache Pinot features. Further, depending on the query throughput/SLA requirements, users might also consider utilizing advanced performance tuning strategies in Apache Pinot. 

Apache Pinot