Question
What are common best practices for using Kafka Connectors in Flink?
Answer
Note: This applies to Flink 1.9 and later.
Starting from Flink 1.14, `KafkaSource` and `KafkaSink`, developed based on the new source API (FLIP-27) and the new sink API (FLIP-143), are the recommended Kafka connectors. `FlinkKafkaConsumer` and `FlinkKafkaProducer` are deprecated. When it is not stated separately, we will use Flink Kafka consumer/producer to refer to both the old and the new connector.
1. Configure Applicable Kafka Transaction Timeouts With End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics, it is strongly recommended to configure the Kafka transaction timeout to a duration longer than the maximum checkpoint duration plus the maximum expected Flink job downtime. Please also take into account that you may want to recover your Flink job after failures at night or on weekends.
To configure Kafka transaction timeouts, set:
- `transaction.max.timeout.ms` at the Kafka broker. The default value is 15 minutes.
- `transaction.timeout.ms` in the Flink Kafka producer. See below. Although the default value here is 1 hour, it is effectively capped by `transaction.max.timeout.ms` configured in the Kafka brokers.
#tabs
##KafkaSink
Properties properties = new Properties(); properties.setProperty("transaction.timeout.ms", "7200000"); // e.g., 2 hours KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers("localhost:9092") .setKafkaProducerConfig(properties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("my-trx-id-prefix") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic-name") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .build();
##FlinkKafkaProducer
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("transaction.timeout.ms", "7200000"); // e.g., 2 hours FlinkKafkaProducer myProducer = new FlinkKafkaProducer<>( "topic-name", // target topic new SimpleStringSchema(), // serialization schema producerProperties, // producer config FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
#--
Explanation
When you recover a job from a checkpoint/savepoint which contains Kafka transactions, Flink will try to re-commit those transactions upon recovery. There are four scenarios here:
- The re-commit succeeds if the transactions are successfully committed upon recovery.
- You get a warning in the logs if a transaction has already been successfully committed before.
- The re-commit fails if its transaction has timed out
- The re-commit fails if its transactional Ids have expired (FLINK-16419)
While scenarios 1 and 2 are ok, the job recovery behaves differently in scenarios 3 & 4:
#tabs
## KafkaSink
The recovery continues with an ERROR message like the following is logged:
Unable to commit transaction...recovery took longer...signals data loss...
## FlinkKafkaProducer
The recovery fails unless your producer explicitly ignores such kinds of errors:
producer.ignoreFailuresAfterTransactionTimeout();
#--
Warning: Ignoring a transaction timeout error may lead to data loss because the messages in the transaction may have been successfully processed by Flink (therefore Flink will not reprocess it again) but are not reflected in Kafka because the transaction was not committed.
2. Use Unique Transactional Ids Across Flink Jobs with End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics, you need to use unique transactional Ids for all Kafka producers in all jobs that are running against the same Kafka cluster. Otherwise, you may run into a `transactional.id` clash issue. The way to build the transactional id in `KafkaSink` and `FlinkKafkaProducer` is different.
#tabs
##KafkaSink
`KafkaSink` in Flink 1.14 or later generates the `transactional.id` based on the following info (see Flink code)
- transactionalId prefix
- subtaskId
- checkpointOffset
So you are required to set a unique transactionalId prefix for all `KafkaSink`s in all of your jobs that are running against the same Kafka cluster.
##FlinkKafkaProducer
`FlinkKafkaProducer` generates the `transactional.id` based on the following info (see Flink code)
- Task Name & Operator UID, or transactionalIdPrefix if specified
- Subtask Index
- Counter
If you have multiple Flink jobs writing to the same Kafka cluster, please make sure that Task names and Operator UIDs of the Kafka sinks are unique across these jobs. The same requirement needs to be met if you are running multiple incarnations of an application at the same time.
#--
Tip: You can, for example, use prefixes/names that you set via your main arguments during job submission to achieve the uniqueness of the transactional Ids.
Explanation
Kafka uses `transactional.id` to identify the same producer instance across process restarts. See this blog post from Confluent for more info about transactional Ids. Using the same task name and operator UID or the same transactionalIdPrefix would result in a clash on the `transactional.id` which is manifested by the following exception:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3. Checkpoint Interval with End-To-End Exactly-Once Delivery
If you configure your Flink Kafka producer with end-to-end exactly-once semantics, Flink will use Kafka transactions to ensure exactly-once delivery. These transactions will be committed only when checkpoints are complete. However, checkpoints may finish late due to many different reasons and thus the Kafka transaction timeout must be much larger than the configured checkpoint interval or the checkpointing may fail due to Kafka transaction timeouts.
You can solve it in either of the following ways:
- Configuring the checkpoint interval:
StreamExecutionEnvironment env = ...;
env.enableCheckpointing(1000); // unit is millisecond
- Configuring the Kafka transaction timeout.
See 1. Configure Correct Kafka Transaction Timeout with End-To-End Exactly-Once Delivery above.
4. Concurrent Checkpoints
#tabs
##KafkaSink
`KafkaSink` in Flink 1.14 or later has no limitations on the number of concurrent checkpoints.
## FlinkKafkaProducer
By default, `FlinkKafkaProducer` has a pool of 5 Kafka producers to support 4 concurrent checkpoints. More than 4 will result in job failure. To support more concurrent checkpoints, you can pass the desired `kafkaProducersPoolSize` to the constructor of `FlinkKafkaProducer`:
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize)
#--
5. Kafka Connector Version
Due to potential resource leaks in the Kafka Producer when the Kafka cluster is unavailable (FLINK-17327), we recommend upgrading to Flink 1.11 or later.
6. Avoid Log Flooding when Kafka clusters are unavailable
When the Kafka cluster has fewer brokers than the configured minimal in-sync replicas (`min.insync.replicas`), you will see messages like this in the logs for every completed checkpoint:
2020-03-24 16:05:08.971Z WARN [ctionEventSink (1/1)] f.s.c.k.FlinkKafkaConsumerBase : Consumer subtask 0 failed async Kafka commit.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Similarly, when the Kafka cluster is unavailable, the following lines show up and flood the logs:
2020-03-24 16:16:33.488Z WARN [tryRequestSink (1/1)] o.a.k.clients.NetworkClient : [Consumer clientId=consumer-73, groupId=elastic-gateway-ci1] Connection to node 4 (hostName/hostIp:49092) could not be established. Broker may not be available.
To avoid the logs being flooded with these messages, set `reconnect.backoff.max.ms` and `reconnect.backoff.ms` in `FlinkKafkaConsumer` or `KafkaSource`.
Related Information
- Flink Kafka Connector Caveats
- Flink Checkpointing
- Kafka Broker Config: transaction.max.timeout.ms
- Kafka Producer Config: transaction.timeout.ms
- Kafka Consumer Config: reconnect backoff setting
- Flink 1.14: TransactionalId Prefix
- FLINK-16419: Avoid to recommit succeeded transactions upon recovery
- Transactional Id and ProducerID
- FLIP-27: Refactor Source Interface
- FLIP-143: Unified Sink API