From c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 23:11:06 -0800 Subject: [PATCH] Track block managers by hostname; handle manager removal. --- .../spark/storage/BlockManagerMaster.scala | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 531331b0e5..4959c05f94 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -156,6 +156,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor def lastSeenMs: Long = _lastSeenMs + def blocks: JHashMap[String, StorageLevel] = _blocks + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem def clear() { @@ -164,16 +166,30 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] + private val blockManagerIdByHost = new HashMap[String, BlockManagerId] private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() + def removeBlockManager(blockManagerId: BlockManagerId) { + val info = blockManagerInfo(blockManagerId) + blockManagerIdByHost.remove(blockManagerId.ip) + blockManagerInfo.remove(blockManagerId) + var iterator = info.blocks.keySet.iterator + while (iterator.hasNext) { + val blockId = iterator.next + val locations = blockInfo.get(blockId)._2 + locations -= blockManagerId + if (locations.size == 0) { + blockInfo.remove(locations) + } + } + } + def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - val ip = host.split(":")(0) - val port = host.split(":")(1) - blockManagerInfo.remove(new BlockManagerId(ip, port.toInt)) + blockManagerIdByHost.get(host).map(removeBlockManager) logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) sender ! true } @@ -223,12 +239,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) + if (blockManagerIdByHost.contains(blockManagerId.ip) && + blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { + val oldId = blockManagerIdByHost(blockManagerId.ip) + logInfo("Got second registration for host " + blockManagerId + + "; removing old slave " + oldId) + removeBlockManager(oldId) + } if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( blockManagerId, System.currentTimeMillis() / 1000, maxMemSize)) } + blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true }