SPARK-1352: Improve robustness of spark-submit script
1. Better error messages when required arguments are missing. 2. Support for unit testing cases where presented arguments are invalid. 3. Bug fix: Only use environment varaibles when they are set (otherwise will cause NPE). 4. A verbose mode to aid debugging. 5. Visibility of several variables is set to private. 6. Deprecation warning for existing scripts. Author: Patrick Wendell <pwendell@gmail.com> Closes #271 from pwendell/spark-submit and squashes the following commits: 9146def [Patrick Wendell] SPARK-1352: Improve robustness of spark-submit script
This commit is contained in:
parent
d666053679
commit
841721e03c
|
@ -128,6 +128,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
|
|||
*/
|
||||
object Client {
|
||||
def main(args: Array[String]) {
|
||||
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
|
||||
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
|
||||
|
||||
val conf = new SparkConf()
|
||||
val driverArgs = new ClientArguments(args)
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.deploy
|
||||
|
||||
import java.io.File
|
||||
import java.io.{PrintStream, File}
|
||||
import java.net.URL
|
||||
|
||||
import org.apache.spark.executor.ExecutorURLClassLoader
|
||||
|
@ -32,38 +32,51 @@ import scala.collection.mutable.Map
|
|||
* modes that Spark supports.
|
||||
*/
|
||||
object SparkSubmit {
|
||||
val YARN = 1
|
||||
val STANDALONE = 2
|
||||
val MESOS = 4
|
||||
val LOCAL = 8
|
||||
val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
|
||||
private val YARN = 1
|
||||
private val STANDALONE = 2
|
||||
private val MESOS = 4
|
||||
private val LOCAL = 8
|
||||
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
|
||||
|
||||
var clusterManager: Int = LOCAL
|
||||
private var clusterManager: Int = LOCAL
|
||||
|
||||
def main(args: Array[String]) {
|
||||
val appArgs = new SparkSubmitArguments(args)
|
||||
if (appArgs.verbose) {
|
||||
printStream.println(appArgs)
|
||||
}
|
||||
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
|
||||
launch(childArgs, classpath, sysProps, mainClass)
|
||||
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
|
||||
}
|
||||
|
||||
// Exposed for testing
|
||||
private[spark] var printStream: PrintStream = System.err
|
||||
private[spark] var exitFn: () => Unit = () => System.exit(-1)
|
||||
|
||||
private[spark] def printErrorAndExit(str: String) = {
|
||||
printStream.println("error: " + str)
|
||||
printStream.println("run with --help for more information or --verbose for debugging output")
|
||||
exitFn()
|
||||
}
|
||||
private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
|
||||
|
||||
/**
|
||||
* @return
|
||||
* a tuple containing the arguments for the child, a list of classpath
|
||||
* entries for the child, and the main class for the child
|
||||
*/
|
||||
def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
|
||||
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
|
||||
ArrayBuffer[String], Map[String, String], String) = {
|
||||
if (appArgs.master.startsWith("yarn")) {
|
||||
if (appArgs.master.startsWith("local")) {
|
||||
clusterManager = LOCAL
|
||||
} else if (appArgs.master.startsWith("yarn")) {
|
||||
clusterManager = YARN
|
||||
} else if (appArgs.master.startsWith("spark")) {
|
||||
clusterManager = STANDALONE
|
||||
} else if (appArgs.master.startsWith("mesos")) {
|
||||
clusterManager = MESOS
|
||||
} else if (appArgs.master.startsWith("local")) {
|
||||
clusterManager = LOCAL
|
||||
} else {
|
||||
System.err.println("master must start with yarn, mesos, spark, or local")
|
||||
System.exit(1)
|
||||
printErrorAndExit("master must start with yarn, mesos, spark, or local")
|
||||
}
|
||||
|
||||
// Because "yarn-standalone" and "yarn-client" encapsulate both the master
|
||||
|
@ -73,12 +86,10 @@ object SparkSubmit {
|
|||
appArgs.deployMode = "cluster"
|
||||
}
|
||||
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
|
||||
System.err.println("Deploy mode \"cluster\" and master \"yarn-client\" are at odds")
|
||||
System.exit(1)
|
||||
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
|
||||
}
|
||||
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
|
||||
System.err.println("Deploy mode \"client\" and master \"yarn-standalone\" are at odds")
|
||||
System.exit(1)
|
||||
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
|
||||
}
|
||||
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
|
||||
appArgs.master = "yarn-standalone"
|
||||
|
@ -95,8 +106,7 @@ object SparkSubmit {
|
|||
var childMainClass = ""
|
||||
|
||||
if (clusterManager == MESOS && deployOnCluster) {
|
||||
System.err.println("Mesos does not support running the driver on the cluster")
|
||||
System.exit(1)
|
||||
printErrorAndExit("Mesos does not support running the driver on the cluster")
|
||||
}
|
||||
|
||||
if (!deployOnCluster) {
|
||||
|
@ -174,8 +184,17 @@ object SparkSubmit {
|
|||
(childArgs, childClasspath, sysProps, childMainClass)
|
||||
}
|
||||
|
||||
def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
|
||||
sysProps: Map[String, String], childMainClass: String) {
|
||||
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
|
||||
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
|
||||
|
||||
if (verbose) {
|
||||
System.err.println(s"Main class:\n$childMainClass")
|
||||
System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
|
||||
System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
|
||||
System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
|
||||
System.err.println("\n")
|
||||
}
|
||||
|
||||
val loader = new ExecutorURLClassLoader(new Array[URL](0),
|
||||
Thread.currentThread.getContextClassLoader)
|
||||
Thread.currentThread.setContextClassLoader(loader)
|
||||
|
@ -193,10 +212,10 @@ object SparkSubmit {
|
|||
mainMethod.invoke(null, childArgs.toArray)
|
||||
}
|
||||
|
||||
def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
|
||||
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
|
||||
val localJarFile = new File(localJar)
|
||||
if (!localJarFile.exists()) {
|
||||
System.err.println("Jar does not exist: " + localJar + ". Skipping.")
|
||||
printWarning(s"Jar $localJar does not exist, skipping.")
|
||||
}
|
||||
|
||||
val url = localJarFile.getAbsoluteFile.toURI.toURL
|
||||
|
|
|
@ -40,25 +40,45 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
|||
var name: String = null
|
||||
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
|
||||
var jars: String = null
|
||||
var verbose: Boolean = false
|
||||
|
||||
loadEnvVars()
|
||||
parseArgs(args.toList)
|
||||
parseOpts(args.toList)
|
||||
|
||||
def loadEnvVars() {
|
||||
master = System.getenv("MASTER")
|
||||
deployMode = System.getenv("DEPLOY_MODE")
|
||||
// Sanity checks
|
||||
if (args.length == 0) printUsageAndExit(-1)
|
||||
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
|
||||
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
|
||||
|
||||
override def toString = {
|
||||
s"""Parsed arguments:
|
||||
| master $master
|
||||
| deployMode $deployMode
|
||||
| executorMemory $executorMemory
|
||||
| executorCores $executorCores
|
||||
| totalExecutorCores $totalExecutorCores
|
||||
| driverMemory $driverMemory
|
||||
| drivercores $driverCores
|
||||
| supervise $supervise
|
||||
| queue $queue
|
||||
| numExecutors $numExecutors
|
||||
| files $files
|
||||
| archives $archives
|
||||
| mainClass $mainClass
|
||||
| primaryResource $primaryResource
|
||||
| name $name
|
||||
| childArgs [${childArgs.mkString(" ")}]
|
||||
| jars $jars
|
||||
| verbose $verbose
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
def parseArgs(args: List[String]) {
|
||||
if (args.size == 0) {
|
||||
printUsageAndExit(1)
|
||||
System.exit(1)
|
||||
}
|
||||
primaryResource = args(0)
|
||||
parseOpts(args.tail)
|
||||
private def loadEnvVars() {
|
||||
Option(System.getenv("MASTER")).map(master = _)
|
||||
Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
|
||||
}
|
||||
|
||||
def parseOpts(opts: List[String]): Unit = opts match {
|
||||
private def parseOpts(opts: List[String]): Unit = opts match {
|
||||
case ("--name") :: value :: tail =>
|
||||
name = value
|
||||
parseOpts(tail)
|
||||
|
@ -73,8 +93,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
|||
|
||||
case ("--deploy-mode") :: value :: tail =>
|
||||
if (value != "client" && value != "cluster") {
|
||||
System.err.println("--deploy-mode must be either \"client\" or \"cluster\"")
|
||||
System.exit(1)
|
||||
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
|
||||
}
|
||||
deployMode = value
|
||||
parseOpts(tail)
|
||||
|
@ -130,17 +149,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
|||
case ("--help" | "-h") :: tail =>
|
||||
printUsageAndExit(0)
|
||||
|
||||
case Nil =>
|
||||
case ("--verbose" | "-v") :: tail =>
|
||||
verbose = true
|
||||
parseOpts(tail)
|
||||
|
||||
case _ =>
|
||||
printUsageAndExit(1, opts)
|
||||
case value :: tail =>
|
||||
if (primaryResource != null) {
|
||||
val error = s"Found two conflicting resources, $value and $primaryResource." +
|
||||
" Expecting only one resource."
|
||||
SparkSubmit.printErrorAndExit(error)
|
||||
}
|
||||
primaryResource = value
|
||||
parseOpts(tail)
|
||||
|
||||
case Nil =>
|
||||
}
|
||||
|
||||
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
|
||||
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
|
||||
val outStream = SparkSubmit.printStream
|
||||
if (unknownParam != null) {
|
||||
System.err.println("Unknown/unsupported param " + unknownParam)
|
||||
outStream.println("Unknown/unsupported param " + unknownParam)
|
||||
}
|
||||
System.err.println(
|
||||
outStream.println(
|
||||
"""Usage: spark-submit <primary binary> [options]
|
||||
|Options:
|
||||
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
|
||||
|
@ -171,6 +201,6 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
|||
| --archives ARCHIVES Comma separated list of archives to be extracted into the
|
||||
| working dir of each executor.""".stripMargin
|
||||
)
|
||||
System.exit(exitCode)
|
||||
SparkSubmit.exitFn()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,14 +17,71 @@
|
|||
|
||||
package org.apache.spark.deploy
|
||||
|
||||
import java.io.{OutputStream, PrintStream}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
|
||||
import org.apache.spark.deploy.SparkSubmit._
|
||||
|
||||
|
||||
class SparkSubmitSuite extends FunSuite with ShouldMatchers {
|
||||
|
||||
val noOpOutputStream = new OutputStream {
|
||||
def write(b: Int) = {}
|
||||
}
|
||||
|
||||
/** Simple PrintStream that reads data into a buffer */
|
||||
class BufferPrintStream extends PrintStream(noOpOutputStream) {
|
||||
var lineBuffer = ArrayBuffer[String]()
|
||||
override def println(line: String) {
|
||||
lineBuffer += line
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if the script exits and the given search string is printed. */
|
||||
def testPrematureExit(input: Array[String], searchString: String): Boolean = {
|
||||
val printStream = new BufferPrintStream()
|
||||
SparkSubmit.printStream = printStream
|
||||
|
||||
@volatile var exitedCleanly = false
|
||||
SparkSubmit.exitFn = () => exitedCleanly = true
|
||||
|
||||
val thread = new Thread {
|
||||
override def run() = try {
|
||||
SparkSubmit.main(input)
|
||||
} catch {
|
||||
// If exceptions occur after the "exit" has happened, fine to ignore them.
|
||||
// These represent code paths not reachable during normal execution.
|
||||
case e: Exception => if (!exitedCleanly) throw e
|
||||
}
|
||||
}
|
||||
thread.start()
|
||||
thread.join()
|
||||
printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
|
||||
}
|
||||
|
||||
test("prints usage on empty input") {
|
||||
val clArgs = Array[String]()
|
||||
// val appArgs = new SparkSubmitArguments(clArgs)
|
||||
testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
|
||||
}
|
||||
|
||||
test("prints usage with only --help") {
|
||||
testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
|
||||
}
|
||||
|
||||
test("handles multiple binary definitions") {
|
||||
val adjacentJars = Array("foo.jar", "bar.jar")
|
||||
testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
|
||||
|
||||
val nonAdjacentJars =
|
||||
Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
|
||||
testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
|
||||
}
|
||||
|
||||
test("handle binary specified but not class") {
|
||||
testPrematureExit(Array("foo.jar"), "must specify a main class")
|
||||
}
|
||||
|
||||
test("handles YARN cluster mode") {
|
||||
|
|
|
@ -167,6 +167,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
|
|||
object Client {
|
||||
|
||||
def main(argStrings: Array[String]) {
|
||||
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
|
||||
println("Use ./bin/spark-submit with \"--master yarn\"")
|
||||
|
||||
// Set an env variable indicating we are running in YARN mode.
|
||||
// Note that anything with SPARK prefix gets propagated to all (remote) processes
|
||||
System.setProperty("SPARK_YARN_MODE", "true")
|
||||
|
|
|
@ -173,6 +173,9 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
|
|||
object Client {
|
||||
|
||||
def main(argStrings: Array[String]) {
|
||||
println("WARNING: This client is deprecated and will be removed in a future version of Spark.")
|
||||
println("Use ./bin/spark-submit with \"--master yarn\"")
|
||||
|
||||
// Set an env variable indicating we are running in YARN mode.
|
||||
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
|
||||
// see Client#setupLaunchEnv().
|
||||
|
|
Loading…
Reference in a new issue