My Flink job requires many properties as input. How can I inject these with Ververica Platform?
These properties could, for example, be names of Kafka topics the job consumes from or produces to, the Elasticsearch cluster the job uses, locations of the key/trust stores and their passwords, prefixes/suffixes of operator names to be set in the job, etc.
Note: This applies to Ververica Platform 2.0 - 2.3
Before we describe the approaches to inject properties, let us first have a look at how you can configure Flink jobs on Ververica Platform. Flink jobs are encapsulated in Ververica Platform Deployments (deployment for short). To run a Flink job, you create a deployment and start it. Ververica Platform provides three editors to create a deployment: Standard, Advanced and YAML:
While the Standard editor offers the possibility to specify basic information like a job jar URI, an entry point class and a parallelism, with the Advanced editor, you can further specify a restore strategy, resource requirements, pod labels, log levels, and more. The YAML editor provides the full flexibility to specify everything which is supported by Ververica Platform like environment variables and volume mounts of job pods. In the following section, we will demonstrate four approaches to inject properties using these editors as well as their pros and cons.
Just like any Java program, you can provide properties as arguments to the `main()` method of your job jar’s entry point class. You specify those arguments as `mainArgs` via the Standard/Advanced/YAML editor. The deployment specification looks like the following in YAML:
spec: template: spec: artifact: jarUri: https://artifacts/flink-job.jar mainArgs: --arg1 value1 --arg2 value2
To retrieve your properties in your job, you can use the Flink class ParameterTool:
ParameterTool params = ParameterTool.fromArgs(args); String arg1 = params.get("arg1");
String arg2 = params.get("arg2");
Using the `mainArgs` is the simplest way to inject properties.
Important: If you have sensitive property values which should not appear in clear text in the deployment specification or the process command line, check the section on Environment Variables below.
JVM System Properties
You can also specify properties as Java System Properties. To do so, you set `env.java.opts` in the additional Flink configuration section of the Advanced/YAML editor. The deployment specification looks like the following in YAML:
spec: template: spec: flinkConfiguration: env.java.opts: '-Dsysprop1=value1 -Dsysprop2=value2'
Tip: You also have the option to use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` if you want to set it differently for `jobmanager` and `taskmanager`.
You can retrieve system properties with the Java class System:
Note: Even though system properties defined like this are set for the whole Flink cluster, they would nonetheless be specific to a single job since Ververica Platform deploys a seperate Flink Job Cluster for each deployment.
Important: Like `mainArgs`, the properties set here are also shown in clear text in the deployment specification.
Another approach to specify properties is to provide them as environment variables. In Ververica Platform, you can set the environment variables in job pods using `envVars` via the YAML editor:
spec: template: spec: kubernetes: pods: envVars: - name: envvar1 value: value1 - name: envvar2 valueFrom: configMapKeyRef: key: envvar2 name: jobprops - name: envvar3 valueFrom: secretKeyRef: key: password name: jobsecret
Environment variable values can be set directly in YAML (`envvar1`), from a Kubernetes ConfigMap (`envvar2`), or from a Kubernetes Secret (`envvar3`). The referenced ConfigMap and Kubernetes Secrets must reside in the same Kubernetes namespace as your deployment.
As with any other Java program, values of environment variables can be retrieved via the Java class System:
Map<String, String> env = System.getenv(); String envvar1 = env.get("envvar1");
String envvar2 = env.get("envvar2");
String envvar3 = env.get("envvar3");
Specifying properties as environment variables in this way is a bit verbose. The advantage of this approach is that it can hide sensitive information by referencing Kubernetes Secrets. On the other hand, creating ConfigMaps and Secrets requires that users who submit jobs have direct access to the Kubernetes cluster. This might be a problem in an organization where the Kubernetes cluster is managed by a dedicated team and no other team has direct access.
You can also provide (property) files to your job's pods by using a Kubernetes volume mount. This can be done via the YAML editor as shown here for an NFS volume:
spec: template: spec: kubernetes: pods: volumeMounts: - name: jobprops volume: name: jobprops nfs: server: 10.1.1.1 path: /myjob/props volumeMount: name: jobprops mountPath: /job/props
Suppose you have a file `job.properties` in the NFS volume, then you can simply load its properties into your job by reading the content of the file `/job/props/job.properties` using the Java class Properties.
Volume mounts provide the most flexible way to inject not only key-value based properties but also any file to your Flink Jobs. The volume can be any volume which is supported by Kubernetes. The only thing is that you need a few lines in your job code to read those files.
Tip: You can also mount Kubernetes secrets this way. How to set up application-specific resource access in Ververica Platform has a section with details in this approach.
So far, we have demonstrated four approaches to inject properties into Flink jobs running on Ververica Platform. You can combine these approaches as needed: you can, for example, hide sensitive information behind environment variables while using `mainArgs` for frequently changed properties.
Tip: If you have some properties which are static for all jobs in one or all Ververica Platform Namespace(s), you can also set them in the Deployment Defaults (available in Ververica Platform 2.1 and later) at the namespace level or the global level, thus you do not have to specify those static properties for each individual deployment.
There are, of course, further custom techniques to retrieve properties for your job, for example by letting your job's `main` method fetch a network resource itself. These are, however, out of the scope of this article.
Warning: You may also be tempted using the Flink configuration for making your properties available to your job. We actually discourage this option because it relies on an internal API to retrieve the `Configuration` object that is created from `flink-conf.yaml` and because your custom options may eventually clash with Flink-internal options at future Flink upgrades.