diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 964ab27a52..6b89812cc2 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,12 +136,6 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } - // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, - // but currently this is very expensive when using a disk store. So we only trigger the slower - // code path when we know we have all data in memory. The following method checks whether all - // the data will be in memory. - private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined - /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -162,21 +156,11 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (isInMemoryStore) { - // For Live UI, we should count the tasks with status "SUCCESS" only. - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.STATUS) - .first("SUCCESS") - .last("SUCCESS") - .closeableIterator() - } else { - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() - } + store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(TaskIndexNames.EXEC_RUN_TIME) + .first(0L) + .closeableIterator() ) { it => var _count = 0L while (it.hasNext()) { @@ -245,50 +229,30 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } - // TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). - // For InMemory case, it is efficient to find using the following code. But for diskStore case - // we need an efficient solution to avoid deserialization time overhead. For that, we need to - // rework on the way indexing works, so that we can index by specific metrics for successful - // and failed tasks differently (would be tricky). Also would require changing the disk store - // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (isInMemoryStore) { - val quantileTasks = store.view(classOf[TaskDataWrapper]) + Utils.tryWithResource( + store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) .first(0L) - .asScala - .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks - .toIndexedSeq - - indices.map { index => - fn(quantileTasks(index.toInt)).toDouble - }.toIndexedSeq - } else { - Utils.tryWithResource( - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => - var last = Double.NaN - var currentIdx = -1L - indices.map { idx => - if (idx == currentIdx) { + .closeableIterator() + ) { it => + var last = Double.NaN + var currentIdx = -1L + indices.map { idx => + if (idx == currentIdx) { + last + } else { + val diff = idx - currentIdx + currentIdx = idx + if (it.skip(diff - 1)) { + last = fn(it.next()).toDouble last } else { - val diff = idx - currentIdx - currentIdx = idx - if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last - } else { - Double.NaN - } + Double.NaN } - }.toIndexedSeq - } + } + }.toIndexedSeq } } @@ -582,7 +546,7 @@ private[spark] class AppStatusStore( private[spark] object AppStatusStore { - val CURRENT_VERSION = 1L + val CURRENT_VERSION = 2L /** * Create an in-memory store for a live application. diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index a0ef8da0a4..5ac7a56b21 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -184,6 +184,19 @@ private class LiveTask( info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis())) } + val hasMetrics = metrics.executorDeserializeTime >= 0 + + /** + * SPARK-26260: For non successful tasks, store the metrics as negative to avoid + * the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make + * it actual value. + */ + val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) { + makeNegative(metrics) + } else { + metrics + } + new TaskDataWrapper( info.taskId, info.index, @@ -199,30 +212,31 @@ private class LiveTask( newAccumulatorInfos(info.accumulables), errorMessage, - metrics.executorDeserializeTime, - metrics.executorDeserializeCpuTime, - metrics.executorRunTime, - metrics.executorCpuTime, - metrics.resultSize, - metrics.jvmGcTime, - metrics.resultSerializationTime, - metrics.memoryBytesSpilled, - metrics.diskBytesSpilled, - metrics.peakExecutionMemory, - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead, - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten, - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead, - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten, + hasMetrics, + taskMetrics.executorDeserializeTime, + taskMetrics.executorDeserializeCpuTime, + taskMetrics.executorRunTime, + taskMetrics.executorCpuTime, + taskMetrics.resultSize, + taskMetrics.jvmGcTime, + taskMetrics.resultSerializationTime, + taskMetrics.memoryBytesSpilled, + taskMetrics.diskBytesSpilled, + taskMetrics.peakExecutionMemory, + taskMetrics.inputMetrics.bytesRead, + taskMetrics.inputMetrics.recordsRead, + taskMetrics.outputMetrics.bytesWritten, + taskMetrics.outputMetrics.recordsWritten, + taskMetrics.shuffleReadMetrics.remoteBlocksFetched, + taskMetrics.shuffleReadMetrics.localBlocksFetched, + taskMetrics.shuffleReadMetrics.fetchWaitTime, + taskMetrics.shuffleReadMetrics.remoteBytesRead, + taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, + taskMetrics.shuffleReadMetrics.localBytesRead, + taskMetrics.shuffleReadMetrics.recordsRead, + taskMetrics.shuffleWriteMetrics.bytesWritten, + taskMetrics.shuffleWriteMetrics.writeTime, + taskMetrics.shuffleWriteMetrics.recordsWritten, stageId, stageAttemptId) @@ -710,6 +724,46 @@ private object LiveEntityHelpers { addMetrics(m1, m2, -1) } + /** + * Convert all the metric values to negative as well as handle zero values. + * This method assumes that all the metric values are greater than or equal to zero + */ + def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = { + // To handle 0 metric value, add 1 and make the metric negative. + // To recover actual value do `math.abs(metric + 1)` + // Eg: if the metric values are (5, 3, 0, 1) => Updated metric values will be (-6, -4, -1, -2) + // To get actual metric value, do math.abs(metric + 1) => (5, 3, 0, 1) + def updateMetricValue(metric: Long): Long = { + metric * -1L - 1L + } + + createMetrics( + updateMetricValue(m.executorDeserializeTime), + updateMetricValue(m.executorDeserializeCpuTime), + updateMetricValue(m.executorRunTime), + updateMetricValue(m.executorCpuTime), + updateMetricValue(m.resultSize), + updateMetricValue(m.jvmGcTime), + updateMetricValue(m.resultSerializationTime), + updateMetricValue(m.memoryBytesSpilled), + updateMetricValue(m.diskBytesSpilled), + updateMetricValue(m.peakExecutionMemory), + updateMetricValue(m.inputMetrics.bytesRead), + updateMetricValue(m.inputMetrics.recordsRead), + updateMetricValue(m.outputMetrics.bytesWritten), + updateMetricValue(m.outputMetrics.recordsWritten), + updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched), + updateMetricValue(m.shuffleReadMetrics.localBlocksFetched), + updateMetricValue(m.shuffleReadMetrics.fetchWaitTime), + updateMetricValue(m.shuffleReadMetrics.remoteBytesRead), + updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk), + updateMetricValue(m.shuffleReadMetrics.localBytesRead), + updateMetricValue(m.shuffleReadMetrics.recordsRead), + updateMetricValue(m.shuffleWriteMetrics.bytesWritten), + updateMetricValue(m.shuffleWriteMetrics.writeTime), + updateMetricValue(m.shuffleWriteMetrics.recordsWritten)) + } + private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = { createMetrics( m1.executorDeserializeTime + m2.executorDeserializeTime * mult, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf..f0a94d84d8 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -177,10 +177,13 @@ private[spark] class TaskDataWrapper( val accumulatorUpdates: Seq[AccumulableInfo], val errorMessage: Option[String], + val hasMetrics: Boolean, // The following is an exploded view of a TaskMetrics API object. This saves 5 objects - // (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value - // (executorDeserializeTime) is -1L, it means the metrics for this task have not been - // recorded. + // (= 80 bytes of Java object overhead) per instance of this wrapper. Non successful + // tasks' metrics will have negative values in `TaskDataWrapper`. `TaskData` will have + // actual metric values. To recover the actual metric value from `TaskDataWrapper`, + // need use `getMetricValue` method. If `hasMetrics` is false, it means the metrics + // for this task have not been recorded. @KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE) val executorDeserializeTime: Long, @KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE) @@ -233,39 +236,46 @@ private[spark] class TaskDataWrapper( val stageId: Int, val stageAttemptId: Int) { - def hasMetrics: Boolean = executorDeserializeTime >= 0 + // SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed). + private def getMetricValue(metric: Long): Long = { + if (status != "SUCCESS") { + math.abs(metric + 1) + } else { + metric + } + } def toApi: TaskData = { val metrics = if (hasMetrics) { Some(new TaskMetrics( - executorDeserializeTime, - executorDeserializeCpuTime, - executorRunTime, - executorCpuTime, - resultSize, - jvmGcTime, - resultSerializationTime, - memoryBytesSpilled, - diskBytesSpilled, - peakExecutionMemory, + getMetricValue(executorDeserializeTime), + getMetricValue(executorDeserializeCpuTime), + getMetricValue(executorRunTime), + getMetricValue(executorCpuTime), + getMetricValue(resultSize), + getMetricValue(jvmGcTime), + getMetricValue(resultSerializationTime), + getMetricValue(memoryBytesSpilled), + getMetricValue(diskBytesSpilled), + getMetricValue(peakExecutionMemory), new InputMetrics( - inputBytesRead, - inputRecordsRead), + getMetricValue(inputBytesRead), + getMetricValue(inputRecordsRead)), new OutputMetrics( - outputBytesWritten, - outputRecordsWritten), + getMetricValue(outputBytesWritten), + getMetricValue(outputRecordsWritten)), new ShuffleReadMetrics( - shuffleRemoteBlocksFetched, - shuffleLocalBlocksFetched, - shuffleFetchWaitTime, - shuffleRemoteBytesRead, - shuffleRemoteBytesReadToDisk, - shuffleLocalBytesRead, - shuffleRecordsRead), + getMetricValue(shuffleRemoteBlocksFetched), + getMetricValue(shuffleLocalBlocksFetched), + getMetricValue(shuffleFetchWaitTime), + getMetricValue(shuffleRemoteBytesRead), + getMetricValue(shuffleRemoteBytesReadToDisk), + getMetricValue(shuffleLocalBytesRead), + getMetricValue(shuffleRecordsRead)), new ShuffleWriteMetrics( - shuffleBytesWritten, - shuffleWriteTime, - shuffleRecordsWritten))) + getMetricValue(shuffleBytesWritten), + getMetricValue(shuffleWriteTime), + getMetricValue(shuffleRecordsWritten)))) } else { None } @@ -296,8 +306,10 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE) def schedulerDelay: Long = { if (hasMetrics) { - AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime, - resultSerializationTime, executorRunTime) + AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, + getMetricValue(executorDeserializeTime), + getMetricValue(resultSerializationTime), + getMetricValue(executorRunTime)) } else { -1L } @@ -330,7 +342,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE) private def shuffleTotalReads: Long = { if (hasMetrics) { - shuffleLocalBytesRead + shuffleRemoteBytesRead + getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead) } else { -1L } @@ -339,7 +351,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE) private def shuffleTotalBlocks: Long = { if (hasMetrics) { - shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched + getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched) } else { -1L } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 165fdb71cc..735e519426 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.status import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.util.Distribution +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.{TaskInfo, TaskLocality} +import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore._ class AppStatusStoreSuite extends SparkFunSuite { @@ -76,42 +78,61 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = { val conf = new SparkConf() - val store = new ElementTrackingStore(inMemoryStore, conf) - val listener = new AppStatusListener(store, conf, true, None) - new AppStatusStore(store, listener = Some(listener)) - } - - test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { - val store = new InMemoryStore() - (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) + if (live) { + return AppStatusStore.createLiveStore(conf) } + + val store: KVStore = if (disk) { + val testDir = Utils.createTempDir() + val diskStore = KVUtils.open(testDir, getClass.getName) + new ElementTrackingStore(diskStore, conf) + } else { + new ElementTrackingStore(new InMemoryStore, conf) + } + new AppStatusStore(store) } - test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { - val store = new InMemoryStore() + Seq( + "disk" -> createAppStore(disk = true, live = false), + "in memory" -> createAppStore(disk = false, live = false), + "in memory live" -> createAppStore(disk = false, live = true) + ).foreach { case (hint, appStore) => + test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)") { + val store = appStore.store - for (i <- 0 to 5) { - if (i % 2 == 1) { - store.write(newTaskData(i, status = "FAILED")) - } else { - store.write(newTaskData(i)) + // Success and failed tasks metrics + for (i <- 0 to 5) { + if (i % 2 == 0) { + writeTaskDataToStore(i, store, "FAILED") + } else { + writeTaskDataToStore(i, store, "SUCCESS") + } } - } - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported) + Seq(-1, 6).foreach { metric => + writeTaskDataToStore(metric, store, "RUNNING") + } + + /** + * Following are the tasks metrics, + * 1, 3, 5 => Success + * 0, 2, 4 => Failed + * -1, 6 => Running + * + * Task summary will consider (1, 3, 5) only + */ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(1.0, 3.0, 5.0) val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) dist.zip(summary.executorRunTime).foreach { case (expected, actual) => assert(expected === actual) } + appStore.close() } } @@ -133,9 +154,54 @@ class AppStatusStoreSuite extends SparkFunSuite { private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = { new TaskDataWrapper( - i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, + i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, stageId, attemptId) } + + private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = { + val liveTask = new LiveTask(new TaskInfo( i.toLong, i, i, i.toLong, i.toString, + i.toString, TaskLocality.ANY, false), stageId, attemptId, None) + + if (status == "SUCCESS") { + liveTask.info.finishTime = 1L + } else if (status == "FAILED") { + liveTask.info.failed = true + liveTask.info.finishTime = 1L + } + + val taskMetrics = getTaskMetrics(i) + liveTask.updateMetrics(taskMetrics) + liveTask.write(store.asInstanceOf[ElementTrackingStore], 1L) + } + + private def getTaskMetrics(i: Int): TaskMetrics = { + val taskMetrics = new TaskMetrics() + taskMetrics.setExecutorDeserializeTime(i) + taskMetrics.setExecutorDeserializeCpuTime(i) + taskMetrics.setExecutorRunTime(i) + taskMetrics.setExecutorCpuTime(i) + taskMetrics.setResultSize(i) + taskMetrics.setJvmGCTime(i) + taskMetrics.setResultSerializationTime(i) + taskMetrics.incMemoryBytesSpilled(i) + taskMetrics.incDiskBytesSpilled(i) + taskMetrics.incPeakExecutionMemory(i) + taskMetrics.inputMetrics.incBytesRead(i) + taskMetrics.inputMetrics.incRecordsRead(i) + taskMetrics.outputMetrics.setBytesWritten(i) + taskMetrics.outputMetrics.setRecordsWritten(i) + taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(i) + taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(i) + taskMetrics.shuffleReadMetrics.incFetchWaitTime(i) + taskMetrics.shuffleReadMetrics.incRemoteBytesRead(i) + taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(i) + taskMetrics.shuffleReadMetrics.incLocalBytesRead(i) + taskMetrics.shuffleReadMetrics.incRecordsRead(i) + taskMetrics.shuffleWriteMetrics.incBytesWritten(i) + taskMetrics.shuffleWriteMetrics.incWriteTime(i) + taskMetrics.shuffleWriteMetrics.incRecordsWritten(i) + taskMetrics + } }