Pilars of Apache Kafka security

Share on:

Overview

When using distributed architecture, a message broker is a key player component in the architecture model, we need asynchronous tools to decouple different components and avoid cascade failure.

Apache Kafka is the most mature messaging broker, it has been developed by Linkendin and then Donated to Apache community.

Installing and managing Apache Kafka cluster requires some specific challenges, one of them is the security of different components involved in the Apache Kafka ecosystem

A production Kafka ecosystem is usually composed of:

  • Zookeeper cluster: which is used to store Kafak metadata like topic, broker configuration, partitions,.. etc., Zookeeper is composed by more than 1 node, usually three or five nodes (Quarum).
  • Breakers: witch is a cluster of brokers used to store data within topics, in a production environment at least three brokers are used, to ensure the resiliency and fault tolerance of the system.
  • Producer APP: which are client application, with responsibility to produce data in the given topic
  • Consumer APP: which are client application, with responsibility to consume data from topics
  • Administration tools: are tools like Kafka Cli witch allow an administrator to manage the cluster.

As you can see Kafka ecosystem is a distributed system, data is exchanged between different component through the network, and each component has its own responsibilities. By default, data is exchanged on clear without encryption, and without authentication or authorization, which can bring some security issues within the system.

In this post we will see together, how to efficiently secure the traffic between the different component by implementing encryption, authentication and authorization

You can download source code of this post from GitHub


Setup your own PKI

Let’s start creating our PKI (private key infrastructure)

For this post I am using a self signed certifcate as a certificate Authority, in a prouction environnement you need to use your company CA. We need to create a server certificate for each node in the Kafka infrastructure, in our example three nodes of ZK an three nodes of Apache kafka borkers.

We also need to create a client certificate for Kafka admin tools, this certificate will be used later to authenticate admin to Apache Kafka cluster.

Kafka is JAVA application, so we need to package server certificate and CA certificate on JKS format.

  • Trust Stores contain CA certificate
  • Key Stores contain server certificate and private key

You can use kafka-pki/generate-ca.sh to generate self signed certificate

#CA Certificate
openssl genrsa  -out rootCA.key 4096

openssl req -x509 -new -nodes -key rootCA.key \
        -sha256 -days 1024 \
        -subj "/C=CA/ST=QC/L=Montreal/O=Digital Infornation Inc CA Root Authority/OU=IT" \
        -out rootCA.crt

Once it’s done, you can use generate-truststore. Sh to create a trust store for each node the first argument is the FQDN of your node and the second argument is the password used to secure the JKS

INSTANCE=$1
PASSWORD=$2
CA_ALIAS="rootCA"
CA_CERT_FILE="RootCA.crt" 

#### Generate Truststore and import ROOT CA certificate ####
keytool -keystore truststore/$INSTANCE.truststore.jks \
 -import -alias $CA_ALIAS -file $CA_CERT_FILE \
 -storepass $PASSWORD

The last step is to create key stores, key store contains a server certificate signed by the CA, private key and the CA certificate Use generate-keystore.sh script to create a key store for each node The first argument is the FQDN of your node and the second argument is the password used to secure the JKS

COMMON_NAME=$1
PASSWORD=$2
ORGANIZATIONAL_UNIT="IT"
ORGANIZATION="Digital Infornation Inc CA Root Authority"
CITY="Montreal"
STATE="QC"
COUNTRY="CA"

CA_ALIAS="ca-root"
CA_CERT_FILE="rootCa.crt" 
CA_KEY_FILE="rootCa.key" 

VALIDITY_DAYS=365

# Generate Keystore with Private Key
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
            -alias $COMMON_NAME -validity $VALIDITY_DAYS \
            -genkey -keyalg RSA \
            -storepass $PASSWORD \
            -dname "CN=$COMMON_NAME, OU=$ORGANIZATIONAL_UNIT, O=$ORGANIZATION, L=$CITY, ST=$STATE, C=$COUNTRY"

# Generate Certificate Signing Request (CSR) using the newly created KeyStore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
         -storepass $PASSWORD \
         -alias $COMMON_NAME -certreq -file $COMMON_NAME.csr 

# Sign the CSR using the custom CA
openssl x509 -req -CA $CA_CERT_FILE -CAkey $CA_KEY_FILE \
         -in $COMMON_NAME.csr -out $COMMON_NAME.signed \
         -days $VALIDITY_DAYS -CAcreateserial

# Import ROOT CA certificate into Keystore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
    -storepass $PASSWORD \
    -alias $CA_ALIAS -importcert -file $CA_CERT_FILE

# Import newly signed certificate into Keystore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
    -storepass $PASSWORD \
    -alias $COMMON_NAME -importcert -file $COMMON_NAME.signed

# Clean-up 
rm $COMMON_NAME.csr
rm $COMMON_NAME.signed
rm rootCa.srl

