From 97434f49b8c029e9b78c91ec5f58557cd1b5c943 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 13 Dec 2012 22:32:19 -0800 Subject: [PATCH] Merged TD's block manager refactoring. --- .../scala/spark/storage/BlockManager.scala | 66 +- .../scala/spark/storage/BlockManagerId.scala | 23 +- .../spark/storage/BlockManagerMaster.scala | 716 +++--------------- .../storage/BlockManagerMasterActor.scala | 406 ++++++++++ .../spark/storage/BlockManagerMessages.scala | 10 +- .../scala/spark/storage/StorageLevel.scala | 32 +- .../scala/spark/util/MetadataCleaner.scala | 35 + .../scala/spark/util/TimeStampedHashMap.scala | 87 +++ .../spark/storage/BlockManagerSuite.scala | 91 ++- 9 files changed, 812 insertions(+), 654 deletions(-) create mode 100644 core/src/main/scala/spark/storage/BlockManagerMasterActor.scala create mode 100644 core/src/main/scala/spark/util/MetadataCleaner.scala create mode 100644 core/src/main/scala/spark/util/TimeStampedHashMap.scala diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index b2c9e2cc40..2f41633440 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer -import spark.util.{ByteBufferInputStream, GenerationIdUtil} +import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap} import sun.nio.ch.DirectBuffer @@ -59,7 +59,7 @@ class BlockManager( } } - private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000) + private val blockInfo = new TimeStampedHashMap[String, BlockInfo]() private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = @@ -96,13 +96,14 @@ class BlockManager( @volatile private var shuttingDown = false private def heartBeat() { - if (!master.mustHeartBeat(HeartBeat(blockManagerId))) { + if (!master.sendHeartBeat(blockManagerId)) { reregister() } } var heartBeatTask: Cancellable = null + val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() /** @@ -117,7 +118,7 @@ class BlockManager( * BlockManagerWorker actor. */ private def initialize() { - master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor) + master.registerBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { @@ -153,17 +154,14 @@ class BlockManager( def reregister() { // TODO: We might need to rate limit reregistering. logInfo("BlockManager reregistering with master") - master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor) + master.registerBlockManager(blockManagerId, maxMemory, slaveActor) reportAllBlocks() } /** * Get storage level of local block. If no info exists for the block, then returns null. */ - def getLevel(blockId: String): StorageLevel = { - val info = blockInfo.get(blockId) - if (info != null) info.level else null - } + def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull /** * Tell the master about the current storage status of a block. This will send a block update @@ -186,9 +184,9 @@ class BlockManager( */ private def tryToReportBlockStatus(blockId: String): Boolean = { val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match { - case null => + case None => (StorageLevel.NONE, 0L, 0L, false) - case info => + case Some(info) => info.synchronized { info.level match { case null => @@ -207,7 +205,7 @@ class BlockManager( } if (tellMaster) { - master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize) } else { true } @@ -219,7 +217,7 @@ class BlockManager( */ def getLocations(blockId: String): Seq[String] = { val startTimeMs = System.currentTimeMillis - var managers = master.mustGetLocations(GetLocations(blockId)) + var managers = master.getLocations(blockId) val locations = managers.map(_.ip) logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs)) return locations @@ -230,8 +228,7 @@ class BlockManager( */ def getLocations(blockIds: Array[String]): Array[Seq[String]] = { val startTimeMs = System.currentTimeMillis - val locations = master.mustGetLocationsMultipleBlockIds( - GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray + val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) return locations } @@ -253,7 +250,7 @@ class BlockManager( } } - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { info.waitForReady() // In case the block is still being put() by another thread @@ -338,7 +335,7 @@ class BlockManager( } } - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { info.waitForReady() // In case the block is still being put() by another thread @@ -394,7 +391,7 @@ class BlockManager( } logDebug("Getting remote block " + blockId) // Get locations of block - val locations = master.mustGetLocations(GetLocations(blockId)) + val locations = master.getLocations(blockId) // Get block from remote locations for (loc <- locations) { @@ -596,7 +593,7 @@ class BlockManager( throw new IllegalArgumentException("Storage level is null or invalid") } - val oldBlock = blockInfo.get(blockId) + val oldBlock = blockInfo.get(blockId).orNull if (oldBlock != null) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") oldBlock.waitForReady() @@ -697,7 +694,7 @@ class BlockManager( throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.containsKey(blockId)) { + if (blockInfo.contains(blockId)) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") return } @@ -772,7 +769,7 @@ class BlockManager( val tLevel: StorageLevel = new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) if (cachedPeers == null) { - cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) + cachedPeers = master.getPeers(blockManagerId, level.replication - 1) } for (peer: BlockManagerId <- cachedPeers) { val start = System.nanoTime @@ -819,7 +816,7 @@ class BlockManager( */ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { val level = info.level @@ -853,7 +850,7 @@ class BlockManager( */ def removeBlock(blockId: String) { logInfo("Removing block " + blockId) - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) info.synchronized { // Removals are idempotent in disk store and memory store. At worst, we get a warning. memoryStore.remove(blockId) @@ -865,6 +862,29 @@ class BlockManager( } } + def dropOldBlocks(cleanupTime: Long) { + logInfo("Dropping blocks older than " + cleanupTime) + val iterator = blockInfo.internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) + if (time < cleanupTime) { + info.synchronized { + val level = info.level + if (level.useMemory) { + memoryStore.remove(id) + } + if (level.useDisk) { + diskStore.remove(id) + } + iterator.remove() + logInfo("Dropped block " + id) + } + reportBlockStatus(id) + } + } + } + def shouldCompress(blockId: String): Boolean = { if (blockId.startsWith("shuffle_")) { compressShuffle diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala index 03cd141805..488679f049 100644 --- a/core/src/main/scala/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/spark/storage/BlockManagerId.scala @@ -1,6 +1,7 @@ package spark.storage -import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} +import java.util.concurrent.ConcurrentHashMap private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { @@ -18,6 +19,9 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter port = in.readInt() } + @throws(classOf[IOException]) + private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this) + override def toString = "BlockManagerId(" + ip + ", " + port + ")" override def hashCode = ip.hashCode * 41 + port @@ -26,4 +30,19 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter case id: BlockManagerId => port == id.port && ip == id.ip case _ => false } -} \ No newline at end of file +} + + +private[spark] object BlockManagerId { + + val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + + def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { + if (blockManagerIdCache.containsKey(id)) { + blockManagerIdCache.get(id) + } else { + blockManagerIdCache.put(id, id) + id + } + } +} diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 64cdb86f8d..cf11393a03 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -1,406 +1,17 @@ package spark.storage -import java.io._ -import java.util.{HashMap => JHashMap} - -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.ArrayBuffer import scala.util.Random -import akka.actor._ -import akka.dispatch._ +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.dispatch.Await import akka.pattern.ask -import akka.remote._ import akka.util.{Duration, Timeout} import akka.util.duration._ import spark.{Logging, SparkException, Utils} -private[spark] -case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) - - -// TODO(rxin): Move BlockManagerMasterActor to its own file. -private[spark] -class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { - - class BlockManagerInfo( - val blockManagerId: BlockManagerId, - timeMs: Long, - val maxMem: Long, - val slaveActor: ActorRef) { - - private var _lastSeenMs: Long = timeMs - private var _remainingMem: Long = maxMem - - // Mapping from block id to its status. - private val _blocks = new JHashMap[String, BlockStatus] - - logInfo("Registering block manager %s:%d with %s RAM".format( - blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) - - def updateLastSeenMs() { - _lastSeenMs = System.currentTimeMillis() - } - - def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) - : Unit = synchronized { - - updateLastSeenMs() - - if (_blocks.containsKey(blockId)) { - // The block exists on the slave already. - val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel - - if (originalLevel.useMemory) { - _remainingMem += memSize - } - } - - if (storageLevel.isValid) { - // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) - if (storageLevel.useMemory) { - _remainingMem -= memSize - logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(_remainingMem))) - } - if (storageLevel.useDisk) { - logInfo("Added %s on disk on %s:%d (size: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) - } - } else if (_blocks.containsKey(blockId)) { - // If isValid is not true, drop the block. - val blockStatus: BlockStatus = _blocks.get(blockId) - _blocks.remove(blockId) - if (blockStatus.storageLevel.useMemory) { - _remainingMem += blockStatus.memSize - logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), - Utils.memoryBytesToString(_remainingMem))) - } - if (blockStatus.storageLevel.useDisk) { - logInfo("Removed %s on %s:%d on disk (size: %s)".format( - blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) - } - } - } - - def remainingMem: Long = _remainingMem - - def lastSeenMs: Long = _lastSeenMs - - def blocks: JHashMap[String, BlockStatus] = _blocks - - override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem - - def clear() { - _blocks.clear() - } - } - - // Mapping from block manager id to the block manager's information. - private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] - - // Mapping from host name to block manager id. - private val blockManagerIdByHost = new HashMap[String, BlockManagerId] - - // Mapping from block id to the set of block managers that have the block. - private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] - - initLogging() - - val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs", - "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong - - val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", - "5000").toLong - - var timeoutCheckingTask: Cancellable = null - - override def preStart() { - if (!BlockManager.getDisableHeartBeatsForTesting) { - timeoutCheckingTask = context.system.scheduler.schedule( - 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) - } - super.preStart() - } - - def removeBlockManager(blockManagerId: BlockManagerId) { - val info = blockManagerInfo(blockManagerId) - blockManagerIdByHost.remove(blockManagerId.ip) - blockManagerInfo.remove(blockManagerId) - var iterator = info.blocks.keySet.iterator - while (iterator.hasNext) { - val blockId = iterator.next - val locations = blockInfo.get(blockId)._2 - locations -= blockManagerId - if (locations.size == 0) { - blockInfo.remove(locations) - } - } - } - - def expireDeadHosts() { - logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.") - val now = System.currentTimeMillis() - val minSeenTime = now - slaveTimeout - val toRemove = new HashSet[BlockManagerId] - for (info <- blockManagerInfo.values) { - if (info.lastSeenMs < minSeenTime) { - logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") - toRemove += info.blockManagerId - } - } - // TODO: Remove corresponding block infos - toRemove.foreach(removeBlockManager) - } - - def removeHost(host: String) { - logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") - logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - blockManagerIdByHost.get(host).foreach(removeBlockManager) - logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) - sender ! true - } - - def heartBeat(blockManagerId: BlockManagerId) { - if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { - sender ! true - } else { - sender ! false - } - } else { - blockManagerInfo(blockManagerId).updateLastSeenMs() - sender ! true - } - } - - def receive = { - case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => - register(blockManagerId, maxMemSize, slaveActor) - - case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) => - blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) - - case GetLocations(blockId) => - getLocations(blockId) - - case GetLocationsMultipleBlockIds(blockIds) => - getLocationsMultipleBlockIds(blockIds) - - case GetPeers(blockManagerId, size) => - getPeersDeterministic(blockManagerId, size) - /*getPeers(blockManagerId, size)*/ - - case GetMemoryStatus => - getMemoryStatus - - case RemoveBlock(blockId) => - removeBlock(blockId) - - case RemoveHost(host) => - removeHost(host) - sender ! true - - case StopBlockManagerMaster => - logInfo("Stopping BlockManagerMaster") - sender ! true - if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel - } - context.stop(self) - - case ExpireDeadHosts => - expireDeadHosts() - - case HeartBeat(blockManagerId) => - heartBeat(blockManagerId) - - case other => - logInfo("Got unknown message: " + other) - } - - // Remove a block from the slaves that have it. This can only be used to remove - // blocks that the master knows about. - private def removeBlock(blockId: String) { - val block = blockInfo.get(blockId) - if (block != null) { - block._2.foreach { blockManagerId: BlockManagerId => - val blockManager = blockManagerInfo.get(blockManagerId) - if (blockManager.isDefined) { - // Remove the block from the slave's BlockManager. - // Doesn't actually wait for a confirmation and the message might get lost. - // If message loss becomes frequent, we should add retry logic here. - blockManager.get.slaveActor ! RemoveBlock(blockId) - // Remove the block from the master's BlockManagerInfo. - blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0) - } - } - blockInfo.remove(blockId) - } - sender ! true - } - - // Return a map from the block manager id to max memory and remaining memory. - private def getMemoryStatus() { - val res = blockManagerInfo.map { case(blockManagerId, info) => - (blockManagerId, (info.maxMem, info.remainingMem)) - }.toMap - sender ! res - } - - private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockManagerId + " " - logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) - if (blockManagerIdByHost.contains(blockManagerId.ip) && - blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { - val oldId = blockManagerIdByHost(blockManagerId.ip) - logInfo("Got second registration for host " + blockManagerId + - "; removing old slave " + oldId) - removeBlockManager(oldId) - } - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { - logInfo("Got Register Msg from master node, don't register it") - } else { - blockManagerInfo += (blockManagerId -> new BlockManagerInfo( - blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) - } - blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) - logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) - sender ! true - } - - private def blockUpdate( - blockManagerId: BlockManagerId, - blockId: String, - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long) { - - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockManagerId + " " + blockId + " " - - if (!blockManagerInfo.contains(blockManagerId)) { - if (blockManagerId.ip == Utils.localHostName() && !isLocal) { - // We intentionally do not register the master (except in local mode), - // so we should not indicate failure. - sender ! true - } else { - sender ! false - } - return - } - - if (blockId == null) { - blockManagerInfo(blockManagerId).updateLastSeenMs() - logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) - sender ! true - return - } - - blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) - - var locations: HashSet[BlockManagerId] = null - if (blockInfo.containsKey(blockId)) { - locations = blockInfo.get(blockId)._2 - } else { - locations = new HashSet[BlockManagerId] - blockInfo.put(blockId, (storageLevel.replication, locations)) - } - - if (storageLevel.isValid) { - locations += blockManagerId - } else { - locations.remove(blockManagerId) - } - - if (locations.size == 0) { - blockInfo.remove(blockId) - } - sender ! true - } - - private def getLocations(blockId: String) { - val startTimeMs = System.currentTimeMillis() - val tmp = " " + blockId + " " - logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) - if (blockInfo.containsKey(blockId)) { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockInfo.get(blockId)._2) - logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " - + Utils.getUsedTimeMs(startTimeMs)) - sender ! res.toSeq - } else { - logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs)) - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - sender ! res - } - } - - private def getLocationsMultipleBlockIds(blockIds: Array[String]) { - def getLocations(blockId: String): Seq[BlockManagerId] = { - val tmp = blockId - logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp) - if (blockInfo.containsKey(blockId)) { - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(blockInfo.get(blockId)._2) - logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq) - return res.toSeq - } else { - logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp) - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - return res.toSeq - } - } - - logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq) - var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]] - for (blockId <- blockIds) { - res.append(getLocations(blockId)) - } - logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq) - sender ! res.toSeq - } - - private def getPeers(blockManagerId: BlockManagerId, size: Int) { - var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - res.appendAll(peers) - res -= blockManagerId - val rand = new Random(System.currentTimeMillis()) - while (res.length > size) { - res.remove(rand.nextInt(res.length)) - } - sender ! res.toSeq - } - - private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) { - var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray - var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] - - val peersWithIndices = peers.zipWithIndex - val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1) - if (selfIndex == -1) { - throw new Exception("Self index for " + blockManagerId + " not found") - } - - var index = selfIndex - while (res.size < size) { - index += 1 - if (index == selfIndex) { - throw new Exception("More peer expected than available") - } - res += peers(index % peers.size) - } - sender ! res.toSeq - } -} - - private[spark] class BlockManagerMaster( val actorSystem: ActorSystem, isMaster: Boolean, @@ -409,228 +20,81 @@ private[spark] class BlockManagerMaster( masterPort: Int) extends Logging { + val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt + val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt + val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager" val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" - val REQUEST_RETRY_INTERVAL_MS = 100 val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds - var masterActor: ActorRef = null - - if (isMaster) { - masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), - name = MASTER_AKKA_ACTOR_NAME) - logInfo("Registered BlockManagerMaster Actor") - } else { - val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME) - logInfo("Connecting to BlockManagerMaster: " + url) - masterActor = actorSystem.actorFor(url) - } - - def stop() { - if (masterActor != null) { - communicate(StopBlockManagerMaster) - masterActor = null - logInfo("BlockManagerMaster stopped") + var masterActor: ActorRef = { + if (isMaster) { + val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), + name = MASTER_AKKA_ACTOR_NAME) + logInfo("Registered BlockManagerMaster Actor") + masterActor + } else { + val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME) + logInfo("Connecting to BlockManagerMaster: " + url) + actorSystem.actorFor(url) } } - // Send a message to the master actor and get its result within a default timeout, or - // throw a SparkException if this fails. - def askMaster(message: Any): Any = { - try { - val future = masterActor.ask(message)(timeout) - return Await.result(future, timeout) - } catch { - case e: Exception => - throw new SparkException("Error communicating with BlockManagerMaster", e) - } - } - - // Send a one-way message to the master actor, to which we expect it to reply with true. - def communicate(message: Any) { - if (askMaster(message) != true) { - throw new SparkException("Error reply received from BlockManagerMaster") - } - } + /** Remove a dead host from the master actor. This is only called on the master side. */ def notifyADeadHost(host: String) { - communicate(RemoveHost(host)) + tell(RemoveHost(host)) logInfo("Removed " + host + " successfully in notifyADeadHost") } - def mustRegisterBlockManager( + /** + * Send the master actor a heart beat from the slave. Returns true if everything works out, + * false if the master does not know about the given block manager, which means the block + * manager should re-register. + */ + def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { + askMasterWithRetry[Boolean](HeartBeat(blockManagerId)) + } + + /** Register the BlockManager's id with the master. */ + def registerBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) logInfo("Trying to register BlockManager") - while (! syncRegisterBlockManager(msg)) { - logWarning("Failed to register " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - } - logInfo("Done registering BlockManager") + tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) + logInfo("Registered BlockManager") } - private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = { - //val masterActor = RemoteActor.select(node, name) - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - communicate(msg) - logInfo("BlockManager registered successfully @ syncRegisterBlockManager") - logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return true - } catch { - case e: Exception => - logError("Failed in syncRegisterBlockManager", e) - return false - } - } - - def mustHeartBeat(msg: HeartBeat): Boolean = { - var res = syncHeartBeat(msg) - while (!res.isDefined) { - logWarning("Failed to send heart beat " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - } - return res.get - } - - private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = { - try { - val answer = askMaster(msg).asInstanceOf[Boolean] - return Some(answer) - } catch { - case e: Exception => - logError("Failed in syncHeartBeat", e) - return None - } - } - - def mustBlockUpdate(msg: BlockUpdate): Boolean = { - var res = syncBlockUpdate(msg) - while (!res.isDefined) { - logWarning("Failed to send block update " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - } - return res.get - } - - private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = { - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[Boolean] - logDebug("Block update sent successfully") - logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs)) - return Some(answer) - } catch { - case e: Exception => - logError("Failed in syncBlockUpdate", e) - return None - } - } - - def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = { - var res = syncGetLocations(msg) - while (res == null) { - logInfo("Failed to get locations " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetLocations(msg) - } - return res - } - - private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = { - val startTimeMs = System.currentTimeMillis() - val tmp = " msg " + msg + " " - logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]] - if (answer != null) { - logDebug("GetLocations successful") - logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetLocations") - return null - } - } catch { - case e: Exception => - logError("GetLocations failed", e) - return null - } - } - - def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): - Seq[Seq[BlockManagerId]] = { - var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg) - while (res == null) { - logWarning("Failed to GetLocationsMultipleBlockIds " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetLocationsMultipleBlockIds(msg) - } - return res - } - - private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): - Seq[Seq[BlockManagerId]] = { - val startTimeMs = System.currentTimeMillis - val tmp = " msg " + msg + " " - logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - - try { - val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]] - if (answer != null) { - logDebug("GetLocationsMultipleBlockIds successful") - logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp + - Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetLocationsMultipleBlockIds") - return null - } - } catch { - case e: Exception => - logError("GetLocationsMultipleBlockIds failed", e) - return null - } - } - - def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = { - var res = syncGetPeers(msg) - while ((res == null) || (res.length != msg.size)) { - logInfo("Failed to get peers " + msg) - Thread.sleep(REQUEST_RETRY_INTERVAL_MS) - res = syncGetPeers(msg) - } + def updateBlockInfo( + blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long): Boolean = { + val res = askMasterWithRetry[Boolean]( + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) + logInfo("Updated info of block " + blockId) res } - private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = { - val startTimeMs = System.currentTimeMillis - val tmp = " msg " + msg + " " - logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) + /** Get locations of the blockId from the master */ + def getLocations(blockId: String): Seq[BlockManagerId] = { + askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) + } - try { - val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]] - if (answer != null) { - logDebug("GetPeers successful") - logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs)) - return answer - } else { - logError("Master replied null in response to GetPeers") - return null - } - } catch { - case e: Exception => - logError("GetPeers failed", e) - return null + /** Get locations of multiple blockIds from the master */ + def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = { + askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) + } + + /** Get ids of other nodes in the cluster from the master */ + def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { + val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) + if (result.length != numPeers) { + throw new SparkException( + "Error getting peers, only got " + result.size + " instead of " + numPeers) } + result } /** @@ -648,6 +112,72 @@ private[spark] class BlockManagerMaster( * amount of remaining memory. */ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]] + askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } + + /** Stop the master actor, called only on the Spark master node */ + def stop() { + if (masterActor != null) { + tell(StopBlockManagerMaster) + masterActor = null + logInfo("BlockManagerMaster stopped") + } + } + + /** Send a one-way message to the master actor, to which we expect it to reply with true. */ + private def tell(message: Any) { + if (!askMasterWithRetry[Boolean](message)) { + throw new SparkException("BlockManagerMasterActor returned false, expected true.") + } + } + + /** + * Send a message to the master actor and get its result within a default timeout, or + * throw a SparkException if this fails. There is no retry logic here so if the Akka + * message is lost, the master actor won't get the command. + */ + private def askMaster[T](message: Any): Any = { + try { + val future = masterActor.ask(message)(timeout) + return Await.result(future, timeout).asInstanceOf[T] + } catch { + case e: Exception => + throw new SparkException("Error communicating with BlockManagerMaster", e) + } + } + + /** + * Send a message to the master actor and get its result within a default timeout, or + * throw a SparkException if this fails. + */ + private def askMasterWithRetry[T](message: Any): T = { + // TODO: Consider removing multiple attempts + if (masterActor == null) { + throw new SparkException("Error sending message to BlockManager as masterActor is null " + + "[message = " + message + "]") + } + var attempts = 0 + var lastException: Exception = null + while (attempts < AKKA_RETRY_ATTEMPS) { + attempts += 1 + try { + val future = masterActor.ask(message)(timeout) + val result = Await.result(future, timeout) + if (result == null) { + throw new Exception("BlockManagerMaster returned null") + } + return result.asInstanceOf[T] + } catch { + case ie: InterruptedException => throw ie + case e: Exception => + lastException = e + logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) + } + Thread.sleep(AKKA_RETRY_INTERVAL_MS) + } + + throw new SparkException( + "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) + } + } diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala new file mode 100644 index 0000000000..0d84e559cb --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -0,0 +1,406 @@ +package spark.storage + +import java.util.{HashMap => JHashMap} + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import scala.util.Random + +import akka.actor.{Actor, ActorRef, Cancellable} +import akka.util.{Duration, Timeout} +import akka.util.duration._ + +import spark.{Logging, Utils} + +/** + * BlockManagerMasterActor is an actor on the master node to track statuses of + * all slaves' block managers. + */ + +private[spark] +object BlockManagerMasterActor { + + case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) + + class BlockManagerInfo( + val blockManagerId: BlockManagerId, + timeMs: Long, + val maxMem: Long, + val slaveActor: ActorRef) + extends Logging { + + private var _lastSeenMs: Long = timeMs + private var _remainingMem: Long = maxMem + + // Mapping from block id to its status. + private val _blocks = new JHashMap[String, BlockStatus] + + logInfo("Registering block manager %s:%d with %s RAM".format( + blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) + + def updateLastSeenMs() { + _lastSeenMs = System.currentTimeMillis() + } + + def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) + : Unit = synchronized { + + updateLastSeenMs() + + if (_blocks.containsKey(blockId)) { + // The block exists on the slave already. + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel + + if (originalLevel.useMemory) { + _remainingMem += memSize + } + } + + if (storageLevel.isValid) { + // isValid means it is either stored in-memory or on-disk. + _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) + if (storageLevel.useMemory) { + _remainingMem -= memSize + logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(_remainingMem))) + } + if (storageLevel.useDisk) { + logInfo("Added %s on disk on %s:%d (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) + } + } else if (_blocks.containsKey(blockId)) { + // If isValid is not true, drop the block. + val blockStatus: BlockStatus = _blocks.get(blockId) + _blocks.remove(blockId) + if (blockStatus.storageLevel.useMemory) { + _remainingMem += blockStatus.memSize + logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(_remainingMem))) + } + if (blockStatus.storageLevel.useDisk) { + logInfo("Removed %s on %s:%d on disk (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) + } + } + } + + def remainingMem: Long = _remainingMem + + def lastSeenMs: Long = _lastSeenMs + + def blocks: JHashMap[String, BlockStatus] = _blocks + + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem + + def clear() { + _blocks.clear() + } + } +} + + +private[spark] +class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { + + // Mapping from block manager id to the block manager's information. + private val blockManagerInfo = + new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo] + + // Mapping from host name to block manager id. + private val blockManagerIdByHost = new HashMap[String, BlockManagerId] + + // Mapping from block id to the set of block managers that have the block. + private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] + + initLogging() + + val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs", + "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong + + val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", + "5000").toLong + + var timeoutCheckingTask: Cancellable = null + + override def preStart() { + if (!BlockManager.getDisableHeartBeatsForTesting) { + timeoutCheckingTask = context.system.scheduler.schedule( + 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + } + super.preStart() + } + + def removeBlockManager(blockManagerId: BlockManagerId) { + val info = blockManagerInfo(blockManagerId) + blockManagerIdByHost.remove(blockManagerId.ip) + blockManagerInfo.remove(blockManagerId) + var iterator = info.blocks.keySet.iterator + while (iterator.hasNext) { + val blockId = iterator.next + val locations = blockInfo.get(blockId)._2 + locations -= blockManagerId + if (locations.size == 0) { + blockInfo.remove(locations) + } + } + } + + def expireDeadHosts() { + logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.") + val now = System.currentTimeMillis() + val minSeenTime = now - slaveTimeout + val toRemove = new HashSet[BlockManagerId] + for (info <- blockManagerInfo.values) { + if (info.lastSeenMs < minSeenTime) { + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") + toRemove += info.blockManagerId + } + } + // TODO: Remove corresponding block infos + toRemove.foreach(removeBlockManager) + } + + def removeHost(host: String) { + logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") + logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) + blockManagerIdByHost.get(host).foreach(removeBlockManager) + logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) + sender ! true + } + + def heartBeat(blockManagerId: BlockManagerId) { + if (!blockManagerInfo.contains(blockManagerId)) { + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + sender ! true + } else { + sender ! false + } + } else { + blockManagerInfo(blockManagerId).updateLastSeenMs() + sender ! true + } + } + + def receive = { + case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => + register(blockManagerId, maxMemSize, slaveActor) + + case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => + blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) + + case GetLocations(blockId) => + getLocations(blockId) + + case GetLocationsMultipleBlockIds(blockIds) => + getLocationsMultipleBlockIds(blockIds) + + case GetPeers(blockManagerId, size) => + getPeersDeterministic(blockManagerId, size) + /*getPeers(blockManagerId, size)*/ + + case GetMemoryStatus => + getMemoryStatus + + case RemoveBlock(blockId) => + removeBlock(blockId) + + case RemoveHost(host) => + removeHost(host) + sender ! true + + case StopBlockManagerMaster => + logInfo("Stopping BlockManagerMaster") + sender ! true + if (timeoutCheckingTask != null) { + timeoutCheckingTask.cancel + } + context.stop(self) + + case ExpireDeadHosts => + expireDeadHosts() + + case HeartBeat(blockManagerId) => + heartBeat(blockManagerId) + + case other => + logInfo("Got unknown message: " + other) + } + + // Remove a block from the slaves that have it. This can only be used to remove + // blocks that the master knows about. + private def removeBlock(blockId: String) { + val block = blockInfo.get(blockId) + if (block != null) { + block._2.foreach { blockManagerId: BlockManagerId => + val blockManager = blockManagerInfo.get(blockManagerId) + if (blockManager.isDefined) { + // Remove the block from the slave's BlockManager. + // Doesn't actually wait for a confirmation and the message might get lost. + // If message loss becomes frequent, we should add retry logic here. + blockManager.get.slaveActor ! RemoveBlock(blockId) + // Remove the block from the master's BlockManagerInfo. + blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0) + } + } + blockInfo.remove(blockId) + } + sender ! true + } + + // Return a map from the block manager id to max memory and remaining memory. + private def getMemoryStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + (blockManagerId, (info.maxMem, info.remainingMem)) + }.toMap + sender ! res + } + + private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + val startTimeMs = System.currentTimeMillis() + val tmp = " " + blockManagerId + " " + logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) + if (blockManagerIdByHost.contains(blockManagerId.ip) && + blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { + val oldId = blockManagerIdByHost(blockManagerId.ip) + logInfo("Got second registration for host " + blockManagerId + + "; removing old slave " + oldId) + removeBlockManager(oldId) + } + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + logInfo("Got Register Msg from master node, don't register it") + } else { + blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo( + blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) + } + blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) + logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) + sender ! true + } + + private def blockUpdate( + blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long) { + + val startTimeMs = System.currentTimeMillis() + val tmp = " " + blockManagerId + " " + blockId + " " + + if (!blockManagerInfo.contains(blockManagerId)) { + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + // We intentionally do not register the master (except in local mode), + // so we should not indicate failure. + sender ! true + } else { + sender ! false + } + return + } + + if (blockId == null) { + blockManagerInfo(blockManagerId).updateLastSeenMs() + logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) + sender ! true + return + } + + blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) + + var locations: HashSet[BlockManagerId] = null + if (blockInfo.containsKey(blockId)) { + locations = blockInfo.get(blockId)._2 + } else { + locations = new HashSet[BlockManagerId] + blockInfo.put(blockId, (storageLevel.replication, locations)) + } + + if (storageLevel.isValid) { + locations += blockManagerId + } else { + locations.remove(blockManagerId) + } + + if (locations.size == 0) { + blockInfo.remove(blockId) + } + sender ! true + } + + private def getLocations(blockId: String) { + val startTimeMs = System.currentTimeMillis() + val tmp = " " + blockId + " " + logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) + if (blockInfo.containsKey(blockId)) { + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + res.appendAll(blockInfo.get(blockId)._2) + logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " + + Utils.getUsedTimeMs(startTimeMs)) + sender ! res.toSeq + } else { + logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs)) + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + sender ! res + } + } + + private def getLocationsMultipleBlockIds(blockIds: Array[String]) { + def getLocations(blockId: String): Seq[BlockManagerId] = { + val tmp = blockId + logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp) + if (blockInfo.containsKey(blockId)) { + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + res.appendAll(blockInfo.get(blockId)._2) + logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq) + return res.toSeq + } else { + logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp) + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + return res.toSeq + } + } + + logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq) + var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]] + for (blockId <- blockIds) { + res.append(getLocations(blockId)) + } + logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq) + sender ! res.toSeq + } + + private def getPeers(blockManagerId: BlockManagerId, size: Int) { + var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + res.appendAll(peers) + res -= blockManagerId + val rand = new Random(System.currentTimeMillis()) + while (res.length > size) { + res.remove(rand.nextInt(res.length)) + } + sender ! res.toSeq + } + + private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) { + var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray + var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] + + val peersWithIndices = peers.zipWithIndex + val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1) + if (selfIndex == -1) { + throw new Exception("Self index for " + blockManagerId + " not found") + } + + var index = selfIndex + while (res.size < size) { + index += 1 + if (index == selfIndex) { + throw new Exception("More peer expected than available") + } + res += peers(index % peers.size) + } + sender ! res.toSeq + } +} diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 5bca170f95..d73a9b790f 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -34,7 +34,7 @@ private[spark] case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster private[spark] -class BlockUpdate( +class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, @@ -65,17 +65,17 @@ class BlockUpdate( } private[spark] -object BlockUpdate { +object UpdateBlockInfo { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): BlockUpdate = { - new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) } // For pattern-matching - def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) } } diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index c497f03e0c..e3544e5aae 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -1,6 +1,6 @@ package spark.storage -import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, @@ -10,14 +10,16 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} * commonly useful storage levels. */ class StorageLevel( - var useDisk: Boolean, + var useDisk: Boolean, var useMemory: Boolean, var deserialized: Boolean, var replication: Int = 1) extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. - + + assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + def this(flags: Int, replication: Int) { this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } @@ -29,14 +31,14 @@ class StorageLevel( override def equals(other: Any): Boolean = other match { case s: StorageLevel => - s.useDisk == useDisk && + s.useDisk == useDisk && s.useMemory == useMemory && s.deserialized == deserialized && - s.replication == replication + s.replication == replication case _ => false } - + def isValid = ((useMemory || useDisk) && (replication > 0)) def toInt: Int = { @@ -66,10 +68,16 @@ class StorageLevel( replication = in.readByte() } + @throws(classOf[IOException]) + private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) + override def toString: String = "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) + + override def hashCode(): Int = toInt * 41 + replication } + object StorageLevel { val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) @@ -82,4 +90,16 @@ object StorageLevel { val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) + + private[spark] + val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]() + + private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = { + if (storageLevelCache.containsKey(level)) { + storageLevelCache.get(level) + } else { + storageLevelCache.put(level, level) + level + } + } } diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala new file mode 100644 index 0000000000..19e67acd0c --- /dev/null +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -0,0 +1,35 @@ +package spark.util + +import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors} +import java.util.{TimerTask, Timer} +import spark.Logging + +class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { + + val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt + val periodSeconds = math.max(10, delaySeconds / 10) + val timer = new Timer(name + " cleanup timer", true) + + val task = new TimerTask { + def run() { + try { + if (delaySeconds > 0) { + cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) + logInfo("Ran metadata cleaner for " + name) + } + } catch { + case e: Exception => logError("Error running cleanup task for " + name, e) + } + } + } + if (periodSeconds > 0) { + logInfo( + "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and " + + "period of " + periodSeconds + " secs") + timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000) + } + + def cancel() { + timer.cancel() + } +} diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala new file mode 100644 index 0000000000..070ee19ac0 --- /dev/null +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -0,0 +1,87 @@ +package spark.util + +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, Map} + +/** + * This is a custom implementation of scala.collection.mutable.Map which stores the insertion + * time stamp along with each key-value pair. Key-value pairs that are older than a particular + * threshold time can them be removed using the cleanup method. This is intended to be a drop-in + * replacement of scala.collection.mutable.HashMap. + */ +class TimeStampedHashMap[A, B] extends Map[A, B]() { + val internalMap = new ConcurrentHashMap[A, (B, Long)]() + + def get(key: A): Option[B] = { + val value = internalMap.get(key) + if (value != null) Some(value._1) else None + } + + def iterator: Iterator[(A, B)] = { + val jIterator = internalMap.entrySet().iterator() + jIterator.map(kv => (kv.getKey, kv.getValue._1)) + } + + override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = { + val newMap = new TimeStampedHashMap[A, B1] + newMap.internalMap.putAll(this.internalMap) + newMap.internalMap.put(kv._1, (kv._2, currentTime)) + newMap + } + + override def - (key: A): Map[A, B] = { + internalMap.remove(key) + this + } + + override def += (kv: (A, B)): this.type = { + internalMap.put(kv._1, (kv._2, currentTime)) + this + } + + override def -= (key: A): this.type = { + internalMap.remove(key) + this + } + + override def update(key: A, value: B) { + this += ((key, value)) + } + + override def apply(key: A): B = { + val value = internalMap.get(key) + if (value == null) throw new NoSuchElementException() + value._1 + } + + override def filter(p: ((A, B)) => Boolean): Map[A, B] = { + internalMap.map(kv => (kv._1, kv._2._1)).filter(p) + } + + override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() + + override def size(): Int = internalMap.size() + + override def foreach[U](f: ((A, B)) => U): Unit = { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val kv = (entry.getKey, entry.getValue._1) + f(kv) + } + } + + def cleanup(threshTime: Long) { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + if (entry.getValue._2 < threshTime) { + iterator.remove() + } + } + } + + private def currentTime: Long = System.currentTimeMillis() + +} diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 4dc3b7ec05..e50ce1430f 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -22,6 +22,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var oldHeartBeat: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + System.setProperty("spark.kryoserializer.buffer.mb", "1") val serializer = new KryoSerializer before { @@ -63,7 +64,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } - test("manager-master interaction") { + test("StorageLevel object caching") { + val level1 = new StorageLevel(false, false, false, 3) + val level2 = new StorageLevel(false, false, false, 3) + val bytes1 = spark.Utils.serialize(level1) + val level1_ = spark.Utils.deserialize[StorageLevel](bytes1) + val bytes2 = spark.Utils.serialize(level2) + val level2_ = spark.Utils.deserialize[StorageLevel](bytes2) + assert(level1_ === level1, "Deserialized level1 not same as original level1") + assert(level2_ === level2, "Deserialized level2 not same as original level1") + assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2") + assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1") + } + + test("BlockManagerId object caching") { + val id1 = new StorageLevel(false, false, false, 3) + val id2 = new StorageLevel(false, false, false, 3) + val bytes1 = spark.Utils.serialize(id1) + val id1_ = spark.Utils.deserialize[StorageLevel](bytes1) + val bytes2 = spark.Utils.serialize(id2) + val id2_ = spark.Utils.deserialize[StorageLevel](bytes2) + assert(id1_ === id1, "Deserialized id1 not same as original id1") + assert(id2_ === id2, "Deserialized id2 not same as original id1") + assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2") + assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1") + } + + test("master + 1 manager interaction") { store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -80,17 +107,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a3") != None, "a3 was not in store") // Checking whether master knows about the blocks or not - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") - assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + assert(master.getLocations("a1").size > 0, "master was not told about a1") + assert(master.getLocations("a2").size > 0, "master was not told about a2") + assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master store.dropFromMemory("a1", null) store.dropFromMemory("a2", null) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") - assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") - assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") + assert(master.getLocations("a1").size === 0, "master did not remove a1") + assert(master.getLocations("a2").size === 0, "master did not remove a2") + } + + test("master + 2 managers interaction") { + store = new BlockManager(actorSystem, master, serializer, 2000) + val otherStore = new BlockManager(actorSystem, master, new KryoSerializer, 2000) + + val peers = master.getPeers(store.blockManagerId, 1) + assert(peers.size === 1, "master did not return the other manager as a peer") + assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager") + + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) + otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") + assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") } test("removing block") { @@ -113,9 +156,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a3") != None, "a3 was not in store") // Checking whether master knows about the blocks or not - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") - assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + assert(master.getLocations("a1").size > 0, "master was not told about a1") + assert(master.getLocations("a2").size > 0, "master was not told about a2") + assert(master.getLocations("a3").size === 0, "master was told about a3") // Remove a1 and a2 and a3. Should be no-op for a3. master.removeBlock("a1") @@ -123,10 +166,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master.removeBlock("a3") assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") - assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") - assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") + assert(master.getLocations("a1").size === 0, "master did not remove a1") + assert(master.getLocations("a2").size === 0, "master did not remove a2") assert(store.getSingle("a3") != None, "a3 was not in store") - assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + assert(master.getLocations("a3").size === 0, "master was told about a3") memStatus = master.getMemoryStatus.head._2 assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000") @@ -140,13 +183,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") != None, "a1 was not in store") - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.getLocations("a1").size > 0, "master was not told about a1") master.notifyADeadHost(store.blockManagerId.ip) - assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store invokePrivate heartBeat() - assert(master.mustGetLocations(GetLocations("a1")).size > 0, + assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") } @@ -157,17 +200,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.getLocations("a1").size > 0, "master was not told about a1") master.notifyADeadHost(store.blockManagerId.ip) - assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) - assert(master.mustGetLocations(GetLocations("a1")).size > 0, - "a1 was not reregistered with master") - assert(master.mustGetLocations(GetLocations("a2")).size > 0, - "master was not told about a2") + assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") + assert(master.getLocations("a2").size > 0, "master was not told about a2") } test("deregistration on duplicate") { @@ -177,19 +218,19 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.getLocations("a1").size > 0, "master was not told about a1") store2 = new BlockManager(actorSystem, master, serializer, 2000) - assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + assert(master.getLocations("a1").size == 0, "a1 was not removed from master") store invokePrivate heartBeat() - assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.getLocations("a1").size > 0, "master was not told about a1") store2 invokePrivate heartBeat() - assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master") + assert(master.getLocations("a1").size == 0, "a2 was not removed from master") } test("in-memory LRU storage") {