Changes based on review feedback.
This commit is contained in:
parent
7a99702ce2
commit
f236ddd1a2
|
@ -62,7 +62,7 @@ object DriverClient extends Logging {
|
|||
|
||||
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
|
||||
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
|
||||
val (actorSystem, _) = AkkaUtils.createActorSystem(
|
||||
"driverClient", Utils.localHostName(), 0)
|
||||
val master = driverArgs.master
|
||||
val response = promise[(Boolean, String)]
|
||||
|
|
|
@ -23,6 +23,9 @@ import scala.collection.mutable.ListBuffer
|
|||
* Command-line parser for the driver client.
|
||||
*/
|
||||
private[spark] class DriverClientArguments(args: Array[String]) {
|
||||
val defaultCores = 1
|
||||
val defaultMemory = 512
|
||||
|
||||
var cmd: String = "" // 'launch' or 'kill'
|
||||
|
||||
// launch parameters
|
||||
|
@ -30,8 +33,8 @@ private[spark] class DriverClientArguments(args: Array[String]) {
|
|||
var jarUrl: String = ""
|
||||
var mainClass: String = ""
|
||||
var supervise: Boolean = false
|
||||
var memory: Int = 512
|
||||
var cores: Int = 1
|
||||
var memory: Int = defaultMemory
|
||||
var cores: Int = defaultCores
|
||||
private var _driverOptions = ListBuffer[String]()
|
||||
def driverOptions = _driverOptions.toSeq
|
||||
|
||||
|
@ -78,14 +81,17 @@ private[spark] class DriverClientArguments(args: Array[String]) {
|
|||
def printUsageAndExit(exitCode: Int) {
|
||||
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
|
||||
// separately similar to in the YARN client.
|
||||
System.err.println(
|
||||
"usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " +
|
||||
"[driver options]\n" +
|
||||
"usage: DriverClient kill <active-master> <driver-id>\n\n" +
|
||||
"Options:\n" +
|
||||
" -c CORES, --cores CORES Number of cores to request \n" +
|
||||
" -m MEMORY, --memory MEMORY Megabytes of memory to request\n" +
|
||||
" -s, --supervise Whether to restart the driver on failure\n")
|
||||
val usage =
|
||||
s"""
|
||||
|Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
|
||||
|Usage: DriverClient kill <active-master> <driver-id>
|
||||
|
|
||||
|Options:
|
||||
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
|
||||
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
|
||||
| -s, --supervise Whether to restart the driver on failure
|
||||
""".stripMargin
|
||||
System.err.println(usage)
|
||||
System.exit(exitCode)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ object CommandUtils extends Logging {
|
|||
}
|
||||
|
||||
private def getEnv(key: String, command: Command): Option[String] =
|
||||
command.environment.get(key).orElse(Option(getenv(key)))
|
||||
command.environment.get(key).orElse(Option(System.getenv(key)))
|
||||
|
||||
/**
|
||||
* Attention: this must always be aligned with the environment variables in the run scripts and
|
||||
|
|
|
@ -119,15 +119,14 @@ private[spark] class DriverRunner(
|
|||
val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
|
||||
val jarFileSystem = jarPath.getFileSystem(emptyConf)
|
||||
|
||||
val destPath = new Path(driverDir.getAbsolutePath())
|
||||
val destFileSystem = destPath.getFileSystem(emptyConf)
|
||||
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
|
||||
val jarFileName = jarPath.getName
|
||||
val localJarFile = new File(driverDir, jarFileName)
|
||||
val localJarFilename = localJarFile.getAbsolutePath
|
||||
|
||||
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
|
||||
logInfo(s"Copying user jar $jarPath to $destPath")
|
||||
FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
|
||||
FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
|
||||
}
|
||||
|
||||
if (!localJarFile.exists()) { // Verify copy succeeded
|
||||
|
@ -140,8 +139,12 @@ private[spark] class DriverRunner(
|
|||
/** Launch the supplied command. */
|
||||
private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File,
|
||||
supervise: Boolean) {
|
||||
|
||||
// Time to wait between submission retries.
|
||||
var waitSeconds = 1
|
||||
// A run of this many seconds resets the exponential back-off.
|
||||
val successfulRunDuration = 1
|
||||
|
||||
var keepTrying = !killed
|
||||
|
||||
while (keepTrying) {
|
||||
|
@ -161,11 +164,15 @@ private[spark] class DriverRunner(
|
|||
val stderr = new File(baseDir, "stderr")
|
||||
val header = "Launch Command: %s\n%s\n\n".format(
|
||||
command.mkString("\"", "\" \"", "\""), "=" * 40)
|
||||
Files.write(header, stderr, Charsets.UTF_8)
|
||||
Files.append(header, stderr, Charsets.UTF_8)
|
||||
CommandUtils.redirectStream(process.get.getErrorStream, stderr)
|
||||
}
|
||||
|
||||
val processStart = System.currentTimeMillis()
|
||||
val exitCode = process.get.waitFor()
|
||||
if (System.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
|
||||
waitSeconds = 1
|
||||
}
|
||||
|
||||
if (supervise && exitCode != 0 && !killed) {
|
||||
waitSeconds = waitSeconds * 2 // exponential back-off
|
||||
|
|
|
@ -11,7 +11,7 @@ object DriverWrapper {
|
|||
def main(args: Array[String]) {
|
||||
args.toList match {
|
||||
case workerUrl :: mainClass :: extraArgs =>
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("Driver",
|
||||
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
|
||||
Utils.localHostName(), 0)
|
||||
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
|
||||
|
||||
|
|
|
@ -260,8 +260,8 @@ private[spark] class Worker(
|
|||
|
||||
case KillDriver(driverId) => {
|
||||
logInfo(s"Asked to kill driver $driverId")
|
||||
drivers.find(_._1 == driverId) match {
|
||||
case Some((id, runner)) =>
|
||||
drivers.get(driverId) match {
|
||||
case Some(runner) =>
|
||||
runner.kill()
|
||||
case None =>
|
||||
logError(s"Asked to kill unknown driver $driverId")
|
||||
|
@ -280,8 +280,7 @@ private[spark] class Worker(
|
|||
masterLock.synchronized {
|
||||
master ! DriverStateChanged(driverId, state, exception)
|
||||
}
|
||||
val driver = drivers(driverId)
|
||||
drivers -= driverId
|
||||
val driver = drivers.remove(driverId).get
|
||||
finishedDrivers(driverId) = driver
|
||||
memoryUsed -= driver.driverDesc.mem
|
||||
coresUsed -= driver.driverDesc.cores
|
||||
|
|
|
@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
|
|||
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
|
||||
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
|
||||
CoarseGrainedSchedulerBackend.ACTOR_NAME)
|
||||
val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
|
||||
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
|
||||
val command = Command(
|
||||
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
|
||||
val sparkHome = sc.getSparkHome().getOrElse(null)
|
||||
|
|
Loading…
Reference in a new issue