[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process launcher.

The current in-process launcher implementation just calls the SparkSubmit
object, which, in case of errors, will more often than not exit the JVM.
This is not desirable since this launcher is meant to be used inside other
applications, and that would kill the application.

The change turns SparkSubmit into a class, and abstracts aways some of
the functionality used to print error messages and abort the submission
process. The default implementation uses the logging system for messages,
and throws exceptions for errors. As part of that I also moved some code
that doesn't really belong in SparkSubmit to a better location.

The command line invocation of spark-submit now uses a special implementation
of the SparkSubmit class that overrides those behaviors to do what is expected
from the command line version (print to the terminal, exit the JVM, etc).

A lot of the changes are to replace calls to methods such as "printErrorAndExit"
with the new API.

As part of adding tests for this, I had to fix some small things in the
launcher option parser so that things like "--version" can work when
used in the launcher library.

There is still code that prints directly to the terminal, like all the
Ivy-related code in SparkSubmitUtils, and other areas where some re-factoring
would help, like the CommandLineUtils class, but I chose to leave those
alone to keep this change more focused.

Aside from existing and added unit tests, I ran command line tools with
a bunch of different arguments to make sure messages and errors behave
like before.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20925 from vanzin/SPARK-22941.
This commit is contained in:
Marcelo Vanzin 2018-04-11 10:13:44 -05:00 committed by Imran Rashid
parent 653fe02415
commit 3cb82047f2
14 changed files with 402 additions and 293 deletions

View file

@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{MutableURLClassLoader, Utils}
private[deploy] object DependencyUtils {
private[deploy] object DependencyUtils extends Logging {
def resolveMavenDependencies(
packagesExclusions: String,
@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
if (jars != null) {
for (jar <- jars.split(",")) {
SparkSubmit.addJarToClasspath(jar, loader)
addJarToClasspath(jar, loader)
}
}
}
@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
}.mkString(",")
}
def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
logWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
logWarning(s"Skip remote jar $uri.")
}
}
/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(Utils.stringToSeq)
if (merged.nonEmpty) merged.mkString(",") else null
}
private def splitOnFragment(path: String): (URI, Option[String]) = {
val uri = Utils.resolveURI(path)
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)

View file

