Question
We run Flink jobs in Ververica Platform and process Kafka messages. Our Kafka cluster has authentication enabled via SASL. How can I avoid showing the user credentials in plain text in deployment specs and table DDLs?
Answer
Note: This section applies to Flink 1.11-1.14 with Ververica Platform 2.3-2.8.
When connecting to an authentication-enabled Kafka cluster, your Flink job, as a Kafka client, needs to be configured with user credentials in order to be authenticated to the Kafka cluster. While Kafka supports different authentication mechanisms, this article covers only authentication with SASL where the user credentials are supplied via JAAS configuration.
The first way to supply JAAS configuration in a Flink job is via the consumer/producer property `sasl.jaas.config`. In order to avoid having user credentials in plain text, you can code some logic in the job to read the JAAS configuration somewhere, e.g., from environment variables backed by Kubernetes Secrets (for other ways to inject properties into a Flink job, see How to inject many properties into Flink Jobs running on Ververica Platform), and build the property `sasl.jaas.config` internally before your job connects to Kafka. Obviously, this applies only to Flink jobs that are packaged as jar files. Flink SQL jobs do not have such flexibility.
The second way is to supply JAAS configuration via the JVM System property `java.security.auth.login.config`. To secure the JAAS configuration, you can use Kubernetes secrets. Here are the detailed steps that apply to both jobs packaged as jars and SQL jobs.
(1) Have the JAAS config in a temporary file called `kafka_jaas.conf`. For example:
#tabs
## Jobs Packaged as Jars
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-password"; };
## SQL Jobs
KafkaClient {
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required
username="your-username"
password="your-password";
};
Note: For SQL jobs, you need to use the shaded package name.
#--
(2) Create a Kubernetes secret with the JAAS file in the Kubernetes namespace where your job will run:
kubectl create secret generic kafka-jaas-config \
-n your-job-namespace
--from-file=kafka_jaas.conf
(3) Mount the secret in the JM & TM pods and set the JVM system property by adding the following into your deployment spec:
spec: template: spec: flinkConfiguration: env.java.opts: '-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_jaas.conf' kubernetes: jobManagerPodTemplate: spec: containers: - name: flink-jobmanager volumeMounts: - mountPath: /etc/kafka/secrets name: jaas-config volumes: - name: jaas-config secret: secretName: kafka-jaas-config taskManagerPodTemplate: spec: containers: - name: flink-taskmanager volumeMounts: - mountPath: /etc/kafka/secrets name: jaas-config volumes: - name: jaas-config secret: secretName: kafka-jaas-config
(4) Connect to your Kafka cluster by specifying the consumer/producer properties. For example:
#tabs
## Jobs Packaged as Jars
kafkaProducerProps.setProperty("bootstrap.servers", ...);
kafkaProducerProps.setProperty("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.setProperty("sasl.mechanism", "SCRAM-SHA-256");
## SQL Jobs
CREATE TABLE mytable ( ... ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '...:9092',
'topic' = '...', 'properties.sasl.mechanism' = 'SCRAM-SHA-256', 'properties.security.protocol' = 'SASL_PLAINTEXT' ... );
INSERT ...;
#--
Related Information
Kafka: Overview of Authentication Methods
KB: How to inject many properties into Flink Jobs running on Ververica Platform