Minor fixes

This commit is contained in:
Patrick Wendell 2013-12-26 14:14:49 -08:00
parent c23d640516
commit 5c1b4f6405
3 changed files with 25 additions and 20 deletions

View file

@ -90,7 +90,6 @@ object DriverClient extends Logging {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
if (success) logInfo(message) else logError(message)
actorSystem.stop(driver)
actorSystem.shutdown()
actorSystem.awaitTermination()
}

View file

@ -37,6 +37,7 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.deploy.master.DriverState.DriverState
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@ -268,21 +269,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case DriverStateChanged(driverId, state, exception) => {
if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
state == DriverState.KILLED)) {
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
drivers.find(_.id == driverId) match {
case Some(driver) => {
drivers -= driver
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = state
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
}
case None =>
logWarning(s"Got driver update for unknown driver $driverId")
state match {
case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
@ -638,6 +629,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}
def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
private[spark] object Master {

View file

@ -53,10 +53,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
val runningDriverTable =
UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
def finishedDriverTable =
UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
val content =
<div class="row-fluid"> <!-- Worker Details -->