Merged TD's block manager refactoring.

This commit is contained in:
Reynold Xin 2012-12-13 22:32:19 -08:00
parent 41e58a519a
commit 97434f49b8
9 changed files with 812 additions and 654 deletions

View file

@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, GenerationIdUtil}
import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap}
import sun.nio.ch.DirectBuffer
@ -59,7 +59,7 @@ class BlockManager(
}
}
private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@ -96,13 +96,14 @@ class BlockManager(
@volatile private var shuttingDown = false
private def heartBeat() {
if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
if (!master.sendHeartBeat(blockManagerId)) {
reregister()
}
}
var heartBeatTask: Cancellable = null
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
@ -117,7 +118,7 @@ class BlockManager(
* BlockManagerWorker actor.
*/
private def initialize() {
master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@ -153,17 +154,14 @@ class BlockManager(
def reregister() {
// TODO: We might need to rate limit reregistering.
logInfo("BlockManager reregistering with master")
master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
reportAllBlocks()
}
/**
* Get storage level of local block. If no info exists for the block, then returns null.
*/
def getLevel(blockId: String): StorageLevel = {
val info = blockInfo.get(blockId)
if (info != null) info.level else null
}
def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
/**
* Tell the master about the current storage status of a block. This will send a block update
@ -186,9 +184,9 @@ class BlockManager(
*/
private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
case null =>
case None =>
(StorageLevel.NONE, 0L, 0L, false)
case info =>
case Some(info) =>
info.synchronized {
info.level match {
case null =>
@ -207,7 +205,7 @@ class BlockManager(
}
if (tellMaster) {
master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
} else {
true
}
@ -219,7 +217,7 @@ class BlockManager(
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
var managers = master.mustGetLocations(GetLocations(blockId))
var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@ -230,8 +228,7 @@ class BlockManager(
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.mustGetLocationsMultipleBlockIds(
GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@ -253,7 +250,7 @@ class BlockManager(
}
}
val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@ -338,7 +335,7 @@ class BlockManager(
}
}
val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@ -394,7 +391,7 @@ class BlockManager(
}
logDebug("Getting remote block " + blockId)
// Get locations of block
val locations = master.mustGetLocations(GetLocations(blockId))
val locations = master.getLocations(blockId)
// Get block from remote locations
for (loc <- locations) {
@ -596,7 +593,7 @@ class BlockManager(
throw new IllegalArgumentException("Storage level is null or invalid")
}
val oldBlock = blockInfo.get(blockId)
val oldBlock = blockInfo.get(blockId).orNull
if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
oldBlock.waitForReady()
@ -697,7 +694,7 @@ class BlockManager(
throw new IllegalArgumentException("Storage level is null or invalid")
}
if (blockInfo.containsKey(blockId)) {
if (blockInfo.contains(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
@ -772,7 +769,7 @@ class BlockManager(
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
@ -819,7 +816,7 @@ class BlockManager(
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
val level = info.level
@ -853,7 +850,7 @@ class BlockManager(
*/
def removeBlock(blockId: String) {
logInfo("Removing block " + blockId)
val info = blockInfo.get(blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
memoryStore.remove(blockId)
@ -865,6 +862,29 @@ class BlockManager(
}
}
def dropOldBlocks(cleanupTime: Long) {
logInfo("Dropping blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime) {
info.synchronized {
val level = info.level
if (level.useMemory) {
memoryStore.remove(id)
}
if (level.useDisk) {
diskStore.remove(id)
}
iterator.remove()
logInfo("Dropped block " + id)
}
reportBlockStatus(id)
}
}
}
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle

View file

@ -1,6 +1,7 @@
package spark.storage
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@ -18,6 +19,9 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
port = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
override def toString = "BlockManagerId(" + ip + ", " + port + ")"
override def hashCode = ip.hashCode * 41 + port
@ -27,3 +31,18 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
case _ => false
}
}
private[spark] object BlockManagerId {
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
if (blockManagerIdCache.containsKey(id)) {
blockManagerIdCache.get(id)
} else {
blockManagerIdCache.put(id, id)
id
}
}
}

View file

@ -1,406 +1,17 @@
package spark.storage
import java.io._
import java.util.{HashMap => JHashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import akka.actor._
import akka.dispatch._
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote._
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.{Logging, SparkException, Utils}
private[spark]
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
// TODO(rxin): Move BlockManagerMasterActor to its own file.
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val slaveActor: ActorRef) {
private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
// Mapping from block id to its status.
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
}
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
: Unit = synchronized {
updateLastSeenMs()
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
if (originalLevel.useMemory) {
_remainingMem += memSize
}
}
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
_blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s:%d (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
}
}
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
def blocks: JHashMap[String, BlockStatus] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
_blocks.clear()
}
}
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
// Mapping from host name to block manager id.
private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
"5000").toLong
var timeoutCheckingTask: Cancellable = null
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
super.preStart()
}
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
blockManagerInfo.remove(blockManagerId)
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockInfo.get(blockId)._2
locations -= blockManagerId
if (locations.size == 0) {
blockInfo.remove(locations)
}
}
}
def expireDeadHosts() {
logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
toRemove += info.blockManagerId
}
}
// TODO: Remove corresponding block infos
toRemove.foreach(removeBlockManager)
}
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
blockManagerIdByHost.get(host).foreach(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
def heartBeat(blockManagerId: BlockManagerId) {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
sender ! true
} else {
sender ! false
}
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
sender ! true
}
}
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
case GetLocationsMultipleBlockIds(blockIds) =>
getLocationsMultipleBlockIds(blockIds)
case GetPeers(blockManagerId, size) =>
getPeersDeterministic(blockManagerId, size)
/*getPeers(blockManagerId, size)*/
case GetMemoryStatus =>
getMemoryStatus
case RemoveBlock(blockId) =>
removeBlock(blockId)
case RemoveHost(host) =>
removeHost(host)
sender ! true
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel
}
context.stop(self)
case ExpireDeadHosts =>
expireDeadHosts()
case HeartBeat(blockManagerId) =>
heartBeat(blockManagerId)
case other =>
logInfo("Got unknown message: " + other)
}
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlock(blockId: String) {
val block = blockInfo.get(blockId)
if (block != null) {
block._2.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveActor ! RemoveBlock(blockId)
// Remove the block from the master's BlockManagerInfo.
blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
}
}
blockInfo.remove(blockId)
}
sender ! true
}
// Return a map from the block manager id to max memory and remaining memory.
private def getMemoryStatus() {
val res = blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
sender ! res
}
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerIdByHost.contains(blockManagerId.ip) &&
blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
val oldId = blockManagerIdByHost(blockManagerId.ip)
logInfo("Got second registration for host " + blockManagerId +
"; removing old slave " + oldId)
removeBlockManager(oldId)
}
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
} else {
sender ! false
}
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {
locations = blockInfo.get(blockId)._2
} else {
locations = new HashSet[BlockManagerId]
blockInfo.put(blockId, (storageLevel.replication, locations))
}
if (storageLevel.isValid) {
locations += blockManagerId
} else {
locations.remove(blockManagerId)
}
if (locations.size == 0) {
blockInfo.remove(blockId)
}
sender ! true
}
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
sender ! res
}
}
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
return res.toSeq
} else {
logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
return res.toSeq
}
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
res.append(getLocations(blockId))
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
sender ! res.toSeq
}
private def getPeers(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(peers)
res -= blockManagerId
val rand = new Random(System.currentTimeMillis())
while (res.length > size) {
res.remove(rand.nextInt(res.length))
}
sender ! res.toSeq
}
private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
val peersWithIndices = peers.zipWithIndex
val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
if (selfIndex == -1) {
throw new Exception("Self index for " + blockManagerId + " not found")
}
var index = selfIndex
while (res.size < size) {
index += 1
if (index == selfIndex) {
throw new Exception("More peer expected than available")
}
res += peers(index % peers.size)
}
sender ! res.toSeq
}
}
private[spark] class BlockManagerMaster(
val actorSystem: ActorSystem,
isMaster: Boolean,
@ -409,228 +20,81 @@ private[spark] class BlockManagerMaster(
masterPort: Int)
extends Logging {
val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt
val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
val REQUEST_RETRY_INTERVAL_MS = 100
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
var masterActor: ActorRef = null
if (isMaster) {
masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
name = MASTER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
} else {
val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
}
def stop() {
if (masterActor != null) {
communicate(StopBlockManagerMaster)
masterActor = null
logInfo("BlockManagerMaster stopped")
var masterActor: ActorRef = {
if (isMaster) {
val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
name = MASTER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
masterActor
} else {
val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
actorSystem.actorFor(url)
}
}
// Send a message to the master actor and get its result within a default timeout, or
// throw a SparkException if this fails.
def askMaster(message: Any): Any = {
try {
val future = masterActor.ask(message)(timeout)
return Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with BlockManagerMaster", e)
}
}
// Send a one-way message to the master actor, to which we expect it to reply with true.
def communicate(message: Any) {
if (askMaster(message) != true) {
throw new SparkException("Error reply received from BlockManagerMaster")
}
}
/** Remove a dead host from the master actor. This is only called on the master side. */
def notifyADeadHost(host: String) {
communicate(RemoveHost(host))
tell(RemoveHost(host))
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
def mustRegisterBlockManager(
/**
* Send the master actor a heart beat from the slave. Returns true if everything works out,
* false if the master does not know about the given block manager, which means the block
* manager should re-register.
*/
def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
}
/** Register the BlockManager's id with the master. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)
logInfo("Trying to register BlockManager")
while (! syncRegisterBlockManager(msg)) {
logWarning("Failed to register " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
logInfo("Done registering BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
logInfo("Registered BlockManager")
}
private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
//val masterActor = RemoteActor.select(node, name)
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return true
} catch {
case e: Exception =>
logError("Failed in syncRegisterBlockManager", e)
return false
}
}
def mustHeartBeat(msg: HeartBeat): Boolean = {
var res = syncHeartBeat(msg)
while (!res.isDefined) {
logWarning("Failed to send heart beat " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
return res.get
}
private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
try {
val answer = askMaster(msg).asInstanceOf[Boolean]
return Some(answer)
} catch {
case e: Exception =>
logError("Failed in syncHeartBeat", e)
return None
}
}
def mustBlockUpdate(msg: BlockUpdate): Boolean = {
var res = syncBlockUpdate(msg)
while (!res.isDefined) {
logWarning("Failed to send block update " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
return res.get
}
private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[Boolean]
logDebug("Block update sent successfully")
logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
return Some(answer)
} catch {
case e: Exception =>
logError("Failed in syncBlockUpdate", e)
return None
}
}
def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
var res = syncGetLocations(msg)
while (res == null) {
logInfo("Failed to get locations " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetLocations(msg)
}
return res
}
private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
if (answer != null) {
logDebug("GetLocations successful")
logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetLocations")
return null
}
} catch {
case e: Exception =>
logError("GetLocations failed", e)
return null
}
}
def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
while (res == null) {
logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetLocationsMultipleBlockIds(msg)
}
return res
}
private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
if (answer != null) {
logDebug("GetLocationsMultipleBlockIds successful")
logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetLocationsMultipleBlockIds")
return null
}
} catch {
case e: Exception =>
logError("GetLocationsMultipleBlockIds failed", e)
return null
}
}
def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
var res = syncGetPeers(msg)
while ((res == null) || (res.length != msg.size)) {
logInfo("Failed to get peers " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetPeers(msg)
}
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = askMasterWithRetry[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logInfo("Updated info of block " + blockId)
res
}
private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
/** Get locations of the blockId from the master */
def getLocations(blockId: String): Seq[BlockManagerId] = {
askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
try {
val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
if (answer != null) {
logDebug("GetPeers successful")
logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetPeers")
return null
}
} catch {
case e: Exception =>
logError("GetPeers failed", e)
return null
/** Get locations of multiple blockIds from the master */
def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
/** Get ids of other nodes in the cluster from the master */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
}
/**
@ -648,6 +112,72 @@ private[spark] class BlockManagerMaster(
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
/** Stop the master actor, called only on the Spark master node */
def stop() {
if (masterActor != null) {
tell(StopBlockManagerMaster)
masterActor = null
logInfo("BlockManagerMaster stopped")
}
}
/** Send a one-way message to the master actor, to which we expect it to reply with true. */
private def tell(message: Any) {
if (!askMasterWithRetry[Boolean](message)) {
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
}
/**
* Send a message to the master actor and get its result within a default timeout, or
* throw a SparkException if this fails. There is no retry logic here so if the Akka
* message is lost, the master actor won't get the command.
*/
private def askMaster[T](message: Any): Any = {
try {
val future = masterActor.ask(message)(timeout)
return Await.result(future, timeout).asInstanceOf[T]
} catch {
case e: Exception =>
throw new SparkException("Error communicating with BlockManagerMaster", e)
}
}
/**
* Send a message to the master actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
private def askMasterWithRetry[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (masterActor == null) {
throw new SparkException("Error sending message to BlockManager as masterActor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
while (attempts < AKKA_RETRY_ATTEMPS) {
attempts += 1
try {
val future = masterActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new Exception("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
throw new SparkException(
"Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
}

View file

@ -0,0 +1,406 @@
package spark.storage
import java.util.{HashMap => JHashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import scala.util.Random
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.{Logging, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
* all slaves' block managers.
*/
private[spark]
object BlockManagerMasterActor {
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val slaveActor: ActorRef)
extends Logging {
private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
// Mapping from block id to its status.
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
}
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
: Unit = synchronized {
updateLastSeenMs()
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
if (originalLevel.useMemory) {
_remainingMem += memSize
}
}
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
_blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s:%d (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
}
}
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
def blocks: JHashMap[String, BlockStatus] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
_blocks.clear()
}
}
}
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo =
new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
// Mapping from host name to block manager id.
private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
"5000").toLong
var timeoutCheckingTask: Cancellable = null
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
super.preStart()
}
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
blockManagerInfo.remove(blockManagerId)
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockInfo.get(blockId)._2
locations -= blockManagerId
if (locations.size == 0) {
blockInfo.remove(locations)
}
}
}
def expireDeadHosts() {
logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
toRemove += info.blockManagerId
}
}
// TODO: Remove corresponding block infos
toRemove.foreach(removeBlockManager)
}
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
blockManagerIdByHost.get(host).foreach(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
def heartBeat(blockManagerId: BlockManagerId) {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
sender ! true
} else {
sender ! false
}
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
sender ! true
}
}
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
case GetLocationsMultipleBlockIds(blockIds) =>
getLocationsMultipleBlockIds(blockIds)
case GetPeers(blockManagerId, size) =>
getPeersDeterministic(blockManagerId, size)
/*getPeers(blockManagerId, size)*/
case GetMemoryStatus =>
getMemoryStatus
case RemoveBlock(blockId) =>
removeBlock(blockId)
case RemoveHost(host) =>
removeHost(host)
sender ! true
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel
}
context.stop(self)
case ExpireDeadHosts =>
expireDeadHosts()
case HeartBeat(blockManagerId) =>
heartBeat(blockManagerId)
case other =>
logInfo("Got unknown message: " + other)
}
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlock(blockId: String) {
val block = blockInfo.get(blockId)
if (block != null) {
block._2.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveActor ! RemoveBlock(blockId)
// Remove the block from the master's BlockManagerInfo.
blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
}
}
blockInfo.remove(blockId)
}
sender ! true
}
// Return a map from the block manager id to max memory and remaining memory.
private def getMemoryStatus() {
val res = blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
sender ! res
}
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerIdByHost.contains(blockManagerId.ip) &&
blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
val oldId = blockManagerIdByHost(blockManagerId.ip)
logInfo("Got second registration for host " + blockManagerId +
"; removing old slave " + oldId)
removeBlockManager(oldId)
}
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
} else {
sender ! false
}
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {
locations = blockInfo.get(blockId)._2
} else {
locations = new HashSet[BlockManagerId]
blockInfo.put(blockId, (storageLevel.replication, locations))
}
if (storageLevel.isValid) {
locations += blockManagerId
} else {
locations.remove(blockManagerId)
}
if (locations.size == 0) {
blockInfo.remove(blockId)
}
sender ! true
}
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs))
sender ! res.toSeq
} else {
logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
sender ! res
}
}
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
return res.toSeq
} else {
logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
return res.toSeq
}
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
for (blockId <- blockIds) {
res.append(getLocations(blockId))
}
logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
sender ! res.toSeq
}
private def getPeers(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(peers)
res -= blockManagerId
val rand = new Random(System.currentTimeMillis())
while (res.length > size) {
res.remove(rand.nextInt(res.length))
}
sender ! res.toSeq
}
private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
val peersWithIndices = peers.zipWithIndex
val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
if (selfIndex == -1) {
throw new Exception("Self index for " + blockManagerId + " not found")
}
var index = selfIndex
while (res.size < size) {
index += 1
if (index == selfIndex) {
throw new Exception("More peer expected than available")
}
res += peers(index % peers.size)
}
sender ! res.toSeq
}
}

