diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c67a61515e..1a54d8ddb4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet} +import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} @@ -321,89 +321,14 @@ private[spark] class BlockManager( */ def getLocal(blockId: BlockId): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) - val info = blockInfo.get(blockId).orNull - if (info != null) { - info.synchronized { - - // In the another thread is writing the block, wait for it to become ready. - if (!info.waitForReady()) { - // If we get here, the block write failed. - logWarning("Block " + blockId + " was marked as failure.") - return None - } - - val level = info.level - logDebug("Level for block " + blockId + " is " + level) - - // Look for the block in memory - if (level.useMemory) { - logDebug("Getting block " + blockId + " from memory") - memoryStore.getValues(blockId) match { - case Some(iterator) => - return Some(iterator) - case None => - logDebug("Block " + blockId + " not found in memory") - } - } - - // Look for block on disk, potentially loading it back into memory if required - if (level.useDisk) { - logDebug("Getting block " + blockId + " from disk") - if (level.useMemory && level.deserialized) { - diskStore.getValues(blockId) match { - case Some(iterator) => - // Put the block back in memory before returning it - // TODO: Consider creating a putValues that also takes in a iterator ? - val elements = new ArrayBuffer[Any] - elements ++= iterator - memoryStore.putValues(blockId, elements, level, true).data match { - case Left(iterator2) => - return Some(iterator2) - case _ => - throw new Exception("Memory store did not return back an iterator") - } - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } else if (level.useMemory && !level.deserialized) { - // Read it as a byte buffer into memory first, then return it - diskStore.getBytes(blockId) match { - case Some(bytes) => - // Put a copy of the block back in memory before returning it. Note that we can't - // put the ByteBuffer returned by the disk store as that's a memory-mapped file. - // The use of rewind assumes this. - assert (0 == bytes.position()) - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - memoryStore.putBytes(blockId, copyForMemory, level) - bytes.rewind() - return Some(dataDeserialize(blockId, bytes)) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } else { - diskStore.getValues(blockId) match { - case Some(iterator) => - return Some(iterator) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } - } - } - } else { - logDebug("Block " + blockId + " not registered locally") - } - return None + doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] } /** * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { - // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") - // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { @@ -414,12 +339,15 @@ private[spark] class BlockManager( throw new Exception("Block " + blockId + " not found on disk, though it should be") } } + doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + } + private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - // In the another thread is writing the block, wait for it to become ready. + // If another thread is writing the block, wait for it to become ready. if (!info.waitForReady()) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure.") @@ -432,42 +360,73 @@ private[spark] class BlockManager( // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") - memoryStore.getBytes(blockId) match { - case Some(bytes) => - return Some(bytes) + val result = if (asValues) { + memoryStore.getValues(blockId) + } else { + memoryStore.getBytes(blockId) + } + result match { + case Some(values) => + return Some(values) case None => logDebug("Block " + blockId + " not found in memory") } } - // Look for block on disk + // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { - // Read it as a byte buffer into memory first, then return it - diskStore.getBytes(blockId) match { - case Some(bytes) => - assert (0 == bytes.position()) - if (level.useMemory) { - if (level.deserialized) { - memoryStore.putBytes(blockId, bytes, level) - } else { - // The memory store will hang onto the ByteBuffer, so give it a copy instead of - // the memory-mapped file buffer we got from the disk store - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - memoryStore.putBytes(blockId, copyForMemory, level) - } - } - bytes.rewind() - return Some(bytes) + logDebug("Getting block " + blockId + " from disk") + val bytes: ByteBuffer = diskStore.getBytes(blockId) match { + case Some(bytes) => bytes case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } + assert (0 == bytes.position()) + + if (!level.useMemory) { + // If the block shouldn't be stored in memory, we can just return it: + if (asValues) { + return Some(dataDeserialize(blockId, bytes)) + } else { + return Some(bytes) + } + } else { + // Otherwise, we also have to store something in the memory store: + if (!level.deserialized || !asValues) { + // We'll store the bytes in memory if the block's storage level includes + // "memory serialized", or if it should cached as objects in memory + // but we only requested its serialized bytes: + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + memoryStore.putBytes(blockId, copyForMemory, level) + bytes.rewind() + } + if (!asValues) { + return Some(bytes) + } else { + val values = dataDeserialize(blockId, bytes) + if (level.deserialized) { + // Cache the values before returning them: + // TODO: Consider creating a putValues that also takes in a iterator? + val valuesBuffer = new ArrayBuffer[Any] + valuesBuffer ++= values + memoryStore.putValues(blockId, valuesBuffer, level, true).data match { + case Left(values2) => + return Some(values2) + case _ => + throw new Exception("Memory store did not return back an iterator") + } + } else { + return Some(values) + } + } + } } } } else { logDebug("Block " + blockId + " not registered locally") } - return None + None } /**