Connecting to Kafka with SASL authentication

In this guide we’ll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SASL authentication(opens in a new tab).

Prerequisites

To follow the code examples in this guide, you must install Docker(opens in a new tab) locally and download recipes.

Navigate to recipe

  1. If you haven’t already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/kafka-sasl

Launch Pinot and Kafka Clusters

You can spin up Pinot and Kafka clusters by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml(opens in a new tab) file on GitHub.

The Kafka cluster is launched with the following JAAS (Java Authentication and Authorization Service) for SASL configuration

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};
Client{};

We have two users:

  • admin with the password admin-secret
  • alice with the password alice-secret

Pinot Schema and Table

Let’s create a Pinot Schema and Table.

The schema is defined below:

{
  "schemaName":"events",
  "dimensionFieldSpecs":[
    {
      "name":"uuid",
      "dataType":"STRING"
    }
  ],
  "metricFieldSpecs":[
    {
      "name":"count",
      "dataType":"INT"
    }
  ],
  "dateTimeFieldSpecs":[
    {
      "name":"ts",
      "dataType":"TIMESTAMP",
      "format":"1:MILLISECONDS:EPOCH",
      "granularity":"1:MILLISECONDS"
    }
  ]
}

config/schema.json

And the table config below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-sasl:9093",
      "security.protocol": "SASL_PLAINTEXT",
      "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table.json

The part of this configuration that we’re interested in is highlighted. The credentials that we want to use are specified in the sasl.jaas.config property.

💡

If our Kafka cluster has SSL enabled, we would need to specify security_protocol as SASL_SSL instead of SASL_PLAINTEXT. For an example of using SSL and SASL, see Connecting to Kafka with SSL and SASL authentication

Create the table and schema by running the following command:

docker exec -it pinot-controller-sasl bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec

Ingesting Data

Next, we’re going to ingest some data into Kafka:

while true; do
  ts=`date +%s%N | cut -b1-13`;
  uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
  count=$[ $RANDOM % 1000 + 0 ]
  echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done | docker exec -i kafka-sasl /opt/kafka/bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --producer.config /etc/kafka/kafka_client.conf \
    --topic events;

We need to pass in a configuration file with SASL credentials when producing events as well. The contents of the configuration file are shown below:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="alice" \
        password="alice-secret";

/etc/kafka/kafka_client.conf

Querying

Now let’s navigate to localhost:9000/#/query(opens in a new tab) and copy/paste the following query:

select count(*), sum(count) 
from events 

You will see the following output:

Connecting To Kafka With SASL Authentication StarTree Developer Hub (1)
Query Results