Initial cut at driver submission.

This commit is contained in:
Patrick Wendell 2013-12-21 21:08:13 -08:00
parent 1070b566d4
commit 6a4acc4c2d
16 changed files with 781 additions and 53 deletions

View file

@ -20,10 +20,11 @@ package org.apache.spark.deploy
import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.master.{DriverInfo, WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.util.Utils
import org.apache.spark.deploy.master.DriverState.DriverState
private[deploy] sealed trait DeployMessage extends Serializable
@ -54,7 +55,14 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
case class DriverStateChanged(
driverId: String,
state: DriverState,
exception: Option[Exception])
extends DeployMessage
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])
case class Heartbeat(workerId: String) extends DeployMessage
@ -76,14 +84,19 @@ private[deploy] object DeployMessages {
sparkHome: String)
extends DeployMessage
// Client to Master
case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int)
extends DeployMessage
case class KillDriver(driverId: String) extends DeployMessage
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
case class MasterChangeAcknowledged(appId: String)
// Master to Client
// Master to AppClient
case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
@ -97,11 +110,21 @@ private[deploy] object DeployMessages {
case class ApplicationRemoved(message: String)
// Internal message in Client
// DriverClient <-> Master
case object StopClient
case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
// Master to Worker & Client
case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
case class RequestKillDriver(driverId: String) extends DeployMessage
case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
// Internal message in AppClient
case object StopAppClient
// Master to Worker & AppClient
case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
@ -112,6 +135,7 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
status: MasterState) {
@ -128,7 +152,8 @@ private[deploy] object DeployMessages {
// Worker to WorkerWebUI
case class WorkerStateResponse(host: String, port: Int, workerId: String,
executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner],
drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
Utils.checkHost(host, "Required hostname")

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
private[spark] class DriverDescription(
val jarUrl: String,
val mainClass: String,
val mem: Integer) // TODO: Should this be Long?
extends Serializable {
override def toString: String = s"DriverDescription ($mainClass)"
}

View file

@ -147,7 +147,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case StopClient =>
case StopAppClient =>
markDead()
sender ! true
context.stop(self)

View file

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.client
import akka.actor._
import akka.remote.{RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{DeployMessage, DriverDescription}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{MasterArguments, Master}
import akka.pattern.ask
import org.apache.spark.util.{Utils, AkkaUtils}
import scala.concurrent.duration.{FiniteDuration, Duration}
import java.util.concurrent.TimeUnit
import akka.util.Timeout
import scala.concurrent.Await
import akka.actor.Actor.emptyBehavior
/**
* Parent class for actors that to send a single message to the standalone master and then die.
*/
private[spark] abstract class SingleMessageClient(
actorSystem: ActorSystem, master: String, message: DeployMessage)
extends Logging {
// Concrete child classes must implement
def handleResponse(response: Any)
var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))
class DriverActor extends Actor with Logging {
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
logInfo("Sending message to master " + master + "...")
val masterActor = context.actorSelection(Master.toAkkaUrl(master))
val timeoutDuration: FiniteDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
val submitFuture = masterActor.ask(message)(timeoutDuration)
handleResponse(Await.result(submitFuture, timeoutDuration))
actorSystem.stop(actor)
actorSystem.shutdown()
}
override def receive = emptyBehavior
}
}
/**
* Submits a driver to the master.
*/
private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
driverDescription: DriverDescription)
extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {
override def handleResponse(response: Any) {
val resp = response.asInstanceOf[SubmitDriverResponse]
if (!resp.success) {
logError(s"Error submitting driver to $master")
logError(resp.message)
}
}
}
/**
* Terminates a client at the master.
*/
private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {
override def handleResponse(response: Any) {
val resp = response.asInstanceOf[KillDriverResponse]
if (!resp.success) {
logError(s"Error terminating $driverId at $master")
logError(resp.message)
}
}
}
/**
* Callable utility for starting and terminating drivers inside of the standalone scheduler.
*/
object DriverClient {
def main(args: Array[String]) {
if (args.size < 3) {
println("usage: DriverClient launch <active-master> <jar-url> <main-class>")
println("usage: DriverClient kill <active-master> <driver-id>")
System.exit(-1)
}
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"driverSubmission", Utils.localHostName(), 0)
// TODO Should be configurable
val mem = 512
args(0) match {
case "launch" =>
val master = args(1)
val jarUrl = args(2)
val mainClass = args(3)
val driverDescription = new DriverDescription(jarUrl, mainClass, mem)
val client = new SubmissionClient(actorSystem, master, driverDescription)
case "kill" =>
val master = args(1)
val driverId = args(2)
val client = new TerminationClient(actorSystem, master, driverId)
}
actorSystem.awaitTermination()
}
}

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
import org.apache.spark.deploy.{DriverDescription, ApplicationDescription}
import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class DriverInfo(
val startTime: Long,
val id: String,
val desc: DriverDescription,
val submitDate: Date)
extends Serializable {
@transient var state: DriverState.Value = DriverState.SUBMITTED
/* If we fail when launching the driver, the exception is stored here. */
@transient var exception: Option[Exception] = None
/* Most recent worker assigned to this driver */
@transient var worker: Option[WorkerInfo] = None
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.master
private[spark] object DriverState extends Enumeration {
type DriverState = Value
// SUBMITTED: Submitted but not yet scheduled on a worker
// RUNNING: Has been allocated to a worker to run
// FINISHED: Previously ran and exited cleanly
// RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
// UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
// KILLED: A user manually killed this driver
// FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
val MAX_NUM_RETRY = 10
}

View file

@ -47,6 +47,15 @@ private[spark] class FileSystemPersistenceEngine(
new File(dir + File.separator + "app_" + app.id).delete()
}
override def addDriver(driver: DriverInfo) {
val driverFile = new File(dir + File.separator + "driver_" + driver.id)
serializeIntoFile(driverFile, driver)
}
override def removeDriver(driver: DriverInfo) {
new File(dir + File.separator + "driver_" + driver.id).delete()
}
override def addWorker(worker: WorkerInfo) {
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
serializeIntoFile(workerFile, worker)
@ -56,13 +65,15 @@ private[spark] class FileSystemPersistenceEngine(
new File(dir + File.separator + "worker_" + worker.id).delete()
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
(apps, workers)
(apps, drivers, workers)
}
private def serializeIntoFile(file: File, value: AnyRef) {

View file

@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.{DriverDescription, ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
@ -47,7 +47,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
@ -57,9 +56,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val idToApp = new HashMap[String, ApplicationInfo]
val actorToApp = new HashMap[ActorRef, ApplicationInfo]
val addressToApp = new HashMap[Address, ApplicationInfo]
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
var nextAppNumber = 0
val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
var nextDriverNumber = 0
Utils.checkHost(host, "Expected hostname")
@ -134,14 +138,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case ElectedLeader => {
val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedWorkers.isEmpty)
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
RecoveryState.ALIVE
else
RecoveryState.RECOVERING
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
beginRecovery(storedApps, storedDrivers, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
}
}
@ -168,6 +172,52 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
case RequestSubmitDriver(description) => {
if (state == RecoveryState.STANDBY) {
sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission")
} else {
logInfo("Driver submitted " + description.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. Since we may already want to expose this.
sender ! SubmitDriverResponse(true, "Driver successfully submitted")
}
}
case RequestKillDriver(driverId) => {
if (state == RecoveryState.STANDBY) {
sender ! KillDriverResponse(false, "Standby master cannot kill drivers")
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
if (waitingDrivers.contains(d)) { waitingDrivers -= d }
else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
d.worker.foreach { w =>
w.actor ! KillDriver(driverId)
}
}
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
sender ! KillDriverResponse(true, msg)
case None =>
val msg = s"Could not find running driver $driverId"
logWarning(msg)
sender ! KillDriverResponse(false, msg)
}
}
}
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
@ -210,6 +260,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
case DriverStateChanged(driverId, state, exception) => {
if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
state == DriverState.KILLED)) {
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
drivers.find(_.id == driverId) match {
case Some(driver) => {
drivers -= driver
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = state
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
}
case None =>
logWarning(s"Got driver update for unknown driver $driverId")
}
}
case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
@ -231,7 +300,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
case WorkerSchedulerStateResponse(workerId, executors) => {
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
@ -244,6 +313,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
@ -260,8 +337,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
state)
sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray,
completedDrivers.toArray ,apps.toArray, completedApps.toArray, state)
}
case CheckForWorkerTimeOut => {
@ -277,7 +354,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
apps.count(_.state == ApplicationState.UNKNOWN) == 0
def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
@ -289,6 +367,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
@ -312,6 +396,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery, re-launching")
relaunchDriver(d)
}
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
@ -332,6 +422,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
*/
def schedule() {
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (driver <- Seq(waitingDrivers: _*)) {
if (worker.memoryFree > driver.desc.mem) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
@ -418,9 +518,19 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
relaunchDriver(driver)
}
persistenceEngine.removeWorker(worker)
}
def relaunchDriver(driver: DriverInfo) {
driver.worker = None
driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
schedule()
}
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
@ -499,6 +609,28 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
}
/** Generate a new driver ID given a driver's submission date */
def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
appId
}
def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new DriverInfo(now, newDriverId(date), desc, date)
}
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass,
driver.desc.mem)
driver.state = DriverState.RUNNING
}
}
private[spark] object Master {

View file

@ -35,11 +35,15 @@ private[spark] trait PersistenceEngine {
def removeWorker(worker: WorkerInfo)
def addDriver(driver: DriverInfo)
def removeDriver(driver: DriverInfo)
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
def close() {}
}
@ -49,5 +53,8 @@ private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
override def readPersistedData() = (Nil, Nil)
override def addDriver(driver: DriverInfo) {}
override def removeDriver(driver: DriverInfo) {}
override def readPersistedData() = (Nil, Nil, Nil)
}

