[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor.

## What changes were proposed in this pull request?

This PR adds two new config properties: `spark.driver.defaultJavaOptions` and `spark.executor.defaultJavaOptions`. These are intended to be set by administrators in a file of defaults for options like JVM garbage collection algorithm. Users will still set `extraJavaOptions` properties, and both sets of JVM options will be added to start a JVM (default options are prepended to extra options).

## How was this patch tested?

Existing + additional unit tests.
```
cd docs/
SKIP_API=1 jekyll build
```
Manual webpage check.

Closes #24804 from gaborgsomogyi/SPARK-23472.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Gabor Somogyi 2019-07-11 09:37:26 -07:00 committed by Marcelo Vanzin
parent d47c219f94
commit f83000597f
12 changed files with 299 additions and 43 deletions

View file

@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.internal.config
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@ -135,6 +136,7 @@ private[rest] class StandaloneSubmitRequestServlet(
val sparkProperties = request.sparkProperties
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
@ -160,9 +162,11 @@ private[rest] class StandaloneSubmitRequestServlet(
.set("spark.master", updatedMasters)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString)
.getOrElse(Seq.empty)
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper

View file

@ -127,8 +127,9 @@ private[spark] class TypedConfigBuilder[T](
/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
stringConverter, parent._doc, parent._public)
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public)
parent._onCreate.foreach(_(entry))
entry
}
@ -141,8 +142,9 @@ private[spark] class TypedConfigBuilder[T](
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
transformedDefault, converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
@ -150,8 +152,9 @@ private[spark] class TypedConfigBuilder[T](
/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}
@ -161,8 +164,9 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
@ -178,6 +182,8 @@ private[spark] case class ConfigBuilder(key: String) {
import ConfigHelpers._
private[config] var _prependedKey: Option[String] = None
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
@ -202,24 +208,34 @@ private[spark] case class ConfigBuilder(key: String) {
this
}
def withPrepended(key: String, separator: String = " "): ConfigBuilder = {
_prependedKey = Option(key)
_prependSeparator = separator
this
}
def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}
def intConf: TypedConfigBuilder[Int] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
def longConf: TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
}
def doubleConf: TypedConfigBuilder[Double] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
}
def booleanConf: TypedConfigBuilder[Boolean] = {
checkPrependConfig
new TypedConfigBuilder(this, toBoolean(_, key))
}
@ -228,20 +244,30 @@ private[spark] case class ConfigBuilder(key: String) {
}
def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
}
def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
}
def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, fallback)
_onCreate.foreach(_(entry))
entry
}
def regexConf: TypedConfigBuilder[Regex] = {
checkPrependConfig
new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString)
}
private def checkPrependConfig = {
if (_prependedKey.isDefined) {
throw new IllegalArgumentException(s"$key type must be string if prepend used")
}
}
}

View file

