Question
My Flink job requires many properties as input. How can I inject them with Ververica Platform?
The properties could, for example, be names of Kafka topics the job consumes from or produces to, the Elasticsearch cluster the job uses, locations of the key/trust stores and their passwords, prefixes/suffixes of operator names to be set in the job, etc.
Answer
Note: This applies to Ververica Platform 2.0 - 2.8
Before we describe the approaches to inject properties, let us first have a look at how you can configure Flink jobs on Ververica Platform. Flink jobs are encapsulated in Ververica Platform Deployments (deployment for short). To run a Flink job, you create a deployment and start it. Ververica Platform provides three editors to create a deployment: Standard, Advanced, and YAML:
While the Standard editor offers the possibility to specify basic information like a job jar URI, an entry point class, and parallelism, with the Advanced editor, you can further specify a restore strategy, resource requirements, pod labels, log levels, and more. The YAML editor provides the full flexibility to specify everything which is supported by Ververica Platform like environment variables and volume mounts of job pods. In the following section, we will demonstrate five approaches to inject properties using these editors as well as their pros and cons.
mainArgs
Just like any Java program, you can provide properties as arguments to the `main()` method of your job jar’s entry point class. You specify those arguments as `mainArgs` via the Standard/Advanced/YAML editor. The deployment specification looks like the following in YAML:
spec: template: spec: artifact: jarUri: https://artifacts/flink-job.jar mainArgs: --arg1 value1 --arg2 value2
To retrieve your properties in your job, you can use the Flink class ParameterTool:
ParameterTool params = ParameterTool.fromArgs(args); String arg1 = params.get("arg1");
String arg2 = params.get("arg2");
Using the `mainArgs` is the simplest way to inject properties.
Important: If you have sensitive property values which must not appear in clear text in the deployment specification or the process command line, check the section on Environment Variables below.
JVM System Properties
You can also specify properties as Java System Properties. To do so, you set `env.java.opts` in the additional Flink configuration section of the Advanced/YAML editor. The deployment specification looks like the following in YAML:
spec: template: spec: flinkConfiguration: env.java.opts: '-Dsysprop1=value1 -Dsysprop2=value2'
Tip: You also have the option to use `env.java.opts.jobmanager` and `env.java.opts.taskmanager` if you want to set it differently for `jobmanager` and `taskmanager`.
You can retrieve system properties with the Java class System:
System.getProperty("sysprop1");
System.getProperty("sysprop2");
Note: System properties defined like this are set for the whole Flink (session) cluster. But if you use a dedicated Flink Job Cluster for your deployment, they would be set for your job only.
Important: Like `mainArgs`, the properties set here are also shown in clear text in the deployment specification.
Environment Variables
Another approach to specify properties is to provide them as environment variables. In Ververica Platform, you can use the recommended Flink Pod Template via the YAML editor. The following shows various ways to specify an environment variable:
spec: template: spec: kubernetes: taskManagerPodTemplate: # for jobmanager, use "jobManagerPodTemplate" spec: containers: # for init containers, use "initContainers" - name: flink-taskmanager # for jobmanager, use "flink-jobmanager" env: - name: envvar1 value: value1 # the value is provided directly - name: envvar2 # from the value of the key 'envvar2' in 'jobprops' valueFrom: configMapKeyRef: key: envvar2 name: jobprops - name: envvar3 # from the value of the key 'password' in 'jobsecret' valueFrom: secretKeyRef: key: password name: jobsecret envFrom: # all key/value pairs in the configMap 'another-config'
# are exposed as environment variables - configMapRef: name: another-config
Environment variable values can be set directly in YAML (`envvar1`), from a Kubernetes ConfigMap (`envvar2`), or from a Kubernetes Secret (`envvar3`). Using `envFrom`, you can also expose all key/value pairs in a configMap as environment variables. The referenced ConfigMap and Kubernetes Secrets must reside in the same Kubernetes namespace as your deployment.
Warning: you can also set the environment variables in job pods using the legacy `envVars` which applies to both jobmanager and taskmanager pods. But you cannot use both `kubernetes.pods` and `kubernetes.taskManagerPodTemplate` (or `jobmanagerPodTemplate`) at the same time. We recommend the latter for the maximum flexibility and standard compatibility.
As with any other Java program, values of environment variables can be retrieved via the Java class System:
Map<String, String> env = System.getenv(); String envvar1 = env.get("envvar1");
String envvar2 = env.get("envvar2");
String envvar3 = env.get("envvar3");
The advantage of this approach is that it can hide sensitive information by referencing Kubernetes Secrets. On the other hand, creating ConfigMaps and Secrets requires that users who submit jobs have direct access to the Kubernetes cluster. This might be a problem in an organization where the Kubernetes cluster is managed by a dedicated team and no other team has direct access.
Additional Dependencies
Since Ververica Platform 2.3, in addition to your job jar, you can also provide additional dependencies to your job, such as a properties file you uploaded to the universal blob storage or stored in an HTTP location.
Once started, your job can read the properties from the additional dependency files which are available under `/flink/usrlib`, e.g., using the Java class Properties.
Tip: Additional dependencies are not limited to properties files, it can be any file your job needs.
Volume Mounts
You can also provide (property) files to your job's pods using the recommended Flink Pod Template via the YAML editor
spec: template: spec: kubernetes: taskManagerPodTemplate: spec: containers: - name: flink-taskmanager volumeMounts: - name: volume-config mountPath: /volumeConfig - name: volume-secret mountPath: /volumeSecret volumes: - name: volume-config configMap: name: custom-config - name: volume-secret secret: secretName: userpass
or via the legacy approach. Then you can simply read the files under `mountPath` in your job code.
Volume mounts provide the most flexible way to inject not only key-value based properties but also any file to your Flink Jobs. The volume can be any volume that is supported by Kubernetes. The only thing is that you need a few lines in your job code to read those files.
Tip: You can also mount Kubernetes secrets this way. How to set up application-specific resource access in Ververica Platform has a section with details in this approach.
Final Remarks
So far, we have demonstrated five approaches to inject properties into Flink jobs running on Ververica Platform. You can combine these approaches as needed: you can, for example, hide sensitive information behind environment variables while using `mainArgs` for frequently changed properties.
Tip: If you have some properties which are static for all jobs in one or all Ververica Platform Namespace(s), you can also set them in the Deployment Defaults (available in Ververica Platform 2.1 and later) at the namespace level or the global level, thus you do not have to specify those static properties for each individual deployment.
There are, of course, further custom techniques to retrieve properties for your job, for example by letting your job's `main` method fetch a network resource itself. These are, however, out of the scope of this article.
Warning: You may also be tempted using the Flink configuration for making your properties available to your job. We actually discourage this option because it relies on an internal API to retrieve the `Configuration` object that is created from `flink-conf.yaml` and because your custom options may eventually clash with Flink-internal options at future Flink upgrades.
Related Information