Introducing ASOF JOIN in Apache Pinot: The Perfect Match for Your Time-Series Data

Announcing support for ASOF JOIN in Apache Pinot, a powerful feature designed to simplify and streamline complex temporal analysis.

Written By
Published
Reading Time

Joining time-series data from different sources is a common but challenging task. When timestamps between two datasets don’t align perfectly, a standard equi-join will fail to produce meaningful results. While this can be worked around through complex subselect-based queries, they aren’t typically very efficient to execute. To address this, we are pleased to introduce support for ASOF JOIN in Apache Pinot, a powerful feature designed to simplify and streamline complex temporal analysis.

An ASOF JOIN is a specialized join that matches rows from two tables based on a proximity condition, typically a timestamp. It allows users to join an event stream with a state stream, finding the state of the system “as of” the time the event occurred.

Understanding ASOF JOIN

Unlike a standard join that requires an exact match on a key, an ASOF JOIN finds the closest match (if one exists). For each row in the left table, it searches the right table for a row that has the closest matching timestamp to the timestamp of the row in the left table (where the closest match can be defined as <, >, <=, or >=), within the specified ON conditions.

This “last observation carried forward” logic is essential for scenarios where data is not perfectly synchronized but is related in time.

How ASOF JOIN Works

Let’s illustrate with a common financial use case: joining a stream of trades with a stream of market quotes.

Consider a trades table and a quotes table:

trades

trade_idsymboltrade_time
1PNOT10:00:05
2STAR10:00:10
3PNOT10:00:12

quotes

symbolquote_timeprice
PNOT10:00:01150.00
PNOT10:00:04150.05
STAR10:00:082800.00
PNOT10:00:14150.10

To determine the market price at the time of each trade, we can use the following ASOF JOIN:

SELECT
  t.trade_id,
  t.symbol,
  t.trade_time,
  q.price AS price_at_trade_time
FROM trades AS t
ASOF JOIN quotes AS q
MATCH_CONDITION(t.trade_time >= q.quote_time)
ON t.symbol = q.symbol;Code language: SQL (Structured Query Language) (sql)

Here’s how Pinot processes this query:

  1. For trade_id = 1 (PNOT @ 10:00:05): The join first finds all quotes for PNOT based on the ON clause. The MATCH_CONDITION (trade_time >= quote_time) then filters these to quotes at or before 10:00:05. Of the valid quotes (10:00:01, 10:00:04), it selects the most recent one: the quote at 10:00:04 with a price of $150.05.
  2. For trade_id = 2 (STAR @ 10:00:10): It finds the most recent quote for STAR at or before 10:00:10. The match is the quote at 10:00:08 with a price of $2800.00.
  3. For trade_id = 3 (PNOT @ 10:00:12): It searches for PNOT quotes at or before 10:00:12. The most recent valid quote is still the one at 10:00:04. The quote at 10:00:14 is ignored as it occurred after the trade.

The resulting table provides a precise, point-in-time view of the market for each trade:

trade_idsymboltrade_timeprice_at_trade_time
1PNOT10:00:05150.05
2STAR10:00:102800.00
3PNOT10:00:12150.05

Under the Hood: Implementation and Design

The ASOF JOIN implementation in Pinot is designed for performance and scalability by intelligently combining our existing distributed join framework with a new, efficient in-memory matching algorithm. The process can be understood in two main phases.

Phase 1: Distributed Matching with the ON Clause

The first phase of the join handles the equality conditions specified in the ON clause. This part of the execution leverages Pinot’s powerful and mature distributed join infrastructure. Based on the table sizes and data distribution, Pinot can choose the most efficient strategy to bring rows with matching keys together from across the cluster (through query hints):

  • Partitioned (Hash) Join: For joins between two large tables, Pinot can use a hash-based distribution strategy. Rows from both tables are shuffled across servers based on the hash of their join keys, ensuring that matching keys are co-located on the same processing node. If the two tables are already partitioned and colocated, data shuffle can be eliminated altogether (see colocated join strategy).
  • Broadcast Join: When one of the tables is small, Pinot can broadcast its entire contents to all servers processing the larger table. This avoids expensive data shuffling for the larger table.

By building on this foundation, the ASOF JOIN ensures that the initial step of finding all potential row matches is fully distributed and horizontally scalable.

