diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java index 2a4d7b26c7316..e6e0e65d46e54 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java @@ -19,6 +19,7 @@ package org.apache.flink.state.rocksdb.sstmerge; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionOptions; import org.rocksdb.RocksDB; @@ -51,7 +52,13 @@ public Compactor(CompactionTarget target, long targetOutputFileSize) { } void compact(ColumnFamilyHandle cfName, int level, List files) throws RocksDBException { - int outputLevel = Math.min(level + 1, cfName.getDescriptor().getOptions().numLevels() - 1); + // FLINK-39753: getDescriptor() allocates a new native ColumnFamilyOptions on every call. + // Closing it is required to release the native object and its reference to the shared + // block cache; leaking it keeps the LRUCache alive and grows native memory until OOM. + final int outputLevel; + try (ColumnFamilyOptions cfOptions = cfName.getDescriptor().getOptions()) { + outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1); + } LOG.debug( "Manually compacting {} files from level {} to {}: {}", files.size(),