Merge branch 'master' of github.com:mesos/spark

This commit is contained in:
Reynold Xin 2013-05-01 14:03:24 -07:00
commit 1deee67615
5 changed files with 9 additions and 61 deletions

View file

@ -4,8 +4,8 @@ import executor.{ShuffleReadMetrics, TaskMetrics}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.storage.{DelegateBlockFetchTracker, BlockManagerId}
import util.{CompletionIterator, TimedIterator}
import spark.storage.BlockManagerId
import spark.util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
@ -49,17 +49,15 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker
itr.setDelegate(blockFetcherItr)
val itr = blockFetcherItr.flatMap(unpackBlock)
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = itr.totalBlocks
shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = itr.numRemoteBlocks
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
metrics.shuffleReadMetrics = Some(shuffleMetrics)
})
}

View file

@ -48,11 +48,6 @@ class ShuffleReadMetrics extends Serializable {
*/
var localBlocksFetched: Int = _
/**
* Total time to read shuffle data
*/
var shuffleReadMillis: Long = _
/**
* Total time that is spent blocked waiting for shuffle to fetch data
*/

View file

@ -1,12 +0,0 @@
package spark.storage
private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
var delegate : BlockFetchTracker = _
def setDelegate(d: BlockFetchTracker) {delegate = d}
def totalBlocks = delegate.totalBlocks
def numLocalBlocks = delegate.numLocalBlocks
def numRemoteBlocks = delegate.numRemoteBlocks
def remoteFetchTime = delegate.remoteFetchTime
def fetchWaitTime = delegate.fetchWaitTime
def remoteBytesRead = delegate.remoteBytesRead
}

View file

@ -1,32 +0,0 @@
package spark.util
/**
* A utility for tracking the total time an iterator takes to iterate through its elements.
*
* In general, this should only be used if you expect it to take a considerable amount of time
* (eg. milliseconds) to get each element -- otherwise, the timing won't be very accurate,
* and you are probably just adding more overhead
*/
class TimedIterator[+A](val sub: Iterator[A]) extends Iterator[A] {
private var netMillis = 0l
private var nElems = 0
def hasNext = {
val start = System.currentTimeMillis()
val r = sub.hasNext
val end = System.currentTimeMillis()
netMillis += (end - start)
r
}
def next = {
val start = System.currentTimeMillis()
val r = sub.next
val end = System.currentTimeMillis()
netMillis += (end - start)
nElems += 1
r
}
def getNetMillis = netMillis
def getAverageTimePerItem = netMillis / nElems.toDouble
}

View file

@ -57,7 +57,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
sm.shuffleReadMillis should be > (0l)
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)