Question
How to integrate Iceberg connector for Flink with Ververica Platform (VVP)?
Answer
Note: This section applies to Flink 1.18.0 (or higher) with Ververica Platform 2.12.0.
Ververica Platform does not support the Flink Iceberg connector out of the box. But the Iceberg connector can be integrated with VVP by using custom Docker images. This article shows the required steps to achieve it. At the end of this article, you will be able to read and write to Iceberg tables.
This tutorial consists of two parts:
- VVP Flink SQL deployment integration
- VVP JAR-based deployment integration
Since VVP includes an advanced SQL editing, validating and debugging environment, some steps are needed to integrate Iceberg into Flink SQL deployment, while the JAR-based deployments integration is relative simple and straightforward.
Note: This article is for Iceberg catalog type 'hadoop'. Catalog type 'hive' is out of scope of this article.
Integration with Flink SQL
1. Add Iceberg as a custom connector and catalog to the VVP gateway by building your own Gateway Docker image.
Before doing that, download the following JAR files from the Maven Central registry or your internal package registry and store them in a local folder named `jars`:
- https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar
- https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
- https://repo1.maven.org/maven2/joda-time/joda-time/2.12.7/joda-time-2.12.7.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.12.262/aws-java-sdk-core-1.12.262.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.12.262/aws-java-sdk-s3-1.12.262.jar
- https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.262/aws-java-sdk-dynamodb-1.12.262.jar
- https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.0/flink-s3-fs-hadoop-1.18.0.jar
- https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-fs/1.18.0/flink-hadoop-fs-1.18.0.jar
Note: we use iceberg-flink-runtime 1.17, because no newer runtime version is available as of writing this article. However, this Iceberg should work fine with Flink cluster of version 1.18 too (verified).
Additionally, download Hadoop libraries which are needed for Iceberg Hadoop catalog-type. Using below shell script, download Hadoop distribution and unarchive it to the `jars` folder as well:
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=3.3.4
curl ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz -o hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
Create YAML file `catalog-meta.yaml` for the Iceberg SQL catalog with the following content:
# catalog-meta.yaml
catalog: type: iceberg packaged: true properties: - key: type required: true description: Must be set to 'iceberg' to configure this catalog. - key: catalog-type required: false description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl - key: catalog-database required: false description: default database name - key: catalog-impl required: false description: The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset - key: property-version required: false description: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1 - key: cache-enable required: false description: Whether to enable catalog cache, default value is true - key: cache.expiration-interval-ms required: false description: How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1 - key: warehouse required: false description: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath
Create another YAML file `connector-meta.yaml` for the Iceberg connector with the following content:
# connector-meta.yaml connector: type: iceberg packaged: true source: true sink: true lookup: false properties: - key: connector required: true description: Must be set to 'iceberg' to configure this connector. defaultValue: iceberg - key: catalog-database required: false description: default database name defaultValue: default - key: catalog-name required: true description: User-specified catalog name. It's required because the connector don't have any default value. - key: catalog-table required: false description: The iceberg table name in the backend catalog. Default to use the table name in the flink CREATE TABLE sentence. - key: catalog-type required: false description: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl
Create Dockerfile for the VVP Gateway container customisation with the following content.
# Put your desired version of VVP and Flink in the following commands
FROM registry.ververica.com/v2.12/vvp-gateway:2.12.0
# Iceberg Catalog
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/iceberg/catalog-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/catalogs/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/catalogs/iceberg/
# Iceberg Connector
COPY connector-meta.yaml /vvp/sql/opt/connectors/iceberg/connector-meta.yaml
COPY jars/iceberg-flink-runtime-1.17-1.4.3.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/hadoop-aws-3.3.4.jar /vvp/sql/opt/connectors/iceberg/
COPY jars/flink-s3-fs-hadoop-1.18.0.jar /vvp/sql/opt/connectors/iceberg/
# VVP SQL Editor libraries for Iceberg
COPY jars/flink-hadoop-fs-1.18.0.jar /vvp/app/lib/
Ensure 2 YAML files are stored in the working directory and the `jars` folders with all the mentioned files above.
├── catalog-meta.yaml
├── connector-meta.yaml
└── jars/
├── aws-java-sdk-core-1.12.262.jar
├── aws-java-sdk-dynamodb-1.12.262.jar
├── aws-java-sdk-s3-1.12.262.jar
├── flink-hadoop-fs-1.18.0.jar
├── flink-s3-fs-hadoop-1.18.0.jar
├── hadoop-3.3.4
├── hadoop-aws-3.3.4.jar
├── iceberg-flink-runtime-1.17-1.4.3.jar
└── joda-time-2.12.7.jar
Now build your Gateway Image.
docker build -t <put your registry here>/vvp-gateway:2.12.0-iceberg -f Dockerfile .
docker push <put your registry here>/vvp-gateway:2.12.0-iceberg
2. Build custom Flink image for SQL Session Clusters.
Create file named `Dockerfile-Flink` with the following content:
FROM registry.ververica.com/v2.12/flink:1.18.0-stream1-scala_2.12-java11
# For Flink SQL and VVP SQL Editor support
COPY jars/hadoop-3.3.4/share/hadoop/common/lib/* ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/common/hadoop-common-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/hdfs/hadoop-hdfs-client-3.3.4.jar ../opt/hadoop/
COPY jars/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar ../opt/hadoop/
# See more at https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#hadoop-dependencies
ENV HADOOP_CLASSPATH="/flink/opt/hadoop/*"
# For JAR-based Deployments support
COPY jars/hadoop-aws-3.3.4.jar ../lib/
COPY jars/aws-java-sdk-core-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-s3-1.12.262.jar ../lib/
COPY jars/aws-java-sdk-dynamodb-1.12.262.jar ../lib/
COPY jars/joda-time-2.12.7.jar ../lib/
Then build Flink image by running the following command:
docker build -t <put your registry here>/flink:1.18.0-iceberg -f Dockerfile-Flink .
docker push <put your registry here>/flink:1.18.0-iceberg
3. Use the newly built Gateway image in your VVP cluster. For that, set the repository and tag in the Helm values.yaml file and also configure other values.
# values.yaml gateway: image: repository: <your registry>/<your repository> tag: 2.12.0-iceberg vvp: flinkVersionMetadata: - flinkImageRegistry: <put your registry here> flinkImageRepository: flink flinkVersion: 1.18 imageTag: 1.18.0-iceberg useForSqlDeployments: true defaultFor: - 1.18 blobStorage:
baseUri: s3://<set bucket name here ...>
s3:
endpoint: <optionally set S3 endpoint address (it is needed for MinIO)>
globalSessionClusterDefaults: |
spec:
kubernetes:
jobManagerPodTemplate:
spec:
containers:
- name: flink-jobmanager
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
taskManagerPodTemplate:
spec:
containers:
- name: flink-taskmanager
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
blobStorageCredentials:
s3:
accessKeyId: <set value ...>
secretAccessKey: <set value ...>
env:
- name: HADOOP_CONF_DIR
value: /etc/hadoop/conf
volumes:
- name: hadoop-s3-creds
secret:
secretName: s3-creds-secret
volumeMounts:
- name: hadoop-s3-creds
mountPath: /etc/hadoop/conf
In summary, we set the following values in this file:
- Flink Image which includes Iceberg dependencies
- Gateway Image which includes Iceberg dependencies
- For S3: bucket name, endpoint (optionally), credentials
- `globalSessionClusterDefaults`: we set an environment variable to propagate S3 credentials to `hadoop-aws` library
- mounting secret: Hadoop XML file with S3 access configuration
4. Create K8s secret in your VVP namespace with the following content:
Important: Put your credentials and other values into the below XML configuration.
apiVersion: v1
kind: Secret
metadata:
name: s3-creds-secret
type: Opaque
stringData:
core-site.xml: |-
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value> put right value here !!! </value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value> put right value here !!! </value>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
<description>Enables or disables SSL connections to S3.</description>
</property>
<!--
Remove below property if you use AWS S3.
Below property is set for MinIO service running in the vvp namespace
-->
<property>
<name>fs.s3a.endpoint</name>
<value>http://minio.vvp.svc:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
</configuration>
Apply secret to the `vvp` and your jobs namespaces like `vvp-jobs`:
kubectl apply -f <your secret file name>.yaml -n vvp
kubectl apply -f <your secret file name>.yaml -n vvp-jobs
5. Install/upgrade your VVP instance with the configured values via Helm.
helm upgrade --install vvp ververica/ververica-platform \
--namespace vvp \
-f values.yaml
6. Now test Iceberg Catalog integration by creating catalog, database and table in the VVP SQL Editor. Use below script for test:
CREATE CATALOG icebergcatalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'catalog-database'='default',
'property-version'='1',
'warehouse'='s3a://<your bucket from the values.yaml>/icebergcatalog'
);
USE CATALOG icebergcatalog;
CREATE DATABASE `default`;
USE CATALOG icebergcatalog;
CREATE TABLE sample (
id BIGINT COMMENT 'unique id',
data STRING
);
7. Insert data to the Iceberg table through the new VVPDeployment:
USE CATALOG icebergcatalog;
INSERT INTO sample VALUES (1, 'a');
8. Read table data in SQL Editor using a Flink Session Cluster:
USE CATALOG icebergcatalog;
SELECT * FROM sample;
Integration with Flink DataStream API
Since we have installed the Iceberg connector dependencies into the Flink Docker image and set the Deployment Defaults in Ververica Platform, only the following steps are need:
1. First, build you job JAR file and use it in your VVP Deployment.
2. Before starting your VVP Deployment, add `iceberg-flink-runtime-1.17-1.4.3.jar` dependency to the list of Additional Dependencies of your VVP Deployment.