Standalone Scheduler fault recovery

Implements a basic form of Standalone Scheduler fault recovery. In particular,
this allows faults to be manually recovered from by means of restarting the
Master process on the same machine. This is the majority of the code necessary
for general fault tolerance, which will first elect a leader and then recover
the Master state.

In order to enable fault recovery, the Master will persist a small amount of state related
to the registration of Workers and Applications to disk. If the Master is started and
sees that this state is still around, it will enter Recovery mode, during which time it
will not schedule any new Executors on Workers (but it does accept the registration of
new Clients and Workers).

At this point, the Master attempts to reconnect to all Workers and Client applications
that were registered at the time of failure. After confirming either the existence
or nonexistence of all such nodes (within a certain timeout), the Master will exit
Recovery mode and resume normal scheduling.
This commit is contained in:
Aaron Davidson 2013-09-17 09:40:06 -07:00
parent 13eced723f
commit d5a96feccb
16 changed files with 457 additions and 74 deletions

View file

@ -17,7 +17,7 @@
# limitations under the License.
#
# Starts the master on the machine this script is executed on.
# Starts workers on the machine this script is executed on.
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

View file

@ -52,6 +52,8 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
case class Heartbeat(workerId: String) extends DeployMessage
// Master to Worker
@ -76,6 +78,8 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
case class MasterChangeAcknowledged(appId: String)
// Master to Client
case class RegisteredApplication(appId: String) extends DeployMessage
@ -94,6 +98,10 @@ private[deploy] object DeployMessages {
case object StopClient
// Master to Worker & Client
case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
// MasterWebUI To Master
case object RequestMasterState
@ -127,6 +135,10 @@ private[deploy] object DeployMessages {
case object CheckForWorkerTimeOut
case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
case object EndRecoveryProcess
case object RequestWebUIPort
case class WebUIPortResponse(webUIBoundPort: Int)

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
private[spark] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
val state: ExecutorState.Value)
extends Serializable {
override def toString: String =
"ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
}

View file

@ -92,20 +92,25 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
case MasterChanged(materUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
context.unwatch(master)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
sender ! MasterChangeAcknowledged(appId)
context.watch(master)
case Terminated(actor_) if actor_ == master =>
logError("Connection to master failed; stopping client")
logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
context.stop(self)
case RemoteClientDisconnected(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
context.stop(self)
case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
context.stop(self)
case StopClient =>
markDisconnected()

View file

@ -23,29 +23,52 @@ import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
val appUiUrl: String)
{
var state = ApplicationState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
val appSource = new ApplicationSource(this)
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
val appUiUrl: String)
extends Serializable {
private var nextExecutorId = 0
@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
def newExecutorId(): Int = {
val id = nextExecutorId
nextExecutorId += 1
id
@transient private var nextExecutorId: Int = _
init()
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
init()
}
def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
coresGranted = 0
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
}
private def newExecutorId(useID: Option[Int] = None): Int = {
useID match {
case Some(id) =>
nextExecutorId = math.max(nextExecutorId, id + 1)
id
case None =>
val id = nextExecutorId
nextExecutorId += 1
id
}
}
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec

View file

@ -18,11 +18,11 @@
package org.apache.spark.deploy.master
private[spark] object ApplicationState
extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
type ApplicationState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
val WAITING, RUNNING, FINISHED, FAILED, UNKNOWN = Value
val MAX_NUM_RETRY = 10
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.deploy.master
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
private[spark] class ExecutorInfo(
val id: Int,
@ -28,5 +28,10 @@ private[spark] class ExecutorInfo(
var state = ExecutorState.LAUNCHING
/** Copy all state variables from the given on-the-wire ExecutorDescription. */
def copyState(execDesc: ExecutorDescription) {
state = execDesc.state
}
def fullId: String = application.id + "/" + id
}

View file

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
import java.io._
import scala.Serializable
import akka.serialization.Serialization
import org.apache.spark.Logging
/**
* Stores data in a single on-disk directory with one file per application and worker.
* Files are deleted when applications and workers are removed.
*
* @param dir Directory to store files. Created if non-existent (but not recursively).
* @param serialization Used to serialize our objects.
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serialization)
extends PersistenceEngine with Logging {
new File(dir).mkdir()
override def addApplication(app: ApplicationInfo) {
val appFile = new File(dir + File.separator + "app_" + app.id)
serializeIntoFile(appFile, app)
}
override def removeApplication(app: ApplicationInfo) {
new File(dir + File.separator + "app_" + app.id).delete()
}
override def addWorker(worker: WorkerInfo) {
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
serializeIntoFile(workerFile, worker)
}
override def removeWorker(worker: WorkerInfo) {
new File(dir + File.separator + "worker_" + worker.id).delete()
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
val sortedFiles = new File(dir).listFiles().sortBy(_.getName())
val appFiles = sortedFiles.filter(_.getName().startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_))
val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_))
(apps, workers)
}
private def serializeIntoFile(file: File, value: Serializable) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
out.write(serialized)
out.close()
}
def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
val clazz = m.erasure.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}
}

View file

@ -27,16 +27,17 @@ import akka.actor.Terminated
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.serialization.SerializationExtension
import akka.util.duration._
import akka.util.Timeout
import akka.util.{Duration, Timeout}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterState.MasterState
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import akka.util.{Duration, Timeout}
import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@ -44,7 +45,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@ -74,6 +76,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (envVar != null) envVar else host
}
var state: MasterState = _
var persistenceEngine: PersistenceEngine = _
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@ -89,6 +95,23 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
persistenceEngine =
if (RECOVERY_DIR.isEmpty()) {
new BlackHolePersistenceEngine()
} else {
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
}
val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
state =
if (storedApps.isEmpty && storedWorkers.isEmpty) {
MasterState.ALIVE
} else {
self ! BeginRecovery(storedApps, storedWorkers)
MasterState.RECOVERING
}
}
override def postStop() {
@ -98,14 +121,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
registerWorker(worker)
context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
@ -113,10 +138,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RegisterApplication(description) => {
logInfo("Registering app " + description.name)
val app = addApplication(description, sender)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
waitingApps += app
context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id)
schedule()
}
@ -158,23 +184,78 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
case BeginRecovery(storedApps, storedWorkers) => {
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess)
val masterUrl = "spark://" + host + ":" + port
val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
for (app <- storedApps) {
registerApplication(app)
app.state = ApplicationState.UNKNOWN
app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
}
for (worker <- storedWorkers) {
registerWorker(worker)
worker.state = WorkerState.UNKNOWN
worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
}
}
case MasterChangeAcknowledged(appId) => {
val appOption = idToApp.get(appId)
appOption match {
case Some(app) =>
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
if (canCompleteRecovery) { completeRecovery() }
}
case WorkerSchedulerStateResponse(workerId, executors) => {
idToWorker.get(workerId) match {
case Some(worker) =>
worker.state = WorkerState.ALIVE
val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
if (canCompleteRecovery) { completeRecovery() }
}
case EndRecoveryProcess => {
completeRecovery()
}
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
@ -190,6 +271,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
def canCompleteRecovery =
workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
apps.count(_.state == ApplicationState.UNKNOWN) == 0
def completeRecovery() {
synchronized {
if (state != MasterState.RECOVERING) { return }
state = MasterState.COMPLETING_RECOVERY
}
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_))
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_))
state = MasterState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
/**
* Can an app use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the app on it (right now the standalone backend doesn't like having
@ -204,6 +304,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* every time a new app joins or resource availability changes.
*/
def schedule() {
if (state != MasterState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
@ -257,8 +358,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
def registerWorker(worker: WorkerInfo): Unit = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
@ -266,12 +366,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}.foreach { w =>
workers -= w
}
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return
}
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
worker
addressToWorker(workerAddress) = worker
}
def removeWorker(worker: WorkerInfo) {
@ -286,25 +391,35 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
persistenceEngine.removeWorker(worker)
}
def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
}
def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.path.address
if (addressToWorker.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
addressToApp(driver.path.address) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
if (firstApp == None) {
firstApp = Some(app)
}
val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
app
waitingApps += app
}
def finishApplication(app: ApplicationInfo) {
@ -336,6 +451,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (state != ApplicationState.FINISHED) {
app.driver ! ApplicationRemoved(state.toString)
}
persistenceEngine.removeApplication(app)
schedule()
}
}

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
private[spark] object MasterState
extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
type MasterState = Value
val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
}

