In part 1 of this blog series, we highlighted the wide spectrum of requirements imposed by real-time analytics use cases and data sources on the underlying OLAP (OnLine Analytical Processing) platform. For example, supporting all those use cases means we need a system that can support extremely high throughput with low query latency and at the same time provide high query accuracy – in the presence of data duplication and real-time updates. Similarly, the same system must be able to ingest data from all kinds of data sources, handle unstructured data and real-time upserts. While there are different ways of solving each such problem scenario, ideally we want one unified platform that can be easily customized.
Enter Apache Pinot – a system that was purpose-built for supporting analytical use cases at scale and which has evolved to a highly customizable platform. In this blog, we will understand at a high level the different capabilities of Apache Pinot that satisfy all of the requirements mentioned in part 1 of the blog.
Apache Pinot is a distributed analytics data store that is rapidly becoming the go-to solution for building real-time analytical applications at scale. Pinot stands out due to its ability to deliver low-latency performance for high-throughput analytical queries. This makes it an ideal fit for powering internal dashboards as well as mission-critical, user-facing applications such as LinkedIn’s user profile analytics and Uber’s Restaurant Manager.
Below is a diagram of Pinot’s distributed architecture. A Pinot cluster can ingest data from streaming and batch sources and makes it available for query processing. The different components of Pinot are as follows:
Pinot segment: Unit of partitioning, replication, and query processing that represents a subset of the input data along with the specified indices.
Pinot Server: A node that is responsible for a set of Pinot segments. It stores the segments locally and processes them at query-time. This can be real-time or offline (batch) depending on the data source.
Pinot Controller: The brain of the cluster. Uses Apache Helix to coordinate segment assignment, replication, cluster membership, and other cluster management tasks.
Pinot Broker: Used to serve user queries. It uses the segment routing information managed by the controller to scatter queries to the different servers. Each server in turn locally executes the query and returns an intermediate response to the broker. The broker will then do a final aggregation and return the results to the user.
Minion: Runs computationally intensive background tasks such as segment creation, segment merge, data purge, index generation. Below, we summarize the different layers and capabilities of Pinot that make it an ideal OLAP platform. We will do a deep dive into each of these components in subsequent blog posts.
As mentioned before, Pinot provides a convenient way to ingest data from various streaming sources (Apache Kafka, Amazon Kinesis) and batch sources (HDFS, GCS, S3). A special case of the streaming source is to ingest Change Data Capture (CDC) events from transactional data stores like MySQL or PostGres using standard technologies such as Debezium (see here for more details). Pinot’s distributed architecture enables ingesting large volumes of data quickly and makes it available for querying with millisecond level freshness guarantees.
Here are some of the salient ingestion features of data ingestion in Pinot
Pinot has built-in support for handling data upserts in a realtime table, based on a primary key. This can be accomplished in 2 ways:
Full row upsert: Replace the entire row with the new one for the same primary key
Partial upsert: Replace a subset of the columns for the same primary key
This is very useful to handle column updates appearing in a CDC stream. It also provides a way to resolve duplicates or updates to data in real-time and hence provide accurate query results. As mentioned in part 1, this ability is crucial for certain use cases like Business metrics. You can find more details in this eng wiki.
Pinot can transform records on the fly during ingestion. This removes the need for external, pre-processing workflows for common scenarios, such as filtering records, derived columns, or flattening complex fields. Pinot offers a plethora of transform functions including JSON, date-time functions, array/string/math functions, and even an ability to write Groovy scripts for custom transform logic. It’s also very easy to plug in your own UDF (User Defined Function). See here for more details.
In the case of large data sets hosted on the likes of HDFS, S3, Pinot provides a framework known as Segment Build And Push, to ingest data quickly. As the name implies, the build phase generates segments from the input data which are then pushed to the Pinot cluster. Such build and push jobs can be executed on the Pinot Minions at scheduled intervals. Naturally, this frees up the Pinot servers from the overhead of segment creation. Thus, bulk loading of terabytes of data has a negligible impact on the scarce resources in the Pinot cluster and minimizes impact on the critical serving performance.
Pinot provides out-of-the-box support for handling unstructured data. For arbitrarily nested JSON data, we can store it as a ‘JSON’ type column and be able to store it in Pinot without the need for complex pre-processing (eg: flattening). In addition, we can specify a JSON index for this column to accelerate nested data structure queries, using the JSON_MATCH function. The JSON index is highly performant and enables users to execute such queries within milliseconds. Similarly, Pinot also provides ease of querying for columns containing large text blobs using the TEXT index which allows users to do term, phrase, prefix, or regex-based text search queries in a fast manner.
Pinot is built around a highly optimized and flexible storage layer within each Pinot server. Following are the highlights of Pinot’s storage capabilities.
Data within a Pinot segment is organized in a columnar format which helps in reducing the amount of data that needs to be processed and stored in memory. This helps in speeding up analytical queries that focus on a few columns within a high-dimensional dataset. In addition, Pinot employs the following optimization techniques for further reducing in-memory and on-disk overhead:
Dictionary encoding: By default, each unique value for a given column is assigned a unique ID which in turn is stored as the column value. Thus, for most columns, this leads to a compact representation of all the column values.
Bit compaction: Bit compression or bit packing allows us to represent a list of numbers using far fewer bits than their original representation. In the case of dictionary encoding, the dictionary IDs can be a bit compressed to further reduce the size of a dictionary encoded column.
Sorting: One of the columns in a given Pinot table can be sorted and then stored using run-length encoding which drastically reduces the total column size.
Pinot provides a rich set of indexes for accelerating a variety of use cases. Each such index can be defined for any of the columns within the table config. As data is being ingested, Pinot will automatically generate the specified indexes and store them as part of the segment. What’s more, we can also add indexes on the fly to the existing segments. Here’s a quick summary of the indexes available in Pinot.
These indexes provide several orders of magnitude improvement in query performance (in some cases query latency goes from 30+ seconds without index to about 50 milliseconds with the index). For more details about each index and its benchmark, please refer to What makes Apache Pinot fast – Chapter 2.
This layer deals with processing user queries in a highly optimized manner across all the Pinot servers. At the same time, Pinot also provides a way to run highly complex queries (ANSI SQL compliant) for greater flexibility.
Upon receiving a query, it goes through several rounds of optimizations:
Broker level Pruning: Data is usually partitioned across the Pinot servers based on a particular column (time column by default). During query time, Pinot brokers will perform partition-aware query routing – a query with a filter predicate on the partitioned column will intelligently be routed only to the segments which contain that column. This effectively minimizes the number of segments that need to be processed across the cluster, thus lowering the per-query latency and boosting the throughput.
Server level Pruning: When a Pinot server receives a query request from the broker, it will first prune (remove) the segments that don’t need to be processed. This is done by utilizing column metadata such as min/max values and bloom filters. Similar to broker pruning, this reduces the set of segments that need to be processed locally and hence improves overall throughput and query latency.
Filter and aggregation optimizations: Pinot offers a rich set of indexes as described above to speed up filtering and aggregation operations as part of query processing.
Although Pinot has a lot of advanced indexing techniques, one of the challenges is to pick the right strategy depending on the query. To achieve this, unlike other DBMS systems that optimize the high-level logical query plan, Pinot utilizes per segment query plan. This allows optimizing query execution for certain scenarios such as predicate matching all the (column) values of a given segment. In other cases, Pinot can generate a query plan that relies solely on the segment metadata for answering things like count, min, or max aggregation without any predicates. This allows segments to have different indexes and physical layouts and still obtain optimum query latency during execution.
Pinot natively supports Calcite SQL with rich semantics including:
Standard OLAP functionality (filtering, projection, aggregation, group-by, order-by)
Lookup join using the concept of dimension table
A rich library of built-in functions and the ability to plugin UDFs
Approximate or probabilistic query support
Ability to do text, JSON, geo-spatial queries powered by the built-in indexes
The ability to perform nested queries or joins across multiple tables, external datasets are provided by a highly optimized PrestoDB connector or Trino connector. This gives us the best of both worlds – full SQL support from Presto and low-latency queries from Pinot. The connector pushes as many operations as it can from the Presto query to Pinot. It can push down everything Pinot supports including predicates, aggregation functions, group by, UDFs, etc. For a certain set of queries, wherein the aggregation function or filter predicate cannot be pushed down, Pinot provides a gRPC server for streaming data from the segments to Presto in a memory-efficient manner.
Thanks to all the above mentioned capabilities, Pinot can meet the critical requirements of all the use cases mentioned in part 1.
Pinot also offers a rich capability to ingest data from various data sources and formats. This allows for easy integration into the existing Data ecosystem.
Overall, Apache Pinot is indeed a strong contender for the ideal OLAP platform. In the next phase of Pinot’s evolution, the community plans to add significant improvements as shown in the diagram below. Overall, this will help bridge the gap in query flexibility, support even more use cases, and make it the cheapest and fastest OLAP store to operate in the cloud. Stay tuned!
If you want to try out Apache Pinot, the following resources will help you get started:
Download page: https://pinot.apache.org/download/
Getting started: https://docs.pinot.apache.org/getting-started
Join our Slack channel: https://communityinviter.com/apps/apache-pinot/apache-pinot
See our upcoming events: https://www.meetup.com/apache-pinot
Follow us on Twitter: https://twitter.com/startreedata
Subscribe to our YouTube channel: https://www.youtube.com/startreedata