@ -58,7 +58,7 @@ import org.apache.spark.util._
*/
private[deploy] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
val SUBMIT, KILL, REQUEST_STATUS, PRINT_VERSION = Value
}
/**
@ -67,78 +67,32 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit extends CommandLineUtils with Logging {
private[spark] class SparkSubmit extends Logging {
import DependencyUtils._
import SparkSubmit._
// Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
// Deploy modes
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
printStream.println("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
printStream.println("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))
printStream.println("Branch %s".format(SPARK_BRANCH))
printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, SPARK_BUILD_DATE))
printStream.println("Revision %s".format(SPARK_REVISION))
printStream.println("Url %s".format(SPARK_REPO_URL))
printStream.println("Type --help for more information.")
exitFn(0)
}
// scalastyle:on println
override def main(args: Array[String]): Unit = {
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args)
}
/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
*/
@ -156,6 +110,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
.requestSubmissionStatus(args.submissionToRequestStatusFor)
}
/** Print version information to the log. */
private def printVersion(): Unit = {
logInfo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
logInfo("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))
logInfo(s"Branch $SPARK_BRANCH")
logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
logInfo(s"Revision $SPARK_REVISION")
logInfo(s"Url $SPARK_REPO_URL")
logInfo("Type --help for more information.")
}
/**
* Submit the application using the provided parameters.
*
@ -185,10 +157,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
@ -210,14 +179,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
logInfo("Running Spark using the REST application submission protocol.")
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
@ -245,19 +211,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
try {
doPrepareSubmitEnvironment(args, conf)
} catch {
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
}
}
private def doPrepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
@ -268,7 +221,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
logWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
@ -276,7 +229,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
@ -284,7 +237,9 @@ object SparkSubmit extends CommandLineUtils with Logging {
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
case _ =>
error("Deploy mode must be either client or cluster")
-1
}
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
@ -296,16 +251,16 @@ object SparkSubmit extends CommandLineUtils with Logging {
deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
error("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
error("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn"
}
// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
error(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
@ -315,7 +270,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
error(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
@ -324,23 +279,23 @@ object SparkSubmit extends CommandLineUtils with Logging {
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
error("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
error("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
error("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
@ -493,11 +448,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
error("SPARK_HOME does not exist for R application in YARN mode.")
}
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
@ -510,7 +465,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit("Failed to zip all the built R packages.")
error("Failed to zip all the built R packages.")
}
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
@ -521,12 +476,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
// TODO: Support distributing R packages with standalone cluster
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
error("Distributing R packages with standalone cluster is not supported.")
}
// TODO: Support distributing R packages with mesos cluster
if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with mesos cluster is not supported.")
error("Distributing R packages with mesos cluster is not supported.")
}
// If we're running an R app, set the main class to our specific R runner
@ -799,9 +754,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
private def setRMPrincipal(sparkConf: SparkConf): Unit = {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
// scalastyle:off println
printStream.println(s"Setting ${key} to ${shortUserName}")
// scalastyle:off println
logInfo(s"Setting ${key} to ${shortUserName}")
sparkConf.set(key, shortUserName)
}
@ -817,16 +770,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
sparkConf: SparkConf,
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
printStream.println(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
// scalastyle:on println
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
@ -848,23 +799,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
logWarning(s"Failed to load $childMainClass.", e)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
@ -872,7 +819,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
@ -891,29 +838,90 @@ object SparkSubmit extends CommandLineUtils with Logging {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
throw findCause(t)
}
}
private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
printWarning(s"Local jar $file does not exist, skipping.")
/** Throw a SparkException with the given error message. */
private def error(msg: String): Unit = throw new SparkException(msg)
}
/**
* This entry point is used by the launcher library to start in-process Spark applications.
*/
private[spark] object InProcessSparkSubmit {
def main(args: Array[String]): Unit = {
val submit = new SparkSubmit()
submit.doSubmit(args)
}
}
object SparkSubmit extends CommandLineUtils with Logging {
// Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
// Deploy modes
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
}
case _ =>
printWarning(s"Skip remote jar $uri.")
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
case e: SparkException =>
printErrorAndExit(e.getMessage())
}
}
}
submit.doSubmit(args)
}
/**
@ -962,17 +970,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
res == SparkLauncher.NO_RESOURCE
}
/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
private[deploy] def mergeFileLists(lists: String*): String = {
val merged = lists.filterNot(StringUtils.isBlank)
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
}
}
/** Provides utility functions to be used inside SparkSubmit. */
@ -1000,12 +997,12 @@ private[spark] object SparkSubmitUtils {
override def toString: String = s"$groupId:$artifactId:$version"
}
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.replace("/", ":").split(":")
@ -1304,6 +1301,13 @@ private[spark] object SparkSubmitUtils {
rule
}
def parseSparkConfProperty(pair: String): (String, String) = {
pair.split("=", 2).toSeq match {
case Seq(k, v) => (k, v)
case _ => throw new SparkException(s"Spark config without '=': $pair")
}
}
}
/**

View file

@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try
import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
* The env argument is used for testing.
*/
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser {
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
// scalastyle:off println
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
if (verbose) {
logInfo(s"Using properties file: $propertiesFile")
}
Option(propertiesFile).foreach { filename =>
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Property files may contain sensitive information, so redact before printing
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
logInfo(s"Adding default property: $k=$v")
}
}
}
// scalastyle:on println
defaultProperties
}
// Set parameters from command line arguments
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
parse(args.asJava)
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
logWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
} catch {
case _: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
error(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
error(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case SUBMIT => validateSubmitArguments()
case KILL => validateKillArguments()
case REQUEST_STATUS => validateStatusRequestArguments()
case PRINT_VERSION =>
}
}
@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
printUsageAndExit(-1)
}
if (primaryResource == null) {
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
error("Must specify a primary resource (JAR or Python or R file)")
}
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
error("No main class set in JAR; please specify one with --class")
}
if (driverMemory != null
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
error("Driver memory must be a positive number")
}
if (executorMemory != null
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
error("Executor memory must be a positive number")
}
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
error("Executor cores must be a positive number")
}
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
error("Total executor cores must be a positive number")
}
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
error("--py-files given but primary resource is not a Python script")
}
if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
error(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}
if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
error("Only one of --proxy-user or --principal can be provided.")
}
}
private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
"Killing submissions is only supported in standalone or Mesos mode!")
error("Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
error("Please specify a submission to kill.")
}
}
private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
SparkSubmit.printErrorAndExit(
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
error("Please specify a submission to request status for.")
}
}
@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KILL_SUBMISSION =>
submissionToKill = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
error(s"Action cannot be both $action and $KILL.")
}
action = KILL
case STATUS =>
submissionToRequestStatusFor = value
if (action != null) {
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
error(s"Action cannot be both $action and $REQUEST_STATUS.")
}
action = REQUEST_STATUS
@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value
case CONF =>
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
sparkProperties(confName) = confValue
case PROXY_USER =>
@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
verbose = true
case VERSION =>
SparkSubmit.printVersionAndExit()
action = SparkSubmitAction.PRINT_VERSION
case USAGE_ERROR =>
printUsageAndExit(1)
case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
error(s"Unexpected argument '$opt'.")
}
true
action != SparkSubmitAction.PRINT_VERSION
}
/**
@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
*/
override protected def handleUnknown(opt: String): Boolean = {
if (opt.startsWith("-")) {
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
error(s"Unrecognized option '$opt'.")
}
primaryResource =
@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
// scalastyle:off println
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)
logInfo("Unknown/unsupported param " + unknownParam)
}
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
|Usage: spark-submit --kill [submission ID] --master [spark://...]
|Usage: spark-submit --status [submission ID] --master [spark://...]
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
outStream.println(command)
logInfo(command)
val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
outStream.println(
logInfo(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
)
if (SparkSubmit.isSqlShell(mainClass)) {
outStream.println("CLI options:")
outStream.println(getSqlShellOptions())
logInfo("CLI options:")
logInfo(getSqlShellOptions())
}
// scalastyle:on println
SparkSubmit.exitFn(exitCode)
throw new SparkUserAppException(exitCode)
}
/**
@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}
private def error(msg: String): Unit = throw new SparkException(msg)
}

View file

@ -25,7 +25,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.util._
/**
* Utility object for launching driver programs such that they share fate with the Worker process.
@ -93,7 +93,7 @@ object DriverWrapper extends Logging {
val jars = {
val jarsProp = sys.props.get("spark.jars").orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
} else {
jarsProp
}

View file

@ -33,23 +33,13 @@ private[spark] trait CommandLineUtils {
private[spark] var printStream: PrintStream = System.err
// scalastyle:off println
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String): Unit = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}
private[spark] def printMessage(str: String): Unit = printStream.println(str)
// scalastyle:on println
private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
pair.split("=", 2).toSeq match {
case Seq(k, v) => (k, v)
case _ => printErrorAndExit(s"Spark config without '=': $pair")
throw new SparkException(s"Spark config without '=': $pair")
}
private[spark] def printErrorAndExit(str: String): Unit = {
printMessage("Error: " + str)
printMessage("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}
def main(args: Array[String]): Unit

View file

@ -109,7 +109,7 @@ public class SparkLauncherSuite extends BaseSuite {
.addSparkArg(opts.CONF,
String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-Dfoo=bar -Dtest.appender=childproc")
"-Dfoo=bar -Dtest.appender=console")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
@ -192,6 +192,41 @@ public class SparkLauncherSuite extends BaseSuite {
}
}
@Test
public void testInProcessLauncherDoesNotKillJvm() throws Exception {
SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
List<String[]> wrongArgs = Arrays.asList(
new String[] { "--unknown" },
new String[] { opts.DEPLOY_MODE, "invalid" });
for (String[] args : wrongArgs) {
InProcessLauncher launcher = new InProcessLauncher()
.setAppResource(SparkLauncher.NO_RESOURCE);
switch (args.length) {
case 2:
launcher.addSparkArg(args[0], args[1]);
break;
case 1:
launcher.addSparkArg(args[0]);
break;
default:
fail("FIXME: invalid test.");
}
SparkAppHandle handle = launcher.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.FAILED, handle.getState());
}
// Run --version, which is useless as a use case, but should succeed and not exit the JVM.
// The expected state is "LOST" since "--version" doesn't report state back to the handle.
SparkAppHandle handle = new InProcessLauncher().addSparkArg(opts.VERSION).startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}
public static class SparkLauncherTestApp {
public static void main(String[] args) throws Exception {

View file

@ -42,6 +42,7 @@ import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.scheduler.EventLoggingListener
import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}
@ -109,6 +110,8 @@ class SparkSubmitSuite
private val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
private val submit = new SparkSubmit()
override def beforeEach() {
super.beforeEach()
}
@ -128,13 +131,16 @@ class SparkSubmitSuite
}
test("handle binary specified but not class") {
testPrematureExit(Array("foo.jar"), "No main class")
val jar = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
testPrematureExit(Array(jar.toString()), "No main class")
}
test("handles arguments with --key=val") {
val clArgs = Seq(
"--jars=one.jar,two.jar,three.jar",
"--name=myApp")
"--name=myApp",
"--class=org.FooBar",
SparkLauncher.NO_RESOURCE)
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.jars should include regex (".*one.jar,.*two.jar,.*three.jar")
appArgs.name should be ("myApp")
@ -182,7 +188,7 @@ class SparkSubmitSuite
"thejar.jar"
)
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = prepareSubmitEnvironment(appArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.deployMode should be ("client")
conf.get("spark.submit.deployMode") should be ("client")
@ -192,11 +198,11 @@ class SparkSubmitSuite
"--master", "yarn",
"--deploy-mode", "cluster",
"--conf", "spark.submit.deployMode=client",
"-class", "org.SomeClass",
"--class", "org.SomeClass",
"thejar.jar"
)
val appArgs1 = new SparkSubmitArguments(clArgs1)
val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
appArgs1.deployMode should be ("cluster")
conf1.get("spark.submit.deployMode") should be ("cluster")
@ -210,7 +216,7 @@ class SparkSubmitSuite
val appArgs2 = new SparkSubmitArguments(clArgs2)
appArgs2.deployMode should be (null)
val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
appArgs2.deployMode should be ("client")
conf2.get("spark.submit.deployMode") should be ("client")
}
@ -233,7 +239,7 @@ class SparkSubmitSuite
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
childArgsStr should include ("--class org.SomeClass")
childArgsStr should include ("--arg arg1 --arg arg2")
@ -276,7 +282,7 @@ class SparkSubmitSuite
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (4)
@ -322,7 +328,7 @@ class SparkSubmitSuite
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
appArgs.useRest = useRest
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
if (useRest) {
childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2")
@ -359,7 +365,7 @@ class SparkSubmitSuite
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (1)
@ -381,7 +387,7 @@ class SparkSubmitSuite
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
childArgs.mkString(" ") should be ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath should have length (1)
@ -403,7 +409,7 @@ class SparkSubmitSuite
"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
@ -428,7 +434,7 @@ class SparkSubmitSuite
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, mainClass) = prepareSubmitEnvironment(appArgs)
val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.master") should be ("yarn")
conf.get("spark.submit.deployMode") should be ("cluster")
@ -441,12 +447,12 @@ class SparkSubmitSuite
val clArgs1 = Seq("--class", "org.apache.spark.repl.Main", "spark-shell")
val appArgs1 = new SparkSubmitArguments(clArgs1)
val (_, _, conf1, _) = prepareSubmitEnvironment(appArgs1)
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
conf1.get(UI_SHOW_CONSOLE_PROGRESS) should be (true)
val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = prepareSubmitEnvironment(appArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
}
@ -625,7 +631,7 @@ class SparkSubmitSuite
"--files", files,
"thejar.jar")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.jars should be (Utils.resolveURIs(jars))
appArgs.files should be (Utils.resolveURIs(files))
conf.get("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
@ -640,7 +646,7 @@ class SparkSubmitSuite
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
appArgs2.files should be (Utils.resolveURIs(files))
appArgs2.archives should fullyMatch regex ("file:/archive1,file:.*#archive3")
conf2.get("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
@ -656,7 +662,7 @@ class SparkSubmitSuite
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
conf3.get("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
@ -708,7 +714,7 @@ class SparkSubmitSuite
"thejar.jar"
)
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
conf.get("spark.files") should be(Utils.resolveURIs(files))
@ -725,7 +731,7 @@ class SparkSubmitSuite
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = SparkSubmit.prepareSubmitEnvironment(appArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
conf2.get("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
conf2.get("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
@ -740,7 +746,7 @@ class SparkSubmitSuite
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = SparkSubmit.prepareSubmitEnvironment(appArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
conf3.get("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
@ -757,7 +763,7 @@ class SparkSubmitSuite
"hdfs:///tmp/mister.py"
)
val appArgs4 = new SparkSubmitArguments(clArgs4)
val (_, _, conf4, _) = SparkSubmit.prepareSubmitEnvironment(appArgs4)
val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4)
// Should not format python path for yarn cluster mode
conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
}
@ -778,17 +784,17 @@ class SparkSubmitSuite
}
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
forConfDir(Map("spark.executor.memory" -> "3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
unusedJar.toString)
val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path))
val appArgs = new SparkSubmitArguments(args, env = Map("SPARK_CONF_DIR" -> path))
assert(appArgs.propertiesFile != null)
assert(appArgs.propertiesFile.startsWith(path))
appArgs.executorMemory should be ("2.3g")
appArgs.executorMemory should be ("3g")
}
}
@ -809,6 +815,9 @@ class SparkSubmitSuite
val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
val tempPyFile = File.createTempFile("tmpApp", ".py")
tempPyFile.deleteOnExit()
val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
@ -818,10 +827,10 @@ class SparkSubmitSuite
"--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
"--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
"--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
jar2.toString)
tempPyFile.toURI().toString())
val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.yarn.dist.jars").split(",").toSet should be
(Set(jar1.toURI.toString, jar2.toURI.toString))
conf.get("spark.yarn.dist.files").split(",").toSet should be
@ -947,7 +956,7 @@ class SparkSubmitSuite
)
val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
// All the resources should still be remote paths, so that YARN client will not upload again.
conf.get("spark.yarn.dist.jars") should be (tmpJarPath)
@ -1007,7 +1016,7 @@ class SparkSubmitSuite
) ++ forceDownloadArgs ++ Seq(s"s3a://$mainResource")
val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
val jars = conf.get("spark.yarn.dist.jars").split(",").toSet
@ -1058,7 +1067,7 @@ class SparkSubmitSuite
"hello")
val exception = intercept[SparkException] {
SparkSubmit.main(args)
submit.doSubmit(args)
}
assert(exception.getMessage() === "hello")

View file

@ -445,7 +445,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
"--class", mainClass,
mainJar) ++ appArgs
val args = new SparkSubmitArguments(commandLineArgs)
val (_, _, sparkConf, _) = SparkSubmit.prepareSubmitEnvironment(args)
val (_, _, sparkConf, _) = new SparkSubmit().prepareSubmitEnvironment(args)
new RestSubmissionClient("spark://host:port").constructSubmitRequest(
mainJar, mainClass, appArgs, sparkConf.getAll.toMap, Map.empty)
}

View file

@ -139,7 +139,7 @@ public abstract class AbstractLauncher<T extends AbstractLauncher> {
public T addSparkArg(String arg) {
SparkSubmitOptionParser validator = new ArgumentValidator(false);
validator.parse(Arrays.asList(arg));
builder.sparkArgs.add(arg);
builder.userArgs.add(arg);
return self();
}
@ -187,8 +187,8 @@ public abstract class AbstractLauncher<T extends AbstractLauncher> {
}
} else {
validator.parse(Arrays.asList(name, value));
builder.sparkArgs.add(name);
builder.sparkArgs.add(value);
builder.userArgs.add(name);
builder.userArgs.add(value);
}
return self();
}

View file

@ -89,10 +89,18 @@ public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
}
Class<?> sparkSubmit;
// SPARK-22941: first try the new SparkSubmit interface that has better error handling,
// but fall back to the old interface in case someone is mixing & matching launcher and
// Spark versions.
try {
sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
} catch (Exception e) {
throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.", e);
sparkSubmit = cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
} catch (Exception e1) {
try {
sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
} catch (Exception e2) {
throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.",
e2);
}
}
Method main;

View file

@ -88,8 +88,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
SparkLauncher.NO_RESOURCE);
}
final List<String> sparkArgs;
private final boolean isAppResourceReq;
final List<String> userArgs;
private final List<String> parsedArgs;
private final boolean requiresAppResource;
private final boolean isExample;
/**
@ -99,17 +100,27 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
*/
private boolean allowsMixedArguments;
/**
* This constructor is used when creating a user-configurable launcher. It allows the
* spark-submit argument list to be modified after creation.
*/
SparkSubmitCommandBuilder() {
this.sparkArgs = new ArrayList<>();
this.isAppResourceReq = true;
this.requiresAppResource = true;
this.isExample = false;
this.parsedArgs = new ArrayList<>();
this.userArgs = new ArrayList<>();
}
/**
* This constructor is used when invoking spark-submit; it parses and validates arguments
* provided by the user on the command line.
*/
SparkSubmitCommandBuilder(List<String> args) {
this.allowsMixedArguments = false;
this.sparkArgs = new ArrayList<>();
this.parsedArgs = new ArrayList<>();
boolean isExample = false;
List<String> submitArgs = args;
this.userArgs = Collections.emptyList();
if (args.size() > 0) {
switch (args.get(0)) {
@ -131,21 +142,21 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
}
this.isExample = isExample;
OptionParser parser = new OptionParser();
OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
this.isAppResourceReq = parser.isAppResourceReq;
} else {
this.requiresAppResource = parser.requiresAppResource;
} else {
this.isExample = isExample;
this.isAppResourceReq = false;
this.requiresAppResource = false;
}
}
@Override
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
if (PYSPARK_SHELL.equals(appResource) && requiresAppResource) {
return buildPySparkShellCommand(env);
} else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
} else if (SPARKR_SHELL.equals(appResource) && requiresAppResource) {
return buildSparkRCommand(env);
} else {
return buildSparkSubmitCommand(env);
@ -154,9 +165,19 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
List<String> buildSparkSubmitArgs() {
List<String> args = new ArrayList<>();
SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
OptionParser parser = new OptionParser(false);
final boolean requiresAppResource;
if (!allowsMixedArguments && isAppResourceReq) {
// If the user args array is not empty, we need to parse it to detect exactly what
// the user is trying to run, so that checks below are correct.
if (!userArgs.isEmpty()) {
parser.parse(userArgs);
requiresAppResource = parser.requiresAppResource;
} else {
requiresAppResource = this.requiresAppResource;
}
if (!allowsMixedArguments && requiresAppResource) {
checkArgument(appResource != null, "Missing application resource.");
}
@ -208,15 +229,16 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
args.add(join(",", pyFiles));
}
if (isAppResourceReq) {
checkArgument(!isExample || mainClass != null, "Missing example class name.");
if (isExample) {
checkArgument(mainClass != null, "Missing example class name.");
}
if (mainClass != null) {
args.add(parser.CLASS);
args.add(mainClass);
}
args.addAll(sparkArgs);
args.addAll(parsedArgs);
if (appResource != null) {
args.add(appResource);
}
@ -399,7 +421,12 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
private class OptionParser extends SparkSubmitOptionParser {
boolean isAppResourceReq = true;
boolean requiresAppResource = true;
private final boolean errorOnUnknownArgs;
OptionParser(boolean errorOnUnknownArgs) {
this.errorOnUnknownArgs = errorOnUnknownArgs;
}
@Override
protected boolean handle(String opt, String value) {
@ -443,23 +470,23 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
break;
case KILL_SUBMISSION:
case STATUS:
isAppResourceReq = false;
sparkArgs.add(opt);
sparkArgs.add(value);
requiresAppResource = false;
parsedArgs.add(opt);
parsedArgs.add(value);
break;
case HELP:
case USAGE_ERROR:
isAppResourceReq = false;
sparkArgs.add(opt);
requiresAppResource = false;
parsedArgs.add(opt);
break;
case VERSION:
isAppResourceReq = false;
sparkArgs.add(opt);
requiresAppResource = false;
parsedArgs.add(opt);
break;
default:
sparkArgs.add(opt);
parsedArgs.add(opt);
if (value != null) {
sparkArgs.add(value);
parsedArgs.add(value);
}
break;
}
@ -483,12 +510,13 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
mainClass = className;
appResource = SparkLauncher.NO_RESOURCE;
return false;
} else {
} else if (errorOnUnknownArgs) {
checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
checkState(appResource == null, "Found unrecognized argument but resource is already set.");
appResource = opt;
return false;
}
return true;
}
@Override

View file

@ -36,12 +36,17 @@ object MimaExcludes {
// Exclude rules for 2.4.x
lazy val v24excludes = v23excludes ++ Seq(
// [SPARK-22941][core] Do not exit JVM when submit fails with in-process launcher.
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printWarning"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.parseSparkConfProperty"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printVersionAndExit"),
// [SPARK-23412][ML] Add cosine distance measure to BisectingKmeans
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.org$apache$spark$ml$param$shared$HasDistanceMeasure$_setter_$distanceMeasure_="),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.getDistanceMeasure"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasDistanceMeasure.distanceMeasure"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel#SaveLoadV1_0.load"),
// [SPARK-20659] Remove StorageStatus, or make it private
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.totalOffHeapStorageMemory"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.usedOffHeapStorageMemory"),

View file

@ -19,7 +19,7 @@ package org.apache.spark.deploy.mesos
import java.util.concurrent.CountDownLatch
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
@ -100,7 +100,13 @@ private[mesos] object MesosClusterDispatcher
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
val dispatcherArgs = try {
new MesosClusterDispatcherArguments(args, conf)
} catch {
case e: SparkException =>
printErrorAndExit(e.getMessage())
null
}
conf.setMaster(dispatcherArgs.masterUrl)
conf.setAppName(dispatcherArgs.name)
dispatcherArgs.zookeeperUrl.foreach { z =>

View file

@ -21,6 +21,7 @@ import scala.annotation.tailrec
import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.util.{IntParam, Utils}
private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
@ -95,9 +96,8 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
parse(tail)
case ("--conf") :: value :: tail =>
val pair = MesosClusterDispatcher.
parseSparkConfProperty(value)
confProperties(pair._1) = pair._2
val (k, v) = SparkSubmitUtils.parseSparkConfProperty(value)
confProperties(k) = v
parse(tail)
case ("--help") :: tail =>