View file

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
/**
* Allows Master to persist any state that is necessary in order to recover from a failure.
* The following semantics are required:
* - addApplication and addWorker are called before completing registration of a new app/worker.
* - removeApplication and removeWorker are called at any time.
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished.
*/
trait PersistenceEngine {
def addApplication(app: ApplicationInfo)
def removeApplication(app: ApplicationInfo)
def addWorker(worker: WorkerInfo)
def removeWorker(worker: WorkerInfo)
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time order of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
}
class BlackHolePersistenceEngine extends PersistenceEngine {
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
override def readPersistedData() = (Nil, Nil)
}

View file

@ -29,21 +29,37 @@ private[spark] class WorkerInfo(
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val publicAddress: String) {
val publicAddress: String)
extends Serializable {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
var state: WorkerState.Value = WorkerState.ALIVE
var coresUsed = 0
var memoryUsed = 0
@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@transient var memoryUsed: Int = _
var lastHeartbeat = System.currentTimeMillis()
@transient var lastHeartbeat: Long = _
init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
init()
}
private def init() {
executors = new mutable.HashMap[String, ExecutorInfo]
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
lastHeartbeat = System.currentTimeMillis()
}
def hostPort: String = {
assert (port > 0)
host + ":" + port

View file

@ -17,8 +17,10 @@
package org.apache.spark.deploy.master
private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
private[spark] object WorkerState
extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
type WorkerState = Value
val ALIVE, DEAD, DECOMMISSIONED = Value
val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
}

