diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index af3acfecb6..f5ff267d44 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -65,7 +65,7 @@ private[spark] class ExecutorRunner( } } - /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */ + /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => hostname diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 435ee5743e..50871802ea 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -8,10 +8,10 @@ import akka.actor.{ActorRef, Actor, Props} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} import akka.remote.RemoteClientLifeCycleEvent import spark.scheduler.cluster._ -import spark.scheduler.cluster.RegisteredSlave +import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.LaunchTask -import spark.scheduler.cluster.RegisterSlaveFailed -import spark.scheduler.cluster.RegisterSlave +import spark.scheduler.cluster.RegisterExecutorFailed +import spark.scheduler.cluster.RegisterExecutor private[spark] class StandaloneExecutorBackend( @@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend( try { logInfo("Connecting to master: " + masterUrl) master = context.actorFor(masterUrl) - master ! RegisterSlave(executorId, hostname, cores) + master ! RegisterExecutor(executorId, hostname, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { @@ -41,11 +41,11 @@ private[spark] class StandaloneExecutorBackend( } override def receive = { - case RegisteredSlave(sparkProperties) => + case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with master") executor.initialize(executorId, hostname, sparkProperties) - case RegisterSlaveFailed(message) => + case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala deleted file mode 100644 index 96ebaa4601..0000000000 --- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala +++ /dev/null @@ -1,4 +0,0 @@ -package spark.scheduler.cluster - -private[spark] -class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {} diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index f0792c1b76..6dd3ae003d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,7 +19,6 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - val executorIdToSlaveId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -47,7 +46,7 @@ private[spark] class SparkDeploySchedulerBackend( } override def stop() { - stopping = true; + stopping = true super.stop() client.stop() if (shutdownCallback != null) { @@ -67,23 +66,16 @@ private[spark] class SparkDeploySchedulerBackend( } def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { - executorIdToSlaveId += id -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String, exitStatus: Option[Int]) { + def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code) case None => SlaveLost(message) } - logInfo("Executor %s removed: %s".format(id, message)) - executorIdToSlaveId.get(id) match { - case Some(slaveId) => - executorIdToSlaveId.remove(id) - scheduler.executorLost(slaveId, reason) - case None => - logInfo("No slave ID known for executor %s".format(id)) - } + logInfo("Executor %s removed: %s".format(executorId, message)) + scheduler.executorLost(executorId, reason) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 1386cd9d44..c68f15bdfa 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -11,24 +11,26 @@ private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage private[spark] -case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage +case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) + extends StandaloneClusterMessage private[spark] -case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage +case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage -// Slaves to master +// Executors to master private[spark] -case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage +case class RegisterExecutor(executorId: String, host: String, cores: Int) + extends StandaloneClusterMessage private[spark] -case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) +case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) extends StandaloneClusterMessage private[spark] object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ - def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { - StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data)) + def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 32be1e7a26..69822f568c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -24,9 +24,9 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor var totalCoreCount = new AtomicInteger(0) class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor { - val slaveActor = new HashMap[String, ActorRef] - val slaveAddress = new HashMap[String, Address] - val slaveHost = new HashMap[String, String] + val executorActor = new HashMap[String, ActorRef] + val executorAddress = new HashMap[String, Address] + val executorHost = new HashMap[String, String] val freeCores = new HashMap[String, Int] val actorToExecutorId = new HashMap[ActorRef, String] val addressToExecutorId = new HashMap[Address, String] @@ -37,17 +37,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } def receive = { - case RegisterSlave(executorId, host, cores) => - if (slaveActor.contains(executorId)) { - sender ! RegisterSlaveFailed("Duplicate executor ID: " + executorId) + case RegisterExecutor(executorId, host, cores) => + if (executorActor.contains(executorId)) { + sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) - sender ! RegisteredSlave(sparkProperties) + sender ! RegisteredExecutor(sparkProperties) context.watch(sender) - slaveActor(executorId) = sender - slaveHost(executorId) = host + executorActor(executorId) = sender + executorHost(executorId) = host freeCores(executorId) = cores - slaveAddress(executorId) = sender.path.address + executorAddress(executorId) = sender.path.address actorToExecutorId(sender) = executorId addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) @@ -69,45 +69,45 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor context.stop(self) case Terminated(actor) => - actorToExecutorId.get(actor).foreach(removeSlave(_, "Akka actor terminated")) + actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) case RemoteClientDisconnected(transport, address) => - addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client disconnected")) + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected")) case RemoteClientShutdown(transport, address) => - addressToExecutorId.get(address).foreach(removeSlave(_, "remote Akka client shutdown")) + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown")) } - // Make fake resource offers on all slaves + // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( - slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) } - // Make fake resource offers on just one slave + // Make fake resource offers on just one executor def makeOffers(executorId: String) { launchTasks(scheduler.resourceOffers( - Seq(new WorkerOffer(executorId, slaveHost(executorId), freeCores(executorId))))) + Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) } // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= 1 - slaveActor(task.executorId) ! LaunchTask(task) + executorActor(task.executorId) ! LaunchTask(task) } } // Remove a disconnected slave from the cluster - def removeSlave(executorId: String, reason: String) { + def removeExecutor(executorId: String, reason: String) { logInfo("Slave " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) - actorToExecutorId -= slaveActor(executorId) - addressToExecutorId -= slaveAddress(executorId) - slaveActor -= executorId - slaveHost -= executorId + actorToExecutorId -= executorActor(executorId) + addressToExecutorId -= executorAddress(executorId) + executorActor -= executorId + executorHost -= executorId freeCores -= executorId - slaveHost -= executorId + executorHost -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) } diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index b7423c7234..956ede201e 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -21,6 +21,8 @@ object BlockManagerUI extends Logging { def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) { val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc) try { + // TODO: This needs to find a random free port to bind to. Unfortunately, there's no way + // in spray to do that, so we'll have to rely on something like new ServerSocket() val boundPort = AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt, webUIDirectives.handler, "BlockManagerHTTPServer") diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala index 139e21d09e..721c4c6029 100644 --- a/core/src/main/scala/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -14,18 +14,16 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging val task = new TimerTask { def run() { try { - if (delaySeconds > 0) { - cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) - logInfo("Ran metadata cleaner for " + name) - } + cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) + logInfo("Ran metadata cleaner for " + name) } catch { case e: Exception => logError("Error running cleanup task for " + name, e) } } } - if (periodSeconds > 0) { - logInfo( + if (delaySeconds > 0) { + logDebug( "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and " + "period of " + periodSeconds + " secs") timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)