View file

@ -36,6 +36,7 @@ private[spark] class WorkerInfo(
assert (port > 0)
@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@transient var memoryUsed: Int = _
@ -54,6 +55,7 @@ private[spark] class WorkerInfo(
private def init() {
executors = new mutable.HashMap
drivers = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
@ -83,6 +85,18 @@ private[spark] class WorkerInfo(
executors.values.exists(_.application == app)
}
def addDriver(driver: DriverInfo) {
drivers(driver.id) = driver
memoryUsed += driver.desc.mem
coresUsed += 1
}
def removeDriver(driver: DriverInfo) {
drivers -= driver.id
memoryUsed -= driver.desc.mem
coresUsed -= 1
}
def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

View file

@ -49,6 +49,14 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
zk.delete(WORKING_DIR + "/app_" + app.id)
}
override def addDriver(driver: DriverInfo) {
serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver)
}
override def removeDriver(driver: DriverInfo) {
zk.delete(WORKING_DIR + "/driver_" + driver.id)
}
override def addWorker(worker: WorkerInfo) {
serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
}
@ -61,13 +69,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
zk.close()
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
val appFiles = sortedFiles.filter(_.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
(apps, workers)
(apps, drivers, workers)
}
private def serializeIntoFile(path: String, value: AnyRef) {

View file

@ -26,7 +26,8 @@ import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{DriverInfo, ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@ -56,6 +57,12 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Memory", "Main Class")
val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers)
val content =
<div class="row-fluid">
<div class="span12">
@ -70,6 +77,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<li><strong>Applications:</strong>
{state.activeApps.size} Running,
{state.completedApps.size} Completed </li>
<li><strong>Drivers:</strong>
{state.activeDrivers.size} Running,
{state.completedDrivers.size} Completed </li>
</ul>
</div>
</div>
@ -94,7 +104,22 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<h4> Completed Applications </h4>
{completedAppsTable}
</div>
</div>;
</div>
<div class="row-fluid">
<div class="span12">
<h4> Active Drivers </h4>
{activeDriversTable}
</div>
</div>
<div class="row-fluid">
<div class="span12">
<h4> Completed Drivers </h4>
{completedDriversTable}
</div>
</div>;
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}
@ -134,4 +159,17 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{DeployWebUI.formatDuration(app.duration)}</td>
</tr>
}
def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
<td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td>
<td>{driver.state}</td>
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
</td>
<td>{driver.desc.mainClass}</td>
</tr>
}
}

