Changed multi-get method in BlockManager to return an iterator

This commit is contained in:
Matei Zaharia 2012-08-12 19:18:01 +02:00
parent 3c94e5c188
commit ad8a7612a4
2 changed files with 13 additions and 9 deletions

View file

@ -34,8 +34,6 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
try {
val blockOptions = blockManager.get(blocksByAddress)
logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
blockOptions.foreach(x => {
val (blockId, blockOption) = x
blockOption match {
@ -65,5 +63,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
}
}
}
logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
}
}

View file

@ -6,6 +6,7 @@ import java.nio.channels.FileChannel.MapMode
import java.util.{HashMap => JHashMap}
import java.util.LinkedHashMap
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections
import scala.actors._
@ -261,15 +262,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
* Get many blocks from local and remote block manager using their BlockManagerIds.
* Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
* an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
* fashion as they're received.
*/
def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = {
def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): Iterator[(String, Option[Iterator[Any]])] = {
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
}
logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks")
val totalBlocks = blocksByAddress.map(_._2.size).sum
logDebug("Getting " + totalBlocks + " blocks")
var startTime = System.currentTimeMillis
val blocks = new HashMap[String,Option[Iterator[Any]]]()
val blocks = new ArrayBuffer[(String, Option[Iterator[Any]])](totalBlocks)
val localBlockIds = new ArrayBuffer[String]()
val remoteBlockIds = new ArrayBuffer[String]()
val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]()
@ -300,7 +304,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds.foreach(id => {
get(id) match {
case Some(block) => {
blocks.update(id, Some(block))
blocks += ((id, Some(block)))
logDebug("Got local block " + id)
}
case None => {
@ -325,7 +329,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val buffer = blockMessage.getData
val blockId = blockMessage.getId
val block = dataDeserialize(buffer)
blocks.update(blockId, Some(block))
blocks += ((blockId, Some(block)))
logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime))
count += 1
})
@ -339,7 +343,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
return blocks
return blocks.iterator
}
/**