[SPARK-35788][SS] Metrics support for RocksDB instance

### What changes were proposed in this pull request?
Add more metrics for the RocksDB instance. We transform the native states from RocksDB.

### Why are the changes needed?
Improve the usability with more metrics for RocksDB instance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #32934 from xuanyuanking/SPARK-35788.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 9544277b0a)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yuanjian Li 2021-07-06 11:12:21 +09:00 committed by Jungtaek Lim
parent 0df89d8999
commit 22b303a648
2 changed files with 118 additions and 1 deletions

View file

@ -26,6 +26,8 @@ import scala.ref.WeakReference
import scala.util.Try import scala.util.Try
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.apache.spark.TaskContext import org.apache.spark.TaskContext
@ -72,6 +74,7 @@ class RocksDB(
dbOptions.setTableFormatConfig(tableFormatConfig) dbOptions.setTableFormatConfig(tableFormatConfig)
private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
dbOptions.setStatistics(new Statistics()) dbOptions.setStatistics(new Statistics())
private val nativeStats = dbOptions.statistics()
private val workingDir = createTempDir("workingDir") private val workingDir = createTempDir("workingDir")
private val fileManager = new RocksDBFileManager( private val fileManager = new RocksDBFileManager(
@ -84,6 +87,7 @@ class RocksDB(
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded
@volatile private var numKeysOnLoadedVersion = 0L @volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L @volatile private var numKeysOnWritingVersion = 0L
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
@GuardedBy("acquireLock") @GuardedBy("acquireLock")
@volatile private var acquiredThreadInfo: AcquiredThreadInfo = _ @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
@ -105,6 +109,7 @@ class RocksDB(
numKeysOnWritingVersion = metadata.numKeys numKeysOnWritingVersion = metadata.numKeys
numKeysOnLoadedVersion = metadata.numKeys numKeysOnLoadedVersion = metadata.numKeys
loadedVersion = version loadedVersion = version
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
} }
writeBatch.clear() writeBatch.clear()
logInfo(s"Loaded $version") logInfo(s"Loaded $version")
@ -223,6 +228,7 @@ class RocksDB(
} }
numKeysOnLoadedVersion = numKeysOnWritingVersion numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion loadedVersion = newVersion
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
commitLatencyMs ++= Map( commitLatencyMs ++= Map(
"writeBatch" -> writeTimeMs, "writeBatch" -> writeTimeMs,
"flush" -> flushTimeMs, "flush" -> flushTimeMs,
@ -231,6 +237,7 @@ class RocksDB(
"checkpoint" -> checkpointTimeMs, "checkpoint" -> checkpointTimeMs,
"fileSync" -> fileSyncTimeMs "fileSync" -> fileSyncTimeMs
) )
logInfo(s"Committed $newVersion, stats = ${metrics.json}")
loadedVersion loadedVersion
} catch { } catch {
case t: Throwable => case t: Throwable =>
@ -283,6 +290,30 @@ class RocksDB(
/** Get the latest version available in the DFS */ /** Get the latest version available in the DFS */
def getLatestVersion(): Long = fileManager.getLatestVersion() def getLatestVersion(): Long = fileManager.getLatestVersion()
/** Get current instantaneous statistics */
def metrics: RocksDBMetrics = {
import HistogramType._
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
}
RocksDBMetrics(
numKeysOnLoadedVersion,
numKeysOnWritingVersion,
readerMemUsage + memTableMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros.toMap,
commitLatencyMs,
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed)
}
private def acquire(): Unit = acquireLock.synchronized { private def acquire(): Unit = acquireLock.synchronized {
val newAcquiredThreadInfo = AcquiredThreadInfo() val newAcquiredThreadInfo = AcquiredThreadInfo()
val waitStartTime = System.currentTimeMillis val waitStartTime = System.currentTimeMillis
@ -314,6 +345,10 @@ class RocksDB(
acquireLock.notifyAll() acquireLock.notifyAll()
} }
private def getDBProperty(property: String): Long = {
db.getProperty(property).toLong
}
private def openDB(): Unit = { private def openDB(): Unit = {
assert(db == null) assert(db == null)
db = NativeRocksDB.open(dbOptions, workingDir.toString) db = NativeRocksDB.open(dbOptions, workingDir.toString)
@ -388,7 +423,6 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
/** /**
* Configurations for optimizing RocksDB * Configurations for optimizing RocksDB
*
* @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing * @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing
*/ */
case class RocksDBConf( case class RocksDBConf(
@ -442,6 +476,42 @@ object RocksDBConf {
def apply(): RocksDBConf = apply(new StateStoreConf()) def apply(): RocksDBConf = apply(new StateStoreConf())
} }
/** Class to represent stats from each commit. */
case class RocksDBMetrics(
numCommittedKeys: Long,
numUncommittedKeys: Long,
memUsageBytes: Long,
totalSSTFilesBytes: Long,
nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram],
lastCommitLatencyMs: Map[String, Long],
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long]) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
object RocksDBMetrics {
val format = Serialization.formats(NoTypeHints)
}
/** Class to wrap RocksDB's native histogram */
case class RocksDBNativeHistogram(
avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
object RocksDBNativeHistogram {
def apply(nativeHist: HistogramData): RocksDBNativeHistogram = {
RocksDBNativeHistogram(
nativeHist.getAverage,
nativeHist.getStandardDeviation,
nativeHist.getMedian,
nativeHist.getPercentile95,
nativeHist.getPercentile99)
}
}
case class AcquiredThreadInfo() { case class AcquiredThreadInfo() {
val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread()) val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread())
val tc: TaskContext = TaskContext.get() val tc: TaskContext = TaskContext.get()

View file

@ -29,6 +29,7 @@ import scala.collection.mutable
import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
@ -134,6 +135,22 @@ class RocksDBFileManager(
override def accept(path: Path): Boolean = path.toString.endsWith(".zip") override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
} }
/**
* Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
/**
* Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
def latestLoadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
/** Save all the files in given local checkpoint directory as a committed version in DFS */ /** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
@ -336,6 +353,11 @@ class RocksDBFileManager(
s" DFS for version $version. $filesReused files reused without copying.") s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles) versionToRocksDBFiles.put(version, immutableFiles)
saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
immutableFiles immutableFiles
} }
@ -387,6 +409,11 @@ class RocksDBFileManager(
} }
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " + logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.") s"$filesReused files reused.")
loadCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
} }
/** Get the SST files required for a version from the version zip file in DFS */ /** Get the SST files required for a version from the version zip file in DFS */
@ -420,6 +447,9 @@ class RocksDBFileManager(
} }
zout.close() // so that any error in closing also cancels the output stream zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr") logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
// The other fields saveCheckpointMetrics should have been filled
saveCheckpointMetrics =
saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
} catch { } catch {
case e: Exception => case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file // Cancel the actual output stream first, so that zout.close() does not write the file
@ -486,6 +516,23 @@ class RocksDBFileManager(
} }
} }
/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
case class RocksDBFileManagerMetrics(
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
zipFileBytesUncompressed: Option[Long] = None)
/**
* Metrics to return when requested but no operation has been performed.
*/
object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}
/** /**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible. * changes to this MUST be backward-compatible.