Apache Pinot

Delta Lake Managed Ingestion with StarTree’s DeltaConnector for Apache Pinot

Saurabh Dubey
Mark Needham
Written by Mark Needham,Saurabh DubeyDecember 15, 20225 minutes read

Delta Lake is a popular open source data storage framework that lets users build and manage data lakes. It interfaces seamlessly with a wide variety of compute engines including Apache Spark, Apache Flink, Presto, and more. 

While data lakes are great for large scale data storage and querying, we can get even more out of that data if we can analyze the data in Apache Pinot. We built the Pinot Delta Lake Connector to help you do exactly that.

The Pinot Delta Lake connector is available with StarTree Cloud. You can register for a 30-day free trial to try it out on your own data.

How does Delta Lake store data? 

Delta lake stores data in tables, which contain two types of data:

  • Apache Parquet files for storing the actual data.

  • Transaction logs that capture the added and removed files in each table version. 

A typical delta lake table directory would look like this:

In this example, we have three Parquet files (f1.parquet, f2.parquet, f3.parquet) and their associated transaction log files in the _delta_log directory.

Below is a sample of the content in a delta table transaction log file:

{"add":{"path":"f3.parquet","size":561,.......}}
{"add":{"path":"f4.parquet","size":561,.......}}
{"remove":{"path":"f2.parquet","deletionTimestamp".....}}
{"commitInfo":{"timestamp":1663674437685,"operation":"WRITE".........}}
Copy

Each DML operation performed on the delta table generates a new table version and therefore new transaction log file. More details about how delta lake operates can be found here.

Now that we understand delta lake’s data format, let’s learn how to ingest that data into Pinot.

How do I use the delta lake connector?

The delta lake connector is configured under the "task" section of the table config. An example configuration is shown below:

"task": {
  "taskTypeConfigsMap": {
    "DeltaTableIngestionTask": {
      "delta.ingestion.table.fs": "S3",
      "delta.ingestion.table.s3.accessKey": "<key>",
      "delta.ingestion.table.s3.secretKey": "<secret>",
      "delta.ingestion.table.s3.region": "<region>",
      "delta.ingestion.table.uri": "s3a://<deltaTablePath>",
      "tableMaxNumTasks": "1",
      "schedule": "0 20 * ? * * *"
    }
  }
},
Copy

Let’s go through the parameters, starting with the Delta Lake Connector specific ones:

  • delta.ingestion.table.fs: Type of the delta table storage location (only S3 supported as of now)

  • delta.ingestion.table.s3.accessKey, delta.ingestion.table.s3.secretKey, delta.ingestion.table.s3.region, delta.ingestion.table.uri: Connection details of the delta table storage location.

We also have a couple of parameters that apply to all jobs that use the Minion framework:

  • schedule: A cron string describing the schedule of the task execution. The minion task framework periodically checks the delta table for new versions at this interval.

  • tableMaxNumTasks: Maximum number of minion tasks for ingesting data. 

How does it work?

The delta lake connector uses Apache Pinot’s minion based task execution framework to manage and execute delta table ingestion tasks. 

The minion is a Pinot component that is primarily used for performing any adhoc, computationally intensive tasks like batch ingestion, segment merge / rollups etc. Since delta lake format stores data into parquet files, the connector can be modeled similarly to the FileIngestionTask, with the delta transaction logs acting as the source of truth for files to be added and removed. 

Each run of the DeltaTableIngestionTask looks at the current table version, generates a list of files to be added and removed, and triggers a set of tasks to begin reading the files and performing the pinot table updation. A diagram showing how it works is shown below:

Let’s go into a bit more detail.

Task generation

As soon as the table is created, the minion task framework will start generating ingestion tasks at a cadence based on the “schedule” config. In each run, the task generator will connect to the delta table using Delta Standalone connector and the connection details from the table config and get the latest table version. 

Using the connector library, it’ll determine a list of files to be added and removed. It’ll store these details as part of the task metadata, and trigger a number of minion tasks (based on the value of the tableMaxNumTasks config), distributing the files to be processed among them.

The diagram below demonstrates how tasks are generated:

You will want to set tableMaxNumTasks to 1 to achieve atomicity between Delta Lake versions, but it will take longer to ingest the data.

Task execution

The minion tasks are executed in parallel by the minions. Each task executor looks at the list of files to be added and removed and downloads the files to be added from the delta table path. 

It then generates segments from these files that include any indexes specified by the table and schema configs. 

Once the segments are generated, the minion uses the segment replace protocol to atomically delete segments corresponding to files being removed and uploads the new segments corresponding to files being added. This ensures a consistent view of the Pinot table, even during updates.

Once all minion tasks have finished successfully, the task metadata will be updated to reflect the delta table version to which the pinot table has successfully been synced. This helps the task generator work out which files are to be added and removed the next time that the task is run.

These steps run periodically as per the schedule config, thereby ensuring that the Pinot table is always kept in sync with the delta table it replicates.

Building the connector (for the curious)

Delta lake has a very comprehensive ecosystem of connectors built for interacting with your delta lake tables (https://delta.io/integrations/), including a standalone Java connector. We use that connector to process the ever-updating delta lake tables.

The connector is built with the following requirements in mind;

  1. Easy table level configuration to ingest delta lake tables into Apache Pinot

  2. Ability to sync any changes made to the delta table, so that the corresponding Pinot table is always in sync with the delta table.

  3. Ensuring atomicity of sync operation, i.e., the Apache Pinot table must always be in sync with at least one Delta Lake table version. Updates to the next version must happen atomically - we should never be in a state where Pinot has data between versions.

Summary

If you have your data inside a Delta Lake it’s now possible to run blazing fast analytics on that data in Apache Pinot.

We’d like to take this opportunity to thank Nick Karpov for helping with and validating the overall design, especially with understanding the delta table format and the delta-connector library integration

We’d love for you to give the Delta Lake Connector a try and let us know how you get on in the StarTree Community Slack Channel

Apache PinotStarTree Cloud