Rename more things from slave to executor

This commit is contained in:
Matei Zaharia 2013-01-27 23:17:20 -08:00
parent 44b4a0f88f
commit 909850729e
8 changed files with 50 additions and 60 deletions

View file

@ -65,7 +65,7 @@ private[spark] class ExecutorRunner(
}
}
/** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => hostname

View file

@ -8,10 +8,10 @@ import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredSlave
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterSlaveFailed
import spark.scheduler.cluster.RegisterSlave
import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
try {
logInfo("Connecting to master: " + masterUrl)
master = context.actorFor(masterUrl)
master ! RegisterSlave(executorId, hostname, cores)
master ! RegisterExecutor(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend(
}
override def receive = {
case RegisteredSlave(sparkProperties) =>
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with master")
executor.initialize(executorId, hostname, sparkProperties)
case RegisterSlaveFailed(message) =>
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)

View file

@ -1,4 +0,0 @@
package spark.scheduler.cluster
private[spark]
class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}

View file

@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend(
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
val executorIdToSlaveId = new HashMap[String, String]
// Memory used by each executor (in megabytes)
val executorMemory = {
@ -47,7 +46,7 @@ private[spark] class SparkDeploySchedulerBackend(
}
override def stop() {
stopping = true;
stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
@ -67,23 +66,16 @@ private[spark] class SparkDeploySchedulerBackend(
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
executorIdToSlaveId += id -> workerId
logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
id, host, cores, Utils.memoryMegabytesToString(memory)))
}
def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {
def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(id, message))
executorIdToSlaveId.get(id) match {
case Some(slaveId) =>
executorIdToSlaveId.remove(id)
scheduler.executorLost(slaveId, reason)
case None =>
logInfo("No slave ID known for executor %s".format(id))
}
logInfo("Executor %s removed: %s".format(executorId, message))
scheduler.executorLost(executorId, reason)
}
}

View file

@ -11,24 +11,26 @@ private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
extends StandaloneClusterMessage
private[spark]
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
// Executors to master
private[spark]
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
case class RegisterExecutor(executorId: String, host: String, cores: Int)
extends StandaloneClusterMessage
private[spark]
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
}
}

View file

@ -24,9 +24,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
var totalCoreCount = new AtomicInteger(0)
class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
val slaveActor = new HashMap[String, ActorRef]
val slaveAddress = new HashMap[String, Address]
val slaveHost = new HashMap[String, String]
val executorActor = new HashMap[String, ActorRef]
val executorAddress = new HashMap[String, Address]
val executorHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
val actorToExecutorId = new HashMap[ActorRef, String]
val addressToExecutorId = new HashMap[Address, String]
@ -37,17 +37,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
def receive = {
case RegisterSlave(executorId, host, cores) =>
if (slaveActor.contains(executorId)) {
sender ! RegisterSlaveFailed("Duplicate executor ID: " + executorId)
case RegisterExecutor(executorId, host, cores) =>
if (executorActor.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredSlave(sparkProperties)
sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
slaveActor(executorId) = sender
slaveHost(executorId) = host
executorActor(executorId) = sender
executorHost(executorId) = host
freeCores(executorId) = cores
slaveAddress(executorId) = sender.path.address
executorAddress(executorId) = sender.path.address
actorToExecutorId(sender) = executorId
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
@ -69,45 +69,45 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
context.stop(self)
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeSlave(_, "Akka actor terminated"))
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
case RemoteClientDisconnected(transport, address) =>
addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client disconnected"))
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
case RemoteClientShutdown(transport, address) =>
addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client shutdown"))
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all slaves
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
// Make fake resource offers on just one slave
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, slaveHost(executorId), freeCores(executorId)))))
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
slaveActor(task.executorId) ! LaunchTask(task)
executorActor(task.executorId) ! LaunchTask(task)
}
}
// Remove a disconnected slave from the cluster
def removeSlave(executorId: String, reason: String) {
def removeExecutor(executorId: String, reason: String) {
logInfo("Slave " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
actorToExecutorId -= slaveActor(executorId)
addressToExecutorId -= slaveAddress(executorId)
slaveActor -= executorId
slaveHost -= executorId
actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
executorHost -= executorId
freeCores -= executorId
slaveHost -= executorId
executorHost -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}

View file

@ -21,6 +21,8 @@ object BlockManagerUI extends Logging {
def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
try {
// TODO: This needs to find a random free port to bind to. Unfortunately, there's no way
// in spray to do that, so we'll have to rely on something like new ServerSocket()
val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0",
Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt,
webUIDirectives.handler, "BlockManagerHTTPServer")

View file

@ -14,18 +14,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
val task = new TimerTask {
def run() {
try {
if (delaySeconds > 0) {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran metadata cleaner for " + name)
}
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
if (periodSeconds > 0) {
logInfo(
if (delaySeconds > 0) {
logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)