De-duplication in getLocal() and getLocalBytes().

This commit is contained in:
Josh Rosen 2013-10-18 13:44:41 -07:00 committed by Josh Rosen
parent 6511bbe2ad
commit babccb695e

View file

@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
import scala.collection.mutable.{HashMap, ArrayBuffer}
import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
@ -321,89 +321,14 @@ private[spark] class BlockManager(
*/
def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// In the another thread is writing the block, wait for it to become ready.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning("Block " + blockId + " was marked as failure.")
return None
}
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getValues(blockId) match {
case Some(iterator) =>
return Some(iterator)
case None =>
logDebug("Block " + blockId + " not found in memory")
}
}
// Look for block on disk, potentially loading it back into memory if required
if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk")
if (level.useMemory && level.deserialized) {
diskStore.getValues(blockId) match {
case Some(iterator) =>
// Put the block back in memory before returning it
// TODO: Consider creating a putValues that also takes in a iterator ?
val elements = new ArrayBuffer[Any]
elements ++= iterator
memoryStore.putValues(blockId, elements, level, true).data match {
case Left(iterator2) =>
return Some(iterator2)
case _ =>
throw new Exception("Memory store did not return back an iterator")
}
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
} else if (level.useMemory && !level.deserialized) {
// Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
case Some(bytes) =>
// Put a copy of the block back in memory before returning it. Note that we can't
// put the ByteBuffer returned by the disk store as that's a memory-mapped file.
// The use of rewind assumes this.
assert (0 == bytes.position())
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
return Some(dataDeserialize(blockId, bytes))
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
} else {
diskStore.getValues(blockId) match {
case Some(iterator) =>
return Some(iterator)
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
}
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
return None
doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
}
/**
* Get block from the local block manager as serialized bytes.
*/
def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
// TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
logDebug("Getting local block " + blockId + " as bytes")
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
@ -414,12 +339,15 @@ private[spark] class BlockManager(
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// In the another thread is writing the block, wait for it to become ready.
// If another thread is writing the block, wait for it to become ready.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning("Block " + blockId + " was marked as failure.")
@ -432,42 +360,73 @@ private[spark] class BlockManager(
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getBytes(blockId) match {
case Some(bytes) =>
return Some(bytes)
val result = if (asValues) {
memoryStore.getValues(blockId)
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return Some(values)
case None =>
logDebug("Block " + blockId + " not found in memory")
}
}
// Look for block on disk
// Look for block on disk, potentially storing it back into memory if required:
if (level.useDisk) {
// Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
case Some(bytes) =>
assert (0 == bytes.position())
if (level.useMemory) {
if (level.deserialized) {
memoryStore.putBytes(blockId, bytes, level)
} else {
// The memory store will hang onto the ByteBuffer, so give it a copy instead of
// the memory-mapped file buffer we got from the disk store
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
}
}
bytes.rewind()
return Some(bytes)
logDebug("Getting block " + blockId + " from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(bytes) => bytes
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
assert (0 == bytes.position())
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it:
if (asValues) {
return Some(dataDeserialize(blockId, bytes))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store:
if (!level.deserialized || !asValues) {
// We'll store the bytes in memory if the block's storage level includes
// "memory serialized", or if it should cached as objects in memory
// but we only requested its serialized bytes:
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
if (!asValues) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them:
// TODO: Consider creating a putValues that also takes in a iterator?
val valuesBuffer = new ArrayBuffer[Any]
valuesBuffer ++= values
memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
case Left(values2) =>
return Some(values2)
case _ =>
throw new Exception("Memory store did not return back an iterator")
}
} else {
return Some(values)
}
}
}
}
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
return None
None
}
/**