Encription

Now that we have a PKI setup, we can start encrypting the data exchanged between different component. I am using docker-compose file to simulate Kafka ecosystem, with three ZK Quarium and three Kafka brokers.

We are going to start by sharing JKS files with each docker container by sharing desktop volume with docker containers

Then we configure ZK to use SSL when replicating data between different node in the quarum, we also configure secure client port, to allow Apache Kafka brokers, witch are ZK client to use secure endpoint

The same configuration is duplicated to two others nodes of ZK quorum

  zookeeper-1:
    image: zookeeper:3.7.0
    hostname: zookeeper-1
    container_name: zookeeper-1
    volumes:
        - ../kafka-pki/keystore/zookeeper-1.keystore.jks:/security/zookeeper-1.keystore.jks
        - ../kafka-pki/truststore/zookeeper-1.truststore.jks:/security/zookeeper-1.truststore.jks
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
      ZOO_CFG_EXTRA: "sslQuorum=true
                portUnification=false
                serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

                ssl.quorum.hostnameVerification=false
                ssl.quorum.keyStore.location=/security/zookeeper-1.keystore.jks
                ssl.quorum.keyStore.password=password
                ssl.quorum.trustStore.location=/security/zookeeper-1.truststore.jks
                ssl.quorum.trustStore.password=password

                secureClientPort=2281
                ssl.hostnameVerification=false
                ssl.keyStore.location=/security/zookeeper-1.keystore.jks
                ssl.keyStore.password=password
                ssl.trustStore.location=/security/zookeeper-1.truststore.jks
                ssl.trustStore.password=password"

From the broker perspective, we need to configure Kafka broker to connect to ZK secure port and to replicate topics data between broker using SSL

We need to add a listener using SSL protocol, then we need to configure the broker with key store and trust store and enforce communication inter broker to use the SSL protocol

The same configuration is duplicated with all Kafka cluster nodes


  broker-1:
    image: confluentinc/cp-kafka:7.0.1
    hostname: broker-1
    container_name: broker-1
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9191:9191"
    volumes:
        - ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks
        - ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks
        - ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt
 
    environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL:SSL
        KAFKA_ADVERTISED_LISTENERS: SSL://broker-1:9191
        KAFKA_LISTENERS: SSL://broker-1:9191
        KAFKA_DEFAULT_REPLICATION_FACTOR: 3
        KAFKA_MIN_INSYNC_REPLICAS: 2
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
        
        KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true"
        KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty
        KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
        KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password
        KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
        KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password

        KAFKA_SSL_CLIENT_AUTH: none
        KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
        KAFKA_SSL_KEYSTORE_PASSWORD: password
        KAFKA_SSL_KEY_PASSWORD: password
        KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
        KAFKA_SSL_TRUSTSTORE_PASSWORD: password

        KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks
        KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt
        KAFKA_SSL_KEY_CREDENTIALS: password.txt
        KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks
        KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL

Form the consumer and producer perspective, we need to adjust the configuration to use the SSL protocol, by adding a CA public certificate to application trust store.

Producer

bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SSL
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt

Consumer

bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SSL
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
group.id=consumer.group-1

Authentication

Now that traffic is encrypted, the next step is to enforce authentication, Apache Kafka offers different protocols to implement authentication. In this post we are going to use mutual SSL or mTLS to authenticate brokers to each others, brokers need to communicate between them to replicate data, mTLS will also be used to authenticate administrator tools.

For a client application (producers and consumers) we are going to use SASL-SCRAM, which allows client applications to authenticate using user name and secret.

We need to add another Kafka listener with security protocol SASL_SSL, then to adapt the configuration to enforce Kafka client authentication, then we enable SASL MECHANISMS SCRAM-SHA-512.

To enable scram we need to add JAAS configuration to java security login configuration

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required 
    ;
};

The same configuration must be done in each broker of Apache Kafka cluster

broker-1:
    image: confluentinc/cp-kafka:7.0.1
    hostname: broker-1
    container_name: broker-1
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - "9191:9191"
      - "9291:9291"
    volumes:
        - ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks
        - ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks
        - ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt
        - ./jaas/sasl-scram/ssl-scram-jaas-broker.conf:/etc/kafka/secrets/ssl-scram-jaas-broker.conf
 
    environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_SSL:SASL_SSL,SSL:SSL
          KAFKA_ADVERTISED_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
          KAFKA_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
          KAFKA_DEFAULT_REPLICATION_FACTOR: 3
          KAFKA_MIN_INSYNC_REPLICAS: 2
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          
          KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true"
          KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty
          KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
          KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password
          KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
          KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password

          KAFKA_SSL_CLIENT_AUTH: required
          KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
          KAFKA_SSL_KEYSTORE_PASSWORD: password
          KAFKA_SSL_KEY_PASSWORD: password
          KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
          KAFKA_SSL_TRUSTSTORE_PASSWORD: password

          KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks
          KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt
          KAFKA_SSL_KEY_CREDENTIALS: password.txt
          KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks
          KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt

          KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
          KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512          

          KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/ssl-scram-jaas-broker.conf"

