Detect disconnected slaves in StandaloneScheduler

This commit is contained in:
Matei Zaharia 2012-08-26 18:56:56 -07:00
parent 29e83f39e9
commit 26dfd20c9a

View file

@ -2,13 +2,14 @@ package spark.scheduler.cluster
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor.{Props, Actor, ActorRef, ActorSystem} import akka.actor._
import akka.util.duration._ import akka.util.duration._
import akka.pattern.ask import akka.pattern.ask
import spark.{SparkException, Logging, TaskState} import spark.{SparkException, Logging, TaskState}
import akka.dispatch.Await import akka.dispatch.Await
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
/** /**
* A standalone scheduler backend, which waits for standalone executors to connect to it through * A standalone scheduler backend, which waits for standalone executors to connect to it through
@ -23,8 +24,16 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor { class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
val slaveActor = new HashMap[String, ActorRef] val slaveActor = new HashMap[String, ActorRef]
val slaveAddress = new HashMap[String, Address]
val slaveHost = new HashMap[String, String] val slaveHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int] val freeCores = new HashMap[String, Int]
val actorToSlaveId = new HashMap[ActorRef, String]
val addressToSlaveId = new HashMap[Address, String]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
}
def receive = { def receive = {
case RegisterSlave(slaveId, host, cores) => case RegisterSlave(slaveId, host, cores) =>
@ -33,9 +42,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
} else { } else {
logInfo("Registered slave: " + sender + " with ID " + slaveId) logInfo("Registered slave: " + sender + " with ID " + slaveId)
sender ! RegisteredSlave(sparkProperties) sender ! RegisteredSlave(sparkProperties)
context.watch(sender)
slaveActor(slaveId) = sender slaveActor(slaveId) = sender
slaveHost(slaveId) = host slaveHost(slaveId) = host
freeCores(slaveId) = cores freeCores(slaveId) = cores
slaveAddress(slaveId) = sender.path.address
actorToSlaveId(sender) = slaveId
addressToSlaveId(sender.path.address) = slaveId
totalCoreCount.addAndGet(cores) totalCoreCount.addAndGet(cores)
makeOffers() makeOffers()
} }
@ -54,7 +67,14 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! true sender ! true
context.stop(self) context.stop(self)
// TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount) case Terminated(actor) =>
actorToSlaveId.get(actor).foreach(removeSlave)
case RemoteClientDisconnected(transport, address) =>
addressToSlaveId.get(address).foreach(removeSlave)
case RemoteClientShutdown(transport, address) =>
addressToSlaveId.get(address).foreach(removeSlave)
} }
// Make fake resource offers on all slaves // Make fake resource offers on all slaves
@ -76,6 +96,20 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
slaveActor(task.slaveId) ! LaunchTask(task) slaveActor(task.slaveId) ! LaunchTask(task)
} }
} }
// Remove a disconnected slave from the cluster
def removeSlave(slaveId: String) {
logInfo("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId)
addressToSlaveId -= slaveAddress(slaveId)
slaveActor -= slaveId
slaveHost -= slaveId
freeCores -= slaveId
slaveHost -= slaveId
totalCoreCount.addAndGet(-numCores)
scheduler.slaveLost(slaveId)
}
} }
var masterActor: ActorRef = null var masterActor: ActorRef = null