Question
I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:
- Stop the unbounded-source-blackhole-sink
- Start the bounded-source-blackhole-sink
- Wait till the bounded-source-blackhole-sink is finished
- Start the unbounded-source-blackhole-sink without loosing state
Answer
Note: This section applies to Flink 1.15 with Ververica Platform 2.8.
To showcase how can this feature be implemented as an addition to Ververica Platform, we will create a NodeJS application which continuously monitors the Ververica Platform. This application will be deployed as a side car to the current Ververica Platform containers. Initially, our directory structure for the application will look like the following:
scheduler
|
|_ _ coordinator
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ deployment
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ Dockerfile
Deployment
This is the module for setting up the transition rules as well as the business logic for interpreting the transition rules.
#tabs
## function.json
const STATE_RUNNING = "RUNNING";
const STATE_FINISHED = "FINISHED";
const STATE_CANCELLED = "CANCELLED";
const STATE_SUSPENDED = "SUSPENDED";
const options = (deploymentName, requestMethod) => {
return {
host:"localhost",
port:"8080",
path:"/api/v1/namespaces/default/deployments/" + deploymentName,
method:requestMethod,
headers: { "Content-Type":"application/json" }
}
}
const TRANSITION_RULES = {
"RULE_FIRST": {
nextRule:"RULE_SECOND",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"unbounded-source-blackhole-sink",
monitorState:STATE_RUNNING,
triggerDeployment:"unbounded-source-blackhole-sink",
triggerState:STATE_SUSPENDED
},
"RULE_SECOND": {
nextRule:"RULE_THIRD",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"unbounded-source-blackhole-sink",
monitorState:STATE_SUSPENDED,
triggerDeployment:"bounded-source-blackhole-sink",
triggerState:STATE_RUNNING
},
"RULE_THIRD": {
nextRule:"RULE_FOURTH",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"bounded-source-blackhole-sink",
monitorState:STATE_FINISHED,
triggerDeployment:"bounded-source-blackhole-sink",
triggerState:STATE_CANCELLED
},
"RULE_FOURTH": {
nextRule:"RULE_FIRST",
monitorDuration:5 * 60 * 1000,
monitorDeployment:"deduplication-request-sink",
monitorState:STATE_CANCELLED,
triggerDeployment:"unbounded-source-blackhole-sink",
triggerState:STATE_RUNNING
}
}
exports.shouldTransition = (deploymentManifest, transitionRule) => {
return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;
}
exports.getMonitorDuration = (transitionRule) => {
return transitionRule.monitorDuration;
}
exports.getMonitorOption = (transitionRule) => {
return options(transitionRule.monitorDeployment, "GET");
}
exports.getTriggerOption = (transitionRule) => {
return options(transitionRule.triggerDeployment, "PATCH");
}
exports.getTriggerState = (transitionRule) => {
return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}
exports.getNextRule = (transitionRule) => {
return TRANSITION_RULES[transitionRule.nextRule]
}
exports.getStartRule = () => {
return TRANSITION_RULES["RULE_FIRST"];
}
## package.json
{
"name": "deployment",
"version": "1.0.0",
"main": "function.js"
}
#--
Coordinator
This is the application that coordinates the monitoring and transitioning functionalities by using the simple http module of NodeJS.
#tabs
## function.json
const http = require("http");
const deployment = require("../deployment");
let deploymentManifest = "";
const schedule = (transitionRule) => {
setTimeout(
monitor,
deployment.getMonitorDuration(transitionRule),
transitionRule);
}
const monitor = (transitionRule) => {
const request = http.request(
deployment.getMonitorOption(transitionRule),
callback(transitionRule));
request.end();
}
const transition = (transitionRule) => {
const request = http.request(
deployment.getTriggerOption(transitionRule));
request.write(deployment.getTriggerState(transitionRule));
request.end();
}
const callback = (transitionRule) => (response) => {
deploymentManifest = "";
response.on("data", onData);
response.on("end", onEnd(transitionRule))
}
const onEnd = (transitionRule) => () => {
if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
transition(transitionRule);
schedule(deployment.getNextRule(transitionRule));
} else {
schedule(transitionRule);
}
}
const onData = (deploymentManifestChunk) => {
deploymentManifest += deploymentManifestChunk;
}
schedule(deployment.getStartRule());
## package.json
{
"name": "coordinator",
"version": "1.0.0",
"main": "function.js",
"scripts": {
"start": "node function.js"
}
}
#--
Dockerfile
This is the Dockerfile we use to build our image.
FROM node:16
WORKDIR /usr/src/app
COPY deployment deployment
COPY coordinator coordinator
WORKDIR coordinator
EXPOSE 8000
CMD ["npm", "start"]
One can build the image containing the scheduler application by running the following command, after replacing <CONTAINER_REGISTRY> and <IMAGE_TAG> with appropriate registry address and your desired image tag.
docker build --tag <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG> .
Ververica Platform
Once we have the docker image, we can add this to the deployment manifest of the Ververica Platform's list of containers, after we template the helm charts[2] for deploying.
....
containers:
- name: scheduler
image: <CONTAINER_REGISTRY>/scheduler:<IMAGE_TAG>
- name: appmanager
image: registry.ververica.com/v2.8/vvp-appmanager:2.8.0
....