[SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client

## What changes were proposed in this pull request?

This patch fixes the issue that ClientEndpoint in standalone cluster doesn't recognize about driver options which are passed to SparkConf instead of system properties. When `Client` is executed via cli they should be provided as system properties, but with `spark-submit` they can be provided as SparkConf. (SpartSubmit will call `ClientApp.start` with SparkConf which would contain these options.)

## How was this patch tested?

Manually tested via following steps:

1) setup standalone cluster (launch master and worker via `./sbin/start-all.sh`)

2) submit one of example app with standalone cluster mode

```
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master "spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" --deploy-mode "cluster" --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
```

3) check whether `foo=BAR` is provided in system properties in Spark UI

<img width="877" alt="Screen Shot 2019-03-21 at 8 18 04 AM" src="https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png">

Closes #24163 from HeartSaVioR/SPARK-26606.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-03-22 15:07:49 -07:00 committed by Marcelo Vanzin
parent 34e3cc7060
commit 8a9eb05137

View file

@ -61,6 +61,10 @@ private class ClientEndpoint(
private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
}
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
@ -70,18 +74,19 @@ private class ClientEndpoint(
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = config.DRIVER_CLASS_PATH.key
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,