View file

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.worker
import java.io._
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import akka.actor.{ActorRef, ActorSelection}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
/**
* Manages the execution of one driver process.
*/
private[spark] class DriverRunner(
val driverId: String,
val jarUrl: String,
val mainClass: String,
val workDir: File,
val memory: Int,
val worker: ActorRef)
extends Logging {
var process: Option[Process] = None
@volatile var killed = false
/** Starts a thread to run and manage the driver. */
def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
var exn: Option[Exception] = None
try {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
val command = Seq("java", "-cp", localJarFilename, mainClass)
runCommandWithRetry(command, driverDir)
}
catch {
case e: Exception => exn = Some(e)
}
val finalState =
if (killed) { DriverState.KILLED }
else if (exn.isDefined) { DriverState.FAILED }
else { DriverState.FINISHED }
worker ! DriverStateChanged(driverId, finalState, exn)
}
}.start()
}
/** Terminate this driver (or prevent it from ever starting if not yet started) */
def kill() {
killed = true
process.foreach(p => p.destroy())
}
/** Spawn a thread that will redirect a given stream to a file */
def redirectStream(in: InputStream, file: File) {
val out = new FileOutputStream(file, true)
new Thread("redirect output to " + file) {
override def run() {
try {
Utils.copyStream(in, out, true)
} catch {
case e: IOException =>
logInfo("Redirection to " + file + " closed: " + e.getMessage)
}
}
}.start()
}
/**
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
*/
def createWorkingDirectory(): File = {
val driverDir = new File(workDir, driverId)
if (!driverDir.exists() && !driverDir.mkdirs()) {
throw new IOException("Failed to create directory " + driverDir)
}
driverDir
}
/**
* Download the user jar into the supplied directory and return its local path.
* Will throw an exception if there are errors downloading the jar.
*/
def downloadUserJar(driverDir: File): String = {
val jarPath = new Path(jarUrl)
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
val jarFileSystem = jarPath.getFileSystem(emptyConf)
val destPath = new Path(driverDir.getAbsolutePath())
val destFileSystem = destPath.getFileSystem(emptyConf)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
val localJarFilename = localJarFile.getAbsolutePath
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
}
localJarFilename
}
/** Continue launching the supplied command until it exits zero. */
def runCommandWithRetry(command: Seq[String], baseDir: File) = {
/* Time to wait between submission retries. */
var waitSeconds = 1
// TODO: We should distinguish between "immediate" exits and cases where it was running
// for a long time and then exits.
var cleanExit = false
while (!cleanExit && !killed) {
Thread.sleep(waitSeconds * 1000)
val builder = new ProcessBuilder(command: _*).directory(baseDir)
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
process = Some(builder.start())
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
redirectStream(process.get.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
val header = "Driver Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.write(header, stderr, Charsets.UTF_8)
redirectStream(process.get.getErrorStream, stderr)
val exitCode =
/* There is a race here I've elected to ignore for now because it's very unlikely and not
* simple to fix. This could see `killed=false` then the main thread gets a kill request
* and sets `killed=true` and destroys the not-yet-started process, then this thread
* launches the process. For now, in that case the user can just re-submit the kill
* request. */
if (killed) -1
else process.get.waitFor()
cleanExit = exitCode == 0
if (!cleanExit && !killed) {
waitSeconds = waitSeconds * 2 // exponential back-off
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
}
}
}
}

