Question
Flink 1.10 essentially removed RocksDB's `LOG` file where all of its operations, and statistics where logged for advanced troubleshooting because this file was not controllable in size. When troubleshooting Flink with RocksDB, however, this file may come in handy. How can you re-populate this `LOG` file?
Answer
Note: This section applies to Flink 1.10 - 1.11
Flink's RocksDB configuration can be fine-tuned by setting one of the PredefinedOptions (either by `flink-conf.yaml` or programmatically) or by setting an own OptionsFactory. In order to configure the `LOG` file, we currently have to create our own OptionsFactory which we will present below. The following code extends Flink's `DefaultConfigurableOptionsFactory` for your jobs to continue using any RocksDB tuning options the way you used them before and just extends this by populating the `LOG` file again and allowing you to specify its location.
Important: `DefaultConfigurableOptionsFactory` was not really meant for being extended and may change among releases. If you plan to take this into production, you should write your own complete `ConfigurableRocksDBOptionsFactory` and set all the options you need in there.
Custom Options Factory
First, if you have not done so yet, add a dependency to Flink's RocksDB state backend. For example, add this to your Maven project's `pom.xml`:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
package com.ververica.troubleshooting;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel; public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory { private static final long serialVersionUID = 1L; private String dbLogDir = ""; @Override public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { currentOptions = super.createDBOptions(currentOptions, handlesToClose); currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL); currentOptions.setStatsDumpPeriodSec(60); currentOptions.setDbLogDir(dbLogDir); return currentOptions; } @Override public String toString() { return this.getClass().toString() + "{" + super.toString() + '}'; } /** * Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the * data directory's absolute path will be used as the log file prefix). */ public void setDbLogDir(String dbLogDir) { this.dbLogDir = dbLogDir; } }
Three points in `createDBOptions` are important here:
- `setInfoLogLevel(InfoLogLevel.INFO_LEVEL)` sets the logging level to INFO from which you would get a decent amount of logging data (increase if needed)
- `setStatsDumpPeriodSec(60)` dumps various RocksDB statistics every this many seconds: this includes compaction statistics.
- `setDbLogDir(dbLogDir)` specifies the path where to put the `LOG` file: depending on what you are trying to troubleshoot, you can just use a local directory or you may need to put this onto a distributed file system (or persistent volume) to survive node/pod/job restarts
Configuring Flink
With this, you can configure Flink either programmatically or through its `flink-conf.yaml`.
Programmatically
RocksDBStateBackend stateBackend = new RocksDBStateBackend("file:///path/to/checkpoints");
DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
options.setDbLogDir("/path/to/rocksdb/logging/");
stateBackend.setRocksDBOptions(options);
Via Flink configuration (`flink-conf.yaml`)
If you want to configure your options factory completely via `flink-conf.yaml`, we may extend the code above to update its settings from the configuration. The code below shows how to do this for the log directory but you can extend it to make further settings configurable.
Note: The interface slightly changed between Flink 1.10 and 1.11; we provide both versions below.
Flink 1.11
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(ReadableConfig configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
Flink 1.10
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the " +
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
Flink configuration
With the code additions from above, you can simply adapt your `flink-conf.yaml` and configure it like this:
state.backend.rocksdb.log.dir: /path/to/rocksdb/logging/
state.backend.rocksdb.options-factory: com.ververica.troubleshooting.DefaultConfigurableOptionsFactoryWithLog
Note: Configuring the options factory via `flink-conf.yaml` will apply the options factory to all jobs started in the Flink cluster. Make sure that this class is available cluster-wide or in all jobs started on this cluster!
(For Ververica Platform deployments, this will not be a problem since each deployment spawns its own Flink cluster.)