change CleanupIterator to CompletionIterator

This commit is contained in:
Imran Rashid 2013-03-03 16:39:05 -08:00
parent 8fef5b9c5f
commit f1006b99ff
3 changed files with 27 additions and 27 deletions

View file

@ -5,7 +5,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.storage.{DelegateBlockFetchTracker, BlockManagerId}
import util.{CleanupIterator, TimedIterator}
import util.{CompletionIterator, TimedIterator}
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
override def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics) = {
@ -51,7 +51,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockFetcherItr = blockManager.getMultiple(blocksByAddress)
val itr = new TimedIterator(blockFetcherItr.flatMap(unpackBlock)) with DelegateBlockFetchTracker
itr.setDelegate(blockFetcherItr)
CleanupIterator[(K,V), Iterator[(K,V)]](itr, {
CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime

View file

@ -1,25 +0,0 @@
package spark.util
/**
* Wrapper around an iterator which calls a cleanup method when its finished iterating through its elements
*/
abstract class CleanupIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
def next = sub.next
def hasNext = {
val r = sub.hasNext
if (!r) {
cleanup
}
r
}
def cleanup
}
object CleanupIterator {
def apply[A, I <: Iterator[A]](sub: I, cleanupFunction: => Unit) : CleanupIterator[A,I] = {
new CleanupIterator[A,I](sub) {
def cleanup = cleanupFunction
}
}
}

View file

@ -0,0 +1,25 @@
package spark.util
/**
* Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
*/
abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
def next = sub.next
def hasNext = {
val r = sub.hasNext
if (!r) {
completion
}
r
}
def completion()
}
object CompletionIterator {
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
new CompletionIterator[A,I](sub) {
def completion() = completionFunction
}
}
}