View file

@ -34,7 +34,7 @@ private[spark]
case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
private[spark]
class BlockUpdate(
class UpdateBlockInfo(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@ -65,17 +65,17 @@ class BlockUpdate(
}
private[spark]
object BlockUpdate {
object UpdateBlockInfo {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): BlockUpdate = {
new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
diskSize: Long): UpdateBlockInfo = {
new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}

View file

@ -1,6 +1,6 @@
package spark.storage
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@ -18,6 +18,8 @@ class StorageLevel(
// TODO: Also add fields for caching priority, dataset ID, and flushing.
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
def this(flags: Int, replication: Int) {
this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
@ -66,10 +68,16 @@ class StorageLevel(
replication = in.readByte()
}
@throws(classOf[IOException])
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
override def hashCode(): Int = toInt * 41 + replication
}
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
@ -82,4 +90,16 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
private[spark]
val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
if (storageLevelCache.containsKey(level)) {
storageLevelCache.get(level)
} else {
storageLevelCache.put(level, level)
level
}
}
}

View file

@ -0,0 +1,35 @@
package spark.util
import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import spark.Logging
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
val task = new TimerTask {
def run() {
try {
if (delaySeconds > 0) {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran metadata cleaner for " + name)
}
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
if (periodSeconds > 0) {
logInfo(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
def cancel() {
timer.cancel()
}
}

View file

@ -0,0 +1,87 @@
package spark.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, Map}
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
* time stamp along with each key-value pair. Key-value pairs that are older than a particular
* threshold time can them be removed using the cleanup method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap.
*/
class TimeStampedHashMap[A, B] extends Map[A, B]() {
val internalMap = new ConcurrentHashMap[A, (B, Long)]()
def get(key: A): Option[B] = {
val value = internalMap.get(key)
if (value != null) Some(value._1) else None
}
def iterator: Iterator[(A, B)] = {
val jIterator = internalMap.entrySet().iterator()
jIterator.map(kv => (kv.getKey, kv.getValue._1))
}
override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
val newMap = new TimeStampedHashMap[A, B1]
newMap.internalMap.putAll(this.internalMap)
newMap.internalMap.put(kv._1, (kv._2, currentTime))
newMap
}
override def - (key: A): Map[A, B] = {
internalMap.remove(key)
this
}
override def += (kv: (A, B)): this.type = {
internalMap.put(kv._1, (kv._2, currentTime))
this
}
override def -= (key: A): this.type = {
internalMap.remove(key)
this
}
override def update(key: A, value: B) {
this += ((key, value))
}
override def apply(key: A): B = {
val value = internalMap.get(key)
if (value == null) throw new NoSuchElementException()
value._1
}
override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
internalMap.map(kv => (kv._1, kv._2._1)).filter(p)
}
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
override def size(): Int = internalMap.size()
override def foreach[U](f: ((A, B)) => U): Unit = {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val kv = (entry.getKey, entry.getValue._1)
f(kv)
}
}
def cleanup(threshTime: Long) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
if (entry.getValue._2 < threshTime) {
iterator.remove()
}
}
}
private def currentTime: Long = System.currentTimeMillis()
}

