File Ingestion

The FileIngestionTask is responsible for converting files into segments. These files can originate from various sources, such as Amazon S3, Google Cloud Storage (GCS), Azure Data Lake Storage (ADLS), or a local file system. Additionally, other file systems are supported by implementing the PinotFS interface.

By default, the FileIngestionTask is designed for OFFLINE tables during the ingestion stage. However, it can also be configured to backfill data into REALTIME tables when necessary.

This task can be customized to suit various use cases. For instance, you can configure it to allow the addition of new files or enable updates to existing files, depending on your specific requirements.

💡

Keep an eye on the data retention value set in table config when ingesting old datasets. If the value is set too low segments will be removed by the data retention task soon after they are ingested.

Pinot Table Configuration

The following sections show a few common use cases and how to customize the FileIngestionTask for them.

Bootstrap

Given a folder of many many files, ingest all of them into segments with indices and sizes as configured. During Proof of Concepts, it’s common to bootstrap Pinot tables with very large datasets. For those cases we can assume the input files won’t change during ingestion.

The task configurations are described below.

Access The Input Folder

Yes

The input folder.

💡

The OSS Pinot docs(opens in a new tab) have explained the push modes clearly. In short, with Tar mode, the segments are uploaded from tasks to controller directly; with Metadata mode, the tasks extract metadata from segments, upload segments to deep store and segment metadata to controller separately. If only a few minion tasks run in parallel, Tar mode should just work fine. If many tens or hundreds of tasks run in parallel, then better use Metadata mode.

Control The Ingestion Parallelism

Required

No

Description

The max number of parallel tasks a table can run at any time. It’s 10 by default. But if set to -1 explicitly, tasks are generated as many as needed to ingest all files in one round.

💡

If set tableMaxNumTasks to -1, tasks are generated as many as needed to ingest all files in one round, with taskMaxDataSize to control the workload for each subtask. In this one-shot manner, user should still check subtask completion states to decide if another round is needed to retry failed subtasks, by simply triggering another task generation.

💡

Running the task every 10min+ is a good starting point. You can then adjust the schedule and task parallelism to finish ingestion faster. We don’t recommend running tasks very frequently (for example, every few seconds) given schedules of tasks across all tables are handled by a single cron scheduler in the Pinot Controller, and frequent schedules may delay other tasks.

A sample task configuration for S3

 "task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
          "input.fs.prop.region": "...",
          "input.fs.prop.secretKey": "...",
          "input.fs.prop.accessKey": "...",
 
          "inputDirURI": "s3://..../",
          "inputFormat": "parquet",
          "includeFileNamePattern": "glob:**/*.parquet",
 
          "taskMaxNumFiles": "32",
          "tableMaxNumTasks": "10",
          "maxNumRecordsPerSegment": "5000000",
          "schedule": "0 */10 * * * ?"
        }
      }
    },
 
    "tableIndexConfig": {
      "segmentNameGeneratorType": "normalizedDate"
    }
💡

The segmentNameGeneratorType(opens in a new tab) parameter is useful when the time column has date formatted string values instead of numeric epoch values. By default ‘simple’ generator is used and it only works with numeric epoch values.

A sample task configuration for GCS

     "task": {
      "taskTypeConfigsMap": {
        "FileIngestionTask": {
          "input.fs.className": "org.apache.pinot.plugin.filesystem.GcsPinotFS",
          "input.fs.prop.projectId": "<project>",
          "input.fs.prop.gcpKey": "/home/pinot/gcp/credentials.json",
          "inputDirURI": "gs://<BucketName>/<Some prefix>/",
          "includeFileNamePattern": "glob:**/*.parquet",
          "schedule": "0 */10 * * * ?",
          "inputFormat": "parquet",
          "tableMaxNumTasks": "400",
          "taskMaxDataSize": "1G",
          "maxNumRecordsPerSegment": "2147483647",
          "push.mode": "metadata"
        }
      }
    },

In the following subsections, we’ll discuss some advanced features and configs when processing the input data via FileIngestionTask.

Derive Columns from The Source File Paths

This is useful when the source file directory is partitioned on some dimensions present in the file path(eg: time with day as the smallest bucket). Those dimensions can be extracted as Pinot table columns.

Note that <seq> is an identifier, properties with the same <seq> form a group and are used together to extract or transform a value.

To help readers understand how it works, here is an example. Say the file path is s3://bucket/year=2022/month=05/day=20/file.csv, and a table has the following schema and configurations

{
  "schemaName": "...",
  "dateTimeFieldSpec": [
    {
      "name": "date",
      "dataType": "STRING",
      "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy/MM/dd"
    }
  ],
  ...
}
{
  ...
  "task": {
    "taskTypeConfigsMap": {
      "FileIngestionTask": {
        ...
        "pathColumn.0.name": "year",
        "pathColumn.0.output": "false",
        "pathColumn.0.pathComponentStartIndex": "1",
        "pathColumn.0.numPathComponentsToExtract": "1",
        "pathColumn.0.charStartIndex": "5",
        "pathColumn.0.numCharsToExtract": "4",
        "pathColumn.1.name": "month",
        "pathColumn.1.output": "false",
        "pathColumn.1.pathComponentStartIndex": "2",
        "pathColumn.1.numPathComponentsToExtract": "1",
        "pathColumn.1.regex": "^month=(.*)$")
        "pathColumn.2.name": "day",
        "pathColumn.2.output": "false",
        "pathColumn.2.pathComponentStartIndex": "3",
        "pathColumn.2.numPathComponentsToExtract": "1",
        "pathColumn.2.charStartIndex": "4",
        "pathColumn.2.numCharsToExtract": "2"
        "pathColumn.3.name": "date",
        "pathColumn.3.output": "true",
        "pathColumn.3.transform": "${year}/${month}/${day}",
        ...
      }
    }
  },

Those 4 groups (indicated by 0 ~ 3) of pathColumn work as following:

  1. The first group extracts 2022 as “year”
  2. The second group extracts 01 as “month”
  3. The third group extracts 01 as “day”
  4. The fourth group combines “year”, “month” and “day” as “date”, and mark “date” as a column in the Pinot table

Partition Data by Sub-directory Level in The Source File Paths

This feature is useful to group data from different folders and files into the same segment or set of segments.