Phase 2: In-Memory Search with the MATCH_CONDITION

Once the matching rows are co-located on a server, the second phase begins. This is where the ASOF logic is applied. For each group of rows matching on the ON key, the rows from the right table are organized into a sorted tree data structure in memory, ordered by the column used in the MATCH_CONDITION.

Instead of scanning through all potential matches for each left-table row, Pinot performs a highly efficient binary search on this sorted tree structure. This allows Pinot to instantly find the closest qualifying row from the right table that satisfies the MATCH_CONDITION (e.g., t.trade_time >= q.quote_time). Using this approach avoids computationally expensive scans and ensures that the join remains fast, even when a single key in the left table matches a large number of rows in the right table.

This two-phase design effectively delegates the heavy lifting of data shuffling to our battle-tested distributed join engine, while handling the unique temporal matching logic with a specialized and optimized in-memory search.

This process is analogous to a traditional hash join algorithm. The right table can be thought of as the build side, where instead of building a simple hash table, we construct a more sophisticated sorted data structure for each join key. The left table then acts as the probe side. For each row, we first look up the corresponding key and then perform a binary search within its sorted data structure to find the closest match, rather than probing for a single exact value. This hybrid approach allows Pinot to combine the scalability of hash-based distributed joins with the efficiency of ordered searches for complex temporal lookups.

Syntax

The syntax is designed to be clear and expressive:

SELECT ...
FROM table1
ASOF JOIN table2
MATCH_CONDITION(table1.time_col <comparison_operator> table2.time_col)
ON table1.join_key = table2.join_key;Code language: SQL (Structured Query Language) (sql)
  • MATCH_CONDITION(…): This defines the temporal logic for the “closest match.” The comparison operator can be <, >, <=, or >=.
  • ON: This clause provides the exact-match conditions, partitioning the data before the ASOF logic is applied. For example, ensuring you only match quotes and trades of the same stock symbol. The join condition in ON is mandatory and has to be a conjunction of equality comparisons (i.e., non-equi join conditions and clauses joined with OR aren’t allowed). ON true can be used in case the join should only be performed using the MATCH_CONDITION.

Pinot also supports LEFT ASOF JOIN, which guarantees that all rows from the left table are included in the result set. If no match is found in the right table, the corresponding columns will be populated with NULL.

Other Practical Use Cases

The utility of ASOF JOIN extends to many domains beyond finance.

IoT and Sensor Data Analytics

In IoT analytics, data from different sensors often arrives at unsynchronized intervals. An ASOF JOIN can correlate these disparate streams.

Query: For every soil moisture reading, find the most recently recorded air temperature.

SELECT
  m.timestamp AS moisture_reading_time,
  m.moisture_level,
  t.temperature
FROM moisture_readings AS m
LEFT ASOF JOIN temperature_readings AS t
MATCH_CONDITION(m.timestamp >= t.timestamp)
ON m.sensor_id = t.sensor_id;Code language: SQL (Structured Query Language) (sql)

E-commerce User Behavior Analysis

E-commerce platforms can use ASOF JOIN to understand user behavior in the context of changing data, like product prices.

Query: What price did a user see for a product at the moment they clicked on it?

SELECT
  c.user_id,
  c.product_id,
  c.click_timestamp,
  p.price AS price_at_click
FROM user_clicks AS c
ASOF JOIN price_history AS p
MATCH_CONDITION(c.click_timestamp >= p.effective_timestamp)
ON c.product_id = p.product_id;Code language: SQL (Structured Query Language) (sql)

Conclusion

The ASOF JOIN implementation in Apache Pinot provides a robust and efficient solution for a common class of time-series analysis problems. By simplifying the process of joining asynchronous data streams, it allows developers and analysts to focus on deriving insights rather than on complex and inefficient data manipulation.

To learn more, you can review the technical details in the original pull request for the feature. We encourage you to try this new feature and provide feedback through our community channels.

Connect with us

Millions of concurrent users. Tens of thousands of queries per second. Hundreds of thousands of events flowing every second. If these are the kinds of numbers you operate with, aspire to handle, or are simply curious about, let’s connect! We’d love to share our lessons in serving at scale, hear your use cases, and geek out over low-latency analytics.

Contents
Share