From 5c1b4f64052e8fae0d942def4d6085a971faee4e Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 26 Dec 2013 14:14:49 -0800 Subject: [PATCH] Minor fixes --- .../spark/deploy/client/DriverClient.scala | 1 - .../apache/spark/deploy/master/Master.scala | 36 +++++++++++-------- .../spark/deploy/worker/ui/IndexPage.scala | 8 ++--- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index d2f3c092fb..8f19294849 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -90,7 +90,6 @@ object DriverClient extends Logging { case e: TimeoutException => (false, s"Master $master failed to respond in time") } if (success) logInfo(message) else logError(message) - actorSystem.stop(driver) actorSystem.shutdown() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7f9ad8a7ef..a0db2a23be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -37,6 +37,7 @@ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.deploy.master.DriverState.DriverState private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher @@ -268,21 +269,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case DriverStateChanged(driverId, state, exception) => { - if (!(state == DriverState.FAILED || state == DriverState.FINISHED || - state == DriverState.KILLED)) { - throw new Exception(s"Received unexpected state update for driver $driverId: $state") - } - drivers.find(_.id == driverId) match { - case Some(driver) => { - drivers -= driver - completedDrivers += driver - persistenceEngine.removeDriver(driver) - driver.state = state - driver.exception = exception - driver.worker.foreach(w => w.removeDriver(driver)) - } - case None => - logWarning(s"Got driver update for unknown driver $driverId") + state match { + case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED => + removeDriver(driverId, state, exception) + case _ => + throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } @@ -638,6 +629,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act worker.actor ! LaunchDriver(driver.id, driver.desc) driver.state = DriverState.RUNNING } + + def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) { + drivers.find(d => d.id == driverId) match { + case Some(driver) => + logInfo(s"Removing driver: $driverId") + drivers -= driver + completedDrivers += driver + persistenceEngine.removeDriver(driver) + driver.state = finalState + driver.exception = exception + driver.worker.foreach(w => w.removeDriver(driver)) + case None => + logWarning(s"Asked to remove unknown driver: $driverId") + } + } } private[spark] object Master { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index c8cafac3b6..35a15074db 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -53,10 +53,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) { UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs") - val runningDriverTable = - UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers) - def finishedDriverTable = - UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers) + val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse + val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) + val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse + def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) val content =