Google Cloud Pubsub Ingestion

Prerequisites

The following settings need to be configured so that messages can be consumed correctly from a Pubsub topic.

Pubsub Topic Settings

The pubsub topic containing the data to be consumed should have the following configurations:

  1. Topic retention must be enabled. Otherwise, data prior to the creation of the subscriber will be lost.
  2. Topic’s message storage policy(opens in a new tab) must configured to use a single region. This is required for ordering guarantees in pubsub.

Pubsub Publisher Settings

The published data should contain certain attributes to work correctly with Pinot.

  1. If data is expected to be partitioned (See attributeFilterKey for more info), then a user-attribute representing the partition ID should be added to each Pubsub message(opens in a new tab). This attribute key should be specified in the streams configuration.
  2. Set orderingKey in the Pubsub message to be the same as the partitionId value used above.

Pinot Table Configuration

Apart from the common real-time table configuration(opens in a new tab), there are some extra config values that you need to setup.

Segment Configs

Segment completion under segmentsConfig should be set to DOWNLOAD in order to avoid errors during the segment completion protocol.

{
  "tableName": "<tableName>",
  ...,
  "segmentsConfig": {
    "completionConfig": {
      "completionMode": "DOWNLOAD"
    }, 
    ...
  }
}

Stream Configs

The following streams configs are available for configuration:

Type

String

Required

Yes

Description

Class name of the pubsub consumer factory. Allowed values: ai.startree.pinot.plugin.stream.pubsub.PubsubConsumerFactory

* Available since st-distribution 0.4.0

Environment variables

The PubsubConsumerFactory requires setting up an environmental variable called GOOGLE_APPLICATION_CREDENTIALS which contains the path to the GCP credentials file. Alternatively, the path to the GCP credentials file can be set using the config property credentialsFile. The former approach is strongly recommended.

Google Pubsub emulator

Google cloud SDK provides an emulator that can be used for local testing. To install the emulator, see the Testing apps locally with the emulator(opens in a new tab) documentation.

All configuration is the same as above, except the following:

Value

ai.startree.pinot.plugin.stream.pubsub.EmulatorPubsubConsumerFactory

The environment variable GOOGLE_APPLICATION_CREDENTIALS is not required. Instead, the environmental variable PUBSUB_EMULATOR_HOST should be set to the emulator endpoint in the form <hostname>:<port>.

Config Tuning Recommendations

subscription.ackDeadlineSeconds

This config is used for the pubsub subscription and specifies the deadline after which unacknowledged message is sent again to the pubsub client. It is recommended to use the max allowed value as Pinot consumer acknowledges messages in batches. Default value is 600 seconds (allowed max in pubsub client). Ideally, there should be no need to tune this config.

subscription.maxOutstandingCount

This config is used for flow control as the consumer implementation uses an asynchronous pull-based subscription. If there are more unack’d messages in the subscriber’s buffer, there is an increased chance of re-delivery in pubsub. Hence, if you notice a lot of log warnings about duplicate message received, you can tune down this number.

maxConsumedMessages

This config is used to control the number of messages processed (ie. record extracted and indexed into an in-memory pinot segment) in a batch. Message flow in pubsub is adaptive and varies over time. Hence, this value can be tuned in cases where indexing time is large and causes message re-delivery from pubsub.

queueOperationTimeout

This timeout is used by shared internal pubsub queues. The default value should be sufficient for most cases as there isn’t expected to be a lot of contention. If the maxConsumedMessages is configured correctly, then the queue will have sufficient capacity for buffering.