Question
RocksDB's LOG file comes in handy when troubleshooting Flink with RocksDB. How can I configure RocksDB logging?
Answer
Note: This section applies to Flink 1.10 or later
By default, Flink uses the log level `HEADER_LEVEL` for RocksDB. This essentially disables RocksDB logging and only prints RocksDB configuration to its log file. The main reason for this behavior is that this log file is not controllable in size prior to Flink 1.14. You can change these defaults but beware of the consequences.
How to configure RocksDB logging depends on the version of Flink you are using. Flink 1.13 or later supports changing RocksDB log level via configuration. Flink 1.14 additionally supports specifying the logging directory so you can, for example, put it onto a (separate) volume that is retained after container shutdown and can be used for debugging purposes. Thanks to the new RocksDB version, you can also configure log rotation in Flink 1.14 or later. For example:
#tabs
##Flink 1.14 or later
state.backend.rocksdb.log.level: INFO_LEVEL
state.backend.rocksdb.log.max-file-size: 10MB
state.backend.rocksdb.log.file-num: 10
# This is set to Flink's log directory in Flink 1.15+ by default
state.backend.rocksdb.log.dir: /rocksdb/logs
##Flink 1.13
state.backend.rocksdb.log.level: INFO_LEVEL
#--
For older Flink versions and more low-level control, you can define your own custom Options Factory. See details below.
Custom Options Factory
If you are using older versions of Flink (<1.13), you want to configure logging directory in Flink 1.13, or you want to dump RocksDB statistics in RocksDB's `LOG` file, you can create a custom Options Factory by extending Flink's `DefaultConfigurableOptionsFactory`. This mechanism gives you options to configure RocksDB logging while still allowing your jobs to continue using any other RocksDB tuning options the way you used them before.
Important: `DefaultConfigurableOptionsFactory` was not really meant for being extended and may change among releases. If you plan to take this into production, you should write your own complete `ConfigurableRocksDBOptionsFactory` and set all the options you need in there.
Extending `DefaultConfigurableOptionsFactory`
First, if you have not done so yet, add a dependency to Flink's RocksDB state backend. For example, add this to your Maven project's `pom.xml`:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Then create your own Options Factory:
package com.ververica.troubleshooting;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel; public class MyCustomRocksDBOptionsFactory extends DefaultConfigurableOptionsFactory { private static final long serialVersionUID = 1L; private String dbLogDir = ""; @Override public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { currentOptions = super.createDBOptions(currentOptions, handlesToClose); currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL); currentOptions.setStatsDumpPeriodSec(60); currentOptions.setDbLogDir(dbLogDir); return currentOptions; } @Override public String toString() { return this.getClass().toString() + "{" + super.toString() + '}'; } /** * Set directory where RocksDB writes its info LOG file (empty = data dir, * otherwise the data directory's absolute path will be used as the log
* file prefix). */ public void setDbLogDir(String dbLogDir) { this.dbLogDir = dbLogDir; } }
Three points in `createDBOptions` are important here:
- `setInfoLogLevel(InfoLogLevel.INFO_LEVEL)` sets the logging level to INFO from which you would get a decent amount of logging data (increase if needed)
- `setStatsDumpPeriodSec(60)` dumps various RocksDB statistics every this many seconds: this includes compaction statistics.
- `setDbLogDir(dbLogDir)` specifies the path where to put the `LOG` file: depending on what you are trying to troubleshoot, you can just use a local directory or you may need to put this onto a distributed file system (or persistent volume) to survive node/pod/job restarts
Configuring Flink
With the custom Options Factory, you can configure Flink either programmatically or through its `flink-conf.yaml`.
(1) Programmatically
Note: The state backend interface is changed since Flink 1.13; we provide both versions below.
#tabs
##Flink 1.13 or later
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend()
env.setStateBackend(stateBackend);
env.getCheckpointConfig().setCheckpointStorage("file:///path/to/checkpoints");
## Flink 1.12 or earlier
RocksDBStateBackend stateBackend =
new RocksDBStateBackend("file:///path/to/checkpoints");
#--
Then
MyCustomRocksDBOptionsFactory options = new MyCustomRocksDBOptionsFactory();
options.setDbLogDir("/path/to/rocksdb/logging/");
stateBackend.setRocksDBOptions(options);
(2) Via Flink configuration (`flink-conf.yaml`)
If you want to configure your options factory completely via `flink-conf.yaml`, you can extend the code above to update its settings from the configuration. The code below exemplifies how to do this for the log directory.
Note: The interface slightly changed since Flink 1.11; we provide both versions below.
#tabs
## Flink 1.11 or later
public static final ConfigOption<String> LOG_DIR = key("state.backend.rocksdb.log.dir") .stringType() .noDefaultValue()
.withDescription("Location of RocksDB's info LOG file " +
"(empty = data dir, otherwise the data directory's " +
"absolute path will be used as the log file prefix)"); @Override public DefaultConfigurableOptionsFactory configure(ReadableConfig configuration) { DefaultConfigurableOptionsFactory optionsFactory = super.configure(configuration); this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir); return optionsFactory; }
## Flink 1.10
public static final ConfigOption<String> LOG_DIR = key("state.backend.rocksdb.log.dir") .stringType() .noDefaultValue()
.withDescription("Location of RocksDB's info LOG file " +
"(empty = data dir, otherwise the data directory's " +
"absolute path will be used as the log file prefix)"); @Override public DefaultConfigurableOptionsFactory configure(Configuration configuration) { DefaultConfigurableOptionsFactory optionsFactory = super.configure(configuration); this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir); return optionsFactory; }
#--
With the code additions from above, you can simply adapt your `flink-conf.yaml` and configure it like this:
state.backend.rocksdb.log.dir: /path/to/rocksdb/logging/
state.backend.rocksdb.options-factory: com.ververica.troubleshooting.MyCustomRocksDBOptionsFactory
Note: Configuring the options factory via `flink-conf.yaml` will apply the options factory to all jobs started in the Flink cluster. Make sure that this class is available cluster-wide or in all jobs started on this cluster! For Ververica Platform deployments, this will not be a problem since each deployment spawns its own Flink cluster.
Related Information
- Flink docs: State Backends
- FLINK-15068 - Disable RocksDB's local LOG by default
- FLINK-15747 - Enable setting RocksDB log level from configuration
- FLINK-20911 - Support configuration of RocksDB log level
- FLINK-23812 - Support RocksDB log rotation via configuration
- RocksDB Tuning Guide: RocksDB Statistics