[SPARK-36236][SS] Additional metrics for RocksDB based state store implementation
### What changes were proposed in this pull request? Proposing adding new metrics to `customMetrics` under the `stateOperators` in `StreamingQueryProgress` event These metrics help have better visibility into the RocksDB based state store in streaming jobs. For full details of metrics, refer to https://issues.apache.org/jira/browse/SPARK-36236. ### Why are the changes needed? Current metrics available for the RockDB state store, do not provide observability into many operations such as how much time is spent by the RocksDB in compaction and what is the cache hit ratio. These metrics help compare performance differences in state store operations between slow and fast microbatches . ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unittests Closes #33455 from vkorukanti/rocksdb-metrics. Authored-by: Venki Korukanti <venki.korukanti@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
parent
01213095e2
commit
eb4d1c0332
|
@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.json4s.NoTypeHints
|
||||
import org.json4s.jackson.Serialization
|
||||
import org.rocksdb.{RocksDB => NativeRocksDB, _}
|
||||
import org.rocksdb.TickerType._
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -115,6 +116,9 @@ class RocksDB(
|
|||
loadedVersion = version
|
||||
fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
|
||||
}
|
||||
if (conf.resetStatsOnLoad) {
|
||||
nativeStats.reset
|
||||
}
|
||||
writeBatch.clear()
|
||||
logInfo(s"Loaded $version")
|
||||
} catch {
|
||||
|
@ -241,8 +245,8 @@ class RocksDB(
|
|||
logInfo("Compacting")
|
||||
timeTakenMs { db.compactRange() }
|
||||
} else 0
|
||||
logInfo("Pausing background work")
|
||||
|
||||
logInfo("Pausing background work")
|
||||
val pauseTimeMs = timeTakenMs {
|
||||
db.pauseBackgroundWork() // To avoid files being changed while committing
|
||||
}
|
||||
|
@ -331,10 +335,35 @@ class RocksDB(
|
|||
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
|
||||
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
|
||||
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
|
||||
val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
|
||||
val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
|
||||
val nativeOpsHistograms = Seq(
|
||||
"get" -> DB_GET,
|
||||
"put" -> DB_WRITE,
|
||||
"compaction" -> COMPACTION_TIME
|
||||
).toMap
|
||||
val nativeOpsLatencyMicros = nativeOpsHistograms.mapValues { typ =>
|
||||
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
|
||||
}
|
||||
val nativeOpsMetricTickers = Seq(
|
||||
/** Number of cache misses that required reading from local disk */
|
||||
"readBlockCacheMissCount" -> BLOCK_CACHE_MISS,
|
||||
/** Number of cache hits that read data from RocksDB block cache avoiding local disk read */
|
||||
"readBlockCacheHitCount" -> BLOCK_CACHE_HIT,
|
||||
/** Number of uncompressed bytes read (from memtables/cache/sst) from DB::Get() */
|
||||
"totalBytesRead" -> BYTES_READ,
|
||||
/** Number of uncompressed bytes issued by DB::{Put(), Delete(), Merge(), Write()} */
|
||||
"totalBytesWritten" -> BYTES_WRITTEN,
|
||||
/** The number of uncompressed bytes read from an iterator. */
|
||||
"totalBytesReadThroughIterator" -> ITER_BYTES_READ,
|
||||
/** Duration of writer requiring to wait for compaction or flush to finish. */
|
||||
"writerStallDuration" -> STALL_MICROS,
|
||||
/** Number of bytes read during compaction */
|
||||
"totalBytesReadByCompaction" -> COMPACT_READ_BYTES,
|
||||
/** Number of bytes written during compaction */
|
||||
"totalBytesWrittenByCompaction" -> COMPACT_WRITE_BYTES
|
||||
).toMap
|
||||
val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ =>
|
||||
nativeStats.getTickerCount(typ)
|
||||
}
|
||||
|
||||
RocksDBMetrics(
|
||||
numKeysOnLoadedVersion,
|
||||
|
@ -346,7 +375,8 @@ class RocksDB(
|
|||
bytesCopied = fileManagerMetrics.bytesCopied,
|
||||
filesCopied = fileManagerMetrics.filesCopied,
|
||||
filesReused = fileManagerMetrics.filesReused,
|
||||
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed)
|
||||
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
|
||||
nativeOpsMetrics = nativeOpsMetrics.toMap)
|
||||
}
|
||||
|
||||
private def acquire(): Unit = acquireLock.synchronized {
|
||||
|
@ -466,7 +496,8 @@ case class RocksDBConf(
|
|||
pauseBackgroundWorkForCommit: Boolean,
|
||||
blockSizeKB: Long,
|
||||
blockCacheSizeMB: Long,
|
||||
lockAcquireTimeoutMs: Long)
|
||||
lockAcquireTimeoutMs: Long,
|
||||
resetStatsOnLoad : Boolean)
|
||||
|
||||
object RocksDBConf {
|
||||
/** Common prefix of all confs in SQLConf that affects RocksDB */
|
||||
|
@ -482,6 +513,7 @@ object RocksDBConf {
|
|||
private val BLOCK_SIZE_KB_CONF = ConfEntry("blockSizeKB", "4")
|
||||
private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
|
||||
private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
|
||||
private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
|
||||
|
||||
def apply(storeConf: StateStoreConf): RocksDBConf = {
|
||||
val confs = CaseInsensitiveMap[String](storeConf.confs)
|
||||
|
@ -505,7 +537,8 @@ object RocksDBConf {
|
|||
getBooleanConf(PAUSE_BG_WORK_FOR_COMMIT_CONF),
|
||||
getPositiveLongConf(BLOCK_SIZE_KB_CONF),
|
||||
getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
|
||||
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF))
|
||||
getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
|
||||
getBooleanConf(RESET_STATS_ON_LOAD))
|
||||
}
|
||||
|
||||
def apply(): RocksDBConf = apply(new StateStoreConf())
|
||||
|
@ -517,12 +550,13 @@ case class RocksDBMetrics(
|
|||
numUncommittedKeys: Long,
|
||||
memUsageBytes: Long,
|
||||
totalSSTFilesBytes: Long,
|
||||
nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram],
|
||||
nativeOpsHistograms: Map[String, RocksDBNativeHistogram],
|
||||
lastCommitLatencyMs: Map[String, Long],
|
||||
filesCopied: Long,
|
||||
bytesCopied: Long,
|
||||
filesReused: Long,
|
||||
zipFileBytesUncompressed: Option[Long]) {
|
||||
zipFileBytesUncompressed: Option[Long],
|
||||
nativeOpsMetrics: Map[String, Long]) {
|
||||
def json: String = Serialization.write(this)(RocksDBMetrics.format)
|
||||
}
|
||||
|
||||
|
@ -532,18 +566,20 @@ object RocksDBMetrics {
|
|||
|
||||
/** Class to wrap RocksDB's native histogram */
|
||||
case class RocksDBNativeHistogram(
|
||||
avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) {
|
||||
sum: Long, avg: Double, stddev: Double, median: Double, p95: Double, p99: Double, count: Long) {
|
||||
def json: String = Serialization.write(this)(RocksDBMetrics.format)
|
||||
}
|
||||
|
||||
object RocksDBNativeHistogram {
|
||||
def apply(nativeHist: HistogramData): RocksDBNativeHistogram = {
|
||||
RocksDBNativeHistogram(
|
||||
nativeHist.getSum,
|
||||
nativeHist.getAverage,
|
||||
nativeHist.getStandardDeviation,
|
||||
nativeHist.getMedian,
|
||||
nativeHist.getPercentile95,
|
||||
nativeHist.getPercentile99)
|
||||
nativeHist.getPercentile99,
|
||||
nativeHist.getCount)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -106,22 +106,43 @@ private[sql] class RocksDBStateStoreProvider
|
|||
override def metrics: StateStoreMetrics = {
|
||||
val rocksDBMetrics = rocksDB.metrics
|
||||
def commitLatencyMs(typ: String): Long = rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
|
||||
def avgNativeOpsLatencyMs(typ: String): Long = {
|
||||
rocksDBMetrics.nativeOpsLatencyMicros.get(typ).map(_.avg).getOrElse(0.0).toLong
|
||||
def nativeOpsLatencyMillis(typ: String): Long = {
|
||||
rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
|
||||
}
|
||||
def sumNativeOpsLatencyMillis(typ: String): Long = {
|
||||
rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.sum / 1000).getOrElse(0)
|
||||
}
|
||||
def nativeOpsCount(typ: String): Long = {
|
||||
rocksDBMetrics.nativeOpsHistograms.get(typ).map(_.count).getOrElse(0)
|
||||
}
|
||||
def nativeOpsMetrics(typ: String): Long = {
|
||||
rocksDBMetrics.nativeOpsMetrics.get(typ).getOrElse(0)
|
||||
}
|
||||
|
||||
val stateStoreCustomMetrics = Map[StateStoreCustomMetric, Long](
|
||||
CUSTOM_METRIC_SST_FILE_SIZE -> rocksDBMetrics.totalSSTFilesBytes,
|
||||
CUSTOM_METRIC_GET_TIME -> avgNativeOpsLatencyMs("get"),
|
||||
CUSTOM_METRIC_PUT_TIME -> avgNativeOpsLatencyMs("put"),
|
||||
CUSTOM_METRIC_GET_TIME -> sumNativeOpsLatencyMillis("get"),
|
||||
CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"),
|
||||
CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"),
|
||||
CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"),
|
||||
CUSTOM_METRIC_WRITEBATCH_TIME -> commitLatencyMs("writeBatch"),
|
||||
CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"),
|
||||
CUSTOM_METRIC_PAUSE_TIME -> commitLatencyMs("pause"),
|
||||
CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"),
|
||||
CUSTOM_METRIC_PAUSE_TIME -> commitLatencyMs("pauseBg"),
|
||||
CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"),
|
||||
CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"),
|
||||
CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
|
||||
CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
|
||||
CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused
|
||||
CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
|
||||
CUSTOM_METRIC_BLOCK_CACHE_MISS -> nativeOpsMetrics("readBlockCacheMissCount"),
|
||||
CUSTOM_METRIC_BLOCK_CACHE_HITS -> nativeOpsMetrics("readBlockCacheHitCount"),
|
||||
CUSTOM_METRIC_BYTES_READ -> nativeOpsMetrics("totalBytesRead"),
|
||||
CUSTOM_METRIC_BYTES_WRITTEN -> nativeOpsMetrics("totalBytesWritten"),
|
||||
CUSTOM_METRIC_ITERATOR_BYTES_READ -> nativeOpsMetrics("totalBytesReadThroughIterator"),
|
||||
CUSTOM_METRIC_STALL_TIME -> nativeOpsLatencyMillis("writerStallDuration"),
|
||||
CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"),
|
||||
CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"),
|
||||
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction")
|
||||
) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
|
||||
Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map())
|
||||
|
||||
|
@ -213,42 +234,82 @@ object RocksDBStateStoreProvider {
|
|||
val STATE_ENCODING_NUM_VERSION_BYTES = 1
|
||||
val STATE_ENCODING_VERSION: Byte = 0
|
||||
|
||||
// Native operation latencies report as latency per 1000 calls
|
||||
// as SQLMetrics support ms latency whereas RocksDB reports it in microseconds.
|
||||
// Native operation latencies report as latency in microseconds
|
||||
// as SQLMetrics support millis. Convert the value to millis
|
||||
val CUSTOM_METRIC_GET_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbGetLatency", "RocksDB: avg get latency (per 1000 calls)")
|
||||
"rocksdbGetLatency", "RocksDB: total get call latency")
|
||||
val CUSTOM_METRIC_PUT_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbPutLatency", "RocksDB: avg put latency (per 1000 calls)")
|
||||
"rocksdbPutLatency", "RocksDB: total put call latency")
|
||||
|
||||
val CUSTOM_METRIC_GET_COUNT = StateStoreCustomSumMetric(
|
||||
"rocksdbGetCount", "RocksDB: number of get calls")
|
||||
val CUSTOM_METRIC_PUT_COUNT = StateStoreCustomSumMetric(
|
||||
"rocksdbPutCount", "RocksDB: number of put calls")
|
||||
|
||||
// Commit latency detailed breakdown
|
||||
val CUSTOM_METRIC_WRITEBATCH_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbCommitWriteBatchLatency", "RocksDB: commit - write batch time")
|
||||
val CUSTOM_METRIC_FLUSH_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbCommitFlushLatency", "RocksDB: commit - flush time")
|
||||
val CUSTOM_METRIC_COMMIT_COMPACT_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbCommitCompactLatency", "RocksDB: commit - compact time")
|
||||
val CUSTOM_METRIC_PAUSE_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbCommitPauseLatency", "RocksDB: commit - pause bg time")
|
||||
val CUSTOM_METRIC_CHECKPOINT_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbCommitCheckpointLatency", "RocksDB: commit - checkpoint time")
|
||||
val CUSTOM_METRIC_FILESYNC_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbFileSyncTime", "RocksDB: commit - file sync time")
|
||||
val CUSTOM_METRIC_FILES_COPIED = StateStoreCustomSizeMetric(
|
||||
"rocksdbCommitFileSyncLatencyMs", "RocksDB: commit - file sync to external storage time")
|
||||
val CUSTOM_METRIC_FILES_COPIED = StateStoreCustomSumMetric(
|
||||
"rocksdbFilesCopied", "RocksDB: file manager - files copied")
|
||||
val CUSTOM_METRIC_BYTES_COPIED = StateStoreCustomSizeMetric(
|
||||
"rocksdbBytesCopied", "RocksDB: file manager - bytes copied")
|
||||
val CUSTOM_METRIC_FILES_REUSED = StateStoreCustomSizeMetric(
|
||||
val CUSTOM_METRIC_FILES_REUSED = StateStoreCustomSumMetric(
|
||||
"rocksdbFilesReused", "RocksDB: file manager - files reused")
|
||||
val CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED = StateStoreCustomSizeMetric(
|
||||
"rocksdbZipFileBytesUncompressed", "RocksDB: file manager - uncompressed zip file bytes")
|
||||
|
||||
val CUSTOM_METRIC_BLOCK_CACHE_MISS = StateStoreCustomSumMetric(
|
||||
"rocksdbReadBlockCacheMissCount",
|
||||
"RocksDB: read - count of cache misses that required reading from local disk")
|
||||
val CUSTOM_METRIC_BLOCK_CACHE_HITS = StateStoreCustomSumMetric(
|
||||
"rocksdbReadBlockCacheHitCount",
|
||||
"RocksDB: read - count of cache hits in RocksDB block cache avoiding disk read")
|
||||
val CUSTOM_METRIC_BYTES_READ = StateStoreCustomSizeMetric(
|
||||
"rocksdbTotalBytesRead",
|
||||
"RocksDB: read - total of uncompressed bytes read (from memtables/cache/sst) from DB::Get()")
|
||||
val CUSTOM_METRIC_BYTES_WRITTEN = StateStoreCustomSizeMetric(
|
||||
"rocksdbTotalBytesWritten",
|
||||
"RocksDB: write - total of uncompressed bytes written by " +
|
||||
"DB::{Put(), Delete(), Merge(), Write()}")
|
||||
val CUSTOM_METRIC_ITERATOR_BYTES_READ = StateStoreCustomSizeMetric(
|
||||
"rocksdbTotalBytesReadThroughIterator",
|
||||
"RocksDB: read - total of uncompressed bytes read using an iterator")
|
||||
val CUSTOM_METRIC_STALL_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbWriterStallLatencyMs",
|
||||
"RocksDB: write - writer wait time for compaction or flush to finish")
|
||||
val CUSTOM_METRIC_TOTAL_COMPACT_TIME = StateStoreCustomTimingMetric(
|
||||
"rocksdbTotalCompactionLatencyMs",
|
||||
"RocksDB: compaction - total compaction time including background")
|
||||
val CUSTOM_METRIC_COMPACT_READ_BYTES = StateStoreCustomSizeMetric(
|
||||
"rocksdbTotalBytesReadByCompaction",
|
||||
"RocksDB: compaction - total bytes read by the compaction process")
|
||||
val CUSTOM_METRIC_COMPACT_WRITTEN_BYTES = StateStoreCustomSizeMetric(
|
||||
"rocksdbTotalBytesWrittenByCompaction",
|
||||
"RocksDB: compaction - total bytes written by the compaction process")
|
||||
|
||||
// Total SST file size
|
||||
val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
|
||||
"rocksdbSstFileSize", "RocksDB: size of all SST files")
|
||||
|
||||
val ALL_CUSTOM_METRICS = Seq(
|
||||
CUSTOM_METRIC_SST_FILE_SIZE, CUSTOM_METRIC_GET_TIME, CUSTOM_METRIC_PUT_TIME,
|
||||
CUSTOM_METRIC_WRITEBATCH_TIME, CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_PAUSE_TIME,
|
||||
CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME,
|
||||
CUSTOM_METRIC_WRITEBATCH_TIME, CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME,
|
||||
CUSTOM_METRIC_PAUSE_TIME, CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME,
|
||||
CUSTOM_METRIC_BYTES_COPIED, CUSTOM_METRIC_FILES_COPIED, CUSTOM_METRIC_FILES_REUSED,
|
||||
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED
|
||||
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED, CUSTOM_METRIC_GET_COUNT, CUSTOM_METRIC_PUT_COUNT,
|
||||
CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ,
|
||||
CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, CUSTOM_METRIC_STALL_TIME,
|
||||
CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES,
|
||||
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES
|
||||
)
|
||||
}
|
||||
|
|
|
@ -19,11 +19,15 @@ package org.apache.spark.sql.execution.streaming.state
|
|||
|
||||
import java.io.File
|
||||
|
||||
import scala.collection.JavaConverters
|
||||
|
||||
import org.scalatest.time.{Minute, Span}
|
||||
|
||||
import org.apache.spark.sql.execution.streaming.MemoryStream
|
||||
import org.apache.spark.sql.functions.count
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.streaming._
|
||||
|
||||
|
||||
class RocksDBStateStoreIntegrationSuite extends StreamTest {
|
||||
import testImplicits._
|
||||
|
||||
|
@ -47,5 +51,57 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-36236: query progress contains only the expected RocksDB store custom metrics") {
|
||||
// fails if any new custom metrics are added to remind the author of API changes
|
||||
import testImplicits._
|
||||
|
||||
withTempDir { dir =>
|
||||
withSQLConf(
|
||||
(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10"),
|
||||
(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName),
|
||||
(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
|
||||
(SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
|
||||
val inputData = MemoryStream[Int]
|
||||
|
||||
val query = inputData.toDS().toDF("value")
|
||||
.select('value)
|
||||
.groupBy($"value")
|
||||
.agg(count("*"))
|
||||
.writeStream
|
||||
.format("console")
|
||||
.outputMode("complete")
|
||||
.start()
|
||||
try {
|
||||
inputData.addData(1, 2)
|
||||
inputData.addData(2, 3)
|
||||
query.processAllAvailable()
|
||||
|
||||
val progress = query.lastProgress
|
||||
assert(progress.stateOperators.length > 0)
|
||||
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
|
||||
eventually(timeout(Span(1, Minute))) {
|
||||
val nextProgress = query.lastProgress
|
||||
assert(nextProgress != null, "progress is not yet available")
|
||||
assert(nextProgress.stateOperators.length > 0, "state operators are missing in metrics")
|
||||
val stateOperatorMetrics = nextProgress.stateOperators(0)
|
||||
assert(JavaConverters.asScalaSet(stateOperatorMetrics.customMetrics.keySet) === Set(
|
||||
"rocksdbGetLatency", "rocksdbCommitCompactLatency", "rocksdbBytesCopied",
|
||||
"rocksdbPutLatency", "rocksdbCommitPauseLatency", "rocksdbFilesReused",
|
||||
"rocksdbCommitWriteBatchLatency", "rocksdbFilesCopied", "rocksdbSstFileSize",
|
||||
"rocksdbCommitCheckpointLatency", "rocksdbZipFileBytesUncompressed",
|
||||
"rocksdbCommitFlushLatency", "rocksdbCommitFileSyncLatencyMs", "rocksdbGetCount",
|
||||
"rocksdbPutCount", "rocksdbTotalBytesRead", "rocksdbTotalBytesWritten",
|
||||
"rocksdbReadBlockCacheHitCount", "rocksdbReadBlockCacheMissCount",
|
||||
"rocksdbTotalBytesReadByCompaction", "rocksdbTotalBytesWrittenByCompaction",
|
||||
"rocksdbTotalCompactionLatencyMs", "rocksdbWriterStallLatencyMs",
|
||||
"rocksdbTotalBytesReadThroughIterator"))
|
||||
}
|
||||
} finally {
|
||||
query.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -366,6 +366,93 @@ class RocksDBSuite extends SparkFunSuite {
|
|||
// scalastyle:on line.size.limit
|
||||
}
|
||||
|
||||
test("SPARK-36236: reset RocksDB metrics whenever a new version is loaded") {
|
||||
def verifyMetrics(putCount: Long, getCount: Long, iterCountPositive: Boolean = false,
|
||||
metrics: RocksDBMetrics): Unit = {
|
||||
assert(metrics.nativeOpsHistograms("put").count === putCount, "invalid put count")
|
||||
assert(metrics.nativeOpsHistograms("get").count === getCount, "invalid get count")
|
||||
if (iterCountPositive) {
|
||||
assert(metrics.nativeOpsMetrics("totalBytesReadThroughIterator") > 0)
|
||||
} else {
|
||||
assert(metrics.nativeOpsMetrics("totalBytesReadThroughIterator") === 0)
|
||||
}
|
||||
|
||||
// most of the time get reads from WriteBatch which is not counted in this metric
|
||||
assert(metrics.nativeOpsMetrics("totalBytesRead") >= 0)
|
||||
assert(metrics.nativeOpsMetrics("totalBytesWritten") >= putCount * 1)
|
||||
|
||||
assert(metrics.nativeOpsHistograms("compaction") != null)
|
||||
assert(metrics.nativeOpsMetrics("readBlockCacheMissCount") >= 0)
|
||||
assert(metrics.nativeOpsMetrics("readBlockCacheHitCount") >= 0)
|
||||
|
||||
assert(metrics.nativeOpsMetrics("writerStallDuration") >= 0)
|
||||
assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") >= 0)
|
||||
assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") >=0)
|
||||
}
|
||||
|
||||
withTempDir { dir =>
|
||||
val remoteDir = dir.getCanonicalPath
|
||||
withDB(remoteDir) { db =>
|
||||
verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
|
||||
db.load(0)
|
||||
db.put("a", "1") // put also triggers a db get
|
||||
db.get("a") // this is found in-memory writebatch - no get triggered in db
|
||||
db.get("b") // key doesn't exists - triggers db get
|
||||
db.commit()
|
||||
verifyMetrics(putCount = 1, getCount = 2, metrics = db.metrics)
|
||||
|
||||
db.load(1)
|
||||
db.put("b", "2") // put also triggers a db get
|
||||
db.get("a") // not found in-memory writebatch, so triggers a db get
|
||||
db.get("c") // key doesn't exists - triggers db get
|
||||
assert(iterator(db).toSet === Set(("a", "1"), ("b", "2")))
|
||||
db.commit()
|
||||
verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, db.metrics)
|
||||
}
|
||||
}
|
||||
|
||||
// disable resetting stats
|
||||
withTempDir { dir =>
|
||||
val remoteDir = dir.getCanonicalPath
|
||||
withDB(remoteDir, conf = RocksDBConf().copy(resetStatsOnLoad = false)) { db =>
|
||||
verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
|
||||
db.load(0)
|
||||
db.put("a", "1") // put also triggers a db get
|
||||
db.commit()
|
||||
// put and get counts are cumulative
|
||||
verifyMetrics(putCount = 1, getCount = 1, metrics = db.metrics)
|
||||
|
||||
db.load(1)
|
||||
db.put("b", "2") // put also triggers a db get
|
||||
db.get("a")
|
||||
db.commit()
|
||||
// put and get counts are cumulative: existing get=1, put=1: new get=2, put=1
|
||||
verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
|
||||
}
|
||||
}
|
||||
|
||||
// force compaction and check the compaction metrics
|
||||
withTempDir { dir =>
|
||||
val remoteDir = dir.getCanonicalPath
|
||||
withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db =>
|
||||
db.load(0)
|
||||
db.put("a", "5")
|
||||
db.put("b", "5")
|
||||
db.commit()
|
||||
|
||||
db.load(1)
|
||||
db.put("a", "10")
|
||||
db.put("b", "25")
|
||||
db.commit()
|
||||
|
||||
val metrics = db.metrics
|
||||
assert(metrics.nativeOpsHistograms("compaction").count > 0)
|
||||
assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0)
|
||||
assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def withDB[T](
|
||||
remoteDir: String,
|
||||
version: Int = 0,
|
||||
|
|
Loading…
Reference in a new issue