[SPARK-35788][SS] Metrics support for RocksDB instance

### What changes were proposed in this pull request?
Add more metrics for the RocksDB instance. We transform the native states from RocksDB.

### Why are the changes needed?
Improve the usability with more metrics for RocksDB instance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #32934 from xuanyuanking/SPARK-35788.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yuanjian Li 2021-07-06 11:12:21 +09:00 committed by Jungtaek Lim
parent 8b46e26fc6
commit 9544277b0a
2 changed files with 118 additions and 1 deletions

View file

@ -26,6 +26,8 @@ import scala.ref.WeakReference
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.apache.spark.TaskContext
@ -72,6 +74,7 @@ class RocksDB(
dbOptions.setTableFormatConfig(tableFormatConfig)
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
dbOptions.setStatistics(new Statistics())
private val nativeStats = dbOptions.statistics()
private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager(
@ -84,6 +87,7 @@ class RocksDB(
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
@GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
@ -105,6 +109,7 @@ class RocksDB(
numKeysOnWritingVersion = metadata.numKeys
numKeysOnLoadedVersion = metadata.numKeys
loadedVersion = version
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
}
writeBatch.clear()
logInfo(s"Loaded $version")
@ -223,6 +228,7 @@ class RocksDB(
}
numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
commitLatencyMs ++= Map(
"writeBatch" -> writeTimeMs,
"flush" -> flushTimeMs,
@ -231,6 +237,7 @@ class RocksDB(
"checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs
)
logInfo(s"Committed $newVersion, stats = ${metrics.json}")
loadedVersion
} catch {
case t: Throwable =>
@ -283,6 +290,30 @@ class RocksDB(
/** Get the latest version available in the DFS */
def getLatestVersion(): Long = fileManager.getLatestVersion()
/** Get current instantaneous statistics */
def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
}
RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
readerMemUsage + memTableMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros.toMap,
commitLatencyMs,
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed)
}
private def acquire(): Unit = acquireLock.synchronized {
val newAcquiredThreadInfo = AcquiredThreadInfo()
val waitStartTime = System.currentTimeMillis
@ -314,6 +345,10 @@ class RocksDB(
acquireLock.notifyAll()
}
private def getDBProperty(property: String): Long = {
db.getProperty(property).toLong
}
private def openDB(): Unit = {
assert(db == null)
db = NativeRocksDB.open(dbOptions, workingDir.toString)
@ -388,7 +423,6 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
/**
* Configurations for optimizing RocksDB
*
* @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing
*/
case class RocksDBConf(
@ -442,6 +476,42 @@ object RocksDBConf {
def apply(): RocksDBConf = apply(new StateStoreConf())
}
/** Class to represent stats from each commit. */
case class RocksDBMetrics(
numCommittedKeys: Long,
numUncommittedKeys: Long,
memUsageBytes: Long,
totalSSTFilesBytes: Long,
nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram],
lastCommitLatencyMs: Map[String, Long],
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long]) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
object RocksDBMetrics {
val format = Serialization.formats(NoTypeHints)
}
/** Class to wrap RocksDB's native histogram */
case class RocksDBNativeHistogram(
avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
object RocksDBNativeHistogram {
def apply(nativeHist: HistogramData): RocksDBNativeHistogram = {
RocksDBNativeHistogram(
nativeHist.getAverage,
nativeHist.getStandardDeviation,
nativeHist.getMedian,
nativeHist.getPercentile95,
nativeHist.getPercentile99)
}
}
case class AcquiredThreadInfo() {
val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread())
val tc: TaskContext = TaskContext.get()

View file

@ -29,6 +29,7 @@ import scala.collection.mutable
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
@ -134,6 +135,22 @@ class RocksDBFileManager(
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
}
/**
* Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
/**
* Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
def latestLoadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
@ -336,6 +353,11 @@ class RocksDBFileManager(
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
immutableFiles
}
@ -387,6 +409,11 @@ class RocksDBFileManager(
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.")
loadCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
}
/** Get the SST files required for a version from the version zip file in DFS */
@ -420,6 +447,9 @@ class RocksDBFileManager(
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
// The other fields saveCheckpointMetrics should have been filled
saveCheckpointMetrics =
saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
@ -486,6 +516,23 @@ class RocksDBFileManager(
}
}
/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
case class RocksDBFileManagerMetrics(
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
zipFileBytesUncompressed: Option[Long] = None)
/**
* Metrics to return when requested but no operation has been performed.
*/
object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}
/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.