Question
I have two streaming applications. One of them (unbounded-source-blackhole-sink) has an unbounded source which lets the application run continuously and never finish. The other one (bounded-source-blackhole-sink) has a bounded source and comes to an end after processing all the input. I would like to automate the coordination of these two deployments so that every X hours:
- Stop the unbounded-source-blackhole-sink
- Start the bounded-source-blackhole-sink
- Wait till the bounded-source-blackhole-sink is finished
- Start the unbounded-source-blackhole-sink without loosing state
Answer
Note: This section applies to Flink 1.15 with Ververica Platform 2.8.
Overview
Previously, in How to Add Scheduling Capabilities to Ververica Platform: Sidecar Container, we have tackled the exact same problem by developing a Node JS application and deployed it as a side car container alongside Ververica Platform. In that article, all the logic to monitor, schedule and trigger the state transitions of Apache Flink deployments for all the steps mentioned above (1-4) were part of a single application. However, this approach is quite naive in the sense that it does not have advanced scheduling capabilities such that having the state for different runs and lacking the templating options as the transition rules were hard coded.
Apache Airflow is a platform for programmatically authoring, scheduling and monitoring workflows that are designed to be finite batch workflows. It was not built for infinitely running event based workflows and it is not a streaming solution [1]. Therefore it makes a perfect match to combine Ververica Platform for Unified Stream and Batch Processing Workflows with Apache Airflow for the authoring, scheduling and monitoring those workflows.
Introduction
In this article, we will take the same Node JS application and evolve it so that the logic of the application is only responsible for executing only one step of the transitioning logic we explained above (1-4). Furthermore, we will get the parameters for the transitioning logic from the environment variables so that we can re-use the application for different strategies. The next step is then to use Apache Airflow and create a DAG that defines a separate task for each of the steps (1-4) that depends on each other. For each task, we will use Kubernetes Pod Operator of Apache Airflow that runs our Node JS application.
Setup
Ververica Platform
Note: This setup uses Ververica Platform Namespace named default that comes with the installation. It also assumes that port forwarding is done so that Ververica Platform is accessible via localhost.
Here we will run the following scripts to create the example Apache Flink deployments:
#tabs
## First Step
Note: This setup assumes that there is a Kubernetes Namespace named vvp-jobs is created and Ververica Platform have enough permission to manage Apache Flink deployments on this namespace.
Run the following to register the vvp-jobs deployment target
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployment-targets" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-target-vvp-jobs.json
where vvp-deployment-target-vvp-jobs.json is as follows
{ "metadata": { "name": "vvp-jobs" }, "spec": { "kubernetes": { "namespace": "vvp-jobs" } } }
## Second Step
Run the following to register the unbounded_source table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-unbounded-source.json
where vvp-catalog-unbounded-source.json is as follows
{
"statement": "CREATE TABLE `vvp`.`default`.`unbounded_source` (`name` STRING) WITH ('connector' = 'datagen', 'rows-per-second' = '10');"
}
## Third Step
Run the following to register the bounded_source table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-bounded-source.json
where vvp-catalog-bounded-source.json is as follows
{
"statement": "CREATE TABLE `vvp`.`default`.`bounded_source` (`name` STRING) WITH ('connector' = 'datagen', 'number-of-rows' = '10');"
}
## Fourth Step
Run the following to register the blackhole_sink table to the catalog
curl -X POST \
"http://localhost:8080/sql/v1beta1/namespaces/default/sqlscripts:execute?catalog=vvp&database=default" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-catalog-blackhole-sink.json
where vvp-catalog-blackhole-sink.json is as follows:
{
"statement": "CREATE TABLE `vvp`.`default`.`blackhole_sink` (`name` STRING) WITH ('connector' = 'blackhole');"
}
## Fifth Step
Run the following to create unbounded-source-blackhole-sink deployment
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployments" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-unbounded-source-blackhole-sink.json
where vvp-deployment-unbounded-source-blackhole-sink.json is as follows
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "unbounded-source-blackhole-sink"
},
"spec": {
"deploymentTargetName": "vvp-jobs",
"state": "RUNNING",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO blackhole_sink SELECT * FROM unbounded_source;"
}
}
}
}
}
## Sixth Step
Run the following to create bounded-source-blackhole-sink deployment
curl -X POST \
"http://localhost:8080/api/v1/namespaces/default/deployments" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
--data @vvp-deployment-bounded-source-blackhole-sink.json
where vvp-deployment-bounded-source-blackhole-sink.json is as follows
{
"apiVersion": "v1",
"kind": "Deployment",
"metadata": {
"name": "bounded-source-blackhole-sink"
},
"spec": {
"deploymentTargetName": "vvp-jobs",
"state": "CANCELLED",
"template": {
"spec": {
"artifact": {
"kind": "SQLSCRIPT",
"sqlScript": "INSERT INTO blackhole_sink SELECT * FROM bounded_source;"
}
}
}
}
}
#--
Scheduler
For the scheduler application we will run the following commands to build and publish our Docker image to our registry. Initially, our directory structure for the application will look like the following:
scheduler
|
|_ _ coordinator
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ deployment
| |
| |_ _ function.json
| |
| |_ _ package.json
|
|_ _ Dockerfile
where we execute the commands under the scheduler directory.
#tabs
## First Step
docker build \
--tag <CONTAINER_REGISTRY>/scheduler:1.0.0 \
--file Dockerfile \
.
## Second Step
docker push <CONTAINER_REGISTRY>/scheduler:1.0.0
#--
with the content of the files are as follow:
#tabs
##Dockerfile
FROM node:16
WORKDIR /usr/src/app
COPY deployment deployment
COPY coordinator coordinator
WORKDIR coordinator
EXPOSE 8000
CMD ["npm", "start"]
##coordinator/function.json
const http = require("http");
const deployment = require("../deployment");
let deploymentManifest = "";
constmonitor = (transitionRule, monitorCount) => {
const request = http.request(deployment.getMonitorOption(transitionRule), callback(transitionRule, monitorCount));
request.end();
}
const callback = (transitionRule, monitorCount) => (response) => {
deploymentManifest = "";
response.on("data", onData);
response.on("end", onEnd(transitionRule, monitorCount))
}
const onData = (deploymentManifestChunk) => {
deploymentManifest += deploymentManifestChunk;
}
const onEnd = (transitionRule, monitorCount) => () => {
if (deployment.shouldTransition(deploymentManifest, transitionRule)) {
transition(transitionRule);
} else {
schedule(transitionRule, monitorCount + 1);
}
}
const transition = (transitionRule) => {
const request = http.request(deployment.getTriggerOption(transitionRule));
request.write(deployment.getTriggerState(transitionRule));
request.end();
}
const schedule = (transitionRule, monitorCount) => {
assertSchedule(transitionRule, monitorCount);
setTimeout(monitor, deployment.getMonitorDuration(transitionRule), transitionRule, monitorCount);
}
const assertSchedule = (transitionRule, monitorCount) => {
if (monitorCount == deployment.getMonitorCount(transitionRule)) {
throw new Error("Deployment state is not triggering the transition!");
}
}
monitor(deployment.getStartRule(), 0);
##coordinator/package.json
{
"name": "coordinator",
"version": "1.0.0",
"description": "This is a simple example of a coordinator application for Ververica Platform implemented in `JavaScript`.",
"main": "function.js",
"scripts": {
"start": "node function.js"
}
}
##deployment/function.json
const options = (deploymentName, requestMethod) => {
return {
host: process.env.VVP_HOST,
port: process.env.VVP_PORT,
path: "/api/v1/namespaces/" + process.env.VVP_NAMESPACE + "/deployments/" + deploymentName,
method: requestMethod,
headers: { "Content-Type":"application/json" }
}
}
const TRANSITION_RULE = {
monitorCount: parseInt(process.env.VVP_MONITOR_COUNT),
monitorDuration: process.env.VVP_MONITOR_DURATION,
monitorDeployment: process.env.VVP_MONITOR_DEPLOYMENT,
monitorState: process.env.VVP_MONITOR_STATE,
triggerDeployment: process.env.VVP_TRIGGER_DEPLOYMENT,
triggerState: process.env.VVP_TRIGGER_STATE
}
exports.shouldTransition = (deploymentManifest, transitionRule) => {
return transitionRule.monitorState === JSON.parse(deploymentManifest).status.state;
}
exports.getMonitorCount = (transitionRule) => {
return transitionRule.monitorCount;
}
exports.getMonitorDuration = (transitionRule) => {
return transitionRule.monitorDuration;
}
exports.getMonitorOption = (transitionRule) => {
return options(transitionRule.monitorDeployment, "GET");
}
exports.getTriggerOption = (transitionRule) => {
return options(transitionRule.triggerDeployment, "PATCH");
}
exports.getTriggerState = (transitionRule) => {
return JSON.stringify({ spec: { state:transitionRule.triggerState } });
}
exports.getStartRule = () => {
returnTRANSITION_RULE;
}
##deployment/package.json
{
"name": "deployment",
"version": "1.0.0",
"description": "This is a simple example of a deployment application for Ververica Platform implemented in `JavaScript`.",
"main": "function.js"
}
#--
Airflow
Finally, we will bundle the DAG with the Airflow image and use that image to deploy it. You can check other ways to manage DAGs of Airflow in their documentation [2].
The steps are as follows:
#tabs
## First Step
docker build \
--tag <CONTAINER_REGISTRY>/airflow:1.0.0 \
--file Dockerfile \
.
where Dockerfile content is
FROM apache/airflow
COPY scheduler.py /opt/airflow/dags/scheduler.py
and the scheduler.py content is
from airflow import DAG
from airflow.decorators import task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from kubernetes.client import models as k8s
with DAG(
dag_id="greeting-request-scheduler",
start_date=datetime(1970, 1, 1),
schedule="0 * * * *",
max_active_runs=1,
catchup=False
) as dag:
suspend_greeting_request_continuous_sink = KubernetesPodOperator(
name="suspend-greeting-request-continuous-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="RUNNING"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="SUSPENDED")
],
task_id="suspend_greeting_request_continuous_sink"
)
run_greeting_request_sink = KubernetesPodOperator(
name="run-greeting-request-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="SUSPENDED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
],
task_id="run_greeting_request_sink"
)
cancel_greeting_request_sink = KubernetesPodOperator(
name="cancel-greeting-request-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="FINISHED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="CANCELLED")
],
task_id="cancel_greeting_request_sink"
)
run_greeting_request_continuous_sink = KubernetesPodOperator(
name="run-greeting-request-continuous-sink",
namespace="airflow",
image="<CONTAINER_REGISTRY>/scheduler:1.0.0",
image_pull_policy="Always",
env_vars=[
k8s.V1EnvVar(name="VVP_HOST", value="vvp-ververica-platform.vvp"),
k8s.V1EnvVar(name="VVP_PORT", value="80"),
k8s.V1EnvVar(name="VVP_NAMESPACE", value="default"),
k8s.V1EnvVar(name="VVP_MONITOR_COUNT", value="3"),
k8s.V1EnvVar(name="VVP_MONITOR_DURATION", value="300000"),
k8s.V1EnvVar(name="VVP_MONITOR_DEPLOYMENT", value="greeting-request-sink"),
k8s.V1EnvVar(name="VVP_MONITOR_STATE", value="CANCELLED"),
k8s.V1EnvVar(name="VVP_TRIGGER_DEPLOYMENT", value="greeting-request-continuous-sink"),
k8s.V1EnvVar(name="VVP_TRIGGER_STATE", value="RUNNING")
],
task_id="run_greeting_request_continuous_sink"
)
suspend_greeting_request_continuous_sink >> run_greeting_request_sink >> cancel_greeting_request_sink >> run_greeting_request_continuous_sink
## Second Step
docker push <CONTAINER_REGISTRY>/airflow:1.0.0
## Third Step
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set images.airflow.pullPolicy=Always \
--set images.airflow.repository=<CONTAINER_REGISTRY>/airflow \
--set images.airflow.tag=1.0.0
#--
Finally you can port forward Airflow service and play with its user interface to start the DAG.