De-duplication in getRemote() and getRemoteBytes().

This commit is contained in:
Josh Rosen 2013-10-18 13:57:48 -07:00 committed by Josh Rosen
parent babccb695e
commit 8279185651

View file

@ -433,52 +433,38 @@ private[spark] class BlockManager(
* Get block from remote block managers.
*/
def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block " + blockId)
// Get locations of block
val locations = Random.shuffle(master.getLocations(blockId))
// Get block from remote locations
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
return Some(dataDeserialize(blockId, data))
}
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
return None
doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
}
/**
* Get block from remote block managers as serialized bytes.
*/
def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
// TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be
// refactored.
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block " + blockId + " as bytes")
val locations = master.getLocations(blockId)
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
return Some(data)
}
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
return None
logDebug("Getting remote block " + blockId + " as bytes")
doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
if (asValues) {
return Some(dataDeserialize(blockId, data))
} else {
return Some(data)
}
}
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
return None
}
/**
* Get a block from the block manager (either local or remote).
*/