Pilars of Apache Kafka security
Overview
When using distributed architecture, a message broker is a key player component in the architecture model, we need asynchronous tools to decouple different components and avoid cascade failure.
Apache Kafka is the most mature messaging broker, it has been developed by Linkendin and then Donated to Apache community.
Installing and managing Apache Kafka cluster requires some specific challenges, one of them is the security of different components involved in the Apache Kafka ecosystem
A production Kafka ecosystem is usually composed of:
- Zookeeper cluster: which is used to store Kafak metadata like topic, broker configuration, partitions,.. etc., Zookeeper is composed by more than 1 node, usually three or five nodes (Quarum).
- Breakers: witch is a cluster of brokers used to store data within topics, in a production environment at least three brokers are used, to ensure the resiliency and fault tolerance of the system.
- Producer APP: which are client application, with responsibility to produce data in the given topic
- Consumer APP: which are client application, with responsibility to consume data from topics
- Administration tools: are tools like Kafka Cli witch allow an administrator to manage the cluster.
As you can see Kafka ecosystem is a distributed system, data is exchanged between different component through the network, and each component has its own responsibilities. By default, data is exchanged on clear without encryption, and without authentication or authorization, which can bring some security issues within the system.
In this post we will see together, how to efficiently secure the traffic between the different component by implementing encryption, authentication and authorization
You can download source code of this post from GitHub
Setup your own PKI
Let’s start creating our PKI (private key infrastructure)
For this post I am using a self signed certifcate as a certificate Authority, in a prouction environnement you need to use your company CA. We need to create a server certificate for each node in the Kafka infrastructure, in our example three nodes of ZK an three nodes of Apache kafka borkers.
We also need to create a client certificate for Kafka admin tools, this certificate will be used later to authenticate admin to Apache Kafka cluster.
Kafka is JAVA application, so we need to package server certificate and CA certificate on JKS format.
- Trust Stores contain CA certificate
- Key Stores contain server certificate and private key
You can use kafka-pki/generate-ca.sh to generate self signed certificate
#CA Certificate
openssl genrsa -out rootCA.key 4096
openssl req -x509 -new -nodes -key rootCA.key \
-sha256 -days 1024 \
-subj "/C=CA/ST=QC/L=Montreal/O=Digital Infornation Inc CA Root Authority/OU=IT" \
-out rootCA.crt
Once it’s done, you can use generate-truststore. Sh to create a trust store for each node the first argument is the FQDN of your node and the second argument is the password used to secure the JKS
INSTANCE=$1
PASSWORD=$2
CA_ALIAS="rootCA"
CA_CERT_FILE="RootCA.crt"
#### Generate Truststore and import ROOT CA certificate ####
keytool -keystore truststore/$INSTANCE.truststore.jks \
-import -alias $CA_ALIAS -file $CA_CERT_FILE \
-storepass $PASSWORD
The last step is to create key stores, key store contains a server certificate signed by the CA, private key and the CA certificate Use generate-keystore.sh script to create a key store for each node The first argument is the FQDN of your node and the second argument is the password used to secure the JKS
COMMON_NAME=$1
PASSWORD=$2
ORGANIZATIONAL_UNIT="IT"
ORGANIZATION="Digital Infornation Inc CA Root Authority"
CITY="Montreal"
STATE="QC"
COUNTRY="CA"
CA_ALIAS="ca-root"
CA_CERT_FILE="rootCa.crt"
CA_KEY_FILE="rootCa.key"
VALIDITY_DAYS=365
# Generate Keystore with Private Key
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
-alias $COMMON_NAME -validity $VALIDITY_DAYS \
-genkey -keyalg RSA \
-storepass $PASSWORD \
-dname "CN=$COMMON_NAME, OU=$ORGANIZATIONAL_UNIT, O=$ORGANIZATION, L=$CITY, ST=$STATE, C=$COUNTRY"
# Generate Certificate Signing Request (CSR) using the newly created KeyStore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
-storepass $PASSWORD \
-alias $COMMON_NAME -certreq -file $COMMON_NAME.csr
# Sign the CSR using the custom CA
openssl x509 -req -CA $CA_CERT_FILE -CAkey $CA_KEY_FILE \
-in $COMMON_NAME.csr -out $COMMON_NAME.signed \
-days $VALIDITY_DAYS -CAcreateserial
# Import ROOT CA certificate into Keystore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
-storepass $PASSWORD \
-alias $CA_ALIAS -importcert -file $CA_CERT_FILE
# Import newly signed certificate into Keystore
keytool -keystore keystore/$COMMON_NAME.keystore.jks \
-storepass $PASSWORD \
-alias $COMMON_NAME -importcert -file $COMMON_NAME.signed
# Clean-up
rm $COMMON_NAME.csr
rm $COMMON_NAME.signed
rm rootCa.srl
Encription
Now that we have a PKI setup, we can start encrypting the data exchanged between different component. I am using docker-compose file to simulate Kafka ecosystem, with three ZK Quarium and three Kafka brokers.
We are going to start by sharing JKS files with each docker container by sharing desktop volume with docker containers
Then we configure ZK to use SSL when replicating data between different node in the quarum, we also configure secure client port, to allow Apache Kafka brokers, witch are ZK client to use secure endpoint
The same configuration is duplicated to two others nodes of ZK quorum
zookeeper-1:
image: zookeeper:3.7.0
hostname: zookeeper-1
container_name: zookeeper-1
volumes:
- ../kafka-pki/keystore/zookeeper-1.keystore.jks:/security/zookeeper-1.keystore.jks
- ../kafka-pki/truststore/zookeeper-1.truststore.jks:/security/zookeeper-1.truststore.jks
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
ZOO_CFG_EXTRA: "sslQuorum=true
portUnification=false
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
ssl.quorum.hostnameVerification=false
ssl.quorum.keyStore.location=/security/zookeeper-1.keystore.jks
ssl.quorum.keyStore.password=password
ssl.quorum.trustStore.location=/security/zookeeper-1.truststore.jks
ssl.quorum.trustStore.password=password
secureClientPort=2281
ssl.hostnameVerification=false
ssl.keyStore.location=/security/zookeeper-1.keystore.jks
ssl.keyStore.password=password
ssl.trustStore.location=/security/zookeeper-1.truststore.jks
ssl.trustStore.password=password"
From the broker perspective, we need to configure Kafka broker to connect to ZK secure port and to replicate topics data between broker using SSL
We need to add a listener using SSL protocol, then we need to configure the broker with key store and trust store and enforce communication inter broker to use the SSL protocol
The same configuration is duplicated with all Kafka cluster nodes
broker-1:
image: confluentinc/cp-kafka:7.0.1
hostname: broker-1
container_name: broker-1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9191:9191"
volumes:
- ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks
- ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks
- ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL:SSL
KAFKA_ADVERTISED_LISTENERS: SSL://broker-1:9191
KAFKA_LISTENERS: SSL://broker-1:9191
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true"
KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty
KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password
KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password
KAFKA_SSL_CLIENT_AUTH: none
KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: password
KAFKA_SSL_KEY_PASSWORD: password
KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: password
KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt
KAFKA_SSL_KEY_CREDENTIALS: password.txt
KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
Form the consumer and producer perspective, we need to adjust the configuration to use the SSL protocol, by adding a CA public certificate to application trust store.
Producer
bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SSL
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
Consumer
bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SSL
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
group.id=consumer.group-1
Authentication
Now that traffic is encrypted, the next step is to enforce authentication, Apache Kafka offers different protocols to implement authentication. In this post we are going to use mutual SSL or mTLS to authenticate brokers to each others, brokers need to communicate between them to replicate data, mTLS will also be used to authenticate administrator tools.
For a client application (producers and consumers) we are going to use SASL-SCRAM, which allows client applications to authenticate using user name and secret.
We need to add another Kafka listener with security protocol SASL_SSL, then to adapt the configuration to enforce Kafka client authentication, then we enable SASL MECHANISMS SCRAM-SHA-512.
To enable scram we need to add JAAS configuration to java security login configuration
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
;
};
The same configuration must be done in each broker of Apache Kafka cluster
broker-1:
image: confluentinc/cp-kafka:7.0.1
hostname: broker-1
container_name: broker-1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- "9191:9191"
- "9291:9291"
volumes:
- ../kafka-pki/keystore/broker-1.keystore.jks:/etc/kafka/secrets/broker-1.keystore.jks
- ../kafka-pki/truststore/broker-1.truststore.jks:/etc/kafka/secrets/broker-1.truststore.jks
- ../kafka-pki/password.txt:/etc/kafka/secrets/password.txt
- ./jaas/sasl-scram/ssl-scram-jaas-broker.conf:/etc/kafka/secrets/ssl-scram-jaas-broker.conf
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2281,zookeeper-2:2281,zookeeper-3:2281
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SASL_SSL:SASL_SSL,SSL:SSL
KAFKA_ADVERTISED_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
KAFKA_LISTENERS: SASL_SSL://broker-1:9191,SSL://broker-1:9291
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: "true"
KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: org.apache.zookeeper.ClientCnxnSocketNetty
KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: password
KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: password
KAFKA_SSL_CLIENT_AUTH: required
KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/broker-1.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: password
KAFKA_SSL_KEY_PASSWORD: password
KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/broker-1.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: password
KAFKA_SSL_KEYSTORE_FILENAME: broker-1.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: password.txt
KAFKA_SSL_KEY_CREDENTIALS: password.txt
KAFKA_SSL_TRUSTSTORE_FILENAME: broker-1.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: password.txt
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-512
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/ssl-scram-jaas-broker.conf"
To add a client application user name and secret, we are going to use Kafka client tools, SSL_SCRAM stores users and secret as Kafka configuration in Zookeeper. The script below adds two users, producer and consumer as Kafka application users
PRODUCER_USER="producer"
PRODUCER_PASSWORD="P@ssMordProducerScram"
CONSUMER_USER="consumer"
CONSUMER_PASSWORD="P@ssMordConsumerScram"
#Add Path to kafka bin folder
KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh
${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
--add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
--entity-type users \
--entity-name $PRODUCER_USER
${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
--add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
--entity-type users \
--entity-name $CONSUMER_USER
The last modification that we need to do is to adaptclient applications configuration, to use SCRAM_SSL protocol.
producer.config
bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SASL_SSL
sasl.mechanisms=SCRAM-SHA-512
sasl.username=producer
sasl.password=P@ssMordProducerScram
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
consumer.config
bootstrap.servers=broker-1:9192,broker-2:9193,broker-3:9194
security.protocol=SASL_SSL
sasl.mechanisms=SCRAM-SHA-512
sasl.username=consumer
sasl.password=P@ssMordConsumerScram
ssl.ca.location=~/Documents/work/kafka-security/kafka-pki/rootCA.crt
group.id=consumer.group-1
Authorization
Access Control Lists (ACLs) provide important solution to grant each Apache Kafka specific right to specific Kafka resources.
Apache Kafka ships with a pluggable, out-of-the-box Authorizer implementation that uses Apache ZooKeeper to store all the ACLs. It is important to set ACLs because otherwise access to resources is limited to super users when an Authorizer is configured.
The default behavior is that if a resource has no associated ACLs, then no one is allowed to access the resource including brokers, except super users.
We need to configure brokers as super user, to allow brokers to communicate between them, we need also to configure Admin user (user to be used by admin client tools) as super user As a best practice, you should restrict the super user list of users configured with mTLS Authentication.
We need to add the configuration below to each Kafka broker, node that super user list contains the subject name of the broker server certificate and the subject name of the client admin tools certificate
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_SUPER_USERS: "User:CN=broker-1,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
User:CN=broker-2,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
User:CN=broker-3,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA;
User:CN=admin,OU=IT,O=Digital Infornation Inc CA Root Authority,L=Montreal,ST=QC,C=CA"
To configure ACL, we need to use Kafka client tools, we need to create a topic, and grant producer and consumer users specific access rules to the created topic.
- Grant producer rules to publish events on the topic
- Grant consumer rules to consumer events with a specific consumer group from the same topic
The script below creates the desired configuration
#!/bin/bash
PRODUCER_USER="producer"
PRODUCER_PASSWORD="P@ssMordProducerScram"
CONSUMER_USER="consumer"
CONSUMER_PASSWORD="P@ssMordConsumerScram"
TOPIC_TEST="myTestTopic"
CONSUMER_GROUP="consumer.group-1"
#Add Path to kafka bin folder
KAFKA_BIN_FOLDER=~/Documents/work/kafka_2.13-3.1.0/bin
chmod +x ${KAFKA_BIN_FOLDER}/kafka-configs.sh
chmod +x ${KAFKA_BIN_FOLDER}/kafka-acls.sh
${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
--add-config 'SCRAM-SHA-512=[password='${PRODUCER_PASSWORD}']' \
--entity-type users \
--entity-name $PRODUCER_USER
${KAFKA_BIN_FOLDER}/kafka-configs.sh --zookeeper localhost:12181 --alter \
--add-config 'SCRAM-SHA-512=[password='${CONSUMER_PASSWORD}']' \
--entity-type users \
--entity-name $CONSUMER_USER
${KAFKA_BIN_FOLDER}/kafka-acls.sh \
--bootstrap-server broker-1:9291 \
--list \
--command-config ./admin.properties
${KAFKA_BIN_FOLDER}/kafka-topics.sh \
--bootstrap-server broker-1:9291 \
--create \
--topic $TOPIC_TEST \
--partitions 3
--command-config ./admin.properties
${KAFKA_BIN_FOLDER}/kafka-acls.sh \
--bootstrap-server broker-1:9291 \
--add \
--allow-principal User:$PRODUCER_USER \
--operation WRITE \
--topic $TOPIC_TEST \
--command-config ./admin.properties
${KAFKA_BIN_FOLDER}//kafka-acls.sh \
--bootstrap-server broker-1:9291 \
--add \
--allow-principal User:$CONSUMER_USER \
--operation READ \
--topic $TOPIC_TEST \
--command-config ./admin.properties
${KAFKA_BIN_FOLDER}//kafka-acls.sh \
--bootstrap-server broker-1:9291 \
--add \
--allow-principal User:$CONSUMER_USER \
--operation All \
--group $CONSUMER_GROUP \
--command-config ./admin.properties
we need to authentiate using admin server certificate witch is configured as super user.
admin.properties
security.protocol=SSL
ssl.truststore.location=~/Documents/work/kafka-security/kafka-pki/truststore/admin.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=~/Documents/work/kafka-security/kafka-pki/keystore/admin.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
There is no need to change Kafka client configuration !!