Issue
When loading/restoring a Flink state snapshot (checkpoint/savepoint), you may face the following exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamMap_0a448493b4782967b150582570326227_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:141)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 15 more
The exact exception stack trace may be different depending on the configured state backend and Flink version.
Environment
Flink 1.8 - 1.19
Resolution
As stated in the stack trace above, the reason for this error is that the key serializer is not compatible.
In the specific example illustrated above, the incompatibility happens at the `keyBy()` used for the operator `StreamMap_0a448493b4782967b150582570326227`. Since `StreamMap` is Flink's operator class wrapping a `MapFunction`, you should check the `keyBy()` call before the `map()`. Beside `StreamMap`, you may also see other operator classes like `StreamFilter`, `StreamFlatMap`, `ProcessOperator`, etc. which wrap functions of `filter()`, `flatMap()`, and `process()` calls. Make sure that the key used in your job is compatible with the one stored in your checkpoint/savepoint, by checking its serializer.
One situation which often causes confusion is that you may call `keyBy()` in different ways using, for example:
- a field name, e.g.,`keyBy("id")`, or
- a field position, e.g., `keyBy(0)`
Because the compiler does not know the exact type of the field in this case, the key type used here is actually `Tuple`. When you try to load/restore the state snapshot with
- a lambda, e.g., `keyBy(e -> e.f0)`, or `keyBy(e -> e.getId())`, or
- an explicitly defined `KeySelector`, e.g.,
.keyBy(new KeySelector<MyClass, Integer>() { @Override public String getKey(MyClass value) throws Exception { return value.getId(); } })
you will get a `StateMigrationException: The new key serializer must be compatible`. Similarly, if you use a lambda or a `KeySelector` to store the snapshot but load/restore it with a field name/position, you will get the same exception, as the key serializer is not compatible (`Tuple` vs. the concrete type).
A similar situation may also occur with Flink's State Processor API. For example, assuming the field 0 or the field 'id' is an `Integer`, and if `keyBy(0)` or `keyBy("id")` was used to create the savepoint, then when reading from the savepoint, your `KeyedStateReaderFunction` must use `Tuple1<Integer>` as the key type:
class MyStateReader extends KeyedStateReaderFunction<Tuple1<Integer>, ...>
{...}
OperatorTransformation
.bootstrapWith(wordMap)
.keyBy(0) //or .keyBy("id")
.transform(new MyBootstrapper());
then your `KeyedStateBootstrapFunction` (`MyBootstrapper` in this case) must be:
class MyBootstrapper extends KeyedStateBootstrapFunction<Tuple,...>
{...}
Because of this confusion, `keyBy()` with field names or positions has been deprecated since Flink 1.11. You should use Java lambdas or a `KeySelector`.
Flink 1.12+ contains an improvement which prints the exact new and previous type serializers in the stack trace to help you finding the root cause of this exception. For example:
org.apache.flink.util.StateMigrationException: The new key serializer
(org.apache.flink.api.common.typeutils.base.IntSerializer@355e1d62)
must be compatible with the previous key serializer
(org.apache.flink.api.java.typeutils.runtime.TupleSerializer@daebabfd).
Cause
The key type used when reading a state snapshot is not compatible with the one used to store the snapshot. A common mistake is using `keyBy()` with a field name (e.g., `keyBy("id")`) or a field position (e.g., `keyBy(0)`) when storing a state snapshot while using the type of that field directly without a Tuple wrapper when reading from the snapshot, or vice versa.
Related Information
FLINK-17074: Deprecate DataStream.keyBy() that use tuple/expression keys
FLINK-19972: Provide more details when type serializers are not compatible