To add a client application user name and secret, we are going to use Kafka client tools, SSL_SCRAM stores users and secret as Kafka configuration in Zookeeper. The script below adds two users, producer and consumer as Kafka application users

PRODUCER_USER="producer"
PRODUCER_PASSWORD="P@ssMordProducerScram"

CONSUMER_USER="consumer"
CONSUMER_PASSWORD="P@ssMordConsumerScram"

#Add Path to kafka bin folder
KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh


${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
        --add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
        --entity-type users \
        --entity-name $PRODUCER_USER

${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
        --add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
        --entity-type users \
        --entity-name $CONSUMER_USER

The last modification that we need to do is to adaptclient applications configuration, to use SCRAM_SSL protocol.

producer.config

bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SASL_SSL
sasl.mechanisms=SCRAM-SHA-512
sasl.username=producer
sasl.password=P@ssMordProducerScram
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt

consumer.config

bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SASL_SSL
sasl.mechanisms=SCRAM-SHA-512
sasl.username=consumer
sasl.password=P@ssMordConsumerScram
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
group.id=consumer.group-1

Authorization

Access Control Lists (ACLs) provide important solution to grant each Apache Kafka specific right to specific Kafka resources.

Apache Kafka ships with a pluggable, out-of-the-box Authorizer implementation that uses Apache ZooKeeper to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an Authorizer is configured.

The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource including brokers, except super users.

We need to configure brokers as super user, to allow brokers to communicate between them, we need also to configure Admin user (user to be used by admin client tools) as super user As a best practice, you should restrict the super user list of users configured with mTLS Authentication.

We need to add the configuration below to each Kafka broker, node that super user list contains the subject name of the broker server certificate and the subject name of the client admin tools certificate

    KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer

    KAFKA_SUPER_USERS: "User:CN=broker-1,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                        User:CN=broker-2,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                        User:CN=broker-3,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
                        User:CN=admin,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA"

To configure ACL, we need to use Kafka client tools, we need to create a topic, and grant producer and consumer users specific access rules to the created topic.

  • Grant producer rules to publish events on the topic
  • Grant consumer rules to consumer events with a specific consumer group from the same topic

The script below creates the desired configuration

#!/bin/bash

PRODUCER_USER="producer"
PRODUCER_PASSWORD="P@ssMordProducerScram"

CONSUMER_USER="consumer"
CONSUMER_PASSWORD="P@ssMordConsumerScram"

TOPIC_TEST="myTestTopic"
CONSUMER_GROUP="consumer.group-1"

#Add Path to kafka bin folder
KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh
chmod +x ${KAFKA_BIN_FOLDER}/kafka-acls.sh


${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
        --add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
        --entity-type users \
        --entity-name $PRODUCER_USER

${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
        --add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
        --entity-type users \
        --entity-name $CONSUMER_USER

${KAFKA_BIN_FOLDER}/kafka-acls.sh \
  --bootstrap-server broker-1:9291 \
  --list \
  --command-config ./admin.properties

${KAFKA_BIN_FOLDER}/kafka-topics.sh \
  --bootstrap-server broker-1:9291 \
  --create \
  --topic $TOPIC_TEST \
  --partitions 3 
  --command-config ./admin.properties

${KAFKA_BIN_FOLDER}/kafka-acls.sh \
  --bootstrap-server broker-1:9291 \
  --add \
  --allow-principal User:$PRODUCER_USER \
  --operation WRITE \
  --topic $TOPIC_TEST \
  --command-config ./admin.properties

${KAFKA_BIN_FOLDER}//kafka-acls.sh \
  --bootstrap-server broker-1:9291 \
  --add \
  --allow-principal User:$CONSUMER_USER \
  --operation READ \
  --topic $TOPIC_TEST \
  --command-config ./admin.properties

${KAFKA_BIN_FOLDER}//kafka-acls.sh \
  --bootstrap-server broker-1:9291 \
  --add \
  --allow-principal User:$CONSUMER_USER \
  --operation All \
  --group $CONSUMER_GROUP \
  --command-config ./admin.properties

we need to authentiate using admin server certificate witch is configured as super user.

admin.properties

security.protocol=SSL
ssl.truststore.location=~/Documents/work/kafka-security/kafka-pki/truststore/admin.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=~/Documents/work/kafka-security/kafka-pki/keystore/admin.keystore.jks
ssl.keystore.password=password
ssl.key.password=password

There is no need to change Kafka client configuration !!