diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 3f0b71bbe1..d966582295 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager( // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers private val tasksPerExecutorForFullParallelism = - conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1) private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) @@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager( "shuffle service. You may enable this through spark.shuffle.service.enabled.") } if (tasksPerExecutorForFullParallelism == 0) { - throw new SparkException("spark.executor.cores must not be < spark.task.cpus.") + throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.") } if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0b47da12b5..681e4378a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -503,12 +503,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria logWarning(msg) } - val executorOptsKey = "spark.executor.extraJavaOptions" - val executorClasspathKey = "spark.executor.extraClassPath" - val driverOptsKey = "spark.driver.extraJavaOptions" - val driverClassPathKey = "spark.driver.extraClassPath" - val driverLibraryPathKey = "spark.driver.extraLibraryPath" - val sparkExecutorInstances = "spark.executor.instances" + val executorOptsKey = EXECUTOR_JAVA_OPTIONS.key // Used by Yarn in 1.1 and before sys.props.get("spark.driver.libraryPath").foreach { value => @@ -517,7 +512,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria |spark.driver.libraryPath was detected (set to '$value'). |This is deprecated in Spark 1.2+. | - |Please instead use: $driverLibraryPathKey + |Please instead use: ${DRIVER_LIBRARY_PATH.key} """.stripMargin logWarning(warning) } @@ -594,9 +589,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } - if (contains("spark.cores.max") && contains("spark.executor.cores")) { - val totalCores = getInt("spark.cores.max", 1) - val executorCores = getInt("spark.executor.cores", 1) + if (contains(CORES_MAX) && contains(EXECUTOR_CORES)) { + val totalCores = getInt(CORES_MAX.key, 1) + val executorCores = get(EXECUTOR_CORES) val leftCores = totalCores % executorCores if (leftCores != 0) { logWarning(s"Total executor cores: ${totalCores} is not " + @@ -605,12 +600,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } - if (contains("spark.executor.cores") && contains("spark.task.cpus")) { - val executorCores = getInt("spark.executor.cores", 1) + if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) { + val executorCores = get(EXECUTOR_CORES) val taskCpus = getInt("spark.task.cpus", 1) if (executorCores < taskCpus) { - throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.") + throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than spark.task.cpus.") } } @@ -680,7 +675,7 @@ private[spark] object SparkConf extends Logging { * TODO: consolidate it with `ConfigBuilder.withAlternative`. */ private val configsWithAlternatives = Map[String, Seq[AlternateConfig]]( - "spark.executor.userClassPathFirst" -> Seq( + EXECUTOR_USER_CLASS_PATH_FIRST.key -> Seq( AlternateConfig("spark.files.userClassPathFirst", "1.3")), UPDATE_INTERVAL_S.key -> Seq( AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"), @@ -703,7 +698,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")), "spark.shuffle.file.buffer" -> Seq( AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")), - "spark.executor.logs.rolling.maxSize" -> Seq( + EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq( AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")), "spark.io.compression.snappy.blockSize" -> Seq( AlternateConfig("spark.io.compression.snappy.block.size", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3475859c3e..89be9de083 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -386,9 +386,9 @@ class SparkContext(config: SparkConf) extends Logging { // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) - _conf.setIfMissing("spark.driver.port", "0") + _conf.setIfMissing(DRIVER_PORT, 0) - _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) + _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) @@ -461,7 +461,7 @@ class SparkContext(config: SparkConf) extends Logging { files.foreach(addFile) } - _executorMemory = _conf.getOption("spark.executor.memory") + _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) @@ -2639,7 +2639,7 @@ object SparkContext extends Logging { case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) case "yarn" => if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) { - conf.getInt("spark.driver.cores", 0) + conf.getInt(DRIVER_CORES.key, 0) } else { 0 } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index de0c8579d9..9222781fa0 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -163,10 +163,10 @@ object SparkEnv extends Logging { mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") - assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") + assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!") val bindAddress = conf.get(DRIVER_BIND_ADDRESS) val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS) - val port = conf.get("spark.driver.port").toInt + val port = conf.get(DRIVER_PORT) val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(conf)) } else { @@ -251,7 +251,7 @@ object SparkEnv extends Logging { // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. if (isDriver) { - conf.set("spark.driver.port", rpcEnv.address.port.toString) + conf.set(DRIVER_PORT, rpcEnv.address.port) } // Create an instance of the class with the given name, possibly initializing it with our conf @@ -359,7 +359,7 @@ object SparkEnv extends Logging { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. - conf.set("spark.executor.id", executorId) + conf.set(EXECUTOR_ID, executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 6b748c825d..5168e93309 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.EXECUTOR_CORES import org.apache.spark.internal.config.Python._ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -74,8 +75,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. - private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY) - .map(_ / conf.getInt("spark.executor.cores", 1)) + private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES)) // All the Python functions should have the same exec, version and envvars. protected val envVars = funcs.head.funcs.head.envVars diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d5145094ec..d94b174d8d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -27,7 +27,7 @@ import org.apache.log4j.Logger import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils} @@ -68,17 +68,17 @@ private class ClientEndpoint( // people call `addJar` assuming the jar is in the same directory. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" - val classPathConf = "spark.driver.extraClassPath" + val classPathConf = config.DRIVER_CLASS_PATH.key val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } - val libraryPathConf = "spark.driver.extraLibraryPath" + val libraryPathConf = config.DRIVER_LIBRARY_PATH.key val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } - val extraJavaOptsConf = "spark.driver.extraJavaOptions" + val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index c6307da61c..0679bdf7c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -34,7 +34,7 @@ import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.master.RecoveryState -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -77,7 +77,7 @@ private object FaultToleranceTest extends App with Logging { private val containerSparkHome = "/opt/spark" private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome) - System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip + System.setProperty(config.DRIVER_HOST_ADDRESS.key, "172.17.42.1") // default docker host ip private def afterEach() { if (sc != null) { @@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging { if (sc != null) { sc.stop() } // Counter-hack: Because of a hack in SparkEnv#create() that changes this // property, we need to reset it. - System.setProperty("spark.driver.port", "0") + System.setProperty(config.DRIVER_PORT.key, "0") sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 763bd0a70a..a4c65aeaae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -514,13 +514,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, - confKey = "spark.driver.memory"), + confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - confKey = "spark.driver.extraClassPath"), + confKey = DRIVER_CLASS_PATH.key), OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - confKey = "spark.driver.extraJavaOptions"), + confKey = DRIVER_JAVA_OPTIONS.key), OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - confKey = "spark.driver.extraLibraryPath"), + confKey = DRIVER_LIBRARY_PATH.key), OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = PRINCIPAL.key), OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -537,7 +537,7 @@ private[spark] class SparkSubmit extends Logging { // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, - confKey = "spark.executor.instances"), + confKey = EXECUTOR_INSTANCES.key), OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"), OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"), OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"), @@ -545,22 +545,22 @@ private[spark] class SparkSubmit extends Logging { // Other options OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, - confKey = "spark.executor.cores"), + confKey = EXECUTOR_CORES.key), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, - confKey = "spark.executor.memory"), + confKey = EXECUTOR_MEMORY.key), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, - confKey = "spark.cores.max"), + confKey = CORES_MAX.key), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.files"), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, - confKey = "spark.driver.memory"), + confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, - confKey = "spark.driver.cores"), + confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, - confKey = "spark.driver.supervise"), + confKey = DRIVER_SUPERVISE.key), OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, @@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging { // Ignore invalid spark.driver.host in cluster modes. if (deployMode == CLUSTER) { - sparkConf.remove("spark.driver.host") + sparkConf.remove(DRIVER_HOST_ADDRESS) } // Resolve paths in certain spark properties diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 4cf08a7980..34facd5a58 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -31,7 +31,7 @@ import scala.util.Try import org.apache.spark.{SparkException, SparkUserAppException} import org.apache.spark.deploy.SparkSubmitAction._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.SparkSubmitArgumentsParser import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils @@ -155,31 +155,31 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(env.get("MASTER")) .orNull driverExtraClassPath = Option(driverExtraClassPath) - .orElse(sparkProperties.get("spark.driver.extraClassPath")) + .orElse(sparkProperties.get(config.DRIVER_CLASS_PATH.key)) .orNull driverExtraJavaOptions = Option(driverExtraJavaOptions) - .orElse(sparkProperties.get("spark.driver.extraJavaOptions")) + .orElse(sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)) .orNull driverExtraLibraryPath = Option(driverExtraLibraryPath) - .orElse(sparkProperties.get("spark.driver.extraLibraryPath")) + .orElse(sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)) .orNull driverMemory = Option(driverMemory) - .orElse(sparkProperties.get("spark.driver.memory")) + .orElse(sparkProperties.get(config.DRIVER_MEMORY.key)) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull driverCores = Option(driverCores) - .orElse(sparkProperties.get("spark.driver.cores")) + .orElse(sparkProperties.get(config.DRIVER_CORES.key)) .orNull executorMemory = Option(executorMemory) - .orElse(sparkProperties.get("spark.executor.memory")) + .orElse(sparkProperties.get(config.EXECUTOR_MEMORY.key)) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) .orNull executorCores = Option(executorCores) - .orElse(sparkProperties.get("spark.executor.cores")) + .orElse(sparkProperties.get(config.EXECUTOR_CORES.key)) .orElse(env.get("SPARK_EXECUTOR_CORES")) .orNull totalExecutorCores = Option(totalExecutorCores) - .orElse(sparkProperties.get("spark.cores.max")) + .orElse(sparkProperties.get(config.CORES_MAX.key)) .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull @@ -197,7 +197,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(env.get("DEPLOY_MODE")) .orNull numExecutors = Option(numExecutors) - .getOrElse(sparkProperties.get("spark.executor.instances").orNull) + .getOrElse(sparkProperties.get(config.EXECUTOR_INSTANCES.key).orNull) queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull keytab = Option(keytab) .orElse(sparkProperties.get("spark.kerberos.keytab")) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index afa1a5fbba..c75e684df2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletResponse import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ +import org.apache.spark.internal.config import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -132,12 +133,12 @@ private[rest] class StandaloneSubmitRequestServlet( // Optional fields val sparkProperties = request.sparkProperties - val driverMemory = sparkProperties.get("spark.driver.memory") - val driverCores = sparkProperties.get("spark.driver.cores") - val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") - val superviseDriver = sparkProperties.get("spark.driver.supervise") + val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) + val driverCores = sparkProperties.get(config.DRIVER_CORES.key) + val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key) + val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key) + val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) + val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) // The semantics of "spark.master" and the masterUrl are different. While the // property "spark.master" could contain all registered masters, masterUrl // contains only the active master. To make sure a Spark driver can recover diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala index 86ddf954ca..7f462148c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest import scala.util.Try +import org.apache.spark.internal.config import org.apache.spark.util.Utils /** @@ -49,11 +50,11 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest { assertFieldIsSet(appArgs, "appArgs") assertFieldIsSet(environmentVariables, "environmentVariables") assertPropertyIsSet("spark.app.name") - assertPropertyIsBoolean("spark.driver.supervise") - assertPropertyIsNumeric("spark.driver.cores") - assertPropertyIsNumeric("spark.cores.max") - assertPropertyIsMemory("spark.driver.memory") - assertPropertyIsMemory("spark.executor.memory") + assertPropertyIsBoolean(config.DRIVER_SUPERVISE.key) + assertPropertyIsNumeric(config.DRIVER_CORES.key) + assertPropertyIsNumeric(config.CORES_MAX.key) + assertPropertyIsMemory(config.DRIVER_MEMORY.key) + assertPropertyIsMemory(config.EXECUTOR_MEMORY.key) } private def assertPropertyIsSet(key: String): Unit = diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 8d6a2b80ef..1e8ad0b6af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util._ @@ -43,7 +43,7 @@ object DriverWrapper extends Logging { case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() val host: String = Utils.localHostName() - val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt + val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf)) logInfo(s"Driver address: ${rpcEnv.address}") rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) @@ -51,7 +51,7 @@ object DriverWrapper extends Logging { val currentLoader = Thread.currentThread.getContextClassLoader val userJarUrl = new File(userJar).toURI().toURL() val loader = - if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { + if (sys.props.getOrElse(config.DRIVER_USER_CLASS_PATH_FIRST.key, "false").toBoolean) { new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader) } else { new MutableURLClassLoader(Array(userJarUrl), currentLoader) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index da80604594..8caaa73b02 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -39,7 +39,12 @@ package object config { private[spark] val DRIVER_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false) - private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") + private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores") + .doc("Number of cores to use for the driver process, only in cluster mode.") + .intConf + .createWithDefault(1) + + private[spark] val DRIVER_MEMORY = ConfigBuilder(SparkLauncher.DRIVER_MEMORY) .doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") @@ -113,6 +118,9 @@ package object config { private[spark] val EVENT_LOG_CALLSITE_LONG_FORM = ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false) + private[spark] val EXECUTOR_ID = + ConfigBuilder("spark.executor.id").stringConf.createOptional + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional @@ -139,7 +147,11 @@ package object config { private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false) - private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") + private[spark] val EXECUTOR_CORES = ConfigBuilder(SparkLauncher.EXECUTOR_CORES) + .intConf + .createWithDefault(1) + + private[spark] val EXECUTOR_MEMORY = ConfigBuilder(SparkLauncher.EXECUTOR_MEMORY) .doc("Amount of memory to use per executor process, in MiB unless otherwise specified.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") @@ -150,6 +162,15 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") + .doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " + + "sharing mode, the maximum amount of CPU cores to request for the application from across " + + "the cluster (not from each machine). If not set, the default will be " + + "`spark.deploy.defaultCores` on Spark's standalone cluster manager, or infinite " + + "(all available cores) on Mesos.") + .intConf + .createOptional + private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled") .doc("If true, Spark will attempt to use off-heap memory for certain operations. " + "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.") @@ -347,6 +368,17 @@ package object config { .stringConf .createWithDefault(Utils.localCanonicalHostName()) + private[spark] val DRIVER_PORT = ConfigBuilder("spark.driver.port") + .doc("Port of driver endpoints.") + .intConf + .createWithDefault(0) + + private[spark] val DRIVER_SUPERVISE = ConfigBuilder("spark.driver.supervise") + .doc("If true, restarts the driver automatically if it fails with a non-zero exit status. " + + "Only has effect in Spark standalone mode or Mesos cluster deploy mode.") + .booleanConf + .createWithDefault(false) + private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") .doc("Address where to bind network listen sockets on the driver.") .fallbackConf(DRIVER_HOST_ADDRESS) @@ -729,4 +761,23 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY = + ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("") + + private[spark] val EXECUTOR_LOGS_ROLLING_TIME_INTERVAL = + ConfigBuilder("spark.executor.logs.rolling.time.interval").stringConf.createWithDefault("daily") + + private[spark] val EXECUTOR_LOGS_ROLLING_MAX_SIZE = + ConfigBuilder("spark.executor.logs.rolling.maxSize") + .stringConf + .createWithDefault((1024 * 1024).toString) + + private[spark] val EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES = + ConfigBuilder("spark.executor.logs.rolling.maxRetainedFiles").intConf.createWithDefault(-1) + + private[spark] val EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION = + ConfigBuilder("spark.executor.logs.rolling.enableCompression") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index a6f7db0600..8286087042 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.memory import org.apache.spark.SparkConf +import org.apache.spark.internal.config import org.apache.spark.storage.BlockId /** @@ -127,14 +128,14 @@ private[spark] object StaticMemoryManager { if (systemMaxMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException(s"System memory $systemMaxMemory must " + s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " + - s"option or spark.driver.memory in Spark configuration.") + s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.") } - if (conf.contains("spark.executor.memory")) { - val executorMemory = conf.getSizeAsBytes("spark.executor.memory") + if (conf.contains(config.EXECUTOR_MEMORY)) { + val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key) if (executorMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$MIN_MEMORY_BYTES. Please increase executor memory using the " + - s"--executor-memory option or spark.executor.memory in Spark configuration.") + s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.") } } val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 78edd2c4d7..9260fd3a6f 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.memory import org.apache.spark.SparkConf +import org.apache.spark.internal.config import org.apache.spark.storage.BlockId /** @@ -216,15 +217,15 @@ object UnifiedMemoryManager { if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + - s"option or spark.driver.memory in Spark configuration.") + s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.") } // SPARK-12759 Check executor memory to fail fast if memory is insufficient - if (conf.contains("spark.executor.memory")) { - val executorMemory = conf.getSizeAsBytes("spark.executor.memory") + if (conf.contains(config.EXECUTOR_MEMORY)) { + val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key) if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + - s"--executor-memory option or spark.executor.memory in Spark configuration.") + s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.") } } val usableMemory = systemMemory - reservedMemory diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 301317a79d..b1e311ada4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -130,7 +130,7 @@ private[spark] class MetricsSystem private ( private[spark] def buildRegistryName(source: Source): String = { val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id")) - val executorId = conf.getOption("spark.executor.id") + val executorId = conf.get(EXECUTOR_ID) val defaultName = MetricRegistry.name(source.sourceName) if (instance == "driver" || instance == "executor") { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6bf60dd8e9..41f032ccf8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -717,7 +717,7 @@ private[spark] class TaskSetManager( calculatedTasks += 1 if (maxResultSize > 0 && totalResultSize > maxResultSize) { val msg = s"Total size of serialized results of ${calculatedTasks} tasks " + - s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " + + s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " + s"(${Utils.bytesToString(maxResultSize)})" logError(msg) abort(msg) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index f73a58ff5d..adef20d307 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -25,7 +25,7 @@ import scala.concurrent.Future import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ @@ -54,7 +54,7 @@ private[spark] class StandaloneSchedulerBackend( private val registrationBarrier = new Semaphore(0) - private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + private val maxCores = conf.get(config.CORES_MAX) private val totalExpectedCores = maxCores.getOrElse(0) override def start() { @@ -69,8 +69,8 @@ private[spark] class StandaloneSchedulerBackend( // The endpoint for executors to talk to us val driverUrl = RpcEndpointAddress( - sc.conf.get("spark.driver.host"), - sc.conf.get("spark.driver.port").toInt, + sc.conf.get(config.DRIVER_HOST_ADDRESS), + sc.conf.get(config.DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString val args = Seq( "--driver-url", driverUrl, @@ -79,11 +79,11 @@ private[spark] class StandaloneSchedulerBackend( "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS) .map(Utils.splitCommandString).getOrElse(Seq.empty) - val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") + val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH) .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) - val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") + val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH) .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) // When testing, expose the parent class path to the child. This is processed by @@ -102,7 +102,7 @@ private[spark] class StandaloneSchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val webUrl = sc.ui.map(_.webUrl).getOrElse("") - val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) + val coresPerExecutor = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt) // If we're using dynamic allocation, set our initial executor limit to 0 for now. // ExecutorAllocationManager will send the real initial limit to the Master later. val initialExecutorLimit = diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 0de57fbd56..6ff8bf29b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -24,7 +24,7 @@ import java.nio.ByteBuffer import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ @@ -116,7 +116,7 @@ private[spark] class LocalSchedulerBackend( * @param conf Spark configuration. */ def getUserClasspath(conf: SparkConf): Seq[URL] = { - val userClassPathStr = conf.getOption("spark.executor.extraClassPath") + val userClassPathStr = conf.get(config.EXECUTOR_CLASS_PATH) userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) } diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index e5cccf39f9..902e48fed3 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark.SparkConf +import org.apache.spark.internal.config import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} private[spark] object RpcUtils { @@ -26,8 +27,8 @@ private[spark] object RpcUtils { * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */ def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { - val driverHost: String = conf.get("spark.driver.host", "localhost") - val driverPort: Int = conf.getInt("spark.driver.port", 7077) + val driverHost: String = conf.get(config.DRIVER_HOST_ADDRESS.key, "localhost") + val driverPort: Int = conf.getInt(config.DRIVER_PORT.key, 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 22f074cf98..3527fee689 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2231,7 +2231,7 @@ private[spark] object Utils extends Logging { s"${e.getMessage}: Service$serviceString failed after " + s"$maxRetries retries (on a random free port)! " + s"Consider explicitly setting the appropriate binding address for " + - s"the service$serviceString (for example spark.driver.bindAddress " + + s"the service$serviceString (for example ${DRIVER_BIND_ADDRESS.key} " + s"for SparkDriver) to the correct binding address." } else { s"${e.getMessage}: Service$serviceString failed after " + diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 2f9ad4c8cc..3188e0bd2b 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -20,7 +20,7 @@ package org.apache.spark.util.logging import java.io.{File, FileOutputStream, InputStream, IOException} import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{IntParam, Utils} /** @@ -115,11 +115,9 @@ private[spark] object FileAppender extends Logging { /** Create the right appender based on Spark configuration */ def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { - import RollingFileAppender._ - - val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT) - val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT) - val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT) + val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY) + val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE) + val rollingInterval = conf.get(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL) def createTimeBasedAppender(): FileAppender = { val validatedParams: Option[(Long, String)] = rollingInterval match { diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index 5d8cec8447..59439b6879 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -24,6 +24,7 @@ import com.google.common.io.Files import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.internal.config /** * Continuously appends data from input stream into the given file, and rolls @@ -44,10 +45,8 @@ private[spark] class RollingFileAppender( bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE ) extends FileAppender(inputStream, activeFile, bufferSize) { - import RollingFileAppender._ - - private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1) - private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false) + private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES) + private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION) /** Stop the appender */ override def stop() { @@ -82,7 +81,7 @@ private[spark] class RollingFileAppender( // Roll the log file and compress if enableCompression is true. private def rotateFile(activeFile: File, rolloverFile: File): Unit = { if (enableCompression) { - val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX) + val gzFile = new File(rolloverFile.getAbsolutePath + RollingFileAppender.GZIP_LOG_SUFFIX) var gzOutputStream: GZIPOutputStream = null var inputStream: InputStream = null try { @@ -103,7 +102,7 @@ private[spark] class RollingFileAppender( // Check if the rollover file already exists. private def rolloverFileExist(file: File): Boolean = { - file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists + file.exists || new File(file.getAbsolutePath + RollingFileAppender.GZIP_LOG_SUFFIX).exists } /** Move the active log file to a new rollover file */ @@ -164,15 +163,7 @@ private[spark] class RollingFileAppender( * names of configurations that configure rolling file appenders. */ private[spark] object RollingFileAppender { - val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy" - val STRATEGY_DEFAULT = "" - val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval" - val INTERVAL_DEFAULT = "daily" - val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize" - val SIZE_DEFAULT = (1024 * 1024).toString - val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles" val DEFAULT_BUFFER_SIZE = 8192 - val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression" val GZIP_LOG_SUFFIX = ".gz" diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5c718cb654..d0389235cb 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -155,7 +155,7 @@ class ExecutorAllocationManagerSuite .set("spark.dynamicAllocation.maxExecutors", "15") .set("spark.dynamicAllocation.minExecutors", "3") .set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString) - .set("spark.executor.cores", cores.toString) + .set(config.EXECUTOR_CORES, cores) val sc = new SparkContext(conf) contexts += sc var manager = sc.executorAllocationManager.get diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index e14a5dcb5e..9a6abbdb0a 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -140,7 +140,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("creating SparkContext with cpus per tasks bigger than cores per executors") { val conf = new SparkConf(false) - .set("spark.executor.cores", "1") + .set(EXECUTOR_CORES, 1) .set("spark.task.cpus", "2") intercept[SparkException] { sc = new SparkContext(conf) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index a1d2a1283d..8567dd1f08 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -243,7 +243,7 @@ class StandaloneDynamicAllocationSuite } test("dynamic allocation with cores per executor") { - sc = new SparkContext(appConf.set("spark.executor.cores", "2")) + sc = new SparkContext(appConf.set(config.EXECUTOR_CORES, 2)) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -296,7 +296,7 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation with cores per executor AND max cores") { sc = new SparkContext(appConf - .set("spark.executor.cores", "2") + .set(config.EXECUTOR_CORES, 2) .set("spark.cores.max", "8")) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { @@ -526,7 +526,7 @@ class StandaloneDynamicAllocationSuite new SparkConf() .setMaster(masterRpcEnv.address.toSparkURL) .setAppName("test") - .set("spark.executor.memory", "256m") + .set(config.EXECUTOR_MEMORY.key, "256m") } /** Make a master to which our application will send executor requests. */ diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index d56cfc183d..5ce3453b68 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -248,7 +248,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val mm = UnifiedMemoryManager(conf, numCores = 1) // Try using an executor memory that's too small - val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString) + val conf2 = conf.clone().set(EXECUTOR_MEMORY.key, (reservedMemory / 2).toString) val exception = intercept[IllegalArgumentException] { UnifiedMemoryManager(conf2, numCores = 1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d264adaef9..f73ff67837 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -655,7 +655,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("abort the job if total size of results is too large") { - val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") + val conf = new SparkConf().set(config.MAX_RESULT_SIZE.key, "2m") sc = new SparkContext("local", "test", conf) def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 3962bdc27d..19116cf22d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE +import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE} import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService @@ -86,7 +86,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.authenticate", "false") - conf.set("spark.driver.port", rpcEnv.address.port.toString) + conf.set(DRIVER_PORT, rpcEnv.address.port) conf.set("spark.testing", "true") conf.set("spark.memory.fraction", "1") conf.set("spark.memory.storageFraction", "1") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cf00c1c3aa..e866342e44 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set("spark.storage.unrollMemoryThreshold", "512") rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) - conf.set("spark.driver.port", rpcEnv.address.port.toString) + conf.set(DRIVER_PORT, rpcEnv.address.port) // Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we // need to create a SparkContext is to initialize LiveListenerBus. diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 52cd5378bc..242163931f 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -34,7 +34,7 @@ import org.mockito.Mockito.{atLeast, mock, verify} import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy} class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { @@ -136,7 +136,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // setup input stream and appender val testOutputStream = new PipedOutputStream() val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000) - val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10") + val conf = new SparkConf().set(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES, 10) val appender = new RollingFileAppender(testInputStream, testFile, new SizeBasedRollingPolicy(1000, false), conf, 10) @@ -200,13 +200,12 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { appender.awaitTermination() } - import RollingFileAppender._ - def rollingStrategy(strategy: String): Seq[(String, String)] = - Seq(STRATEGY_PROPERTY -> strategy) - def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY -> size) + Seq(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key -> strategy) + def rollingSize(size: String): Seq[(String, String)] = + Seq(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> size) def rollingInterval(interval: String): Seq[(String, String)] = - Seq(INTERVAL_PROPERTY -> interval) + Seq(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL.key -> interval) val msInDay = 24 * 60 * 60 * 1000L val msInHour = 60 * 60 * 1000L diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 8362c14fb2..d52988df58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -42,7 +42,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .getOrElse(throw new SparkException("Must specify the driver container image")) // CPU settings - private val driverCpuCores = conf.get("spark.driver.cores", "1") + private val driverCpuCores = conf.get(DRIVER_CORES.key, "1") private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // Memory settings @@ -85,7 +85,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } - val driverPort = conf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) + val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT) val driverBlockManagerPort = conf.sparkConf.getInt( DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index dd73a5e522..6c3a6b39fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -46,8 +46,8 @@ private[spark] class BasicExecutorFeatureStep( private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix private val driverUrl = RpcEndpointAddress( - kubernetesConf.get("spark.driver.host"), - kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + kubernetesConf.get(DRIVER_HOST_ADDRESS), + kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY) private val executorMemoryString = kubernetesConf.get( @@ -67,7 +67,7 @@ private[spark] class BasicExecutorFeatureStep( executorMemoryWithOverhead } - private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) + private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES) private val executorCoresRequest = if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) { kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 42305457f4..15671179b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{Clock, SystemClock} private[spark] class DriverServiceFeatureStep( @@ -51,18 +51,17 @@ private[spark] class DriverServiceFeatureStep( } private val driverPort = kubernetesConf.sparkConf.getInt( - "spark.driver.port", DEFAULT_DRIVER_PORT) + config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt( - org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) override def configurePod(pod: SparkPod): SparkPod = pod override def getAdditionalPodSystemProperties(): Map[String, String] = { val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace}.svc" Map(DRIVER_HOST_KEY -> driverHostname, - "spark.driver.port" -> driverPort.toString, - org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key -> - driverBlockManagerPort.toString) + config.DRIVER_PORT.key -> driverPort.toString, + config.DRIVER_BLOCK_MANAGER_PORT.key -> driverBlockManagerPort.toString) } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { @@ -90,8 +89,8 @@ private[spark] class DriverServiceFeatureStep( } private[spark] object DriverServiceFeatureStep { - val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key - val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key + val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key + val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key val DRIVER_SVC_POSTFIX = "-driver-svc" val MAX_SERVICE_NAME_LENGTH = 63 } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 5ceb9d6d6f..27d59dd7f3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -46,7 +46,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test("Check the pod respects all configurations from the user.") { val sparkConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") - .set("spark.driver.cores", "2") + .set(DRIVER_CORES, 2) .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") .set(DRIVER_MEMORY.key, "256M") .set(DRIVER_MEMORY_OVERHEAD, 200L) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index e28c650a57..36bfb7d41e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.config._ +import org.apache.spark.internal.config import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -74,8 +74,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX) .set(CONTAINER_IMAGE, EXECUTOR_IMAGE) .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true) - .set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME) - .set("spark.driver.port", DRIVER_PORT.toString) + .set(config.DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME) + .set(config.DRIVER_PORT, DRIVER_PORT) .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) .set("spark.kubernetes.resource.type", "java") } @@ -125,8 +125,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("classpath and extra java options get translated into environment variables") { - baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar") - baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz") + baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar") + baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz") val kconf = newExecutorConf(environment = Map("qux" -> "quux")) val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) @@ -150,7 +150,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { test("auth secret propagation") { val conf = baseConf.clone() - .set(NETWORK_AUTH_ENABLED, true) + .set(config.NETWORK_AUTH_ENABLED, true) .set("spark.master", "k8s://127.0.0.1") val secMgr = new SecurityManager(conf) @@ -168,8 +168,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val secretFile = new File(secretDir, "secret-file.txt") Files.write(secretFile.toPath, "some-secret".getBytes(StandardCharsets.UTF_8)) val conf = baseConf.clone() - .set(NETWORK_AUTH_ENABLED, true) - .set(AUTH_SECRET_FILE, secretFile.getAbsolutePath) + .set(config.NETWORK_AUTH_ENABLED, true) + .set(config.AUTH_SECRET_FILE, secretFile.getAbsolutePath) .set("spark.master", "k8s://127.0.0.1") val secMgr = new SecurityManager(conf) secMgr.initializeAuth() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 045278939d..822f1e3296 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -39,7 +39,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { test("Headless service has a port for the driver RPC and the block manager.") { val sparkConf = new SparkConf(false) - .set("spark.driver.port", "9000") + .set(DRIVER_PORT, 9000) .set(DRIVER_BLOCK_MANAGER_PORT, 8080) val kconf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, @@ -61,7 +61,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { test("Hostname and ports are set according to the service name.") { val sparkConf = new SparkConf(false) - .set("spark.driver.port", "9000") + .set(DRIVER_PORT, 9000) .set(DRIVER_BLOCK_MANAGER_PORT, 8080) .set(KUBERNETES_NAMESPACE, "my-namespace") val kconf = KubernetesTestConf.createDriverConf( @@ -87,7 +87,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() - assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString) + assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString) assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index d134847dc7..dd0b2bad1e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -129,4 +129,7 @@ package object config { "when launching drivers. Default is to accept all offers with sufficient resources.") .stringConf .createWithDefault("") + + private[spark] val EXECUTOR_URI = + ConfigBuilder("spark.executor.uri").stringConf.createOptional } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 68f6921153..a4aba3e9c0 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -27,6 +27,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.deploy.Command import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.config import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler import org.apache.spark.util.Utils @@ -92,12 +93,12 @@ private[mesos] class MesosSubmitRequestServlet( // Optional fields val sparkProperties = request.sparkProperties - val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") - val superviseDriver = sparkProperties.get("spark.driver.supervise") - val driverMemory = sparkProperties.get("spark.driver.memory") - val driverCores = sparkProperties.get("spark.driver.cores") + val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key) + val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key) + val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) + val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) + val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) + val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) // Construct driver description diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index cb1bcba651..021b1ac848 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -32,6 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} import org.apache.spark.deploy.mesos.{config, MesosDriverDescription} import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} +import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.Utils @@ -365,8 +366,7 @@ private[spark] class MesosClusterScheduler( } private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = { - desc.conf.getOption("spark.executor.uri") - .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) + desc.conf.get(config.EXECUTOR_URI).orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) } private def getDriverFrameworkID(desc: MesosDriverDescription): String = { @@ -474,7 +474,7 @@ private[spark] class MesosClusterScheduler( } else if (executorUri.isDefined) { val folderBasename = executorUri.get.split('/').last.split('.').head - val entries = conf.getOption("spark.executor.extraLibraryPath") + val entries = conf.get(EXECUTOR_LIBRARY_PATH) .map(path => Seq(path) ++ desc.command.libraryPathEntries) .getOrElse(desc.command.libraryPathEntries) @@ -528,10 +528,10 @@ private[spark] class MesosClusterScheduler( options ++= Seq("--class", desc.command.mainClass) } - desc.conf.getOption("spark.executor.memory").foreach { v => + desc.conf.getOption(EXECUTOR_MEMORY.key).foreach { v => options ++= Seq("--executor-memory", v) } - desc.conf.getOption("spark.cores.max").foreach { v => + desc.conf.getOption(CORES_MAX.key).foreach { v => options ++= Seq("--total-executor-cores", v) } desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles => diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f5866651dc..d0174516c2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -33,7 +33,6 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config -import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -63,9 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 - private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) + private val maxCoresOption = conf.get(config.CORES_MAX) - private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt) + private val executorCoresOption = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt) private val minCoresPerExecutor = executorCoresOption.getOrElse(1) @@ -220,18 +219,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") + val extraClassPath = conf.get(config.EXECUTOR_CLASS_PATH) extraClassPath.foreach { cp => environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build()) } - val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map { + val extraJavaOpts = conf.get(config.EXECUTOR_JAVA_OPTIONS).map { Utils.substituteAppNExecIds(_, appId, taskId) }.getOrElse("") // Set the environment variable through a command prefix // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => + val prefixEnv = conf.get(config.EXECUTOR_LIBRARY_PATH).map { p => Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") @@ -261,8 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val command = CommandInfo.newBuilder() .setEnvironment(environment) - val uri = conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) + val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) if (uri.isEmpty) { val executorSparkHome = conf.getOption("spark.mesos.executor.home") @@ -304,8 +302,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( "driverURL" } else { RpcEndpointAddress( - conf.get("spark.driver.host"), - conf.get("spark.driver.port").toInt, + conf.get(config.DRIVER_HOST_ADDRESS), + conf.get(config.DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString } } @@ -633,7 +631,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) + sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 0bb6fe0fa4..192f9407a1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -28,8 +28,9 @@ import org.apache.mesos.SchedulerDriver import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} -import org.apache.spark.deploy.mesos.config +import org.apache.spark.deploy.mesos.config.EXECUTOR_URI import org.apache.spark.executor.MesosExecutorBackend +import org.apache.spark.internal.config import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils @@ -107,15 +108,15 @@ private[spark] class MesosFineGrainedSchedulerBackend( throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") } val environment = Environment.newBuilder() - sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => + sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp => environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build()) } - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map { + val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS).map { Utils.substituteAppNExecIds(_, appId, execId) }.getOrElse("") - val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => + val prefixEnv = sc.conf.get(config.EXECUTOR_LIBRARY_PATH).map { p => Utils.libraryPathEnvPrefix(Seq(p)) }.getOrElse("") @@ -132,8 +133,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val uri = sc.conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) + val uri = sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) val executorBackendName = classOf[MesosExecutorBackend].getName if (uri.isEmpty) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e46c4f970c..8dbdac168f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -470,8 +470,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends rpcEnv = sc.env.rpcEnv val userConf = sc.getConf - val host = userConf.get("spark.driver.host") - val port = userConf.get("spark.driver.port").toInt + val host = userConf.get(DRIVER_HOST_ADDRESS) + val port = userConf.get(DRIVER_PORT) registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( @@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends amCores, true) // The client-mode AM doesn't listen for incoming connections, so report an invalid port. - registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress")) + registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS)) // The driver should be up and listening, so unlike cluster mode, just try to connect to it // with no waiting or retrying. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index b257d8fdd3..7e9cd409da 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -224,16 +224,12 @@ package object config { /* Driver configuration. */ - private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores") - .intConf - .createWithDefault(1) + private[spark] val DRIVER_APP_UI_ADDRESS = ConfigBuilder("spark.driver.appUIAddress") + .stringConf + .createOptional /* Executor configuration. */ - private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores") - .intConf - .createWithDefault(1) - private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.executor.nodeLabelExpression") .doc("Node label expression for executors.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 9397a1e3de..167eef19ed 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl @@ -42,10 +42,10 @@ private[spark] class YarnClientSchedulerBackend( * This waits until the application is running. */ override def start() { - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") + val driverHost = conf.get(config.DRIVER_HOST_ADDRESS) + val driverPort = conf.get(config.DRIVER_PORT) val hostport = driverHost + ":" + driverPort - sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } + sc.ui.foreach { ui => conf.set(DRIVER_APP_UI_ADDRESS, ui.webUrl) } val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index 8032213602..9e3cc6ec01 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY} +import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY} class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {