Question
How can I develop an Apache Flink application with Stateful Functions API using remote functions and deploy it to Ververica Platform with Application Modules?
Answer
Note: This section applies to Flink 1.14 with Ververica Platform 2.6.
There are several ways to manage deployment of the Stateful Function applications, such as using the DataStream Integration. In this article, we will go through a different method, using the Application Modules, for deploying a Stateful Function Application that utilises remote functions to generate a simple Greeter application in Ververica Platform.
1) Application
The application logic is a basic Greeter application that consumes events from an Apache Kafka topic and just echoes it back to another topic. Here, we are using module.yaml file to let Apache Flink runtime to discover the ingress, stateful functions and egress information, as well as the initial routing of the messages from the ingress.
#tabs
##module.yaml
kind: io.statefun.endpoints.v2/http
spec:
functions: com.ververica.function.remote/greeter
urlPathTemplate: http://greeter:8000
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.ververica.ingress/greeter-request
address: kafka:9092
consumerGroupId: greeter
startupPosition:
type: latest
topics:
- topic: greeter-request
valueType: com.ververica.type/greeter-request
targets:
- com.ververica.function.remote/greeter
---
kind: io.statefun.kafka.v1/egress
spec:
id: com.ververica.egress/greeter-response
address: kafka:9092
#--
2) Stateful Functions
The next step is to have the Stateful Function implementation and have it deployed and accessible. For the specific use case, we will use Stateful Function API's JavaScript SDK to implement the logic and we will deploy it to the kubernetes after we build the docker image.
#tabs
##greeter-function.js
const http = require("http");
const {StateFun, kafkaEgressMessage} = require("apache-flink-statefun");
const GreeterRequest = StateFun.jsonType("com.ververica.type/greeter-request");
let statefun = new StateFun();
statefun.bind({
typename: "com.ververica.function.remote/greeter",
fn(context, message) {
let greeterRequest = message.as(GreeterRequest);
context.send(
kafkaEgressMessage({
typename: "com.ververica.egress/greeter-response",
topic: "greeter-response",
key: greeterRequest.name,
value: JSON.stringify(greeterRequest)
})
);
}
});
http.createServer(statefun.handler()).listen(8000);
##package.json
{
"name": "greeter",
"version": "3.2.0",
"description": "This is a simple example of a stateful functions application implemented in `JavaScript`.",
"main": "functions.js",
"scripts": {
"start": "node functions.js"
},
"dependencies": {
"apache-flink-statefun": "3.2.0"
}
}
##Dockerfile
FROM node:16
WORKDIR /usr/src/app
COPY package.json ./
RUN npm install apache-flink-statefun@3.2.0
COPY greeter-function.js ./functions.js
EXPOSE 8000
CMD ["npm", "start"]
##deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: greeter
spec:
replicas: 1
selector:
matchLabels:
app: function
component: greeter
template:
metadata:
labels:
app: function
component: greeter
spec:
containers:
- name: greeter
image: greeter:1.0.0
ports:
- name: endpoint
containerPort: 8000
livenessProbe:
tcpSocket:
port: 8000
initialDelaySeconds: 30
periodSeconds: 60
##service.yaml
apiVersion: v1
kind: Service
metadata:
name: greeter
spec:
type: ClusterIP
ports:
- name: endpoint
port: 8000
selector:
app: function
component: greeter
#--
3) Deployment
The last step is to deploy the Stateful Functions Application to Ververica Platform. The first thing to consider is what Apache Flink image to use for creating the deployment. Since Ververica does not have official images for Stateful Functions, we will build our own Docker image with the necessary dependencies added to the image.
Warning: The internal structure of the official images is not considered a public, stable API and may change in the future.
Warning: For custom images, we cannot guarantee compatibility between all possible base images, custom dependencies and Ververica Platform components. Therefore, we are only able to support custom images on a best-effort basis. Application Manager and Apache Flink themselves, as well as their integration with Kubernetes, remain supported.
Since we now will use a different entry point to run a Stateful Functions Application, we will also change the cluster entrypoint. Finally, we will also bundle the module.yaml file to be included in the Docker image so that we will not need to create a separate configmap and attach it to our deployment. When creating the deployment on Ververica Platform, choose the Apache Flink image to the newly built image, while setting the jarUri to 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'.
#tabs
##Dockerfile
FROM alpine/curl:3.14 AS dependency-downloader
RUN curl -o /tmp/statefun-flink-core.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-core/3.2.0/statefun-flink-core-3.2.0.jar
RUN curl -o /tmp/statefun-flink-distribution-3.2.0.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-distribution/3.2.0/statefun-flink-distribution-3.2.0.jar
#----
FROM registry.ververica.com/v2.6/flink:1.14.4-stream1-scala_2.12-java8
COPY --from=dependency-downloader /tmp/statefun-flink-core.jar /opt/flink/lib/statefun-flink-core.jar
COPY --from=dependency-downloader /tmp/statefun-flink-distribution-3.2.0.jar /flink/lib/statefun-flink-distribution-3.2.0.jar
RUN sed -i "s/org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint/org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint/" /flink/bin/flink-console.sh
COPY module.yaml /opt/statefun/modules/greeter/module.yaml
##Artifact Configuration
artifact:
flinkImageRegistry: flink
flinkImageRepository: greeter
flinkImageTag: 1.0.0
flinkVersion: '1.14'
jarUri: 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'
kind: JAR
#--