From f83000597f250868de9722d8285fed013abc5ecf Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 11 Jul 2019 09:37:26 -0700 Subject: [PATCH] [SPARK-23472][CORE] Add defaultJavaOptions for driver and executor. ## What changes were proposed in this pull request? This PR adds two new config properties: `spark.driver.defaultJavaOptions` and `spark.executor.defaultJavaOptions`. These are intended to be set by administrators in a file of defaults for options like JVM garbage collection algorithm. Users will still set `extraJavaOptions` properties, and both sets of JVM options will be added to start a JVM (default options are prepended to extra options). ## How was this patch tested? Existing + additional unit tests. ``` cd docs/ SKIP_API=1 jekyll build ``` Manual webpage check. Closes #24804 from gaborgsomogyi/SPARK-23472. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- .../deploy/rest/StandaloneRestServer.scala | 6 +- .../spark/internal/config/ConfigBuilder.scala | 44 +++++++-- .../spark/internal/config/ConfigEntry.scala | 94 ++++++++++++++++--- .../spark/internal/config/package.scala | 10 +- .../internal/config/ConfigEntrySuite.scala | 90 +++++++++++++++++- docs/configuration.md | 52 +++++++++- docs/tuning.md | 2 +- .../apache/spark/launcher/SparkLauncher.java | 4 + .../launcher/SparkSubmitCommandBuilder.java | 19 ++-- .../SparkSubmitCommandBuilderSuite.java | 12 ++- .../src/test/resources/spark-defaults.conf | 3 +- .../deploy/rest/mesos/MesosRestServer.scala | 6 +- 12 files changed, 299 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index a70754c6e2..f912ed64c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ import org.apache.spark.internal.config +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -135,6 +136,7 @@ private[rest] class StandaloneSubmitRequestServlet( val sparkProperties = request.sparkProperties val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) val driverCores = sparkProperties.get(config.DRIVER_CORES.key) + val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS) val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key) val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key) val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) @@ -160,9 +162,11 @@ private[rest] class StandaloneSubmitRequestServlet( .set("spark.master", updatedMasters) val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) + val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString) + .getOrElse(Seq.empty) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts + val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts val command = new Command( "org.apache.spark.deploy.worker.DriverWrapper", Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index f27aca0377..68e1994f0f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -127,8 +127,9 @@ private[spark] class TypedConfigBuilder[T]( /** Creates a [[ConfigEntry]] that does not have a default value. */ def createOptional: OptionalConfigEntry[T] = { - val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter, - stringConverter, parent._doc, parent._public) + val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey, + parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc, + parent._public) parent._onCreate.foreach(_(entry)) entry } @@ -141,8 +142,9 @@ private[spark] class TypedConfigBuilder[T]( createWithDefaultString(default.asInstanceOf[String]) } else { val transformedDefault = converter(stringConverter(default)) - val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives, - transformedDefault, converter, stringConverter, parent._doc, parent._public) + val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey, + parent._prependSeparator, parent._alternatives, transformedDefault, converter, + stringConverter, parent._doc, parent._public) parent._onCreate.foreach(_(entry)) entry } @@ -150,8 +152,9 @@ private[spark] class TypedConfigBuilder[T]( /** Creates a [[ConfigEntry]] with a function to determine the default value */ def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = { - val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc, - converter, stringConverter, parent._doc, parent._public) + val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey, + parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter, + parent._doc, parent._public) parent._onCreate.foreach(_ (entry)) entry } @@ -161,8 +164,9 @@ private[spark] class TypedConfigBuilder[T]( * [[String]] and must be a valid value for the entry. */ def createWithDefaultString(default: String): ConfigEntry[T] = { - val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default, - converter, stringConverter, parent._doc, parent._public) + val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey, + parent._prependSeparator, parent._alternatives, default, converter, stringConverter, + parent._doc, parent._public) parent._onCreate.foreach(_(entry)) entry } @@ -178,6 +182,8 @@ private[spark] case class ConfigBuilder(key: String) { import ConfigHelpers._ + private[config] var _prependedKey: Option[String] = None + private[config] var _prependSeparator: String = "" private[config] var _public = true private[config] var _doc = "" private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None @@ -202,24 +208,34 @@ private[spark] case class ConfigBuilder(key: String) { this } + def withPrepended(key: String, separator: String = " "): ConfigBuilder = { + _prependedKey = Option(key) + _prependSeparator = separator + this + } + def withAlternative(key: String): ConfigBuilder = { _alternatives = _alternatives :+ key this } def intConf: TypedConfigBuilder[Int] = { + checkPrependConfig new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int")) } def longConf: TypedConfigBuilder[Long] = { + checkPrependConfig new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long")) } def doubleConf: TypedConfigBuilder[Double] = { + checkPrependConfig new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double")) } def booleanConf: TypedConfigBuilder[Boolean] = { + checkPrependConfig new TypedConfigBuilder(this, toBoolean(_, key)) } @@ -228,20 +244,30 @@ private[spark] case class ConfigBuilder(key: String) { } def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = { + checkPrependConfig new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit)) } def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = { + checkPrependConfig new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit)) } def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = { - val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback) + val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc, + _public, fallback) _onCreate.foreach(_(entry)) entry } def regexConf: TypedConfigBuilder[Regex] = { + checkPrependConfig new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString) } + + private def checkPrependConfig = { + if (_prependedKey.isDefined) { + throw new IllegalArgumentException(s"$key type must be string if prepend used") + } + } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index ede3ace4f9..c5df4c8820 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -28,6 +28,8 @@ package org.apache.spark.internal.config * value declared as a string. * * @param key the key for the configuration + * @param prependedKey the key for the configuration which will be prepended + * @param prependSeparator the separator which is used for prepending * @param valueConverter how to convert a string to the value. It should throw an exception if the * string does not have the required format. * @param stringConverter how to convert a value to a string that the user can use it as a valid @@ -41,6 +43,8 @@ package org.apache.spark.internal.config */ private[spark] abstract class ConfigEntry[T] ( val key: String, + val prependedKey: Option[String], + val prependSeparator: String, val alternatives: List[String], val valueConverter: String => T, val stringConverter: T => String, @@ -54,7 +58,15 @@ private[spark] abstract class ConfigEntry[T] ( def defaultValueString: String protected def readString(reader: ConfigReader): Option[String] = { - alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey))) + val values = Seq( + prependedKey.flatMap(reader.get(_)), + alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey))) + ).flatten + if (values.nonEmpty) { + Some(values.mkString(prependSeparator)) + } else { + None + } } def readFrom(reader: ConfigReader): T @@ -68,13 +80,24 @@ private[spark] abstract class ConfigEntry[T] ( private class ConfigEntryWithDefault[T] ( key: String, + prependedKey: Option[String], + prependSeparator: String, alternatives: List[String], _defaultValue: T, valueConverter: String => T, stringConverter: T => String, doc: String, isPublic: Boolean) - extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) { + extends ConfigEntry( + key, + prependedKey, + prependSeparator, + alternatives, + valueConverter, + stringConverter, + doc, + isPublic + ) { override def defaultValue: Option[T] = Some(_defaultValue) @@ -86,14 +109,25 @@ private class ConfigEntryWithDefault[T] ( } private class ConfigEntryWithDefaultFunction[T] ( - key: String, - alternatives: List[String], - _defaultFunction: () => T, - valueConverter: String => T, - stringConverter: T => String, - doc: String, - isPublic: Boolean) - extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) { + key: String, + prependedKey: Option[String], + prependSeparator: String, + alternatives: List[String], + _defaultFunction: () => T, + valueConverter: String => T, + stringConverter: T => String, + doc: String, + isPublic: Boolean) + extends ConfigEntry( + key, + prependedKey, + prependSeparator, + alternatives, + valueConverter, + stringConverter, + doc, + isPublic + ) { override def defaultValue: Option[T] = Some(_defaultFunction()) @@ -106,13 +140,24 @@ private class ConfigEntryWithDefaultFunction[T] ( private class ConfigEntryWithDefaultString[T] ( key: String, + prependedKey: Option[String], + prependSeparator: String, alternatives: List[String], _defaultValue: String, valueConverter: String => T, stringConverter: T => String, doc: String, isPublic: Boolean) - extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) { + extends ConfigEntry( + key, + prependedKey, + prependSeparator, + alternatives, + valueConverter, + stringConverter, + doc, + isPublic + ) { override def defaultValue: Option[T] = Some(valueConverter(_defaultValue)) @@ -130,14 +175,23 @@ private class ConfigEntryWithDefaultString[T] ( */ private[spark] class OptionalConfigEntry[T]( key: String, + prependedKey: Option[String], + prependSeparator: String, alternatives: List[String], val rawValueConverter: String => T, val rawStringConverter: T => String, doc: String, isPublic: Boolean) - extends ConfigEntry[Option[T]](key, alternatives, + extends ConfigEntry[Option[T]]( + key, + prependedKey, + prependSeparator, + alternatives, s => Some(rawValueConverter(s)), - v => v.map(rawStringConverter).orNull, doc, isPublic) { + v => v.map(rawStringConverter).orNull, + doc, + isPublic + ) { override def defaultValueString: String = ConfigEntry.UNDEFINED @@ -151,12 +205,22 @@ private[spark] class OptionalConfigEntry[T]( */ private[spark] class FallbackConfigEntry[T] ( key: String, + prependedKey: Option[String], + prependSeparator: String, alternatives: List[String], doc: String, isPublic: Boolean, val fallback: ConfigEntry[T]) - extends ConfigEntry[T](key, alternatives, - fallback.valueConverter, fallback.stringConverter, doc, isPublic) { + extends ConfigEntry[T]( + key, + prependedKey, + prependSeparator, + alternatives, + fallback.valueConverter, + fallback.stringConverter, + doc, + isPublic + ) { override def defaultValueString: String = s"" diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7c332fdb85..488886f162 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -48,7 +48,10 @@ package object config { ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional private[spark] val DRIVER_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS) + .withPrepended(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS) + .stringConf + .createOptional private[spark] val DRIVER_LIBRARY_PATH = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional @@ -174,7 +177,10 @@ package object config { ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60) private[spark] val EXECUTOR_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS) + .withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS) + .stringConf + .createOptional private[spark] val EXECUTOR_LIBRARY_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 02514dc7da..c3bfa7ddee 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -70,8 +70,8 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: fallback") { val conf = new SparkConf() - val parentConf = ConfigBuilder(testKey("parent")).intConf.createWithDefault(1) - val confWithFallback = ConfigBuilder(testKey("fallback")).fallbackConf(parentConf) + val parentConf = ConfigBuilder(testKey("parent1")).intConf.createWithDefault(1) + val confWithFallback = ConfigBuilder(testKey("fallback1")).fallbackConf(parentConf) assert(conf.get(confWithFallback) === 1) conf.set(confWithFallback, 2) assert(conf.get(parentConf) === 1) @@ -289,6 +289,92 @@ class ConfigEntrySuite extends SparkFunSuite { assert(conf.get(iConf) === 3) } + test("conf entry: prepend with default separator") { + val conf = new SparkConf() + val prependedKey = testKey("prepended1") + val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional + val derivedConf = ConfigBuilder(testKey("prepend1")) + .withPrepended(prependedKey) + .stringConf + .createOptional + + conf.set(derivedConf, "1") + assert(conf.get(derivedConf) === Some("1")) + + conf.set(prependedConf, "2") + assert(conf.get(derivedConf) === Some("2 1")) + } + + test("conf entry: prepend with custom separator") { + val conf = new SparkConf() + val prependedKey = testKey("prepended2") + val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional + val derivedConf = ConfigBuilder(testKey("prepend2")) + .withPrepended(prependedKey, ",") + .stringConf + .createOptional + + conf.set(derivedConf, "1") + assert(conf.get(derivedConf) === Some("1")) + + conf.set(prependedConf, "2") + assert(conf.get(derivedConf) === Some("2,1")) + } + + test("conf entry: prepend with fallback") { + val conf = new SparkConf() + val prependedKey = testKey("prepended3") + val prependedConf = ConfigBuilder(prependedKey).stringConf.createOptional + val derivedConf = ConfigBuilder(testKey("prepend3")) + .withPrepended(prependedKey) + .stringConf + .createOptional + val confWithFallback = ConfigBuilder(testKey("fallback2")).fallbackConf(derivedConf) + + assert(conf.get(confWithFallback) === None) + + conf.set(derivedConf, "1") + assert(conf.get(confWithFallback) === Some("1")) + + conf.set(prependedConf, "2") + assert(conf.get(confWithFallback) === Some("2 1")) + + conf.set(confWithFallback, Some("3")) + assert(conf.get(confWithFallback) === Some("3")) + } + + test("conf entry: prepend should work only with string type") { + var i = 0 + def testPrependFail(createConf: (String, String) => Unit): Unit = { + intercept[IllegalArgumentException] { + createConf(testKey(s"prependedFail$i"), testKey(s"prependFail$i")) + }.getMessage.contains("type must be string if prepend used") + i += 1 + } + + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).intConf + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).longConf + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).doubleConf + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).booleanConf + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).timeConf(TimeUnit.MILLISECONDS) + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).bytesConf(ByteUnit.BYTE) + ) + testPrependFail( (prependedKey, prependKey) => + ConfigBuilder(testKey(prependKey)).withPrepended(prependedKey).regexConf + ) + } + test("onCreate") { var onCreateCalled = false ConfigBuilder(testKey("oc1")).onCreate(_ => onCreateCalled = true).intConf.createWithDefault(1) diff --git a/docs/configuration.md b/docs/configuration.md index 211dfbb3f4..06b040866f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -411,10 +411,13 @@ Apart from these, the following properties are also available, and may be useful - spark.driver.extraJavaOptions + spark.driver.defaultJavaOptions (none) - A string of extra JVM options to pass to the driver. For instance, GC settings or other logging. + A string of default JVM options to prepend to spark.driver.extraJavaOptions. + This is intended to be set by administrators. + + For instance, GC settings or other logging. Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap size settings can be set with spark.driver.memory in the cluster mode and through the --driver-memory command line option in the client mode. @@ -425,6 +428,25 @@ Apart from these, the following properties are also available, and may be useful your default properties file. + + spark.driver.extraJavaOptions + (none) + + A string of extra JVM options to pass to the driver. This is intended to be set by users. + + For instance, GC settings or other logging. + Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap + size settings can be set with spark.driver.memory in the cluster mode and through + the --driver-memory command line option in the client mode. + +
Note: In client mode, this config must not be set through the SparkConf + directly in your application, because the driver JVM has already started at that point. + Instead, please set this through the --driver-java-options command line option or in + your default properties file. + + spark.driver.defaultJavaOptions will be prepended to this configuration. + + spark.driver.extraLibraryPath (none) @@ -458,10 +480,13 @@ Apart from these, the following properties are also available, and may be useful - spark.executor.extraJavaOptions + spark.executor.defaultJavaOptions (none) - A string of extra JVM options to pass to executors. For instance, GC settings or other logging. + A string of default JVM options to prepend to spark.executor.extraJavaOptions. + This is intended to be set by administrators. + + For instance, GC settings or other logging. Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory. @@ -472,6 +497,25 @@ Apart from these, the following properties are also available, and may be useful -verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc + + spark.executor.extraJavaOptions + (none) + + A string of extra JVM options to pass to executors. This is intended to be set by users. + + For instance, GC settings or other logging. + Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this + option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file + used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory. + + The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by + application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable + verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of: + -verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc + + spark.executor.defaultJavaOptions will be prepended to this configuration. + + spark.executor.extraLibraryPath (none) diff --git a/docs/tuning.md b/docs/tuning.md index 222f8720ce..1faf7cfe0d 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -248,7 +248,7 @@ Our experience suggests that the effect of GC tuning depends on your application There are [many more tuning options](https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/index.html) described online, but at a high level, managing how frequently full GC takes place can help in reducing the overhead. -GC tuning flags for executors can be specified by setting `spark.executor.extraJavaOptions` in +GC tuning flags for executors can be specified by setting `spark.executor.defaultJavaOptions` or `spark.executor.extraJavaOptions` in a job's configuration. # Other Considerations diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index f86d40015b..84940d96b5 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -48,6 +48,8 @@ public class SparkLauncher extends AbstractLauncher { public static final String DRIVER_MEMORY = "spark.driver.memory"; /** Configuration key for the driver class path. */ public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; + /** Configuration key for the default driver VM options. */ + public static final String DRIVER_DEFAULT_JAVA_OPTIONS = "spark.driver.defaultJavaOptions"; /** Configuration key for the driver VM options. */ public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; /** Configuration key for the driver native library path. */ @@ -57,6 +59,8 @@ public class SparkLauncher extends AbstractLauncher { public static final String EXECUTOR_MEMORY = "spark.executor.memory"; /** Configuration key for the executor class path. */ public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; + /** Configuration key for the default executor VM options. */ + public static final String EXECUTOR_DEFAULT_JAVA_OPTIONS = "spark.executor.defaultJavaOptions"; /** Configuration key for the executor VM options. */ public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; /** Configuration key for the executor native library path. */ diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index e3ee843f62..3479e0c342 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -267,13 +267,10 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { // We don't want the client to specify Xmx. These have to be set by their corresponding // memory flag --driver-memory or configuration entry spark.driver.memory + String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS); + checkJavaOptions(driverDefaultJavaOptions); String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); - if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) { - String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " + - "java options (was %s). Use the corresponding --driver-memory or " + - "spark.driver.memory configuration instead.", driverExtraJavaOptions); - throw new IllegalArgumentException(msg); - } + checkJavaOptions(driverExtraJavaOptions); if (isClientMode) { // Figuring out where the memory value come from is a little tricky due to precedence. @@ -289,6 +286,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); cmd.add("-Xmx" + memory); + addOptionString(cmd, driverDefaultJavaOptions); addOptionString(cmd, driverExtraJavaOptions); mergeEnvPathList(env, getLibPathEnvName(), config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); @@ -299,6 +297,15 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { return cmd; } + private void checkJavaOptions(String javaOptions) { + if (!isEmpty(javaOptions) && javaOptions.contains("Xmx")) { + String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " + + "java options (was %s). Use the corresponding --driver-memory or " + + "spark.driver.memory configuration instead.", javaOptions); + throw new IllegalArgumentException(msg); + } + } + private List buildPySparkShellCommand(Map env) throws IOException { // For backwards compatibility, if a script is specified in // the pyspark command line, then run it using spark-submit. diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index e694e9066f..32a91b1789 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -251,6 +251,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { } private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception { + final String DRIVER_DEFAULT_PARAM = "-Ddriver-default"; + final String DRIVER_EXTRA_PARAM = "-Ddriver-extra"; String deployMode = isDriver ? "client" : "cluster"; SparkSubmitCommandBuilder launcher = @@ -270,7 +272,8 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath()); launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver"); - launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver"); + launcher.conf.put(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS, DRIVER_DEFAULT_PARAM); + launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_PARAM); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native"); } else { launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home") @@ -284,6 +287,9 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { if (isDriver) { assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g")); + assertTrue("Driver default options should be configured.", + cmd.contains(DRIVER_DEFAULT_PARAM)); + assertTrue("Driver extra options should be configured.", cmd.contains(DRIVER_EXTRA_PARAM)); } else { boolean found = false; for (String arg : cmd) { @@ -293,6 +299,10 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { } } assertFalse("Memory arguments should not be set.", found); + assertFalse("Driver default options should not be configured.", + cmd.contains(DRIVER_DEFAULT_PARAM)); + assertFalse("Driver extra options should not be configured.", + cmd.contains(DRIVER_EXTRA_PARAM)); } String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator)); diff --git a/launcher/src/test/resources/spark-defaults.conf b/launcher/src/test/resources/spark-defaults.conf index 3a51208c7c..22c253693d 100644 --- a/launcher/src/test/resources/spark-defaults.conf +++ b/launcher/src/test/resources/spark-defaults.conf @@ -17,5 +17,6 @@ spark.driver.memory=1g spark.driver.extraClassPath=/driver -spark.driver.extraJavaOptions=-Ddriver +spark.driver.defaultJavaOptions=-Ddriver-default +spark.driver.extraJavaOptions=-Ddriver-extra spark.driver.extraLibraryPath=/native \ No newline at end of file diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 3ff68348be..2fd13a5903 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.Command import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest._ import org.apache.spark.internal.config +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler import org.apache.spark.util.Utils @@ -97,6 +98,7 @@ private[mesos] class MesosSubmitRequestServlet( // Optional fields val sparkProperties = request.sparkProperties + val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS) val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key) val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key) val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) @@ -110,9 +112,11 @@ private[mesos] class MesosSubmitRequestServlet( val conf = new SparkConf(false).setAll(sparkProperties) val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) + val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString) + .getOrElse(Seq.empty) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts + val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts val command = new Command( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)