Question
My Flink job state contains Scala types stored via Apache Flink Scala API. How can I migrate/rewrite my Flink Job state to restore my job without Flink Scala API dependency? I still want to continue to use Scala as my main language for Flink, but I need to stop using Apache Flink Scala API, because it still depends on Scala 2.12. My goal is to start using Scala 2.13 or Scala 3 for my existing Flink jobs.
Answer
Note: This section applies to Flink 1.14 as version to migrate from and Flink 1.15 as version to migrate to. However, this might work as well for newer/greater than Flink 1.15 as target version.
If you are trying to upgrade Flink version from 1.14.x to Flink 1.15.x of your Scala Flink job, you might hit similar exception:
Caused by: org.apache.flink.util.StateMigrationException:
For heap backends, the new state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@db1bc2cb)
must not be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@583ad08c).
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:211)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:276)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
Your exception can refer also to other serializers like `Key` or `ValueSerializer`, i.e. not necessarily to MapSerializer
. The problem itself is caused by a mismatch of the Flink serializers between the currently available in your job runtime and those which are stored in the savepoint file. Flink compares them and throws an exception, in case they differ on Java object semantic level. You can implicitly see that the string representation of the objects in the above stack trace is different: MapSerializer@db1bc2cb vs. MapSerializer@583ad08c
Migration from Flink 1.14.x
First you will need to change your project dependencies and compile with newer Flink version. This can be Flink 1.15.x or newer version.
"org.apache.flink" %% "flink-streaming-scala" % "1.15.2" // "1.14.6"
When migrating to newer Flink version you should continue to use Flink Scala API createTypeInformation macro function, which knows how to de/serialize Scala types. For example:
import org.apache.flink.streaming.api.scala.createTypeInformation
....
.keyBy(_.number, createTypeInformation[Int])
Rewriting Old Savepoint
You will need to create one-time Flink job, which will rewrite an existing savepoint to a newer version. This newer version will be using different state serializers than those in the Flink Scala API, which we want to migrate from. Apache Flink provides State Processor API to do the work.
Below is an example of a Flink migrating job which stores word counters in its state variable.
Tip: see the full source code in the following repository: https://github.com/ververica/lab-flink-scala-savepoint
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.state.api.{
BootstrapTransformation, SavepointReader, SavepointWriter
}
import org.apache.flink.state.api.functions.{
KeyedStateBootstrapFunction, StateBootstrapFunction
}
import org.apache.flink.streaming.api.scala.createTypeInformation
// common variables
object WordCounter {
val stateDescriptor = new MapStateDescriptor(
"wordCounter",
createTypeInformation[Int],
createTypeInformation[WordCountState]
)
}
Below is an instance of KeyedStateReaderFunction function to read existing state. It is required by the State Process API:
class ReaderFunction extends KeyedStateReaderFunction[Int, WordCountState] {
var countState: MapState[Int, WordCountState] = _
override def open(parameters: Configuration): Unit =
countState = getRuntimeContext.getMapState(stateDescriptor)
override def readKey(
key: Int, ctx: Context, out: Collector[WordCountState]): Unit = {
val state = countState.get(key)
out.collect(
WordCountState(
key,
state.count,
System.currentTimeMillis,
state.lastCount
)
)
}
}
Main code of the Savepoint Migration Job:
object ReadState extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// specify your path. This example is using local filesystem
val oldSavepointPath = "/tmp/flink-savepoints/savepoint-7fb950-384cc7627885"
val savepoint = SavepointReader.read(
env,
oldSavepointPath,
// set your/required state backend
new HashMapStateBackend()
)
val keyedState = savepoint.readKeyedState(
"word-count",
new ReaderFunction(),
createTypeInformation[Int], // comes from flink-scala-api
createTypeInformation[WordCountState] // comes from flink-scala-api
)
...
Note: we use Scala's createTypeInformation function to read data types from the state, because they were written by Scala serializers which are part of the flink-scala-api. If you use different TypeInformation instances, then the reader function and the Flink job will fail with an exception.
Next step is to implement the writer part with an instance of KeyedStateBootstrapFunction.
val transformation = OperatorTransformation
.bootstrapWith(keyedState)
.keyBy(
(value: WordCountState) => value.key,
TypeInformation.of(classOf[Int])
)
.transform(new KeyedStateBootstrapFunction[Int, WordCountState] {
private var countState: MapState[Int, WordCountState] = _
override def open(parameters: Configuration): Unit = {
// this is target state descriptor, which is used to apply different serializers
val descriptor = new MapStateDescriptor(
"wordCounter",
// TypeInformation.of will fallback to Kryo serializer.
// Set your own serializer, if Kryo is not what you want
TypeInformation.of(classOf[Int]),
TypeInformation.of(classOf[WordCountState])
)
countState = getRuntimeContext.getMapState(descriptor)
}
override def processElement(
value: WordCountState,
ctx: KeyedStateBootstrapFunction[Int, WordCountState]#Context
): Unit =
countState.put(value.key, value)
})
In the above KeyedStateBootstrapFunction implementation, we create a state descriptor with different TypeInformation instances. This way we change the serializers that will be used for a new savepoint file. You need to decide which serializers you want to use to store Scala types in the savepoint.
Tip: Keep in mind that if migrate from Flink Scala API then by default Flink Java API will use Kryo to serialize Scala types. Java API does not know how to serialize Scala types on its own. That is the reason Flink offers separate Scala API serializers. It is recommended to supply own serializers for Scala types in this case, if usage of Kryo is not desirable.
Finally we construct a SavepointWriter to connect all the pieces and produce a new savepoint:
SavepointWriter
.fromExistingSavepoint(oldSavepointPath)
.removeOperator("word-count")
.withOperator("word-count", transformation)
.write(oldSavepointPath.replaceAll("savepoint-", "new-savepoint-"))
env.execute()
Old operator `word-count` is replaced with a new one, all the rest job graph structure remains unchanged.