View file

@ -30,18 +30,10 @@ import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.deploy.DeployMessages.Heartbeat
import org.apache.spark.deploy.DeployMessages.RegisteredWorker
import org.apache.spark.deploy.DeployMessages.LaunchExecutor
import org.apache.spark.deploy.DeployMessages.RegisterWorker
/**
* @param masterUrls Each url should look like spark://host:port.
@ -83,6 +75,9 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@ -193,7 +188,7 @@ private[spark] class Worker(
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)
case RegisterWorkerFailed(message) =>
if (!registered) {
@ -247,13 +242,56 @@ private[spark] class Worker(
}
}
case LaunchDriver(driverId, jarUrl, mainClass, memory) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self)
drivers(driverId) = driver
driver.start()
coresUsed += 1
memoryUsed += memory
}
case KillDriver(driverId) => {
logInfo(s"Asked to kill driver $driverId")
drivers.find(_._1 == driverId) match {
case Some((id, runner)) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
}
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.FAILED =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
val driver = drivers(driverId)
memoryUsed -= driver.memory
coresUsed -= 1
drivers -= driverId
finishedDrivers(driverId) = driver
}
case x: DisassociatedEvent if x.remoteAddress == masterAddress =>
logInfo(s"$x Disassociated !")
masterDisconnected()
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, activeMasterUrl, cores, memory,
finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl)
}
}

View file

@ -30,7 +30,7 @@ import net.liftweb.json.JsonAST.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@ -56,6 +56,12 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val finishedExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
val driverHeaders = Seq("DriverID", "Main Class", "Memory", "Logs")
val runningDriverTable =
UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
def finishedDriverTable =
UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
val content =
<div class="row-fluid"> <!-- Worker Details -->
<div class="span12">
@ -84,6 +90,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<h4> Finished Executors </h4>
{finishedExecutorTable}
</div>
</div>
<div class="row-fluid"> <!-- Running Drivers -->
<div class="span12">
<h4> Running Drivers {workerState.drivers.size} </h4>
{runningDriverTable}
</div>
</div>
<div class="row-fluid"> <!-- Finished Drivers -->
<div class="span12">
<h4> Finished Drivers </h4>
{finishedDriverTable}
</div>
</div>;
UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
@ -111,6 +131,20 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
.format(executor.appId, executor.execId)}>stderr</a>
</td>
</tr>
}
def driverRow(driver: DriverRunner): Seq[Node] = {
<tr>
<td>{driver.driverId}</td>
<td>{driver.mainClass}</td>
<td sorttable_customkey={driver.memory.toString}>
{Utils.megabytesToString(driver.memory)}
</td>
<td>
<a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
<a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a>
</td>
</tr>
}
}

View file

@ -69,30 +69,44 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
def log(request: HttpServletRequest): String = {
val defaultBytes = 100 * 1024
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val appId = Option(request.getParameter("appId"))
val executorId = Option(request.getParameter("executorId"))
val driverId = Option(request.getParameter("driverId"))
val logType = request.getParameter("logType")
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val path = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
s"${workDir.getPath}/$appId/$executorId/$logType"
case (None, None, Some(d)) =>
s"${workDir.getPath}/$driverId/$logType"
}
val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
val logLength = file.length
val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
.format(startByte, endByte, logLength, appId, executorId, logType)
val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
pre + Utils.offsetBytes(path, startByte, endByte)
}
def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
val defaultBytes = 100 * 1024
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val appId = Option(request.getParameter("appId"))
val executorId = Option(request.getParameter("executorId"))
val driverId = Option(request.getParameter("driverId"))
val logType = request.getParameter("logType")
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val (path, params) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
(s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
case (None, None, Some(d)) =>
(s"${workDir.getPath}/$d/$logType", s"driverId=$d")
}
val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
@ -106,9 +120,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val backButton =
if (startByte > 0) {
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
.format(appId, executorId, logType, math.max(startByte-byteLength, 0),
byteLength)}>
<a href={"?%s&logType=%s&offset=%s&byteLength=%s"
.format(params, logType, math.max(startByte-byteLength, 0), byteLength)}>
<button type="button" class="btn btn-default">
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
@ -122,8 +135,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val nextButton =
if (endByte < logLength) {
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
format(appId, executorId, logType, endByte, byteLength)}>
<a href={"?%s&logType=%s&offset=%s&byteLength=%s".
format(params, logType, endByte, byteLength)}>
<button type="button" class="btn btn-default">
Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
</button>