diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 5d59e00636..21fe643b8d 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -99,7 +99,6 @@ class TaskMetrics extends Serializable { existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched - existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead case None => _shuffleReadMetrics = Some(newMetrics) @@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable { /** * Number of blocks fetched in this shuffle by this task (remote or local) */ - var totalBlocksFetched: Int = _ + def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched /** * Number of remote blocks fetched in this shuffle by this task diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 3795994cd9..9978882898 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 2f0296c20f..69905a960a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -46,7 +46,6 @@ import org.apache.spark.util.Utils private[storage] trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() - def totalBlocks: Int def numLocalBlocks: Int def numRemoteBlocks: Int def fetchWaitTime: Long @@ -192,7 +191,7 @@ object BlockFetcherIterator { } } logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + - totalBlocks + " blocks") + (numLocal + numRemote) + " blocks") remoteRequests } @@ -235,7 +234,6 @@ object BlockFetcherIterator { logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") } - override def totalBlocks: Int = numLocal + numRemote override def numLocalBlocks: Int = numLocal override def numRemoteBlocks: Int = numRemote override def fetchWaitTime: Long = _fetchWaitTime diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2ff8b25a56..3448aaaf57 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -237,7 +237,6 @@ private[spark] object JsonProtocol { def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~ - ("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~ ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ @@ -548,7 +547,6 @@ private[spark] object JsonProtocol { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { val metrics = new ShuffleReadMetrics metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long] - metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int] metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 11f70a6090..9305b6d973 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -314,7 +314,6 @@ class JsonProtocolSuite extends FunSuite { private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) { assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime) - assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched) assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched) assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched) assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime) @@ -513,7 +512,6 @@ class JsonProtocolSuite extends FunSuite { } else { val sr = new ShuffleReadMetrics sr.shuffleFinishTime = b + c - sr.totalBlocksFetched = e + f sr.remoteBytesRead = b + d sr.localBlocksFetched = e sr.fetchWaitTime = a + d @@ -584,7 +582,6 @@ class JsonProtocolSuite extends FunSuite { | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, | "Shuffle Read Metrics":{ | "Shuffle Finish Time":900, - | "Total Blocks Fetched":1500, | "Remote Blocks Fetched":800, | "Local Blocks Fetched":700, | "Fetch Wait Time":900,