File Ingestion
The FileIngestionTask is responsible for converting files into segments. These files can originate from various sources, such as Amazon S3, Google Cloud Storage (GCS), Azure Data Lake Storage (ADLS), or a local file system. Additionally, other file systems are supported by implementing the PinotFS interface.
By default, the FileIngestionTask is designed for OFFLINE tables during the ingestion stage. However, it can also be configured to backfill data into REALTIME tables when necessary.
This task can be customized to suit various use cases. For instance, you can configure it to allow the addition of new files or enable updates to existing files, depending on your specific requirements.
Keep an eye on the data retention value set in table config when ingesting old datasets. If the value is set too low segments will be removed by the data retention task soon after they are ingested.
Pinot Table Configuration
The following sections show a few common use cases and how to customize the FileIngestionTask for them.
Bootstrap
Given a folder of many many files, ingest all of them into segments with indices and sizes as configured. During Proof of Concepts, it’s common to bootstrap Pinot tables with very large datasets. For those cases we can assume the input files won’t change during ingestion.
The task configurations are described below.
Access The Input Folder
Yes
The input folder.
inputDirURI
Yes
The input folder.
inputFormat
Yes
The input file format.
includeFileNamePattern
No
To filter the target input files out, e.g. to exlude mark file _SUCCESS. It’s empty by default to allow all files. The syntax is defined by FileSystem.getPathMatcher(opens in a new tab).
input.fs.className
No
The className used to ingest the files. It’s inferred per URI.
input.fs.prop.<>
No
Those props are based on which fs.className is picked.
hasDirectories
No
Enable a sanity check on whether a file URI is directory. DEPRECATED, as the sanity check got optimized a lot, and it’s enabled all the time.
skipFileIfFailToInitReader
No
Skip input file if record reader fails to init for it. Enabled by default, the names and count of skipping files are tracked in task metadata for debugging.
push.mode
No
Tar (default) or Metadata . Use Metadata push mode if controller becomes a bottlelneck when tasks upload segments.
The OSS Pinot docs(opens in a new tab) have explained the push modes clearly. In short, with Tar mode, the segments are uploaded from tasks to controller directly; with Metadata mode, the tasks extract metadata from segments, upload segments to deep store and segment metadata to controller separately. If only a few minion tasks run in parallel, Tar mode should just work fine. If many tens or hundreds of tasks run in parallel, then better use Metadata mode.
Control The Ingestion Parallelism
No
The max number of parallel tasks a table can run at any time. It’s 10 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round.
tableMaxNumTasks
No
The max number of parallel tasks a table can run at any time. It’s 10 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round.
taskMaxDataSize
No
The max number of bytes one task can process, to spread workload among parallel tasks. It’s 1G by default. If set, taskMaxNumFiles is ignored.
taskMaxNumFiles
No
The max number of files one task can process, to spread workload among parallel tasks. It’s 32 by default. This config is ignored if taskMaxDataSize is set explicitly.
maxNumRecordsPerSegment
No
The number of records to put in output segment, to control size of output segments. It’s 5M by default.
desiredSegmentSize
No
The segment size desired (Default is 500M. K for kilobyte, M megabyte, G for gigabyte). With this configuration, we do not need both maxNumRecordsPerSegment and taskMaxDataSize properties to be configured..
schedule
No
CRON per Quartz cron syntax(opens in a new tab) for when the job will be routinely triggered. If not set, the task is not cron scheduled but can still be triggered via endpoint /tasks/schedule.
If set tableMaxNumTasks
to -1, tasks are generated as many as needed to ingest all files in one round, with taskMaxDataSize
to control the workload for each subtask. In this one-shot manner, user should still check subtask completion states to decide if another round is needed to retry failed subtasks, by simply triggering another task generation.
Running the task every 10min+ is a good starting point. You can then adjust the schedule and task parallelism to finish ingestion faster. We don’t recommend running tasks very frequently (for example, every few seconds) given schedules of tasks across all tables are handled by a single cron scheduler in the Pinot Controller, and frequent schedules may delay other tasks.
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
"input.fs.prop.region": "...",
"input.fs.prop.secretKey": "...",
"input.fs.prop.accessKey": "...",
"inputDirURI": "s3://..../",
"inputFormat": "parquet",
"includeFileNamePattern": "glob:**/*.parquet",
"taskMaxNumFiles": "32",
"tableMaxNumTasks": "10",
"maxNumRecordsPerSegment": "5000000",
"schedule": "0 */10 * * * ?"
}
}
},
"tableIndexConfig": {
"segmentNameGeneratorType": "normalizedDate"
}
The segmentNameGeneratorType(opens in a new tab) parameter is useful when the time column has date formatted string values instead of numeric epoch values. By default ‘simple’ generator is used and it only works with numeric epoch values.
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
"input.fs.className": "org.apache.pinot.plugin.filesystem.GcsPinotFS",
"input.fs.prop.projectId": "<project>",
"input.fs.prop.gcpKey": "/home/pinot/gcp/credentials.json",
"inputDirURI": "gs://<BucketName>/<Some prefix>/",
"includeFileNamePattern": "glob:**/*.parquet",
"schedule": "0 */10 * * * ?",
"inputFormat": "parquet",
"tableMaxNumTasks": "400",
"taskMaxDataSize": "1G",
"maxNumRecordsPerSegment": "2147483647",
"push.mode": "metadata"
}
}
},
In the following subsections, we’ll discuss some advanced features and configs when processing the input data via FileIngestionTask.
Derive Columns from The Source File Paths
This is useful when the source file directory is partitioned on some dimensions present in the file path(eg: time with day as the smallest bucket). Those dimensions can be extracted as Pinot table columns.
Note that <seq>
is an identifier, properties with the same <seq>
form a group and are used together to extract or transform a value.
To help readers understand how it works, here is an example. Say the file path is s3://bucket/year=2022/month=05/day=20/file.csv
, and a table has the following schema and configurations
{
"schemaName": "...",
"dateTimeFieldSpec": [
{
"name": "date",
"dataType": "STRING",
"format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy/MM/dd"
}
],
...
}
{
...
"task": {
"taskTypeConfigsMap": {
"FileIngestionTask": {
...
"pathColumn.0.name": "year",
"pathColumn.0.output": "false",
"pathColumn.0.pathComponentStartIndex": "1",
"pathColumn.0.numPathComponentsToExtract": "1",
"pathColumn.0.charStartIndex": "5",
"pathColumn.0.numCharsToExtract": "4",
"pathColumn.1.name": "month",
"pathColumn.1.output": "false",
"pathColumn.1.pathComponentStartIndex": "2",
"pathColumn.1.numPathComponentsToExtract": "1",
"pathColumn.1.regex": "^month=(.*)$")
"pathColumn.2.name": "day",
"pathColumn.2.output": "false",
"pathColumn.2.pathComponentStartIndex": "3",
"pathColumn.2.numPathComponentsToExtract": "1",
"pathColumn.2.charStartIndex": "4",
"pathColumn.2.numCharsToExtract": "2"
"pathColumn.3.name": "date",
"pathColumn.3.output": "true",
"pathColumn.3.transform": "${year}/${month}/${day}",
...
}
}
},
Those 4 groups (indicated by 0 ~ 3) of pathColumn
work as following:
- The first group extracts 2022 as “year”
- The second group extracts 01 as “month”
- The third group extracts 01 as “day”
- The fourth group combines “year”, “month” and “day” as “date”, and mark “date” as a column in the Pinot table
Partition Data by Sub-directory Level in The Source File Paths
This feature is useful to group data from different folders and files into the same segment or set of segments.