diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 06d2d09fce..283825391e 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -34,8 +34,6 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { try { val blockOptions = blockManager.get(blocksByAddress) - logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format( - shuffleId, reduceId, System.currentTimeMillis - startTime)) blockOptions.foreach(x => { val (blockId, blockOption) = x blockOption match { @@ -65,5 +63,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } } } + logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format( + shuffleId, reduceId, System.currentTimeMillis - startTime)) } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index cde74e5805..b79addb6c8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -6,6 +6,7 @@ import java.nio.channels.FileChannel.MapMode import java.util.{HashMap => JHashMap} import java.util.LinkedHashMap import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue import java.util.Collections import scala.actors._ @@ -261,15 +262,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Get many blocks from local and remote block manager using their BlockManagerIds. + * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns + * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined + * fashion as they're received. */ - def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = { + def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): Iterator[(String, Option[Iterator[Any]])] = { if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") } - logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks") + val totalBlocks = blocksByAddress.map(_._2.size).sum + logDebug("Getting " + totalBlocks + " blocks") var startTime = System.currentTimeMillis - val blocks = new HashMap[String,Option[Iterator[Any]]]() + val blocks = new ArrayBuffer[(String, Option[Iterator[Any]])](totalBlocks) val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new ArrayBuffer[String]() val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]() @@ -300,7 +304,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds.foreach(id => { get(id) match { case Some(block) => { - blocks.update(id, Some(block)) + blocks += ((id, Some(block))) logDebug("Got local block " + id) } case None => { @@ -325,7 +329,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val buffer = blockMessage.getData val blockId = blockMessage.getId val block = dataDeserialize(buffer) - blocks.update(blockId, Some(block)) + blocks += ((blockId, Some(block))) logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime)) count += 1 }) @@ -339,7 +343,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms") - return blocks + return blocks.iterator } /**