Apache Pinot 101: Lesson 3 - Batch Ingest

This is the third lesson in the Apache Pinot 101 Tutorial Series:
- Lesson 1: Up and Running with Docker
- Lesson 2: Ingesting Data with Kafka
- Lesson 3: Ingesting Batch Data
- Lesson 4: Indexes for Faster Queries
- Lesson 5: Queries with SQL, Joins
- Lesson 6: Visualization with Superset
In the last lesson you learned how to ingest data from a Kafka topic. In addition to streaming ingestion, Apache Pinot supports batch ingestion from files stored in the local file system or in cloud storage. In this lesson, you’ll explore how to ingest a CSV file into Pinot.
Let’s ingest a file which contains information about Bitcoin being bought and sold. The rows of the file are shown below.
id | side | time_ms | size |
---|---|---|---|
1 | buy | 1742205882000 | 1 |
2 | sell | 1742205910000 | 0.5 |
3 | buy | 1742205945000 | 1 |
There are three rows which show that Bitcoin is bought two times and sold once for a total of 1.5 coins. We’ll ingest these rows into a table and write a query which gives us the same result.
To ingest data from a file, we need to create an offline table, in contrast to the real-time table that we saw in the previous post.
Tables in Pinot come in three different types – realtime, offline, and hybrid. Realtime tables ingest data from streaming sources like Kafka, Kinesis, or Pulsar. Offline tables ingest data from batch sources like the local filesystem or a blob store like S3. A hybrid table ingests data from both realtime and batch sources.
Like we did last time, start by creating the schema for the table.
{
"schemaName": "portfolio",
"enableColumnBasedNullHandling": true,
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "LONG"
},
{
"name": "side",
"dataType": "STRING"
},
{
"name": "size",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "time_ms",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS",
"notNull": true
}
],
"primaryKeyColumns": [
"id"
],
"metricFieldSpecs": []
}
The schema defines a table called “portfolio” and has information about the columns it will contain. Next, create the table configuration.
{
"tableName": "portfolio",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": 1,
"schemaName": "portfolio",
"timeColumnName": "time_ms"
},
"tenants": {},
"tableIndexConfig": {},
"ingestionConfig": {},
"metadata": {}
}
Notice how the type of the table is OFFLINE. This indicates that the table will store information from batch sources.
Finally, write a job specification file in YAML which will tell Pinot to ingest the CSV file. The content of the YAML file is given below.
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/files'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/files'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'portfolio'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushParallelism: 2
pushAttempts: 2
Let’s look at the various sections of the file.
- jobType is specified as SegmentCreationAndTarPush. This tells Pinot to generate a segment for the CSV, as a TAR file, and push it to the server.
- inputDirURI specifies where to read the CSV file from. In our Docker compose file, we’ve mounted a volume for the Pinot controller at /tmp/files path. This contains the job specification YAML and the CSV file.
- outputDirURI specifies where the TAR file for the segment will be written. We’re writing it back to the /tmp/files directory.
- includeFileNamePattern specifies what files we’d like to read. It’s written as a glob which matches all the CSV files in the folder.
- In the recordReaderSpec section we’ve specified that the file format is CSV.
- In the tableSpec section, we’ve specified that the file will be ingested into the portfolio table.
- Finally, the pinotClusterSpecs specifies the URL for the Pinot controller. Since the job specification file will be submitted to the Pinot controller, the URL uses localhost as the hostname.
The documentation covers more details about how to run ingestion jobs when performing batch ingestion. For the various sections in the job spec file, refer to the documentation on ingestion job spec.
Let’s start by creating the schema. To create the schema, send the contents of the schema JSON file over to the controller. Run the following curl command.
curl -F schemaName=@tables/002-portfolio/portfolio_schema.json localhost:9000/schemas
Similarly, create the table by sending the contents of the table configuration JSON file to the controller.
curl -XPOST -H 'Content-Type: application/json' -d @tables/002-portfolio/portfolio_table.json localhost:9000/tables
Now that the schema and table have been created, launch the ingestion job.
docker compose exec pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob -jobSpecFile /tmp/files/job_spec.yaml
The command above launches a job on the controller using the Pinot CLI. Point to the job spec file by passing its path to the jobSpecFile option. Wait for the ingestion job to finish and then query the newly populated portfolio table by navigating to the query console.
And it’s as easy as that. Next up, we’ll look at how to create indexes to speed up the queries.
Next Lesson: Indexes for faster queries →