[SPARK-2571] Correctly report shuffle read metrics.
Currently, shuffle read metrics are incorrectly reported when stages have multiple shuffle dependencies (they are set to be the metrics from just one of the shuffle dependencies, rather than the accumulated metrics from all of the shuffle dependencies). This fixes that problem, and should probably be back-ported to the 0.9 branch. Thanks ryanra for discovering this problem! cc rxin andrewor14 Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1476 from kayousterhout/join_bug and squashes the following commits: 0203a16 [Kay Ousterhout] Fix broken unit tests. f463c2e [Kay Ousterhout] [SPARK-2571] Correctly report shuffle read metrics.
This commit is contained in:
parent
7f17208137
commit
7b971b91ca
|
@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
|
|||
/**
|
||||
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
|
||||
*/
|
||||
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
|
||||
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
|
||||
|
||||
def shuffleReadMetrics = _shuffleReadMetrics
|
||||
|
||||
/**
|
||||
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
|
||||
|
@ -87,6 +89,22 @@ class TaskMetrics extends Serializable {
|
|||
* Storage statuses of any blocks that have been updated as a result of this task.
|
||||
*/
|
||||
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
|
||||
|
||||
/** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
|
||||
def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
|
||||
_shuffleReadMetrics match {
|
||||
case Some(existingMetrics) =>
|
||||
existingMetrics.shuffleFinishTime = math.max(
|
||||
existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
|
||||
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
|
||||
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
|
||||
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
|
||||
existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
|
||||
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
|
||||
case None =>
|
||||
_shuffleReadMetrics = Some(newMetrics)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[spark] object TaskMetrics {
|
||||
|
|
|
@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
|
|||
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
|
||||
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
|
||||
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
|
||||
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
|
||||
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
|
||||
})
|
||||
|
||||
new InterruptibleIterator[T](context, completionIter)
|
||||
|
|
|
@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
|
|||
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
|
||||
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
|
||||
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
|
||||
metrics.shuffleReadMetrics =
|
||||
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
|
||||
Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics =>
|
||||
metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
|
||||
}
|
||||
metrics.shuffleWriteMetrics =
|
||||
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
|
||||
metrics.inputMetrics =
|
||||
|
|
|
@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
|
|||
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
|
||||
taskMetrics.shuffleReadMetrics should be ('defined)
|
||||
val sm = taskMetrics.shuffleReadMetrics.get
|
||||
sm.totalBlocksFetched should be > (0)
|
||||
sm.localBlocksFetched should be > (0)
|
||||
sm.totalBlocksFetched should be (128)
|
||||
sm.localBlocksFetched should be (128)
|
||||
sm.remoteBlocksFetched should be (0)
|
||||
sm.remoteBytesRead should be (0l)
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
|
|||
|
||||
// finish this task, should get updated shuffleRead
|
||||
shuffleReadMetrics.remoteBytesRead = 1000
|
||||
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
|
||||
taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
|
||||
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
|
||||
taskInfo.finishTime = 1
|
||||
var task = new ShuffleMapTask(0, null, null, 0, null)
|
||||
|
@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
|
|||
assert(listener.stageIdToData.size === 1)
|
||||
|
||||
// finish this task, should get updated duration
|
||||
shuffleReadMetrics.remoteBytesRead = 1000
|
||||
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
|
||||
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
|
||||
taskInfo.finishTime = 1
|
||||
task = new ShuffleMapTask(0, null, null, 0, null)
|
||||
|
@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
|
|||
.shuffleRead === 2000)
|
||||
|
||||
// finish this task, should get updated duration
|
||||
shuffleReadMetrics.remoteBytesRead = 1000
|
||||
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
|
||||
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
|
||||
taskInfo.finishTime = 1
|
||||
task = new ShuffleMapTask(0, null, null, 0, null)
|
||||
|
|
|
@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
|
|||
sr.localBlocksFetched = e
|
||||
sr.fetchWaitTime = a + d
|
||||
sr.remoteBlocksFetched = f
|
||||
t.shuffleReadMetrics = Some(sr)
|
||||
t.updateShuffleReadMetrics(sr)
|
||||
}
|
||||
sw.shuffleBytesWritten = a + b + c
|
||||
sw.shuffleWriteTime = b + c + d
|
||||
|
|
Loading…
Reference in a new issue