When loading/restoring a Flink state snapshot (checkpoint/savepoint), you may face the following exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
at org.apache.flink.runtime.taskmanager.Task.doRun(
at java.base/
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamMap_0a448493b4782967b150582570326227_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(
... 11 more
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
... 15 more
The exact exception stack trace may be different depending on the configured state backend and Flink version.
Flink 1.8 - 1.19
As stated in the stack trace above, the reason for this error is that the key serializer is not compatible.
In the specific example illustrated above, the incompatibility happens at the `keyBy()` used for the operator `StreamMap_0a448493b4782967b150582570326227`. Since `StreamMap` is Flink's operator class wrapping a `MapFunction`, you should check the `keyBy()` call before the `map()`. Beside `StreamMap`, you may also see other operator classes like `StreamFilter`, `StreamFlatMap`, `ProcessOperator`, etc. which wrap functions of `filter()`, `flatMap()`, and `process()` calls. Make sure that the key used in your job is compatible with the one stored in your checkpoint/savepoint, by checking its serializer.
One situation which often causes confusion is that you may call `keyBy()` in different ways using, for example:
- a field name, e.g.,`keyBy("id")`, or
- a field position, e.g., `keyBy(0)`
Because the compiler does not know the exact type of the field in this case, the key type used here is actually `Tuple`. When you try to load/restore the state snapshot with
- a lambda, e.g., `keyBy(e -> e.f0)`, or `keyBy(e -> e.getId())`, or
- an explicitly defined `KeySelector`, e.g.,
.keyBy(new KeySelector<MyClass, Integer>() { @Override public String getKey(MyClass value) throws Exception { return value.getId(); } })
you will get a `StateMigrationException: The new key serializer must be compatible`. Similarly, if you use a lambda or a `KeySelector` to store the snapshot but load/restore it with a field name/position, you will get the same exception, as the key serializer is not compatible (`Tuple` vs. the concrete type).
A similar situation may also occur with Flink's State Processor API. For example, assuming the field 0 or the field 'id' is an `Integer`, and if `keyBy(0)` or `keyBy("id")` was used to create the savepoint, then when reading from the savepoint, your `KeyedStateReaderFunction` must use `Tuple1<Integer>` as the key type:
class MyStateReader extends KeyedStateReaderFunction<Tuple1<Integer>, ...>
.keyBy(0) //or .keyBy("id")
.transform(new MyBootstrapper());
then your `KeyedStateBootstrapFunction` (`MyBootstrapper` in this case) must be:
class MyBootstrapper extends KeyedStateBootstrapFunction<Tuple,...>
Because of this confusion, `keyBy()` with field names or positions has been deprecated since Flink 1.11. You should use Java lambdas or a `KeySelector`.
Flink 1.12+ contains an improvement which prints the exact new and previous type serializers in the stack trace to help you finding the root cause of this exception. For example:
org.apache.flink.util.StateMigrationException: The new key serializer
must be compatible with the previous key serializer
The key type used when reading a state snapshot is not compatible with the one used to store the snapshot. A common mistake is using `keyBy()` with a field name (e.g., `keyBy("id")`) or a field position (e.g., `keyBy(0)`) when storing a state snapshot while using the type of that field directly without a Tuple wrapper when reading from the snapshot, or vice versa.
Related Information
FLINK-17074: Deprecate DataStream.keyBy() that use tuple/expression keys
FLINK-19972: Provide more details when type serializers are not compatible