View file

@ -43,7 +43,8 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
val workDir: File)
val workDir: File,
var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
@ -83,7 +84,8 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@ -180,9 +182,9 @@ private[spark] class ExecutorRunner(
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
val exitCode = process.waitFor()
state = ExecutorState.FAILED
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
Some(exitCode))
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
@ -192,8 +194,9 @@ private[spark] class ExecutorRunner(
if (process != null) {
process.destroy()
}
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
}
}
}

View file

@ -27,8 +27,8 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
import org.apache.spark.{Logging}
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
@ -42,7 +42,7 @@ private[spark] class Worker(
webUiPort: Int,
cores: Int,
memory: Int,
masterUrl: String,
var masterUrl: String,
workDirPath: String = null)
extends Actor with Logging {
@ -125,19 +125,30 @@ private[spark] class Worker(
master ! Heartbeat(workerId)
}
case MasterChanged(url, uiUrl) =>
logInfo("Master has changed, new master is at " + url)
masterUrl = url
masterWebUiUrl = uiUrl
context.unwatch(master)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
context.watch(master) // Doesn't work with remote actors, but useful for testing
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message)
System.exit(1)
case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(
appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
@ -174,11 +185,7 @@ private[spark] class Worker(
}
def masterDisconnected() {
// TODO: It would be nice to try to reconnect to the master, but just shut down for now.
// (Note that if reconnecting we would also need to assign IDs differently.)
logError("Connection to master failed! Shutting down.")
executors.values.foreach(_.kill())
System.exit(1)
logError("Connection to master failed! Waiting for master to reconnect...")
}
def generateWorkerId(): String = {

View file

@ -71,8 +71,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def disconnected() {
if (!stopping) {
logError("Disconnected from Spark cluster!")
scheduler.error("Disconnected from Spark cluster")
logError("Disconnected from Spark cluster! Waiting for reconnection...")
}
}