Refactored BlockManagerMaster (not BlockManagerMasterActor) to simplify the code and fix live lock problem in unlimited attempts to contact the master. Also added testcases in the BlockManagerSuite to test BlockManagerMaster methods getPeers and getLocations.

This commit is contained in:
Tathagata Das 2012-11-11 08:54:21 -08:00
parent 62af376863
commit 04e9e9d93c
3 changed files with 146 additions and 217 deletions

View file

@ -120,8 +120,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* BlockManagerWorker actor.
*/
private def initialize() {
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
master.registerBlockManager(blockManagerId, maxMemory)
BlockManagerWorker.startBlockManagerWorker(this)
}
@ -158,7 +157,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
logDebug("Told master about block " + blockId)
}
@ -167,7 +166,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
var managers = master.mustGetLocations(GetLocations(blockId))
var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@ -178,8 +177,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
val locations = master.mustGetLocationsMultipleBlockIds(
GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@ -343,7 +341,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
logDebug("Getting remote block " + blockId)
// Get locations of block
val locations = master.mustGetLocations(GetLocations(blockId))
val locations = master.getLocations(blockId)
// Get block from remote locations
for (loc <- locations) {
@ -721,7 +719,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime

View file

@ -26,7 +26,7 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
class HeartBeat(
class UpdateBlockInfo(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@ -57,17 +57,17 @@ class HeartBeat(
}
private[spark]
object HeartBeat {
object UpdateBlockInfo {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): HeartBeat = {
new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
diskSize: Long): UpdateBlockInfo = {
new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@ -182,8 +182,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@ -233,7 +233,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
private def heartBeat(
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@ -245,7 +245,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
logDebug("Got in updateBlockInfo 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
@ -350,211 +350,124 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
extends Logging {
val AKKA_ACTOR_NAME: String = "BlockMasterManager"
val REQUEST_RETRY_INTERVAL_MS = 100
val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val DEFAULT_MANAGER_PORT: String = "10902"
val actorName = "BlockMasterManager"
val timeout = 10.seconds
var masterActor: ActorRef = null
val maxAttempts = 5
if (isMaster) {
masterActor = actorSystem.actorOf(
Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
var masterActor = if (isMaster) {
val actor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), name = actorName)
logInfo("Registered BlockManagerMaster Actor")
actor
} else {
val url = "akka://spark@%s:%s/user/%s".format(
DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
val host = System.getProperty("spark.master.host", "localhost")
val port = System.getProperty("spark.master.port", "7077").toInt
val url = "akka://spark@%s:%s/user/%s".format(host, port, actorName)
val actor = actorSystem.actorFor(url)
logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
actor
}
/**
* Send a message to the master actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
private def ask[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (masterActor == null) {
throw new SparkException("Error sending message to BlockManager as masterActor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
while (attempts < maxAttempts) {
attempts += 1
try {
val future = masterActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new Exception("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException =>
throw ie
case e: Exception =>
lastException = e
logWarning(
"Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
Thread.sleep(100)
}
throw new SparkException(
"Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
/**
* Send a one-way message to the master actor, to which we expect it to reply with true
*/
private def tell(message: Any) {
if (!ask[Boolean](message)) {
throw new SparkException("Telling master a message returned false")
}
}
/**
* Register the BlockManager's id with the master
*/
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize))
logInfo("Registered BlockManager")
}
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long
) {
tell(UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logInfo("Updated info of block " + blockId)
}
/** Get locations of the blockId from the master */
def getLocations(blockId: String): Seq[BlockManagerId] = {
ask[Seq[BlockManagerId]](GetLocations(blockId))
}
/** Get locations of multiple blockIds from the master */
def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
ask[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
/** Get ids of other nodes in the cluster from the master */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = ask[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
}
/** Notify the master of a dead node */
def notifyADeadHost(host: String) {
tell(RemoveHost(host + ":10902"))
logInfo("Told BlockManagerMaster to remove dead host " + host)
}
/** Get the memory status form the master */
def getMemoryStatus(): Map[BlockManagerId, (Long, Long)] = {
ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
/** Stop the master actor, called only on the Spark master node */
def stop() {
if (masterActor != null) {
communicate(StopBlockManagerMaster)
tell(StopBlockManagerMaster)
masterActor = null
logInfo("BlockManagerMaster stopped")
}
}
// Send a message to the master actor and get its result within a default timeout, or
// throw a SparkException if this fails.
def askMaster(message: Any): Any = {
try {
val future = masterActor.ask(message)(timeout)
return Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with BlockManagerMaster", e)
}
}
// Send a one-way message to the master actor, to which we expect it to reply with true.
def communicate(message: Any) {
if (askMaster(message) != true) {
throw new SparkException("Error reply received from BlockManagerMaster")
}
}
def notifyADeadHost(host: String) {
communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
def mustRegisterBlockManager(msg: RegisterBlockManager) {
logInfo("Trying to register BlockManager")
while (! syncRegisterBlockManager(msg)) {
logWarning("Failed to register " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
logInfo("Done registering BlockManager")
}
def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
//val masterActor = RemoteActor.select(node, name)
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return true
} catch {
case e: Exception =>
logError("Failed in syncRegisterBlockManager", e)
return false
}
}
def mustHeartBeat(msg: HeartBeat) {
while (! syncHeartBeat(msg)) {
logWarning("Failed to send heartbeat" + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
}
def syncHeartBeat(msg: HeartBeat): Boolean = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logDebug("Heartbeat sent successfully")
logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
return true
} catch {
case e: Exception =>
logError("Failed in syncHeartBeat", e)
return false
}
}
def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
var res = syncGetLocations(msg)
while (res == null) {
logInfo("Failed to get locations " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetLocations(msg)
}
return res
}
def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
if (answer != null) {
logDebug("GetLocations successful")
logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetLocations")
return null
}
} catch {
case e: Exception =>
logError("GetLocations failed", e)
return null
}
}
def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
while (res == null) {
logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetLocationsMultipleBlockIds(msg)
}
return res
}
def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
if (answer != null) {
logDebug("GetLocationsMultipleBlockIds successful")
logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetLocationsMultipleBlockIds")
return null
}
} catch {
case e: Exception =>
logError("GetLocationsMultipleBlockIds failed", e)
return null
}
}
def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
var res = syncGetPeers(msg)
while ((res == null) || (res.length != msg.size)) {
logInfo("Failed to get peers " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetPeers(msg)
}
return res
}
def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
try {
val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
if (answer != null) {
logDebug("GetPeers successful")
logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
return answer
} else {
logError("Master replied null in response to GetPeers")
return null
}
} catch {
case e: Exception =>
logError("GetPeers failed", e)
return null
}
}
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
}
}

View file

@ -20,9 +20,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldOops: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
master = new BlockManagerMaster(actorSystem, true, true)
@ -55,7 +57,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
test("manager-master interaction") {
test("master + 1 manager interaction") {
store = new BlockManager(master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@ -72,17 +74,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
assert(master.getLocations("a1").size === 1, "master was not told about a1")
assert(master.getLocations("a2").size === 1, "master was not told about a2")
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
assert(master.getLocations("a2").size === 0, "master did not remove a2")
}
test("master + 2 managers interaction") {
store = new BlockManager(master, serializer, 2000)
val otherStore = new BlockManager(master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
test("in-memory LRU storage") {