@ -28,6 +28,8 @@ package org.apache.spark.internal.config
* value declared as a string.
*
* @param key the key for the configuration
* @param prependedKey the key for the configuration which will be prepended
* @param prependSeparator the separator which is used for prepending
* @param valueConverter how to convert a string to the value. It should throw an exception if the
* string does not have the required format.
* @param stringConverter how to convert a value to a string that the user can use it as a valid
@ -41,6 +43,8 @@ package org.apache.spark.internal.config
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val prependedKey: Option[String],
val prependSeparator: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
@ -54,7 +58,15 @@ private[spark] abstract class ConfigEntry[T] (
def defaultValueString: String
protected def readString(reader: ConfigReader): Option[String] = {
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
val values = Seq(
prependedKey.flatMap(reader.get(_)),
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
).flatten
if (values.nonEmpty) {
Some(values.mkString(prependSeparator))
} else {
None
}
}
def readFrom(reader: ConfigReader): T
@ -68,13 +80,24 @@ private[spark] abstract class ConfigEntry[T] (
private class ConfigEntryWithDefault[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(_defaultValue)
@ -86,14 +109,25 @@ private class ConfigEntryWithDefault[T] (
}
private class ConfigEntryWithDefaultFunction[T] (
key: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(_defaultFunction())
@ -106,13 +140,24 @@ private class ConfigEntryWithDefaultFunction[T] (
private class ConfigEntryWithDefaultString[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
@ -130,14 +175,23 @@ private class ConfigEntryWithDefaultString[T] (
*/
private[spark] class OptionalConfigEntry[T](
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry[Option[T]](key, alternatives,
extends ConfigEntry[Option[T]](
key,
prependedKey,
prependSeparator,
alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {
v => v.map(rawStringConverter).orNull,
doc,
isPublic
) {
override def defaultValueString: String = ConfigEntry.UNDEFINED
@ -151,12 +205,22 @@ private[spark] class OptionalConfigEntry[T](
*/
private[spark] class FallbackConfigEntry[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
extends ConfigEntry[T](
key,
prependedKey,
prependSeparator,
alternatives,
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic
) {
override def defaultValueString: String = s"<value of ${fallback.key}>"

View file

@ -48,7 +48,10 @@ package object config {
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val DRIVER_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
.stringConf
.createOptional
private[spark] val DRIVER_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
@ -174,7 +177,10 @@ package object config {
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)
private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)
.stringConf
.createOptional
private[spark] val EXECUTOR_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional

View file

@ -70,8 +70,8 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: fallback") {
val conf = new SparkConf()
val parentConf = ConfigBuilder(testKey("parent")).intConf.createWithDefault(1)
val confWithFallback = ConfigBuilder(testKey("fallback")).fallbackConf(parentConf)
val parentConf = ConfigBuilder(testKey("parent1")).intConf.createWithDefault(1)
val confWithFallback = ConfigBuilder(testKey("fallback1")).fallbackConf(parentConf)
assert(conf.get(confWithFallback) === 1)
conf.set(confWithFallback, 2)
assert(conf.get(parentConf) === 1)
@ -289,6 +289,92 @@ class ConfigEntrySuite extends SparkFunSuite {
assert(conf.get(iConf) === 3)
}
test("conf entry: prepend with default separator") {
val conf = new SparkConf()
val prependedKey = testKey("prepended1")
val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional
val derivedConf = ConfigBuilder(testKey("prepend1"))
.withPrepended(prependedKey)
.stringConf
.createOptional
conf.set(derivedConf, "1")
assert(conf.get(derivedConf) === Some("1"))
conf.set(prependedConf, "2")
assert(conf.get(derivedConf) === Some("2 1"))
}
test("conf entry: prepend with custom separator") {
val conf = new SparkConf()
val prependedKey = testKey("prepended2")
val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional
val derivedConf = ConfigBuilder(testKey("prepend2"))
.withPrepended(prependedKey, ",")
.stringConf
.createOptional
conf.set(derivedConf, "1")
assert(conf.get(derivedConf) === Some("1"))
conf.set(prependedConf, "2")
assert(conf.get(derivedConf) === Some("2,1"))
}
test("conf entry: prepend with fallback") {
val conf = new SparkConf()
val prependedKey = testKey("prepended3")
val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional
val derivedConf = ConfigBuilder(testKey("prepend3"))
.withPrepended(prependedKey)
.stringConf
.createOptional
val confWithFallback = ConfigBuilder(testKey("fallback2")).fallbackConf(derivedConf)
assert(conf.get(confWithFallback) === None)
conf.set(derivedConf, "1")
assert(conf.get(confWithFallback) === Some("1"))
conf.set(prependedConf, "2")
assert(conf.get(confWithFallback) === Some("2 1"))
conf.set(confWithFallback, Some("3"))
assert(conf.get(confWithFallback) === Some("3"))
}
test("conf entry: prepend should work only with string type") {
var i = 0
def testPrependFail(createConf: (String, String) => Unit): Unit = {
intercept[IllegalArgumentException] {
createConf(testKey(s"prependedFail$i"), testKey(s"prependFail$i"))
}.getMessage.contains("type must be string if prepend used")
i += 1
}
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).intConf
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).longConf
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).doubleConf
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).booleanConf
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).timeConf(TimeUnit.MILLISECONDS)
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).bytesConf(ByteUnit.BYTE)
)
testPrependFail( (prependedKey, prependKey) =>
ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).regexConf
)
}
test("onCreate") {
var onCreateCalled = false
ConfigBuilder(testKey("oc1")).onCreate(_ => onCreateCalled = true).intConf.createWithDefault(1)

View file

@ -411,10 +411,13 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.driver.extraJavaOptions</code></td>
<td><code>spark.driver.defaultJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to the driver. For instance, GC settings or other logging.
A string of default JVM options to prepend to <code>spark.driver.extraJavaOptions</code>.
This is intended to be set by administrators.
For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.
@ -425,6 +428,25 @@ Apart from these, the following properties are also available, and may be useful
your default properties file.
</td>
</tr>
<tr>
<td><code>spark.driver.extraJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to the driver. This is intended to be set by users.
For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.
<br /><em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-java-options</code> command line option or in
your default properties file.
<code>spark.driver.defaultJavaOptions</code> will be prepended to this configuration.
</td>
</tr>
<tr>
<td><code>spark.driver.extraLibraryPath</code></td>
<td>(none)</td>
@ -458,10 +480,13 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.executor.extraJavaOptions</code></td>
<td><code>spark.executor.defaultJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to executors. For instance, GC settings or other logging.
A string of default JVM options to prepend to <code>spark.executor.extraJavaOptions</code>.
This is intended to be set by administrators.
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.
@ -472,6 +497,25 @@ Apart from these, the following properties are also available, and may be useful
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
</td>
</tr>
<tr>
<td><code>spark.executor.extraJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to executors. This is intended to be set by users.
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.
The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by
application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable
verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of:
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
<code>spark.executor.defaultJavaOptions</code> will be prepended to this configuration.
</td>
</tr>
<tr>
<td><code>spark.executor.extraLibraryPath</code></td>
<td>(none)</td>

View file

@ -248,7 +248,7 @@ Our experience suggests that the effect of GC tuning depends on your application
There are [many more tuning options](https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/index.html) described online,
but at a high level, managing how frequently full GC takes place can help in reducing the overhead.
GC tuning flags for executors can be specified by setting `spark.executor.extraJavaOptions` in
GC tuning flags for executors can be specified by setting `spark.executor.defaultJavaOptions` or `spark.executor.extraJavaOptions` in
a job's configuration.
# Other Considerations

View file

@ -48,6 +48,8 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
public static final String DRIVER_MEMORY = "spark.driver.memory";
/** Configuration key for the driver class path. */
public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
/** Configuration key for the default driver VM options. */
public static final String DRIVER_DEFAULT_JAVA_OPTIONS = "spark.driver.defaultJavaOptions";
/** Configuration key for the driver VM options. */
public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
/** Configuration key for the driver native library path. */
@ -57,6 +59,8 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
public static final String EXECUTOR_MEMORY = "spark.executor.memory";
/** Configuration key for the executor class path. */
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
/** Configuration key for the default executor VM options. */
public static final String EXECUTOR_DEFAULT_JAVA_OPTIONS = "spark.executor.defaultJavaOptions";
/** Configuration key for the executor VM options. */
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
/** Configuration key for the executor native library path. */

View file

@ -267,13 +267,10 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS);
checkJavaOptions(driverDefaultJavaOptions);
String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
"java options (was %s). Use the corresponding --driver-memory or " +
"spark.driver.memory configuration instead.", driverExtraJavaOptions);
throw new IllegalArgumentException(msg);
}
checkJavaOptions(driverExtraJavaOptions);
if (isClientMode) {
// Figuring out where the memory value come from is a little tricky due to precedence.
@ -289,6 +286,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
addOptionString(cmd, driverDefaultJavaOptions);
addOptionString(cmd, driverExtraJavaOptions);
mergeEnvPathList(env, getLibPathEnvName(),
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
@ -299,6 +297,15 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
return cmd;
}
private void checkJavaOptions(String javaOptions) {
if (!isEmpty(javaOptions) && javaOptions.contains("Xmx")) {
String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
"java options (was %s). Use the corresponding --driver-memory or " +
"spark.driver.memory configuration instead.", javaOptions);
throw new IllegalArgumentException(msg);
}
}
private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
// For backwards compatibility, if a script is specified in
// the pyspark command line, then run it using spark-submit.

View file

@ -251,6 +251,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
}
private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception {
final String DRIVER_DEFAULT_PARAM = "-Ddriver-default";
final String DRIVER_EXTRA_PARAM = "-Ddriver-extra";
String deployMode = isDriver ? "client" : "cluster";
SparkSubmitCommandBuilder launcher =
@ -270,7 +272,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath());
launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver");
launcher.conf.put(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS, DRIVER_DEFAULT_PARAM);
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_PARAM);
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
} else {
launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home")
@ -284,6 +287,9 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
if (isDriver) {
assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
assertTrue("Driver default options should be configured.",
cmd.contains(DRIVER_DEFAULT_PARAM));
assertTrue("Driver extra options should be configured.", cmd.contains(DRIVER_EXTRA_PARAM));
} else {
boolean found = false;
for (String arg : cmd) {
@ -293,6 +299,10 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
}
}
assertFalse("Memory arguments should not be set.", found);
assertFalse("Driver default options should not be configured.",
cmd.contains(DRIVER_DEFAULT_PARAM));
assertFalse("Driver extra options should not be configured.",
cmd.contains(DRIVER_EXTRA_PARAM));
}
String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));

View file

@ -17,5 +17,6 @@
spark.driver.memory=1g
spark.driver.extraClassPath=/driver
spark.driver.extraJavaOptions=-Ddriver
spark.driver.defaultJavaOptions=-Ddriver-default
spark.driver.extraJavaOptions=-Ddriver-extra
spark.driver.extraLibraryPath=/native

View file

@ -28,6 +28,7 @@ import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.config
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.util.Utils
@ -97,6 +98,7 @@ private[mesos] class MesosSubmitRequestServlet(
// Optional fields
val sparkProperties = request.sparkProperties
val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
@ -110,9 +112,11 @@ private[mesos] class MesosSubmitRequestServlet(
val conf = new SparkConf(false).setAll(sparkProperties)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString)
.getOrElse(Seq.empty)
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts
val command = new Command(
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)