What are common best practices for using Kafka Sources and Sinks in Flink?
Note: This applies to Flink 1.9 - 1.11
1. Configure Kafka Transaction Timeouts with End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics (`FlinkKafkaProducer.Semantic#EXACTLY_ONCE`), it is strongly recommended to configure the Kafka transaction timeout to a duration longer than the maximum expected Flink job down time. Please also take into account that you may want to recover your Flink job after failures at night or on weekends.
To configure Kafka transaction timeouts, set:
- `transaction.max.timeout.ms` at the Kafka broker. The default value is 15 minutes.
- `transaction.timeout.ms` in `FlinkKafkaProducer`. Although the default value here is 1 hour, it is effectively capped by `transaction.max.timeout.ms` configured in the Kafka brokers.
When you recover a job from a checkpoint/savepoint which contains Kafka transactions, Flink will try to re-commit those transactions upon recovery. There are three scenarios here:
- The re-commit succeeds if the transactions are successfully committed upon recovery.
- You get a warning in the logs if a transaction has already been successfully committed before.
- The re-commit fails if its transaction has timed out.
While scenario 1 and 2 are ok, the job recovery will fail in scenario 3, unless your producer explicitly ignores such kinds of errors:
Warning: Ignoring a transaction timeout error may lead to data loss because the messages in the transaction may have been successfully processed by Flink (therefore Flink will not reprocess it again) but are not reflected in Kafka because the transaction was not committed.
2. Set Unique Operator UIDs Across Flink Jobs with End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics (`FlinkKafkaProducer.Semantic#EXACTLY_ONCE`), and you have multiple Flink jobs writing to the same Kafka cluster, please make sure that Task names and Operator UIDs of the Kafka sinks are unique across these jobs. Otherwise you may run into a `transactional.id` clash. This can result from running multiple incarnations of the same job code or even from different jobs if the Task names and Operator UIDs are not unique.
Tip: You can, for example, use prefixes or suffixes for task names and operator UIDs that you set via your main arguments provided during job submission if you are running the same code in multiple jobs.
Kafka uses `transactional.id` to identify the same producer instance across process restarts. When communicating to Kafka, `FlinkKafkaProducer` generates the `transactional.id` based the following info (see also in the Flink code)
- Task Name
- Operator UID
- Subtask Index
Using the same task name and operator UID would result in a clash on the `transactional.id` which is manifested by the following exception:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3. Checkpoint Interval with End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics (`FlinkKafkaProducer.Semantic#EXACTLY_ONCE`), Flink will use Kafka transactions to ensure exactly-once delivery. These transactions will be committed only when checkpoints complete. However, checkpoints may finish late due to many different reasons and thus the Kafka transaction timeout must be much larger than the configured checkpoint interval or the job may fail due to Kafka transaction timeouts.
You can solve it in either of the following ways:
- Configuring the checkpoint interval:
StreamExecutionEnvironment env = ...;
env.enableCheckpointing(1000); // unit is millisecond
- Configuring the Kafka transaction timeout:
see 1. Configure Correct Kafka Transaction Timeout with End-To-End Exactly-Once Delivery above
4. Concurrent Checkpoints
By default, each FlinkKafkaProducer has a pool of 5 KafkaProducers to support 4 concurrent checkpoints. More than 4 concurrent checkpoints will results in job failure. To support more concurrent checkpoints, you can pass the desired `kafkaProducersPoolSize` to the constructor of `FlinkKafkaProducer`:
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
5. Kafka Connector Version
6. Avoid Log Flooding when Kafka clusters are unavailable
When the Kafka cluster has fewer brokers than the configured minimal in-sync replicas (`min.insync.replicas`), you will see messages like these in the log for every completed checkpoint:
2020-03-24 16:05:08.971Z WARN [ctionEventSink (1/1)] f.s.c.k.FlinkKafkaConsumerBase : Consumer subtask 0 failed async Kafka commit.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Similarly, when the Kafka cluster is unavailable, the following lines show up and flood the log:
2020-03-24 16:16:33.488Z WARN [tryRequestSink (1/1)] o.a.k.clients.NetworkClient : [Consumer clientId=consumer-73, groupId=elastic-gateway-ci1] Connection to node 4 (hostName/hostIp:49092) could not be established. Broker may not be available.
To avoid the log being flooded with these messages, set `reconnect.backoff.max.ms` and `reconnect.backoff.ms` in `FlinkKafkaConsumer`.
- Flink Kafka Connector Caveats
- Flink Checkpointing
- Kafka Broker Config: transaction.max.timeout.ms
- Kafka Producer Config: transaction.timeout.ms
- Kafka Consumer Config: reconnect backoff setting