Minor formatting fixes

This commit is contained in:
Matei Zaharia 2012-09-03 16:24:00 -07:00
parent 2d6a629f8c
commit a842c63044

View file

@ -83,6 +83,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This will be removed after cacheTracker is removed from the code base. // TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null var cacheTracker: CacheTracker = null
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties()
initLogging() initLogging()
initialize() initialize()
@ -279,7 +281,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])] val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])]
// Bound the number and memory usage of fetched remote blocks. // Bound the number and memory usage of fetched remote blocks.
val parallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)] val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)]
def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) { def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) {
@ -290,7 +291,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
case Some(message) => { case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage] val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockMessageArray.foreach(blockMessage => { for (blockMessage <- blockMessageArray) {
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
throw new SparkException( throw new SparkException(
"Unexpected message " + blockMessage.getType + " received from " + cmId) "Unexpected message " + blockMessage.getType + " received from " + cmId)
@ -298,7 +299,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val blockId = blockMessage.getId val blockId = blockMessage.getId
results.put((blockId, Some(() => dataDeserialize(blockMessage.getData)))) results.put((blockId, Some(() => dataDeserialize(blockMessage.getData))))
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}) }
} }
case None => { case None => {
logError("Could not get block(s) from " + cmId) logError("Could not get block(s) from " + cmId)
@ -318,9 +319,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds ++= blockIds localBlockIds ++= blockIds
} else { } else {
remoteBlockIds ++= blockIds remoteBlockIds ++= blockIds
blockIds.foreach{blockId => for (blockId <- blockIds) {
val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId)) val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId))
if (initialRequests < parallelFetches) { if (initialRequests < numParallelFetches) {
initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage]) initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage])
.append(blockMessage) .append(blockMessage)
initialRequests += 1 initialRequests += 1
@ -331,15 +332,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
// Send out initial request(s) for 'parallelFetches' blocks. // Send out initial request(s) for 'numParallelFetches' blocks.
for ((bmId, blockMessages) <- initialRequestBlocks) { sendRequest(bmId, blockMessages) } for ((bmId, blockMessages) <- initialRequestBlocks) {
sendRequest(bmId, blockMessages)
}
logDebug("Started remote gets for " + parallelFetches + " blocks in " + logDebug("Started remote gets for " + numParallelFetches + " blocks in " +
Utils.getUsedTimeMs(startTime) + " ms") Utils.getUsedTimeMs(startTime) + " ms")
// Get the local blocks while remote blocks are being fetched. // Get the local blocks while remote blocks are being fetched.
startTime = System.currentTimeMillis startTime = System.currentTimeMillis
localBlockIds.foreach(id => { for (id <- localBlockIds) {
getLocal(id) match { getLocal(id) match {
case Some(block) => { case Some(block) => {
results.put((id, Some(() => block))) results.put((id, Some(() => block)))
@ -349,7 +352,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new BlockException(id, "Could not get block " + id + " from local machine") throw new BlockException(id, "Could not get block " + id + " from local machine")
} }
} }
}) }
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
// Return an iterator that will read fetched blocks off the queue as they arrive. // Return an iterator that will read fetched blocks off the queue as they arrive.
@ -362,8 +365,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
resultsGotten += 1 resultsGotten += 1
val (blockId, functionOption) = results.take() val (blockId, functionOption) = results.take()
if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) { if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) {
val (bmId, blockMessage) = blocksToRequest.dequeue val (bmId, blockMessage) = blocksToRequest.dequeue()
sendRequest(bmId, Seq(blockMessage)) sendRequest(bmId, Seq(blockMessage))
} }
(blockId, functionOption.map(_.apply())) (blockId, functionOption.map(_.apply()))
} }