diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3aac8e50b4..1211f0f2c2 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -83,6 +83,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This will be removed after cacheTracker is removed from the code base. var cacheTracker: CacheTracker = null + val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties() + initLogging() initialize() @@ -279,7 +281,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])] // Bound the number and memory usage of fetched remote blocks. - val parallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)] def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) { @@ -290,7 +291,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - blockMessageArray.foreach(blockMessage => { + for (blockMessage <- blockMessageArray) { if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { throw new SparkException( "Unexpected message " + blockMessage.getType + " received from " + cmId) @@ -298,7 +299,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val blockId = blockMessage.getId results.put((blockId, Some(() => dataDeserialize(blockMessage.getData)))) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) - }) + } } case None => { logError("Could not get block(s) from " + cmId) @@ -318,9 +319,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds ++= blockIds } else { remoteBlockIds ++= blockIds - blockIds.foreach{blockId => + for (blockId <- blockIds) { val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId)) - if (initialRequests < parallelFetches) { + if (initialRequests < numParallelFetches) { initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage]) .append(blockMessage) initialRequests += 1 @@ -331,15 +332,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - // Send out initial request(s) for 'parallelFetches' blocks. - for ((bmId, blockMessages) <- initialRequestBlocks) { sendRequest(bmId, blockMessages) } + // Send out initial request(s) for 'numParallelFetches' blocks. + for ((bmId, blockMessages) <- initialRequestBlocks) { + sendRequest(bmId, blockMessages) + } - logDebug("Started remote gets for " + parallelFetches + " blocks in " + + logDebug("Started remote gets for " + numParallelFetches + " blocks in " + Utils.getUsedTimeMs(startTime) + " ms") // Get the local blocks while remote blocks are being fetched. startTime = System.currentTimeMillis - localBlockIds.foreach(id => { + for (id <- localBlockIds) { getLocal(id) match { case Some(block) => { results.put((id, Some(() => block))) @@ -349,7 +352,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new BlockException(id, "Could not get block " + id + " from local machine") } } - }) + } logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") // Return an iterator that will read fetched blocks off the queue as they arrive. @@ -362,8 +365,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m resultsGotten += 1 val (blockId, functionOption) = results.take() if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) { - val (bmId, blockMessage) = blocksToRequest.dequeue - sendRequest(bmId, Seq(blockMessage)) + val (bmId, blockMessage) = blocksToRequest.dequeue() + sendRequest(bmId, Seq(blockMessage)) } (blockId, functionOption.map(_.apply())) }