Question
How can I use Apache Flink's Hive Catalog and Flink SQL on Ververica Platform to easily read/write data from/to Apache Hive's External Tables that are using Avro Schema stored on S3?
Answer
Note: This section applies to Apache Flink 1.18.1 and Apache Hive 3.1.3 with Ververica Platform 2.12.1 or later.
Ververica Platform supports integrating additional catalogs such as Hive Catalog both by uploading them via UI and by building a custom docker image for the Gateway container. Ververica Platform's UI for uploading catalogs supports uploading of a single jar file only. Since external tables with Avro schema requires additional dependencies, specifically when Apache Hive is using S3 to store both the schema and the data, it is not possible to use Ververica Platform's UI. Therefore, this article will follow the approach of building a custom docker image for the Ververica Platform's gateway container with the required dependencies.
1) Apache Hive
Throughout the article, we will use a database on Apache Hive that uses S3 as the location. We will be creating an `events` database with an external table named `login` with two fields which has the Avro schema that is stored on S3. The initial step is to create an S3 bucket to store both the data and the schema of the table we will be using, followed by uploading the schema to that bucket. We will also store the scripts to run on Apache Hive CLI on S3, so that we can set up our environment on Apache Hive easily.
We will use the AWS CLI's `s3api`, specifically with `create-bucket`[1] and `put-object`[2] commands, to perform the creation of the bucket and uploading the schema file - `schema.avsc` and the scripts - `create_database.hql` and `create_table.hql`. Finally, we will run from Apache Hive CLI[3], which have access to your Apache Hive cluster, to create the database and the external table.
Following are the scripts to run,
#tabs
## AWS CLI Commands
## Create Bucket
aws s3api create-bucket <BUCKET_NAME>
## Upload Schema
aws s3api put-object --bucket <BUCKET_NAME> --key schemas/login_event.avsc --body <LOCAL_PATH_TO_LOGIN_EVENT.AVSC>
## Upload Create Database Script
aws s3api put-object --bucket <BUCKET_NAME> --key scripts/create_database.hql --body <LOCAL_PATH_TO_CREATE_DATABASE.HQL>
## Upload Create Table Script
aws s3api put-object --bucket <BUCKET_NAME> --key scripts/create_table.hql --body <LOCAL_PATH_TO_CREATE_TABLE.HQL>
## Apache Hive CLI Commands
## Create Database
hive -f 's3a://<BUCKET_NAME>/scripts/create_database.hql'
## Create Table
hive -f 's3a://<BUCKET_NAME>/scripts/create_table.hql'
#--
followed by the actual contents of the file in the preceding section.
#tabs
## login_event.avsc
{
"type": "record",
"name": "LoginEvent",
"fields": [
{
"name": "user_name",
"type": "string"
},
{
"name": "login_time",
"type": "long"
}
]
}
## create_database.hql
CREATE DATABASE events LOCATION 's3a://<BUCKET_NAME>/warehouse';
## create_table.hql
CREATE EXTERNAL TABLE events.login (user_name STRING, login_time BIGINT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.url' = 's3a://<BUCKET_NAME>/schemas/login_event.avsc')
STORED AS AVRO;
#--
2) Ververica Platform
The next step is to build a custom image for the gateway[4] container with the required dependencies, create a `configmap` on Kubernetes for storing `hive-site.xml` which has the connectivity information to Apache Hive for the Ververica Platform. Finally, we will deploy Ververica Platform with the `configmap` attached to the gateway container which uses the custom image with additional parameters in the `values.yaml` file using Helm[5].
Note: When using a different version of Ververica Platform, please check the latest[6] supported Apache Flink version for the specific Ververica Platform version and upgrade the Apache Flink version in the following scripts accordingly.
Following are the scripts to run,
#tabs
## Kubernetes Commands
## Create the configmap
kubectl apply -f configmap.yaml
## Docker Commands
## Build the image
docker build --tag <CONTAINER_REGISTRY>/v2.12/vvp-gateway:2.12.1 --file Dockerfile .
## Push the image
docker push <CONTAINER_REGISTRY>/v2.12/vvp-gateway:2.12.1
## Helm Commands
## Deploy Ververica Platform
helm install vvp ververica-platform \
--repo https://charts.ververica.com \
--namespace <VVP_NAMESPACE> \
--version 5.8.1 \
--values values.yaml
#--
followed by the actual contents of the file in the preceding section.
#tabs
## configmap.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: hive-metastore
namespace: <VVP_NAMESPACE>
data:
hive-site.xml: |
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://<HIVE_METASTORE_IP>:9083</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value><S3_ACCESS_KEY></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value><S3_SECRET_KEY></value>
</property>
</configuration>
## catalog-meta.yaml
catalog:
type: hive
packaged: true
readOnly: true
properties:
- key: type
required: true
- key: default-database
required: false
- key: property-version
required: false
- key: hive-conf-dir
required: false
- key: hive-version
required: false
- key: hadoop-conf-dir
required: false
## Dockerfile
FROM alpine/curl:3.14 AS dependency-downloader
RUN curl -o /tmp/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
RUN curl -o /tmp/flink-connector-files-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-files/1.18.1/flink-connector-files-1.18.1.jar
RUN curl -o /tmp/flink-s3-fs-hadoop-1.18.1.jar https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.18.1/flink-s3-fs-hadoop-1.18.1.jar
RUN curl -o /tmp/hadoop-mapreduce-client-core-3.3.6.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.3.6/hadoop-mapreduce-client-core-3.3.6.jar
RUN curl -o /tmp/hadoop-aws-3.3.6.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar
RUN curl -o /tmp/aws-java-sdk-bundle-1.12.664.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.664/aws-java-sdk-bundle-1.12.664.jar
# ----
FROM registry.ververica.com/v2.12/vvp-gateway:2.12.1
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/hive/catalog-meta.yaml
COPY --from=dependency-downloader /tmp/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
COPY --from=dependency-downloader /tmp/flink-connector-files-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-connector-files-1.18.1.jar
COPY --from=dependency-downloader /tmp/flink-s3-fs-hadoop-1.18.1.jar /vvp/sql/opt/catalogs/hive/flink-s3-fs-hadoop-1.18.1.jar
COPY --from=dependency-downloader /tmp/hadoop-mapreduce-client-core-3.3.6.jar /vvp/sql/opt/catalogs/hive/hadoop-mapreduce-client-core-3.3.6.jar
COPY --from=dependency-downloader /tmp/hadoop-aws-3.3.6.jar /vvp/sql/opt/catalogs/hive/hadoop-aws-3.3.6.jar
COPY --from=dependency-downloader /tmp/aws-java-sdk-bundle-1.12.664.jar /vvp/sql/opt/catalogs/hive/aws-java-sdk-bundle-1.12.664.jar
## values.yaml
vvp:
...Ververica Platform configuration...
gateway:
image:
repository: <CONTAINER_REGISTRY>/v2.12/vvp-gateway
env:
- name: JAVA_TOOL_OPTIONS
value: "--add-opens=java.base/java.net=ALL-UNNAMED"
volumeMounts:
- name: hive-site
mountPath: /etc/hive
volumes:
- name: hive-site
configMap:
name: hive-metastore
#--
3) Flink SQL
Finally, we will register Hive Catalog using Flink SQL, define a pipeline that populates data and define an extract-transform-load pipeline which aggregates the data to Elasticsearch for visualisation purposes.
#tabs
## Create Catalog
CREATE CATALOG hive
WITH (
'type' = 'hive',
'hive-version' = '3.1.3',
'hive-conf-dir' = '/etc/hive'
);
## Populate Data
INSERT INTO hive.events.login
VALUES ('john_doe', 1708723294000), ('jane_doe', 1708723294000);
## Create Sink
CREATE DATABASE vvp.events_analytics;
CREATE VIEW vvp.events_analytics.login_events_year_month AS (
SELECT
user_name,
CONCAT(
CAST(YEAR(TO_TIMESTAMP_LTZ(login_time, 3)) AS STRING),
'_',
CAST(MONTH(TO_TIMESTAMP_LTZ(login_time, 3)) AS STRING)
) AS yyyy_mm
FROM hive.events.login
);
CREATE TABLE vvp.events_analytics.login_events_count_per_month (
user_name STRING,
yyyy_mm STRING,
login_count BIGINT,
PRIMARY KEY (user_name, yyyy_mm) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://<ELASTICSEARCH_HOST>:<ELASTICSEARCH_PORT>',
'index' = 'login_events_count_per_month'
);
## Extract Transform Load
INSERT INTO vvp.events_analytics.login_events_count_per_month
SELECT
user_name,
yyyy_mm,
COUNT(*) AS login_count
FROM vvp.events_analytics.login_events_year_month
GROUP BY user_name, yyyy_mm;
#--
4) Conclusion
Ververica Platform supports the integration of additional catalogs in a flexible way[7] for different use case scenarios. When Apache Hive's external tables are using Avro Schema stored on S3, an S3 access is needed for Apache Flink's Catalog API to fetch the schema from S3 in order to fully parse the metadata information of the table, otherwise returns with an error schema.
Therefore, we have used both `hadoop-aws`[8] and `aws-java-sdk-bundle` in our gateway image where the catalog access operations are executed. Since external tables' actual data resides in filesystem and not in Apache Hive's warehouse, we have also used Apache Flink's Filesystem connector.
We have also used AWS's `SimpleAWSCredentialsProvider` with an access key and a secret key for the demo purposes. However, in a production environment, IAM role based permission policy is suggested for accessing both the schema and the data residing on S3.
If you would like to see all the setup working in place, feel free to check the sandbox project which deploys Apache Hive on EMR and integrates with Ververica Platform using Terraform[9].
3) References
[1]: https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html
[2]: https://docs.aws.amazon.com/cli/latest/reference/s3api/put-object.html
[3]: https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-RunningHiveCLI
[4]: https://ververica.zendesk.com/hc/en-us/articles/360013488139
[5]: https://docs.ververica.com/vvp/installation-and-upgrades/install-with-helm
[6]: https://docs.ververica.com/vvp/installation-and-upgrades/vvp-docker-images
[7]: https://docs.ververica.com/vvp/user-guide/sql-development/catalogs-databases
[8]: https://hadoop.apache.org/docs/r3.3.6/hadoop-aws/tools/hadoop-aws/index.html
[9]: https://github.com/alibahadirzeybek/sandbox/tree/vvp-hive-catalog