SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant

Author: Sandy Ryza <sandy@cloudera.com>

Closes #1474 from sryza/sandy-spark-2564 and squashes the following commits:

35b8388 [Sandy Ryza] Fix compile error on upmerge
7b985fb [Sandy Ryza] Fix test compile error
43f79e6 [Sandy Ryza] SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant
This commit is contained in:
Sandy Ryza 2014-07-20 14:45:34 -07:00 committed by Aaron Davidson
parent 1b10b8114a
commit 9564f85489
5 changed files with 2 additions and 11 deletions

View file

@ -99,7 +99,6 @@ class TaskMetrics extends Serializable {
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
case None => case None =>
_shuffleReadMetrics = Some(newMetrics) _shuffleReadMetrics = Some(newMetrics)
@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable {
/** /**
* Number of blocks fetched in this shuffle by this task (remote or local) * Number of blocks fetched in this shuffle by this task (remote or local)
*/ */
var totalBlocksFetched: Int = _ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
/** /**
* Number of remote blocks fetched in this shuffle by this task * Number of remote blocks fetched in this shuffle by this task

View file

@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)

View file

@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
private[storage] private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize() def initialize()
def totalBlocks: Int
def numLocalBlocks: Int def numLocalBlocks: Int
def numRemoteBlocks: Int def numRemoteBlocks: Int
def fetchWaitTime: Long def fetchWaitTime: Long
@ -192,7 +191,7 @@ object BlockFetcherIterator {
} }
} }
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks") (numLocal + numRemote) + " blocks")
remoteRequests remoteRequests
} }
@ -235,7 +234,6 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
} }
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote override def numRemoteBlocks: Int = numRemote
override def fetchWaitTime: Long = _fetchWaitTime override def fetchWaitTime: Long = _fetchWaitTime

View file

@ -237,7 +237,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~ ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@ -548,7 +547,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = { def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long] metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int] metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int] metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long] metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]

View file

@ -314,7 +314,6 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) { private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime) assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched)
assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched) assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched) assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime) assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
@ -513,7 +512,6 @@ class JsonProtocolSuite extends FunSuite {
} else { } else {
val sr = new ShuffleReadMetrics val sr = new ShuffleReadMetrics
sr.shuffleFinishTime = b + c sr.shuffleFinishTime = b + c
sr.totalBlocksFetched = e + f
sr.remoteBytesRead = b + d sr.remoteBytesRead = b + d
sr.localBlocksFetched = e sr.localBlocksFetched = e
sr.fetchWaitTime = a + d sr.fetchWaitTime = a + d
@ -584,7 +582,6 @@ class JsonProtocolSuite extends FunSuite {
| "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
| "Shuffle Read Metrics":{ | "Shuffle Read Metrics":{
| "Shuffle Finish Time":900, | "Shuffle Finish Time":900,
| "Total Blocks Fetched":1500,
| "Remote Blocks Fetched":800, | "Remote Blocks Fetched":800,
| "Local Blocks Fetched":700, | "Local Blocks Fetched":700,
| "Fetch Wait Time":900, | "Fetch Wait Time":900,