Adding --verbose option to DriverClient
This commit is contained in:
parent
e21a707a13
commit
3d939e5fe8
|
@ -22,12 +22,15 @@ import scala.collection.mutable.Map
|
|||
import scala.concurrent._
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor
|
||||
|
||||
import org.apache.spark.{SparkConf, Logging}
|
||||
import org.apache.spark.{Logging, SparkConf}
|
||||
import org.apache.spark.deploy.{Command, DriverDescription}
|
||||
import org.apache.spark.deploy.DeployMessages._
|
||||
import org.apache.spark.deploy.master.Master
|
||||
import org.apache.spark.util.{AkkaUtils, Utils}
|
||||
import org.apache.log4j.{Logger, Level}
|
||||
import akka.remote.RemotingLifecycleEvent
|
||||
|
||||
/**
|
||||
* Actor that sends a single message to the standalone master and returns the response in the
|
||||
|
@ -55,12 +58,18 @@ class DriverActor(master: String, response: Promise[(Boolean, String)]) extends
|
|||
/**
|
||||
* Executable utility for starting and terminating drivers inside of a standalone cluster.
|
||||
*/
|
||||
object DriverClient extends Logging {
|
||||
object DriverClient {
|
||||
|
||||
def main(args: Array[String]) {
|
||||
val driverArgs = new DriverClientArguments(args)
|
||||
val conf = new SparkConf()
|
||||
|
||||
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
|
||||
conf.set("spark.akka.logLifecycleEvents", "true")
|
||||
}
|
||||
conf.set("spark.akka.askTimeout", "5")
|
||||
Logger.getRootLogger.setLevel(driverArgs.logLevel)
|
||||
|
||||
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
|
||||
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
|
||||
val (actorSystem, _) = AkkaUtils.createActorSystem(
|
||||
|
@ -69,6 +78,7 @@ object DriverClient extends Logging {
|
|||
val response = promise[(Boolean, String)]
|
||||
val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
|
||||
|
||||
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
|
||||
driverArgs.cmd match {
|
||||
case "launch" =>
|
||||
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
|
||||
|
@ -98,9 +108,9 @@ object DriverClient extends Logging {
|
|||
try {
|
||||
Await.result(response.future, AkkaUtils.askTimeout(conf))
|
||||
} catch {
|
||||
case e: TimeoutException => (false, s"Master $master failed to respond in time")
|
||||
case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
|
||||
}
|
||||
if (success) logInfo(message) else logError(message)
|
||||
println(message)
|
||||
actorSystem.shutdown()
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.spark.deploy.client
|
|||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
import org.apache.log4j.Level
|
||||
|
||||
/**
|
||||
* Command-line parser for the driver client.
|
||||
*/
|
||||
|
@ -27,6 +29,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
|
|||
val defaultMemory = 512
|
||||
|
||||
var cmd: String = "" // 'launch' or 'kill'
|
||||
var logLevel = Level.WARN
|
||||
|
||||
// launch parameters
|
||||
var master: String = ""
|
||||
|
@ -59,6 +62,10 @@ private[spark] class DriverClientArguments(args: Array[String]) {
|
|||
case ("--help" | "-h") :: tail =>
|
||||
printUsageAndExit(0)
|
||||
|
||||
case ("--verbose" | "-v") :: tail =>
|
||||
logLevel = Level.INFO
|
||||
parse(tail)
|
||||
|
||||
case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
|
||||
cmd = "launch"
|
||||
master = _master
|
||||
|
@ -90,6 +97,7 @@ private[spark] class DriverClientArguments(args: Array[String]) {
|
|||
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
|
||||
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
|
||||
| -s, --supervise Whether to restart the driver on failure
|
||||
| -v, --verbose Print more debugging output
|
||||
""".stripMargin
|
||||
System.err.println(usage)
|
||||
System.exit(exitCode)
|
||||
|
|
Loading…
Reference in a new issue