Track block managers by hostname; handle manager removal.

This commit is contained in:
Charles Reiss 2012-12-05 23:11:06 -08:00
parent 5afa2ee9e9
commit c9e54a6755

View file

@ -156,6 +156,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
def lastSeenMs: Long = _lastSeenMs
def blocks: JHashMap[String, StorageLevel] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
@ -164,16 +166,30 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
blockManagerInfo.remove(blockManagerId)
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockInfo.get(blockId)._2
locations -= blockManagerId
if (locations.size == 0) {
blockInfo.remove(locations)
}
}
}
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
val ip = host.split(":")(0)
val port = host.split(":")(1)
blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
blockManagerIdByHost.get(host).map(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
@ -223,12 +239,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerIdByHost.contains(blockManagerId.ip) &&
blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
val oldId = blockManagerIdByHost(blockManagerId.ip)
logInfo("Got second registration for host " + blockManagerId +
"; removing old slave " + oldId)
removeBlockManager(oldId)
}
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
}
blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}