diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 3cbf4fdd98..8a3e64e4c2 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -65,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String exitStatus: Option[Int]) private[spark] -case class appKilled(message: String) +case class ApplicationRemoved(message: String) // Internal message in Client diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 1a95524cf9..2fc5e657f9 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -54,6 +54,11 @@ private[spark] class Client( appId = appId_ listener.connected(appId) + case ApplicationRemoved(message) => + logError("Master removed our application: %s; stopping client".format(message)) + markDisconnected() + context.stop(self) + case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) => val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 4af22cf9b6..71b9d0801d 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -107,7 +107,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } else { logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) - removeApplication(appInfo) + removeApplication(appInfo, ApplicationState.FAILED) } } } @@ -129,19 +129,19 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap actorToWorker.get(actor).foreach(removeWorker) - actorToApp.get(actor).foreach(removeApplication) + actorToApp.get(actor).foreach(finishApplication) } case RemoteClientDisconnected(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) - addressToApp.get(address).foreach(removeApplication) + addressToApp.get(address).foreach(finishApplication) } case RemoteClientShutdown(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) - addressToApp.get(address).foreach(removeApplication) + addressToApp.get(address).foreach(finishApplication) } case RequestMasterState => { @@ -257,7 +257,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor return app } - def removeApplication(app: ApplicationInfo) { + def finishApplication(app: ApplicationInfo) { + removeApplication(app, ApplicationState.FINISHED) + } + + def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) { if (apps.contains(app)) { logInfo("Removing app " + app.id) apps -= app @@ -270,7 +274,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor exec.worker.removeExecutor(exec) exec.worker.actor ! KillExecutor(exec.application.id, exec.id) } - app.markFinished(ApplicationState.FINISHED) // TODO: Mark it as FAILED if it failed + app.markFinished(state) + app.driver ! ApplicationRemoved(state.toString) schedule() } }