View file

@ -22,6 +22,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
@ -63,7 +64,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
test("manager-master interaction") {
test("StorageLevel object caching") {
val level1 = new StorageLevel(false, false, false, 3)
val level2 = new StorageLevel(false, false, false, 3)
val bytes1 = spark.Utils.serialize(level1)
val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
val bytes2 = spark.Utils.serialize(level2)
val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
assert(level1_ === level1, "Deserialized level1 not same as original level1")
assert(level2_ === level2, "Deserialized level2 not same as original level1")
assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
}
test("BlockManagerId object caching") {
val id1 = new StorageLevel(false, false, false, 3)
val id2 = new StorageLevel(false, false, false, 3)
val bytes1 = spark.Utils.serialize(id1)
val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
val bytes2 = spark.Utils.serialize(id2)
val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
assert(id1_ === id1, "Deserialized id1 not same as original id1")
assert(id2_ === id2, "Deserialized id2 not same as original id1")
assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
}
test("master + 1 manager interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@ -80,17 +107,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
assert(master.getLocations("a2").size === 0, "master did not remove a2")
}
test("master + 2 managers interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val otherStore = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
test("removing block") {
@ -113,9 +156,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Remove a1 and a2 and a3. Should be no-op for a3.
master.removeBlock("a1")
@ -123,10 +166,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeBlock("a3")
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
assert(master.getLocations("a2").size === 0, "master did not remove a2")
assert(store.getSingle("a3") != None, "a3 was not in store")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
assert(master.getLocations("a3").size === 0, "master was told about a3")
memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000")
@ -140,13 +183,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size > 0,
assert(master.getLocations("a1").size > 0,
"a1 was not reregistered with master")
}
@ -157,17 +200,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0,
"a1 was not reregistered with master")
assert(master.mustGetLocations(GetLocations("a2")).size > 0,
"master was not told about a2")
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
test("deregistration on duplicate") {
@ -177,19 +218,19 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
store2 = new BlockManager(actorSystem, master, serializer, 2000)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
store2 invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
assert(master.getLocations("a1").size == 0, "a2 was not removed from master")
}
test("in-memory LRU storage") {