Question
How can I develop an Apache Flink application with Stateful Functions API using both embedded & remote functions and deploy it to Ververica Platform?
Answer
Note: This section applies to Flink 1.14 and later, with Ververica Platform 2.6 and later.
There are several ways to manage deployment of the Stateful Function applications, such as using the Application Modules. In this article, we will go through, using a different method - DataStream Integration, a step by step guide on how to mix & match different API's of Apache Flink so that we have an end-to-end pipeline deployed in Ververica Platform that utilises both embedded and remote functions to generate a simple Greeter application.
1) Application
The overall pipeline is responsible of consuming events from an Apache Kafka topic, converting it into a Greeting, passing it alongside a remote and an embedded Greeter Stateful Function implementations, written in JavaScript and Java respectively, and finally print the result to the standard output. For each stage of the pipeline, a stamp for the current Greeter -INGRESS, REMOTE, EMBEDDED, EGRESS- is added to the passed message's greeter list.
The consumption of events from the topic is handled using Table API, then converted into a DataStream and finally passed to Stateful Functions using Stateful Functions' DataStream API integration. Therefore Identifiers, Types and the Egress for the Stateful Functions are defined. Moreover, the data class to hold the necessary information as well as the functionalities are defined in the following class.
#tabs
##Identifiers.java
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
public class Identifiers {
public static final String TYPE_GREETING = "com.ververica.type/greeting";
public static final FunctionType FUNCTION_EMBEDDED = new FunctionType("com.ververica.function.embedded", "greeter");
public static final FunctionType FUNCTION_REMOTE = new FunctionType("com.ververica.function.remote", "greeter");
public static final EgressIdentifier<Greeting> EGRESS_DATASTREAM =
new EgressIdentifier<>("com.ververica.egress.datastream", "greeter", Greeting.class);
}
##Greeting.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class Greeting {
public String name;
public List<String> greeters;
public Greeting() {}
public Greeting(String name, List<String> greeters) {
this.name = name;
this.greeters = greeters;
}
public static Greeting withGreeter(Greeting greeting, String greeter) {
greeting.greeters.add(greeter);
return greeting;
}
public static String toString(Greeting greeting) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(greeting);
}
public static Greeting fromString(String value) throws JsonProcessingException {
return new ObjectMapper().readValue(value, Greeting.class);
}
public static ByteString toValue(Greeting greeting) throws JsonProcessingException {
return ByteString.copyFrom(toString(greeting), StandardCharsets.UTF_8);
}
public static Greeting fromValue(ByteString value) throws JsonProcessingException {
return fromString(value.toString(StandardCharsets.UTF_8));
}
}
#--
2) Ingress
For consuming the events, we will use the following code to read from a topic and convert it into a Table using the Table API and put the initial stamp into the Greeting event.
final Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.build();
final FormatDescriptor formatDescriptor = FormatDescriptor.forFormat("json")
.option("fail-on-missing-field", "false")
.option("ignore-parse-errors", "true")
.build();
final TableDescriptor tableDescriptor = TableDescriptor.forConnector("kafka")
.schema(schema)
.format(formatDescriptor)
.option(KafkaConnectorOptions.KEY_FIELDS, Collections.singletonList("name"))
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "kafka:9092")
.option(KafkaConnectorOptions.PROPS_GROUP_ID, "greeter-request")
.option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
.option(KafkaConnectorOptions.TOPIC, Collections.singletonList("greeter-request"))
.build();
final Table table = streamTableEnvironment.from(tableDescriptor).select($("name"));
final DataStream<Greeting> greetings = streamTableEnvironment.toDataStream(table)
.map(row -> new Greeting(row.getFieldAs("name"), Collections.singletonList("INGRESS")));
3) Stateful Functions
We first need to define our Greeter implementations and then setup the routing of the messages in between them.
#tabs
##greeter-function.js
const http = require("http");
const {messageBuilder, StateFun} = require("apache-flink-statefun");
const Greeting = StateFun.jsonType("com.ververica.type/greeting");
let statefun = new StateFun();
statefun.bind({
typename:"com.ververica.function.remote/greeter",
fn(context, message) {
letgreeting = message.as(Greeting)
greeting.greeters.push("REMOTE");
context.send(
messageBuilder({typename:'com.ververica.function.embedded/greeter',
id:greeting.name,
value:greeting,
valueType:Greeting})
);
}
});
http.createServer(statefun.handler()).listen(8000);
##GreeterFunction.java
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
public class GreeterFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object message) {
final TypedValue typedValue = (TypedValue) message;
try {
final Greeting greeting = Greeting.fromValue(typedValue.getValue());
context.send(Identifiers.EGRESS_DATASTREAM, Greeting.withGreeter(greeting, "EMBEDDED"));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
##Message Routing
final StatefulFunctionEgressStreams statefulFunctionEgressStreams = StatefulFunctionDataStreamBuilder.builder("greeter")
.withDataStreamAsIngress(dataStream)
.withRequestReplyRemoteFunction(RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
Identifiers.FUNCTION_REMOTE,
URI.create("http://greeter:8000"))
)
.withFunctionProvider(Identifiers.FUNCTION_EMBEDDED, functionType -> new GreeterFunction())
.withEgressId(Identifiers.EGRESS_DATASTREAM)
.build(streamExecutionEnvironment);
#--
4) Egress
In order to get the output as the result and to put the final stamp into the message, we collect the egress as DataStream and just print it.
statefulFunctionEgressStreams.getDataStreamForEgressId(Identifiers.EGRESS_DATASTREAM)
.map(greeting -> Greeting.withGreeter(greeting,"EGRESS"))
.map(greeting -> Greeting.toString(greeting))
.print();
5) Deployment
Finally, we deploy the build and deploy the remote function manually, and use the main method to deploy the built Apache Flink application in Ververica Platform by passing the main method as the entrypoint.
#tabs
##package.json
{
"name": "greeter",
"version": "3.2.0",
"description": "This is a simple example of a stateful functions application implemented in `JavaScript`.",
"main": "functions.js",
"scripts": {
"start": "node functions.js"
},
"dependencies": {
"apache-flink-statefun": "3.2.0"
}
}
##Dockerfile
FROM node:16
WORKDIR /usr/src/app
COPY package.json ./
RUN npm install apache-flink-statefun@3.2.0
COPY greeter-function.js ./functions.js
EXPOSE 8000
CMD ["npm", "start"]
##deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: greeter
spec:
replicas: 1
selector:
matchLabels:
app: function
component: greeter
template:
metadata:
labels:
app: function
component: greeter
spec:
containers:
- name: greeter
image: greeter:1.0.0
ports:
- containerPort: 8000
name: endpoint
livenessProbe:
tcpSocket:
port: 8000
initialDelaySeconds: 30
periodSeconds: 60
##service.yaml
apiVersion: v1
kind: Service
metadata:
name: greeter
spec:
type: ClusterIP
ports:
- name: endpoint
port: 8000
selector:
app: function
component: greeter
#--