[SPARK-26445][CORE] Use ConfigEntry for hardcoded configs for driver/executor categories.

## What changes were proposed in this pull request?

The PR makes hardcoded spark.driver, spark.executor, and spark.cores.max configs to use `ConfigEntry`.

Note that some config keys are from `SparkLauncher` instead of defining in the config package object because the string is already defined in it and it does not depend on core module.

## How was this patch tested?

Existing tests.

Closes #23415 from ueshin/issues/SPARK-26445/hardcoded_driver_executor_configs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Takuya UESHIN 2019-01-04 22:12:35 +08:00 committed by Hyukjin Kwon
parent 27e42c1de5
commit 4419e1daca
46 changed files with 236 additions and 200 deletions

View file

@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager(
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutorForFullParallelism =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.")
}
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {

View file

@ -503,12 +503,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
logWarning(msg)
}
val executorOptsKey = "spark.executor.extraJavaOptions"
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
val sparkExecutorInstances = "spark.executor.instances"
val executorOptsKey = EXECUTOR_JAVA_OPTIONS.key
// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
@ -517,7 +512,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
|spark.driver.libraryPath was detected (set to '$value').
|This is deprecated in Spark 1.2+.
|
|Please instead use: $driverLibraryPathKey
|Please instead use: ${DRIVER_LIBRARY_PATH.key}
""".stripMargin
logWarning(warning)
}
@ -594,9 +589,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}
if (contains("spark.cores.max") && contains("spark.executor.cores")) {
val totalCores = getInt("spark.cores.max", 1)
val executorCores = getInt("spark.executor.cores", 1)
if (contains(CORES_MAX) && contains(EXECUTOR_CORES)) {
val totalCores = getInt(CORES_MAX.key, 1)
val executorCores = get(EXECUTOR_CORES)
val leftCores = totalCores % executorCores
if (leftCores != 0) {
logWarning(s"Total executor cores: ${totalCores} is not " +
@ -605,12 +600,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}
if (contains("spark.executor.cores") && contains("spark.task.cpus")) {
val executorCores = getInt("spark.executor.cores", 1)
if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
val executorCores = get(EXECUTOR_CORES)
val taskCpus = getInt("spark.task.cpus", 1)
if (executorCores < taskCpus) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than spark.task.cpus.")
}
}
@ -680,7 +675,7 @@ private[spark] object SparkConf extends Logging {
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
EXECUTOR_USER_CLASS_PATH_FIRST.key -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
@ -703,7 +698,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
"spark.executor.logs.rolling.maxSize" -> Seq(
EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
"spark.io.compression.snappy.blockSize" -> Seq(
AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),

View file

@ -386,9 +386,9 @@ class SparkContext(config: SparkConf) extends Logging {
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.setIfMissing(DRIVER_PORT, 0)
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
@ -461,7 +461,7 @@ class SparkContext(config: SparkConf) extends Logging {
files.foreach(addFile)
}
_executorMemory = _conf.getOption("spark.executor.memory")
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
@ -2639,7 +2639,7 @@ object SparkContext extends Logging {
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" =>
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
conf.getInt("spark.driver.cores", 0)
conf.getInt(DRIVER_CORES.key, 0)
} else {
0
}

View file

@ -163,10 +163,10 @@ object SparkEnv extends Logging {
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val port = conf.get(DRIVER_PORT)
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
@ -251,7 +251,7 @@ object SparkEnv extends Logging {
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
conf.set(DRIVER_PORT, rpcEnv.address.port)
}
// Create an instance of the class with the given name, possibly initializing it with our conf
@ -359,7 +359,7 @@ object SparkEnv extends Logging {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms

View file

@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.EXECUTOR_CORES
import org.apache.spark.internal.config.Python._
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._
@ -74,8 +75,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
.map(_ / conf.getInt("spark.executor.cores", 1))
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / conf.get(EXECUTOR_CORES))
// All the Python functions should have the same exec, version and envvars.
protected val envVars = funcs.head.funcs.head.envVars

View file

@ -27,7 +27,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
@ -68,17 +68,17 @@ private class ClientEndpoint(
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathConf = config.DRIVER_CLASS_PATH.key
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)

View file

@ -34,7 +34,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{ThreadUtils, Utils}
/**
@ -77,7 +77,7 @@ private object FaultToleranceTest extends App with Logging {
private val containerSparkHome = "/opt/spark"
private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
System.setProperty(config.DRIVER_HOST_ADDRESS.key, "172.17.42.1") // default docker host ip
private def afterEach() {
if (sc != null) {
@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
System.setProperty(config.DRIVER_PORT.key, "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
}

View file

@ -514,13 +514,13 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
confKey = "spark.driver.memory"),
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraClassPath"),
confKey = DRIVER_CLASS_PATH.key),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraJavaOptions"),
confKey = DRIVER_JAVA_OPTIONS.key),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraLibraryPath"),
confKey = DRIVER_LIBRARY_PATH.key),
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@ -537,7 +537,7 @@ private[spark] class SparkSubmit extends Logging {
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
confKey = "spark.executor.instances"),
confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
@ -545,22 +545,22 @@ private[spark] class SparkSubmit extends Logging {
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
confKey = EXECUTOR_CORES.key),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
confKey = EXECUTOR_MEMORY.key),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
confKey = DRIVER_SUPERVISE.key),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),
// An internal option used only for spark-shell to add user jars to repl's classloader,
@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging {
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sparkConf.remove("spark.driver.host")
sparkConf.remove(DRIVER_HOST_ADDRESS)
}
// Resolve paths in certain spark properties

View file

@ -31,7 +31,7 @@ import scala.util.Try
import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
@ -155,31 +155,31 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get("spark.driver.extraClassPath"))
.orElse(sparkProperties.get(config.DRIVER_CLASS_PATH.key))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
.orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
.orElse(sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
.orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
.orElse(sparkProperties.get(config.DRIVER_LIBRARY_PATH.key))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(sparkProperties.get(config.DRIVER_MEMORY.key))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
driverCores = Option(driverCores)
.orElse(sparkProperties.get("spark.driver.cores"))
.orElse(sparkProperties.get(config.DRIVER_CORES.key))
.orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(sparkProperties.get(config.EXECUTOR_MEMORY.key))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
.orNull
executorCores = Option(executorCores)
.orElse(sparkProperties.get("spark.executor.cores"))
.orElse(sparkProperties.get(config.EXECUTOR_CORES.key))
.orElse(env.get("SPARK_EXECUTOR_CORES"))
.orNull
totalExecutorCores = Option(totalExecutorCores)
.orElse(sparkProperties.get("spark.cores.max"))
.orElse(sparkProperties.get(config.CORES_MAX.key))
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
@ -197,7 +197,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
.orElse(env.get("DEPLOY_MODE"))
.orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
.getOrElse(sparkProperties.get(config.EXECUTOR_INSTANCES.key).orNull)
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab)
.orElse(sparkProperties.get("spark.kerberos.keytab"))

View file

@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletResponse
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.internal.config
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@ -132,12 +133,12 @@ private[rest] class StandaloneSubmitRequestServlet(
// Optional fields
val sparkProperties = request.sparkProperties
val driverMemory = sparkProperties.get("spark.driver.memory")
val driverCores = sparkProperties.get("spark.driver.cores")
val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
val superviseDriver = sparkProperties.get("spark.driver.supervise")
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
// The semantics of "spark.master" and the masterUrl are different. While the
// property "spark.master" could contain all registered masters, masterUrl
// contains only the active master. To make sure a Spark driver can recover

View file

@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest
import scala.util.Try
import org.apache.spark.internal.config
import org.apache.spark.util.Utils
/**
@ -49,11 +50,11 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
assertFieldIsSet(appArgs, "appArgs")
assertFieldIsSet(environmentVariables, "environmentVariables")
assertPropertyIsSet("spark.app.name")
assertPropertyIsBoolean("spark.driver.supervise")
assertPropertyIsNumeric("spark.driver.cores")
assertPropertyIsNumeric("spark.cores.max")
assertPropertyIsMemory("spark.driver.memory")
assertPropertyIsMemory("spark.executor.memory")
assertPropertyIsBoolean(config.DRIVER_SUPERVISE.key)
assertPropertyIsNumeric(config.DRIVER_CORES.key)
assertPropertyIsNumeric(config.CORES_MAX.key)
assertPropertyIsMemory(config.DRIVER_MEMORY.key)
assertPropertyIsMemory(config.EXECUTOR_MEMORY.key)
}
private def assertPropertyIsSet(key: String): Unit =

View file

@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util._
@ -43,7 +43,7 @@ object DriverWrapper extends Logging {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val host: String = Utils.localHostName()
val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
logInfo(s"Driver address: ${rpcEnv.address}")
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
@ -51,7 +51,7 @@ object DriverWrapper extends Logging {
val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
if (sys.props.getOrElse(config.DRIVER_USER_CLASS_PATH_FIRST.key, "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)

View file

@ -39,7 +39,12 @@ package object config {
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
.doc("Number of cores to use for the driver process, only in cluster mode.")
.intConf
.createWithDefault(1)
private[spark] val DRIVER_MEMORY = ConfigBuilder(SparkLauncher.DRIVER_MEMORY)
.doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
@ -113,6 +118,9 @@ package object config {
private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_ID =
ConfigBuilder("spark.executor.id").stringConf.createOptional
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
@ -139,7 +147,11 @@ package object config {
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
private[spark] val EXECUTOR_CORES = ConfigBuilder(SparkLauncher.EXECUTOR_CORES)
.intConf
.createWithDefault(1)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder(SparkLauncher.EXECUTOR_MEMORY)
.doc("Amount of memory to use per executor process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
@ -150,6 +162,15 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createOptional
private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
.doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " +
"sharing mode, the maximum amount of CPU cores to request for the application from across " +
"the cluster (not from each machine). If not set, the default will be " +
"`spark.deploy.defaultCores` on Spark's standalone cluster manager, or infinite " +
"(all available cores) on Mesos.")
.intConf
.createOptional
private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
@ -347,6 +368,17 @@ package object config {
.stringConf
.createWithDefault(Utils.localCanonicalHostName())
private[spark] val DRIVER_PORT = ConfigBuilder("spark.driver.port")
.doc("Port of driver endpoints.")
.intConf
.createWithDefault(0)
private[spark] val DRIVER_SUPERVISE = ConfigBuilder("spark.driver.supervise")
.doc("If true, restarts the driver automatically if it fails with a non-zero exit status. " +
"Only has effect in Spark standalone mode or Mesos cluster deploy mode.")
.booleanConf
.createWithDefault(false)
private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.fallbackConf(DRIVER_HOST_ADDRESS)
@ -729,4 +761,23 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("")
private[spark] val EXECUTOR_LOGS_ROLLING_TIME_INTERVAL =
ConfigBuilder("spark.executor.logs.rolling.time.interval").stringConf.createWithDefault("daily")
private[spark] val EXECUTOR_LOGS_ROLLING_MAX_SIZE =
ConfigBuilder("spark.executor.logs.rolling.maxSize")
.stringConf
.createWithDefault((1024 * 1024).toString)
private[spark] val EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES =
ConfigBuilder("spark.executor.logs.rolling.maxRetainedFiles").intConf.createWithDefault(-1)
private[spark] val EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION =
ConfigBuilder("spark.executor.logs.rolling.enableCompression")
.booleanConf
.createWithDefault(false)
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockId
/**
@ -127,14 +128,14 @@ private[spark] object StaticMemoryManager {
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)

View file

@ -18,6 +18,7 @@
package org.apache.spark.memory
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockId
/**
@ -216,15 +217,15 @@ object UnifiedMemoryManager {
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory

View file

@ -130,7 +130,7 @@ private[spark] class MetricsSystem private (
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
val executorId = conf.getOption("spark.executor.id")
val executorId = conf.get(EXECUTOR_ID)
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {

View file

@ -717,7 +717,7 @@ private[spark] class TaskSetManager(
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)

View file

@ -25,7 +25,7 @@ import scala.concurrent.Future
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
@ -54,7 +54,7 @@ private[spark] class StandaloneSchedulerBackend(
private val registrationBarrier = new Semaphore(0)
private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
private val maxCores = conf.get(config.CORES_MAX)
private val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
@ -69,8 +69,8 @@ private[spark] class StandaloneSchedulerBackend(
// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
sc.conf.get(config.DRIVER_HOST_ADDRESS),
sc.conf.get(config.DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
@ -79,11 +79,11 @@ private[spark] class StandaloneSchedulerBackend(
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
@ -102,7 +102,7 @@ private[spark] class StandaloneSchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val coresPerExecutor = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
// If we're using dynamic allocation, set our initial executor limit to 0 for now.
// ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =

View file

@ -24,7 +24,7 @@ import java.nio.ByteBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
@ -116,7 +116,7 @@ private[spark] class LocalSchedulerBackend(
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Seq[URL] = {
val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
val userClassPathStr = conf.get(config.EXECUTOR_CLASS_PATH)
userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.util
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
private[spark] object RpcUtils {
@ -26,8 +27,8 @@ private[spark] object RpcUtils {
* Retrieve a `RpcEndpointRef` which is located in the driver via its name.
*/
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
val driverHost: String = conf.get(config.DRIVER_HOST_ADDRESS.key, "localhost")
val driverPort: Int = conf.getInt(config.DRIVER_PORT.key, 7077)
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

View file

@ -2231,7 +2231,7 @@ private[spark] object Utils extends Logging {
s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries (on a random free port)! " +
s"Consider explicitly setting the appropriate binding address for " +
s"the service$serviceString (for example spark.driver.bindAddress " +
s"the service$serviceString (for example ${DRIVER_BIND_ADDRESS.key} " +
s"for SparkDriver) to the correct binding address."
} else {
s"${e.getMessage}: Service$serviceString failed after " +

View file

@ -20,7 +20,7 @@ package org.apache.spark.util.logging
import java.io.{File, FileOutputStream, InputStream, IOException}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{IntParam, Utils}
/**
@ -115,11 +115,9 @@ private[spark] object FileAppender extends Logging {
/** Create the right appender based on Spark configuration */
def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
import RollingFileAppender._
val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
val rollingInterval = conf.get(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL)
def createTimeBasedAppender(): FileAppender = {
val validatedParams: Option[(Long, String)] = rollingInterval match {

View file

@ -24,6 +24,7 @@ import com.google.common.io.Files
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
/**
* Continuously appends data from input stream into the given file, and rolls
@ -44,10 +45,8 @@ private[spark] class RollingFileAppender(
bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
) extends FileAppender(inputStream, activeFile, bufferSize) {
import RollingFileAppender._
private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
private val maxRetainedFiles = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
private val enableCompression = conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
/** Stop the appender */
override def stop() {
@ -82,7 +81,7 @@ private[spark] class RollingFileAppender(
// Roll the log file and compress if enableCompression is true.
private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
if (enableCompression) {
val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
val gzFile = new File(rolloverFile.getAbsolutePath + RollingFileAppender.GZIP_LOG_SUFFIX)
var gzOutputStream: GZIPOutputStream = null
var inputStream: InputStream = null
try {
@ -103,7 +102,7 @@ private[spark] class RollingFileAppender(
// Check if the rollover file already exists.
private def rolloverFileExist(file: File): Boolean = {
file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
file.exists || new File(file.getAbsolutePath + RollingFileAppender.GZIP_LOG_SUFFIX).exists
}
/** Move the active log file to a new rollover file */
@ -164,15 +163,7 @@ private[spark] class RollingFileAppender(
* names of configurations that configure rolling file appenders.
*/
private[spark] object RollingFileAppender {
val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
val STRATEGY_DEFAULT = ""
val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
val INTERVAL_DEFAULT = "daily"
val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
val GZIP_LOG_SUFFIX = ".gz"

View file

@ -155,7 +155,7 @@ class ExecutorAllocationManagerSuite
.set("spark.dynamicAllocation.maxExecutors", "15")
.set("spark.dynamicAllocation.minExecutors", "3")
.set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
.set("spark.executor.cores", cores.toString)
.set(config.EXECUTOR_CORES, cores)
val sc = new SparkContext(conf)
contexts += sc
var manager = sc.executorAllocationManager.get

View file

@ -140,7 +140,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("creating SparkContext with cpus per tasks bigger than cores per executors") {
val conf = new SparkConf(false)
.set("spark.executor.cores", "1")
.set(EXECUTOR_CORES, 1)
.set("spark.task.cpus", "2")
intercept[SparkException] { sc = new SparkContext(conf) }
}

View file

@ -243,7 +243,7 @@ class StandaloneDynamicAllocationSuite
}
test("dynamic allocation with cores per executor") {
sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
sc = new SparkContext(appConf.set(config.EXECUTOR_CORES, 2))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@ -296,7 +296,7 @@ class StandaloneDynamicAllocationSuite
test("dynamic allocation with cores per executor AND max cores") {
sc = new SparkContext(appConf
.set("spark.executor.cores", "2")
.set(config.EXECUTOR_CORES, 2)
.set("spark.cores.max", "8"))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
@ -526,7 +526,7 @@ class StandaloneDynamicAllocationSuite
new SparkConf()
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set("spark.executor.memory", "256m")
.set(config.EXECUTOR_MEMORY.key, "256m")
}
/** Make a master to which our application will send executor requests. */

View file

@ -248,7 +248,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val mm = UnifiedMemoryManager(conf, numCores = 1)
// Try using an executor memory that's too small
val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString)
val conf2 = conf.clone().set(EXECUTOR_MEMORY.key, (reservedMemory / 2).toString)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}

View file

@ -655,7 +655,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
}
test("abort the job if total size of results is too large") {
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
val conf = new SparkConf().set(config.MAX_RESULT_SIZE.key, "2m")
sc = new SparkContext("local", "test", conf)
def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>

View file

@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
@ -86,7 +86,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.authenticate", "false")
conf.set("spark.driver.port", rpcEnv.address.port.toString)
conf.set(DRIVER_PORT, rpcEnv.address.port)
conf.set("spark.testing", "true")
conf.set("spark.memory.fraction", "1")
conf.set("spark.memory.storageFraction", "1")

View file

@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set("spark.storage.unrollMemoryThreshold", "512")
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
conf.set(DRIVER_PORT, rpcEnv.address.port)
// Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we
// need to create a SparkContext is to initialize LiveListenerBus.

View file

@ -34,7 +34,7 @@ import org.mockito.Mockito.{atLeast, mock, verify}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy}
class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
@ -136,7 +136,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
val conf = new SparkConf().set(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES, 10)
val appender = new RollingFileAppender(testInputStream, testFile,
new SizeBasedRollingPolicy(1000, false), conf, 10)
@ -200,13 +200,12 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender.awaitTermination()
}
import RollingFileAppender._
def rollingStrategy(strategy: String): Seq[(String, String)] =
Seq(STRATEGY_PROPERTY -> strategy)
def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY -> size)
Seq(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key -> strategy)
def rollingSize(size: String): Seq[(String, String)] =
Seq(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> size)
def rollingInterval(interval: String): Seq[(String, String)] =
Seq(INTERVAL_PROPERTY -> interval)
Seq(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL.key -> interval)
val msInDay = 24 * 60 * 60 * 1000L
val msInHour = 60 * 60 * 1000L

View file

@ -42,7 +42,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.getOrElse(throw new SparkException("Must specify the driver container image"))
// CPU settings
private val driverCpuCores = conf.get("spark.driver.cores", "1")
private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
// Memory settings
@ -85,7 +85,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}
val driverPort = conf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = conf.sparkConf.getInt(
DRIVER_BLOCK_MANAGER_PORT.key,
DEFAULT_BLOCKMANAGER_PORT

View file

@ -46,8 +46,8 @@ private[spark] class BasicExecutorFeatureStep(
private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
private val driverUrl = RpcEndpointAddress(
kubernetesConf.get("spark.driver.host"),
kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
kubernetesConf.get(DRIVER_HOST_ADDRESS),
kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
private val executorMemoryString = kubernetesConf.get(
@ -67,7 +67,7 @@ private[spark] class BasicExecutorFeatureStep(
executorMemoryWithOverhead
}
private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
private val executorCoresRequest =
if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get

View file

@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}
private[spark] class DriverServiceFeatureStep(
@ -51,18 +51,17 @@ private[spark] class DriverServiceFeatureStep(
}
private val driverPort = kubernetesConf.sparkConf.getInt(
"spark.driver.port", DEFAULT_DRIVER_PORT)
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
override def configurePod(pod: SparkPod): SparkPod = pod
override def getAdditionalPodSystemProperties(): Map[String, String] = {
val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace}.svc"
Map(DRIVER_HOST_KEY -> driverHostname,
"spark.driver.port" -> driverPort.toString,
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
driverBlockManagerPort.toString)
config.DRIVER_PORT.key -> driverPort.toString,
config.DRIVER_BLOCK_MANAGER_PORT.key -> driverBlockManagerPort.toString)
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
@ -90,8 +89,8 @@ private[spark] class DriverServiceFeatureStep(
}
private[spark] object DriverServiceFeatureStep {
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
val MAX_SERVICE_NAME_LENGTH = 63
}

View file

@ -46,7 +46,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
test("Check the pod respects all configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set("spark.driver.cores", "2")
.set(DRIVER_CORES, 2)
.set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
.set(DRIVER_MEMORY.key, "256M")
.set(DRIVER_MEMORY_OVERHEAD, 200L)

View file

@ -29,7 +29,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@ -74,8 +74,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
.set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
.set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
.set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
.set("spark.driver.port", DRIVER_PORT.toString)
.set(config.DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
.set(config.DRIVER_PORT, DRIVER_PORT)
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
.set("spark.kubernetes.resource.type", "java")
}
@ -125,8 +125,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
}
test("classpath and extra java options get translated into environment variables") {
baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
val step = new BasicExecutorFeatureStep(kconf, new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod())
@ -150,7 +150,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
test("auth secret propagation") {
val conf = baseConf.clone()
.set(NETWORK_AUTH_ENABLED, true)
.set(config.NETWORK_AUTH_ENABLED, true)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
@ -168,8 +168,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val secretFile = new File(secretDir, "secret-file.txt")
Files.write(secretFile.toPath, "some-secret".getBytes(StandardCharsets.UTF_8))
val conf = baseConf.clone()
.set(NETWORK_AUTH_ENABLED, true)
.set(AUTH_SECRET_FILE, secretFile.getAbsolutePath)
.set(config.NETWORK_AUTH_ENABLED, true)
.set(config.AUTH_SECRET_FILE, secretFile.getAbsolutePath)
.set("spark.master", "k8s://127.0.0.1")
val secMgr = new SecurityManager(conf)
secMgr.initializeAuth()

View file

@ -39,7 +39,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
test("Headless service has a port for the driver RPC and the block manager.") {
val sparkConf = new SparkConf(false)
.set("spark.driver.port", "9000")
.set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
@ -61,7 +61,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
test("Hostname and ports are set according to the service name.") {
val sparkConf = new SparkConf(false)
.set("spark.driver.port", "9000")
.set(DRIVER_PORT, 9000)
.set(DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val kconf = KubernetesTestConf.createDriverConf(
@ -87,7 +87,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
resolvedService)
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString)
assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString)
assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString)
}

View file

@ -129,4 +129,7 @@ package object config {
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")
private[spark] val EXECUTOR_URI =
ConfigBuilder("spark.executor.uri").stringConf.createOptional
}

View file

@ -27,6 +27,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.config
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.util.Utils
@ -92,12 +93,12 @@ private[mesos] class MesosSubmitRequestServlet(
// Optional fields
val sparkProperties = request.sparkProperties
val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
val superviseDriver = sparkProperties.get("spark.driver.supervise")
val driverMemory = sparkProperties.get("spark.driver.memory")
val driverCores = sparkProperties.get("spark.driver.cores")
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
// Construct driver description

View file

@ -32,6 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
@ -365,8 +366,7 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
desc.conf.getOption("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
desc.conf.get(config.EXECUTOR_URI).orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
@ -474,7 +474,7 @@ private[spark] class MesosClusterScheduler(
} else if (executorUri.isDefined) {
val folderBasename = executorUri.get.split('/').last.split('.').head
val entries = conf.getOption("spark.executor.extraLibraryPath")
val entries = conf.get(EXECUTOR_LIBRARY_PATH)
.map(path => Seq(path) ++ desc.command.libraryPathEntries)
.getOrElse(desc.command.libraryPathEntries)
@ -528,10 +528,10 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}
desc.conf.getOption("spark.executor.memory").foreach { v =>
desc.conf.getOption(EXECUTOR_MEMORY.key).foreach { v =>
options ++= Seq("--executor-memory", v)
}
desc.conf.getOption("spark.cores.max").foreach { v =>
desc.conf.getOption(CORES_MAX.key).foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>

View file

@ -33,7 +33,6 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@ -63,9 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
private val maxCoresOption = conf.get(config.CORES_MAX)
private val executorCoresOption = conf.getOption("spark.executor.cores").map(_.toInt)
private val executorCoresOption = conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
private val minCoresPerExecutor = executorCoresOption.getOrElse(1)
@ -220,18 +219,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
val extraClassPath = conf.get(config.EXECUTOR_CLASS_PATH)
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map {
val extraJavaOpts = conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
Utils.substituteAppNExecIds(_, appId, taskId)
}.getOrElse("")
// Set the environment variable through a command prefix
// to append to the existing value of the variable
val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
val prefixEnv = conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")
@ -261,8 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
if (uri.isEmpty) {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
@ -304,8 +302,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
"driverURL"
} else {
RpcEndpointAddress(
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
conf.get(config.DRIVER_HOST_ADDRESS),
conf.get(config.DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
}
}
@ -633,7 +631,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
slave.shuffleRegistered = true
}

View file

@ -28,8 +28,9 @@ import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config
import org.apache.spark.deploy.mesos.config.EXECUTOR_URI
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.internal.config
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
@ -107,15 +108,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map {
val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
Utils.substituteAppNExecIds(_, appId, execId)
}.getOrElse("")
val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
val prefixEnv = sc.conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
Utils.libraryPathEnvPrefix(Seq(p))
}.getOrElse("")
@ -132,8 +133,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val uri = sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri.isEmpty) {

View file

@ -470,8 +470,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef(
@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
amCores, true)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))
registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS))
// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.

View file

@ -224,16 +224,12 @@ package object config {
/* Driver configuration. */
private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
.intConf
.createWithDefault(1)
private[spark] val DRIVER_APP_UI_ADDRESS = ConfigBuilder("spark.driver.appUIAddress")
.stringConf
.createOptional
/* Executor configuration. */
private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
.intConf
.createWithDefault(1)
private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
.doc("Node label expression for executors.")

View file

@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
@ -42,10 +42,10 @@ private[spark] class YarnClientSchedulerBackend(
* This waits until the application is running.
*/
override def start() {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val driverHost = conf.get(config.DRIVER_HOST_ADDRESS)
val driverPort = conf.get(config.DRIVER_PORT)
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
sc.ui.foreach { ui => conf.set(DRIVER_APP_UI_ADDRESS, ui.webUrl) }
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)

View file

@ -24,7 +24,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY}
class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {