Question
How can I develop an Apache Flink application with Stateful Functions API using embedded modules and deploy it to Ververica Platform?
Answer
Note: This section applies to Flink 1.14 with Ververica Platform 2.6.
There are several ways to manage deployment of the Stateful Function applications, such as using the Application Modules. In this article, we will go through a different method, using the Embedded Modules, for deploying a Stateful Function Application that utilises embedded functions to generate a simple Greeter application in Ververica Platform.
1) Application
The overall pipeline is responsible of consuming events from an Apache Kafka topic, converting it into a Greeter Request, passing it alongside an embedded Greeter Stateful Function implementations, written in Java, and finally print the result to the standard output.
Here; Identifiers, Types and the Deserialisation Schema for the Stateful Functions are defined. Moreover, the data class to hold the necessary information as well as the functionalities are defined in the following class.
#tabs
##Identifiers.java
import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
public class Identifiers {
public static final IngressIdentifier<StringValue> INGRESS_IDENTIFIER =
new IngressIdentifier<>(StringValue.class, "com.ververica.stateful-functions", "greeter-request");
public static final FunctionType FUNCTION_TYPE =
new FunctionType("com.ververica.stateful-functions", "greeter");
}
##GreeterRequest.java
public class GreeterRequest {
public String name;
public GreeterRequest() {}
public GreeterRequest(String name) {
this.name = name;
}
}
##GreeterRequestDeserializer.java
import com.google.protobuf.StringValue;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;
public class GreeterRequestDeserializer implements KafkaIngressDeserializer<StringValue> {
@Override
public StringValue deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) {
return StringValue.of(new String(consumerRecord.value(), StandardCharsets.UTF_8));
}
}
#--
2) Stateful Function
We first need to define our GreeterFunction implementation.
#tabs
##GreeterFunction.java
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
public class GreeterFunction implements StatefulFunction {
@Override
public void invoke(Context context, Object greeterRequest) {
System.out.println("Hello there, " + context.self().id() + "!");
}
}
#--
3) Embedded Module
The next step is to assemble the routing by defining the Embedded Module
#tabs
##EmbeddedModule.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.myorg.quickstart.data.GreeterRequest;
import org.myorg.quickstart.deserializer.GreeterRequestDeserializer;
import org.myorg.quickstart.function.GreeterFunction;
import org.myorg.quickstart.identifiers.Identifiers;
import java.util.Map;
public class EmbeddedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> map, Binder binder) {
binder.bindIngress(KafkaIngressBuilder
.forIdentifier(Identifiers.INGRESS_IDENTIFIER)
.withKafkaAddress("kafka:9092")
.withTopic("greeter-request")
.withConsumerGroupId("greeter-request")
.withDeserializer(GreeterRequestDeserializer.class)
.withStartupPosition(KafkaIngressStartupPosition.fromLatest())
.build());
binder.bindIngressRouter(
Identifiers.INGRESS_IDENTIFIER,
(greeterRequest, downstream) -> {
String key;
try {
key = new ObjectMapper().readValue(greeterRequest.getValue(), GreeterRequest.class).name;
} catch (JsonProcessingException e) {
key = "NOT_PARSABLE";
}
downstream.forward(Identifiers.FUNCTION_TYPE, key, greeterRequest);
});
binder.bindFunctionProvider(Identifiers.FUNCTION_TYPE, x -> new GreeterFunction());
}
}
#--
5) Deployment
Finally, we deploy the build and use the main method to deploy the built Apache Flink application in Ververica Platform by passing the main method as the entrypoint.
#tabs
##Artifact Configuration
artifact:
flinkImageRegistry: flink
flinkImageRepository: greeter
flinkImageTag: 1.0.0
flinkVersion: '1.14'
jarUri: 'file:///flink/lib/statefun-flink-distribution-3.2.0.jar'
kind: JAR
##Dockerfile
FROM alpine/curl:3.14 AS dependency-downloader
RUN curl -o /tmp/statefun-flink-core.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-core/3.2.0/statefun-flink-core-3.2.0.jar
RUN curl -o /tmp/statefun-flink-distribution-3.2.0.jar https://repo1.maven.org/maven2/org/apache/flink/statefun-flink-distribution/3.2.0/statefun-flink-distribution-3.2.0.jar
# ----
FROM registry.ververica.com/v2.6/flink:1.14.4-stream1-scala_2.12-java8
COPY --from=dependency-downloader /tmp/statefun-flink-core.jar /opt/flink/lib/statefun-flink-core.jar
COPY --from=dependency-downloader /tmp/statefun-flink-distribution-3.2.0.jar /flink/lib/statefun-flink-distribution-3.2.0.jar
RUN sed -i "s/org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint/org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint/" /flink/bin/flink-console.sh
COPY target/quickstart-0.1.jar /opt/statefun/modules/greeter/quickstart-0.1.jar
#--