diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index f640018001..9952d5dd18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -26,6 +26,8 @@ import scala.ref.WeakReference import scala.util.Try import org.apache.hadoop.conf.Configuration +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.apache.spark.TaskContext @@ -72,6 +74,7 @@ class RocksDB( dbOptions.setTableFormatConfig(tableFormatConfig) private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j dbOptions.setStatistics(new Statistics()) + private val nativeStats = dbOptions.statistics() private val workingDir = createTempDir("workingDir") private val fileManager = new RocksDBFileManager( @@ -84,6 +87,7 @@ class RocksDB( @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded @volatile private var numKeysOnLoadedVersion = 0L @volatile private var numKeysOnWritingVersion = 0L + @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS @GuardedBy("acquireLock") @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _ @@ -105,6 +109,7 @@ class RocksDB( numKeysOnWritingVersion = metadata.numKeys numKeysOnLoadedVersion = metadata.numKeys loadedVersion = version + fileManagerMetrics = fileManager.latestLoadCheckpointMetrics } writeBatch.clear() logInfo(s"Loaded $version") @@ -223,6 +228,7 @@ class RocksDB( } numKeysOnLoadedVersion = numKeysOnWritingVersion loadedVersion = newVersion + fileManagerMetrics = fileManager.latestSaveCheckpointMetrics commitLatencyMs ++= Map( "writeBatch" -> writeTimeMs, "flush" -> flushTimeMs, @@ -231,6 +237,7 @@ class RocksDB( "checkpoint" -> checkpointTimeMs, "fileSync" -> fileSyncTimeMs ) + logInfo(s"Committed $newVersion, stats = ${metrics.json}") loadedVersion } catch { case t: Throwable => @@ -283,6 +290,30 @@ class RocksDB( /** Get the latest version available in the DFS */ def getLatestVersion(): Long = fileManager.getLatestVersion() + /** Get current instantaneous statistics */ + def metrics: RocksDBMetrics = { + import HistogramType._ + val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size") + val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem") + val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables") + val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap + val nativeOpsLatencyMicros = nativeOps.mapValues { typ => + RocksDBNativeHistogram(nativeStats.getHistogramData(typ)) + } + + RocksDBMetrics( + numKeysOnLoadedVersion, + numKeysOnWritingVersion, + readerMemUsage + memTableMemUsage, + totalSSTFilesBytes, + nativeOpsLatencyMicros.toMap, + commitLatencyMs, + bytesCopied = fileManagerMetrics.bytesCopied, + filesCopied = fileManagerMetrics.filesCopied, + filesReused = fileManagerMetrics.filesReused, + zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed) + } + private def acquire(): Unit = acquireLock.synchronized { val newAcquiredThreadInfo = AcquiredThreadInfo() val waitStartTime = System.currentTimeMillis @@ -314,6 +345,10 @@ class RocksDB( acquireLock.notifyAll() } + private def getDBProperty(property: String): Long = { + db.getProperty(property).toLong + } + private def openDB(): Unit = { assert(db == null) db = NativeRocksDB.open(dbOptions, workingDir.toString) @@ -388,7 +423,6 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null) /** * Configurations for optimizing RocksDB - * * @param compactOnCommit Whether to compact RocksDB data before commit / checkpointing */ case class RocksDBConf( @@ -442,6 +476,42 @@ object RocksDBConf { def apply(): RocksDBConf = apply(new StateStoreConf()) } +/** Class to represent stats from each commit. */ +case class RocksDBMetrics( + numCommittedKeys: Long, + numUncommittedKeys: Long, + memUsageBytes: Long, + totalSSTFilesBytes: Long, + nativeOpsLatencyMicros: Map[String, RocksDBNativeHistogram], + lastCommitLatencyMs: Map[String, Long], + filesCopied: Long, + bytesCopied: Long, + filesReused: Long, + zipFileBytesUncompressed: Option[Long]) { + def json: String = Serialization.write(this)(RocksDBMetrics.format) +} + +object RocksDBMetrics { + val format = Serialization.formats(NoTypeHints) +} + +/** Class to wrap RocksDB's native histogram */ +case class RocksDBNativeHistogram( + avg: Double, stddev: Double, median: Double, p95: Double, p99: Double) { + def json: String = Serialization.write(this)(RocksDBMetrics.format) +} + +object RocksDBNativeHistogram { + def apply(nativeHist: HistogramData): RocksDBNativeHistogram = { + RocksDBNativeHistogram( + nativeHist.getAverage, + nativeHist.getStandardDeviation, + nativeHist.getMedian, + nativeHist.getPercentile95, + nativeHist.getPercentile99) + } +} + case class AcquiredThreadInfo() { val threadRef: WeakReference[Thread] = new WeakReference[Thread](Thread.currentThread()) val tc: TaskContext = TaskContext.get() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index bfdace3949..23cdbd01bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -29,6 +29,7 @@ import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.conf.Configuration @@ -134,6 +135,22 @@ class RocksDBFileManager( override def accept(path: Path): Boolean = path.toString.endsWith(".zip") } + /** + * Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this + * metrics, so this effectively records the latest metrics. + */ + @volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS + + /** + * Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this + * metrics, so this effectively records the latest metrics. + */ + @volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS + + def latestLoadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics + + def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics + /** Save all the files in given local checkpoint directory as a committed version in DFS */ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") @@ -336,6 +353,11 @@ class RocksDBFileManager( s" DFS for version $version. $filesReused files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) + saveCheckpointMetrics = RocksDBFileManagerMetrics( + bytesCopied = bytesCopied, + filesCopied = filesCopied, + filesReused = filesReused) + immutableFiles } @@ -387,6 +409,11 @@ class RocksDBFileManager( } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " + s"$filesReused files reused.") + + loadCheckpointMetrics = RocksDBFileManagerMetrics( + bytesCopied = bytesCopied, + filesCopied = filesCopied, + filesReused = filesReused) } /** Get the SST files required for a version from the version zip file in DFS */ @@ -420,6 +447,9 @@ class RocksDBFileManager( } zout.close() // so that any error in closing also cancels the output stream logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr") + // The other fields saveCheckpointMetrics should have been filled + saveCheckpointMetrics = + saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes)) } catch { case e: Exception => // Cancel the actual output stream first, so that zout.close() does not write the file @@ -486,6 +516,23 @@ class RocksDBFileManager( } } +/** + * Metrics regarding RocksDB file sync between local and DFS. + */ +case class RocksDBFileManagerMetrics( + filesCopied: Long, + bytesCopied: Long, + filesReused: Long, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + zipFileBytesUncompressed: Option[Long] = None) + +/** + * Metrics to return when requested but no operation has been performed. + */ +object RocksDBFileManagerMetrics { + val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None) +} + /** * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any * changes to this MUST be backward-compatible.