Question
I want to use Ververica Platform's Flink SQL development functionalities [1] with Apache Pulsar. How to integrate Apache Pulsar metadata with Ververica Platform so that I can directly run Flink SQL queries that uses Apache Pulsar as source/sink without having to register each topic manually?
Answer
Note: This section applies to Flink 1.15 with Ververica Platform 2.8.
Overview
Apache Pulsar [2] is a distributed, open source pub-sub messaging and streaming platform for real-time workloads. Apache Flink's Apache Pulsar DataStream API connector [3] is maintained by the Apache Flink community. However, Apache Pulsar Table API connector as well as Apache Pulsar Catalog is still released as part of the Stream Native's fork [4] of Apache Flink.
Currently, Apache Pulsar Catalog implements two separate mechanisms, one is used to infer the metadata information for already available Apache Pulsar tenants, namespaces and the topics automatically, named native tables. Whereas explicit table is used to persist the metadata information of the tables registered via Flink SQL/Table API in an Apache Pulsar topic.
For the purpose of this KB article, we only want a version of the Apache Pulsar Catalog that serves the purpose of the native tables integration since Ververica Platform already has its built-in VVP catalog [5] for persisting the metadata information of the the tables registered via Flink SQL. Since there is not yet such separation exists within Apache Pulsar Catalog, we will go ahead and use a custom implementation of the Apache Pulsar Catalog, which is basically a read-only version for the native tables. The current implementation is available at the specific fork of the Apache Flink [6] based on the 1.15.2 release.
Setup
Once you clone [6] locally, you will build the project by running
./mvnw clean package -DskipTests
in the root folder (flink) of the project. Once it completes, you will have the necessary artifact (flink-sql-connector-pulsar-1.15.2.jar) for the Pulsar SQL Connector and the Pulsar Catalog available at the following path in the project:
flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15.2.jar
The next step is the add the connector and the catalog definitions to Ververica Platform's Gateway container by building a custom docker image using the following resources:
#tabs
## catalog-meta.yaml
catalog:
type: pulsar
packaged: true
readOnly: true
properties:
- key: type
required: true
- key: catalog-admin-url
required: true
- key: catalog-service-url
required: true
## connector-meta.yaml
connector:
type: pulsar
packaged: true
source: true
sink: true
lookup: false
supportedFormats:
- json
## Dockerfile
FROM registry.ververica.com/v2.8/vvp-gateway:2.8.1
USER root:root
RUN mkdir -p /vvp/sql/opt/catalogs/pulsar
COPY catalog-meta.yaml /vvp/sql/opt/catalogs/pulsar/catalog-meta.yaml
COPY flink-sql-connector-pulsar-1.15.2.jar /vvp/sql/opt/catalogs/pulsar/flink-sql-connector-pulsar-1.15.2.jar
RUN mkdir -p /vvp/sql/opt/connectors/pulsar
COPY connector-meta.yaml /vvp/sql/opt/connectors/pulsar/connector-meta.yaml
COPY flink-sql-connector-pulsar-1.15.2.jar /vvp/sql/opt/connectors/pulsar/flink-sql-connector-pulsar-1.15.2.jar
USER 999:999
#--
Once you have Ververica Platform up and running with the packaged Pulsar SQL Connector and Pulsar Catalog, you can create a catalog, run queries in the SQL Editor and create SQL Deployments that can use Apache Pulsar topics as a source/sink without having to register them manually.
Bear in mind that at the moment, ReadOnlyPulsarCatalog implementation only supports json format. Also, both the current implementation of the ReadOnlyPulsarCatalog and PulsarCatalog expects the schema information to be available for the given topic in the Apache Pulsar cluster [7].
Example
#tabs
## CREATE CATALOG
CREATE CATALOG pulsar
WITH (
'type' = 'pulsar',
'catalog-admin-url' = 'http://<PULSAR_HOST>:<ADMIN_PORT>',
'catalog-service-url' = 'pulsar://<PULSAR_HOST>:<DATA_PORT>'
);
## SELECT FROM
SELECT *
FROM `pulsar`.`<PULSAR_TENANT>/<PULSAR_NAMESPACE>`.`<PULSAR_TOPIC>`;
#--