[SPARK-26843][MESOS] Use ConfigEntry for hardcoded configs for "mesos" resource manager

## What changes were proposed in this pull request?

This patch makes hardcoded configs in "mesos" module to use ConfigEntry, avoiding issues on mistake like SPARK-26082.

Please note that there're some changes on type while migrating to ConfigEntry: specifically "comma-separated list on a string" becomes "sequence of strings". While SparkConf smoothly handles on the change (comma-separated list on a string is still supported so backward compatible), there're some methods in utility class (`mesos` package private) to depend on the type change, so this patch also modifies the method signature for them a bit.

## How was this patch tested?

Existing tests.

Closes #23743 from HeartSaVioR/SPARK-26843.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-02-10 16:34:33 -08:00 committed by Dongjoon Hyun
parent bb985586f2
commit af4c59c0fb
14 changed files with 403 additions and 169 deletions

View file

@ -56,6 +56,31 @@ package object config {
.createOptional
}
private[spark] val CREDENTIAL_PRINCIPAL =
ConfigBuilder("spark.mesos.principal")
.doc("Name of the Kerberos principal to authenticate Spark to Mesos.")
.stringConf
.createOptional
private[spark] val CREDENTIAL_PRINCIPAL_FILE =
ConfigBuilder("spark.mesos.principal.file")
.doc("The path of file which contains the name of the Kerberos principal " +
"to authenticate Spark to Mesos.")
.stringConf
.createOptional
private[spark] val CREDENTIAL_SECRET =
ConfigBuilder("spark.mesos.secret")
.doc("The secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional
private[spark] val CREDENTIAL_SECRET_FILE =
ConfigBuilder("spark.mesos.secret.file")
.doc("The path of file which contains the secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional
/* Common app configuration. */
private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
@ -85,6 +110,13 @@ package object config {
.stringConf
.createOptional
private[spark] val DRIVER_WEBUI_URL =
ConfigBuilder("spark.mesos.driver.webui.url")
.doc("Set the Spark Mesos driver webui_url for interacting with the framework. " +
"If unset it will point to Spark's internal web UI.")
.stringConf
.createOptional
private[spark] val driverSecretConfig = new MesosSecretConfig("driver")
private[spark] val executorSecretConfig = new MesosSecretConfig("executor")
@ -118,6 +150,211 @@ package object config {
.stringConf
.createWithDefault("")
private[spark] val DRIVER_FRAMEWORK_ID =
ConfigBuilder("spark.mesos.driver.frameworkId")
.stringConf
.createOptional
private[spark] val EXECUTOR_URI =
ConfigBuilder("spark.executor.uri").stringConf.createOptional
private[spark] val PROXY_BASE_URL =
ConfigBuilder("spark.mesos.proxy.baseURL").stringConf.createOptional
private[spark] val COARSE_MODE =
ConfigBuilder("spark.mesos.coarse")
.doc("If set to true, runs over Mesos clusters in \"coarse-grained\" sharing mode, where " +
"Spark acquires one long-lived Mesos task on each machine. If set to false, runs over " +
"Mesos cluster in \"fine-grained\" sharing mode, where one Mesos task is created per " +
"Spark task.")
.booleanConf.createWithDefault(true)
private[spark] val COARSE_SHUTDOWN_TIMEOUT =
ConfigBuilder("spark.mesos.coarse.shutdownTimeout")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ >= 0, s"spark.mesos.coarse.shutdownTimeout must be >= 0")
.createWithDefaultString("10s")
private[spark] val MAX_DRIVERS =
ConfigBuilder("spark.mesos.maxDrivers").intConf.createWithDefault(200)
private[spark] val RETAINED_DRIVERS =
ConfigBuilder("spark.mesos.retainedDrivers").intConf.createWithDefault(200)
private[spark] val CLUSTER_RETRY_WAIT_MAX_SECONDS =
ConfigBuilder("spark.mesos.cluster.retry.wait.max")
.intConf
.createWithDefault(60) // 1 minute
private[spark] val ENABLE_FETCHER_CACHE =
ConfigBuilder("spark.mesos.fetcherCache.enable")
.doc("If set to true, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be " +
"cached by the Mesos Fetcher Cache.")
.booleanConf
.createWithDefault(false)
private[spark] val APP_JAR_LOCAL_RESOLUTION_MODE =
ConfigBuilder("spark.mesos.appJar.local.resolution.mode")
.doc("Provides support for the `local:///` scheme to reference the app jar resource in " +
"cluster mode. If user uses a local resource (`local:///path/to/jar`) and the config " +
"option is not used it defaults to `host` eg. the mesos fetcher tries to get the " +
"resource from the host's file system. If the value is unknown it prints a warning msg " +
"in the dispatcher logs and defaults to `host`. If the value is `container` then spark " +
"submit in the container will use the jar in the container's path: `/path/to/jar`.")
.stringConf
.checkValues(Set("host", "container"))
.createWithDefault("host")
private[spark] val REJECT_OFFER_DURATION =
ConfigBuilder("spark.mesos.rejectOfferDuration")
.doc("Time to consider unused resources refused, serves as a fallback of " +
"`spark.mesos.rejectOfferDurationForUnmetConstraints`, " +
"`spark.mesos.rejectOfferDurationForReachedMaxCores`.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
private[spark] val REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS =
ConfigBuilder("spark.mesos.rejectOfferDurationForUnmetConstraints")
.doc("Time to consider unused resources refused with unmet constraints.")
.timeConf(TimeUnit.SECONDS)
.createOptional
private[spark] val REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES =
ConfigBuilder("spark.mesos.rejectOfferDurationForReachedMaxCores")
.doc("Time to consider unused resources refused when maximum number of cores " +
"`spark.cores.max` is reached.")
.timeConf(TimeUnit.SECONDS)
.createOptional
private[spark] val URIS_TO_DOWNLOAD =
ConfigBuilder("spark.mesos.uris")
.doc("A comma-separated list of URIs to be downloaded to the sandbox when driver or " +
"executor is launched by Mesos. This applies to both coarse-grained and fine-grained " +
"mode.")
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val EXECUTOR_HOME =
ConfigBuilder("spark.mesos.executor.home")
.doc("Set the directory in which Spark is installed on the executors in Mesos. " +
"By default, the executors will simply use the driver's Spark home directory, which may " +
"not be visible to them. Note that this is only relevant if a Spark binary package is " +
"not specified through `spark.executor.uri`.")
.stringConf
.createOptional
private[spark] val EXECUTOR_CORES =
ConfigBuilder("spark.mesos.mesosExecutor.cores")
.doc("(Fine-grained mode only) Number of cores to give each Mesos executor. This does not " +
"include the cores used to run the Spark tasks. In other words, even if no Spark task " +
"is being run, each Mesos executor will occupy the number of cores configured here. " +
"The value can be a floating point number.")
.doubleConf
.createWithDefault(1.0)
private[spark] val EXTRA_CORES_PER_EXECUTOR =
ConfigBuilder("spark.mesos.extra.cores")
.doc("Set the extra number of cores for an executor to advertise. This does not result in " +
"more cores allocated. It instead means that an executor will \"pretend\" it has more " +
"cores, so that the driver will send it more tasks. Use this to increase parallelism. " +
"This setting is only used for Mesos coarse-grained mode.")
.intConf
.createWithDefault(0)
private[spark] val EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.mesos.executor.memoryOverhead")
.doc("The amount of additional memory, specified in MiB, to be allocated per executor. " +
"By default, the overhead will be larger of either 384 or 10% of " +
"`spark.executor.memory`. If set, the final overhead will be this value.")
.intConf
.createOptional
private[spark] val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.image")
.doc("Set the name of the docker image that the Spark executors will run in. The selected " +
"image must have Spark installed, as well as a compatible version of the Mesos library. " +
"The installed path of Spark in the image can be specified with " +
"`spark.mesos.executor.home`; the installed path of the Mesos library can be specified " +
"with `spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY`.")
.stringConf
.createOptional
private[spark] val EXECUTOR_DOCKER_FORCE_PULL_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.forcePullImage")
.doc("Force Mesos agents to pull the image specified in " +
"`spark.mesos.executor.docker.image`. By default Mesos agents will not pull images they " +
"already have cached.")
.booleanConf
.createOptional
private[spark] val EXECUTOR_DOCKER_PORT_MAPS =
ConfigBuilder("spark.mesos.executor.docker.portmaps")
.stringConf
.toSequence
.createOptional
private[spark] val EXECUTOR_DOCKER_PARAMETERS =
ConfigBuilder("spark.mesos.executor.docker.parameters")
.doc("Set the list of custom parameters which will be passed into the `docker run` " +
"command when launching the Spark executor on Mesos using the docker containerizer. " +
"The format of this property is a list of key/value pairs which pair looks key1=value1.")
.stringConf
.toSequence
.createOptional
private[spark] val EXECUTOR_DOCKER_VOLUMES =
ConfigBuilder("spark.mesos.executor.docker.volumes")
.doc("Set the list of volumes which will be mounted into the Docker image, which was set " +
"using `spark.mesos.executor.docker.image`. The format of this property is a list of " +
"mappings following the form passed to `docker run -v`. That is they take the form: " +
"`[host_path:]container_path[:ro|:rw]`")
.stringConf
.toSequence
.createOptional
private[spark] val MAX_GPUS =
ConfigBuilder("spark.mesos.gpus.max")
.doc("Set the maximum number GPU resources to acquire for this job. Note that executors " +
"will still launch when no GPU resources are found since this configuration is just an " +
"upper limit and not a guaranteed amount.")
.intConf
.createWithDefault(0)
private[spark] val TASK_LABELS =
ConfigBuilder("spark.mesos.task.labels")
.doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " +
"Key-value pairs should be separated by a colon, and commas used to list more than one. " +
"If your label includes a colon or comma, you can escape it with a backslash. " +
"Ex. key:value,key2:a\\:b.")
.stringConf
.createWithDefault("")
private[spark] val CONSTRAINTS =
ConfigBuilder("spark.mesos.constraints")
.doc("Attribute-based constraints on mesos resource offers. By default, all resource " +
"offers will be accepted. This setting applies only to executors. Refer to Mesos " +
"Attributes & Resources doc for more information on attributes.")
.stringConf
.createWithDefault("")
private[spark] val CONTAINERIZER =
ConfigBuilder("spark.mesos.containerizer")
.doc("This only affects docker containers, and must be one of \"docker\" or \"mesos\". " +
"Mesos supports two types of containerizers for docker: the \"docker\" containerizer, " +
"and the preferred \"mesos\" containerizer. " +
"Read more here: http://mesos.apache.org/documentation/latest/container-image/")
.stringConf
.checkValues(Set("docker", "mesos"))
.createWithDefault("docker")
private[spark] val ROLE =
ConfigBuilder("spark.mesos.role")
.doc("Set the role of this Spark framework for Mesos. Roles are used in Mesos for " +
"reservations and resource weight sharing.")
.stringConf
.createOptional
private[spark] val DRIVER_ENV_PREFIX = "spark.mesos.driverEnv."
private[spark] val DISPATCHER_DRIVER_DEFAULT_PREFIX = "spark.mesos.dispatcher.driverDefault."
}

View file

@ -77,7 +77,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId
val proxy = parent.conf.getOption("spark.mesos.proxy.baseURL")
val proxy = parent.conf.get(PROXY_BASE_URL)
val sandboxCol = if (proxy.isDefined) {
val clusterSchedulerId = parent.scheduler.getSchedulerState().frameworkId

View file

@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark.SparkContext
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
@ -42,7 +43,7 @@ private[spark] class MesosClusterManager extends ExternalClusterManager {
"I/O encryption is currently not supported in Mesos.")
val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val coarse = sc.conf.get(COARSE_MODE)
if (coarse) {
new MesosCoarseGrainedSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],

View file

@ -127,10 +127,10 @@ private[spark] class MesosClusterScheduler(
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
private val master = conf.get("spark.master")
private val appName = conf.get("spark.app.name")
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val useFetchCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val queuedCapacity = conf.get(config.MAX_DRIVERS)
private val retainedDrivers = conf.get(config.RETAINED_DRIVERS)
private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS)
private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
@ -390,10 +390,10 @@ private[spark] class MesosClusterScheduler(
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}"
)
val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val env = desc.conf.getAllWithPrefix(config.DRIVER_ENV_PREFIX) ++ commandEnv
val envBuilder = Environment.newBuilder()
@ -419,22 +419,17 @@ private[spark] class MesosClusterScheduler(
private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = {
val isLocalJar = desc.jarUrl.startsWith("local://")
val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists {
val isContainerLocal = desc.conf.get(config.APP_JAR_LOCAL_RESOLUTION_MODE) match {
case "container" => true
case "host" => false
case other =>
logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.")
false
}
isLocalJar && isContainerLocal
}
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(SUBMIT_PYTHON_FILES)).toList
if (isContainerLocalAppJar(desc)) {
(confUris ++ getDriverExecutorURI(desc).toList).map(uri =>
@ -464,7 +459,7 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val dockerDefined = desc.conf.contains(config.EXECUTOR_DOCKER_IMAGE)
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
@ -484,11 +479,11 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
val executorSparkHome = desc.conf.get(config.EXECUTOR_HOME)
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `${config.EXECUTOR_HOME}` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
// Sandbox points to the current directory by default with Mesos.
@ -547,7 +542,7 @@ private[spark] class MesosClusterScheduler(
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap
val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.toMap

View file

@ -76,15 +76,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cores - (cores % minCoresPerExecutor)
}
private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE)
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private val maxGpus = conf.get(MAX_GPUS)
private val taskLabels = conf.get("spark.mesos.task.labels", "")
private val taskLabels = conf.get(TASK_LABELS)
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
private[this] val shutdownTimeoutMS = conf.get(COARSE_SHUTDOWN_TIMEOUT)
// Synchronization protected by stateLock
private[this] var stopCalled: Boolean = false
@ -144,11 +142,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
private val extraCoresPerExecutor = conf.get(EXTRA_CORES_PER_EXECUTOR)
// Offer constraints
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
parseConstraintString(sc.conf.get(CONSTRAINTS))
// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
@ -208,10 +206,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
sc.conf.get(DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)),
None,
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
sc.conf.get(DRIVER_FRAMEWORK_ID).map(_ + suffix)
)
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
@ -264,10 +262,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
if (uri.isEmpty) {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
val executorSparkHome = conf.get(EXECUTOR_HOME)
.orElse(sc.getSparkHome())
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!")
}
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
command.setValue(
@ -293,7 +291,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}
conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))
setupUris(conf.get(URIS_TO_DOWNLOAD), command, useFetcherCache)
command.build()
}

View file

@ -28,7 +28,7 @@ import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config.EXECUTOR_URI
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.internal.config
import org.apache.spark.scheduler._
@ -60,11 +60,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
private[mesos] val mesosExecutorCores = sc.conf.get(mesosConfig.EXECUTOR_CORES)
// Offer constraints
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
parseConstraintString(sc.conf.get(mesosConfig.CONSTRAINTS))
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
@ -82,10 +82,10 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
sc.conf.get(mesosConfig.DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.driver.frameworkId")
sc.conf.get(mesosConfig.DRIVER_FRAMEWORK_ID)
)
unsetFrameworkID(sc)
@ -102,10 +102,10 @@ private[spark] class MesosFineGrainedSchedulerBackend(
def createExecutorInfo(
availableResources: JList[Resource],
execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
val executorSparkHome = sc.conf.get(mesosConfig.EXECUTOR_HOME)
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `${mesosConfig.EXECUTOR_HOME}` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp =>
@ -133,7 +133,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val uri = sc.conf.get(mesosConfig.EXECUTOR_URI)
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
val executorBackendName = classOf[MesosExecutorBackend].getName
if (uri.isEmpty) {
@ -155,7 +156,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
builder.addAllResources(usedCpuResources.asJava)
builder.addAllResources(usedMemResources.asJava)
sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))
setupUris(sc.conf.get(mesosConfig.URIS_TO_DOWNLOAD), command)
val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())

View file

@ -24,7 +24,7 @@ import org.apache.mesos.protobuf.ByteString
import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.mesos.config.MesosSecretConfig
import org.apache.spark.internal.Logging
@ -34,11 +34,11 @@ import org.apache.spark.internal.Logging
*/
private[mesos] object MesosSchedulerBackendUtil extends Logging {
/**
* Parse a comma-delimited list of volume specs, each of which
* Parse a list of volume specs, each of which
* takes the form [host-dir:]container-dir[:rw|:ro].
*/
def parseVolumesSpec(volumes: String): List[Volume] = {
volumes.split(",").map(_.split(":")).flatMap { spec =>
def parseVolumesSpec(volumes: Seq[String]): List[Volume] = {
volumes.map(_.split(":")).flatMap { spec =>
val vol: Volume.Builder = Volume
.newBuilder()
.setMode(Volume.Mode.RW)
@ -71,7 +71,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
* Parse a comma-delimited list of port mapping specs, each of which
* Parse a list of port mapping specs, each of which
* takes the form host_port:container_port[:udp|:tcp]
*
* Note:
@ -81,8 +81,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
* anticipates the expansion of the docker form to allow for a protocol
* and leaves open the chance for mesos to begin to accept an 'ip' field
*/
def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping] = {
portmaps.map(_.split(":")).flatMap { spec: Array[String] =>
val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping
.newBuilder()
.setProtocol("tcp")
@ -108,10 +108,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
* Parse a list of docker parameters, each of which
* takes the form key=value
*/
private def parseParamsSpec(params: String): List[Parameter] = {
private def parseParamsSpec(params: Seq[String]): List[Parameter] = {
// split with limit of 2 to avoid parsing error when '='
// exists in the parameter value
params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] =>
params.map(_.split("=", 2)).flatMap { spec: Array[String] =>
val param: Parameter.Builder = Parameter.newBuilder()
spec match {
case Array(key, value) =>
@ -127,8 +127,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder = {
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
conf.get("spark.mesos.containerizer", "docker") == "docker") {
val containerType = if (conf.contains(EXECUTOR_DOCKER_IMAGE) &&
conf.get(CONTAINERIZER) == "docker") {
ContainerInfo.Type.DOCKER
} else {
ContainerInfo.Type.MESOS
@ -137,18 +137,17 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
val containerInfo = ContainerInfo.newBuilder()
.setType(containerType)
conf.getOption("spark.mesos.executor.docker.image").map { image =>
conf.get(EXECUTOR_DOCKER_IMAGE).map { image =>
val forcePullImage = conf
.getOption("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
.get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true)
val portMaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
.get(EXECUTOR_DOCKER_PORT_MAPS)
.map(parsePortMappingsSpec)
.getOrElse(List.empty)
val params = conf
.getOption("spark.mesos.executor.docker.parameters")
.get(EXECUTOR_DOCKER_PARAMETERS)
.map(parseParamsSpec)
.getOrElse(List.empty)
@ -159,7 +158,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
.get(EXECUTOR_DOCKER_VOLUMES)
.map(parseVolumesSpec)
volumes.foreach(_.foreach(containerInfo.addVolumes(_)))

View file

@ -36,6 +36,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{Status => _, _}
import org.apache.spark.util.Utils
@ -83,10 +84,10 @@ trait MesosSchedulerUtils extends Logging {
fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
}
conf.getOption("spark.mesos.role").foreach { role =>
conf.get(mesosConfig.ROLE).foreach { role =>
fwInfoBuilder.setRole(role)
}
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
val maxGpus = conf.get(mesosConfig.MAX_GPUS)
if (maxGpus > 0) {
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
}
@ -103,10 +104,10 @@ trait MesosSchedulerUtils extends Logging {
conf: SparkConf,
fwInfoBuilder: Protos.FrameworkInfo.Builder): Protos.Credential.Builder = {
val credBuilder = Credential.newBuilder()
conf.getOption("spark.mesos.principal")
conf.get(mesosConfig.CREDENTIAL_PRINCIPAL)
.orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL")))
.orElse(
conf.getOption("spark.mesos.principal.file")
conf.get(mesosConfig.CREDENTIAL_PRINCIPAL_FILE)
.orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL_FILE")))
.map { principalFile =>
Files.toString(new File(principalFile), StandardCharsets.UTF_8)
@ -115,10 +116,10 @@ trait MesosSchedulerUtils extends Logging {
fwInfoBuilder.setPrincipal(principal)
credBuilder.setPrincipal(principal)
}
conf.getOption("spark.mesos.secret")
conf.get(mesosConfig.CREDENTIAL_SECRET)
.orElse(Option(conf.getenv("SPARK_MESOS_SECRET")))
.orElse(
conf.getOption("spark.mesos.secret.file")
conf.get(mesosConfig.CREDENTIAL_SECRET_FILE)
.orElse(Option(conf.getenv("SPARK_MESOS_SECRET_FILE")))
.map { secretFile =>
Files.toString(new File(secretFile), StandardCharsets.UTF_8)
@ -128,7 +129,8 @@ trait MesosSchedulerUtils extends Logging {
}
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(
"spark.mesos.principal must be configured when spark.mesos.secret is set")
s"${mesosConfig.CREDENTIAL_PRINCIPAL} must be configured when " +
s"${mesosConfig.CREDENTIAL_SECRET} is set")
}
credBuilder
}
@ -399,37 +401,31 @@ trait MesosSchedulerUtils extends Logging {
* (whichever is larger)
*/
def executorMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
}
def setupUris(uris: String,
def setupUris(uris: Seq[String],
builder: CommandInfo.Builder,
useFetcherCache: Boolean = false): Unit = {
uris.split(",").foreach { uri =>
uris.foreach { uri =>
builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
}
}
private def getRejectOfferDurationStr(conf: SparkConf): String = {
conf.get("spark.mesos.rejectOfferDuration", "120s")
}
protected def getRejectOfferDuration(conf: SparkConf): Long = {
Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf))
conf.get(mesosConfig.REJECT_OFFER_DURATION)
}
protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = {
conf.getTimeAsSeconds(
"spark.mesos.rejectOfferDurationForUnmetConstraints",
getRejectOfferDurationStr(conf))
conf.get(mesosConfig.REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS)
.getOrElse(getRejectOfferDuration(conf))
}
protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = {
conf.getTimeAsSeconds(
"spark.mesos.rejectOfferDurationForReachedMaxCores",
getRejectOfferDurationStr(conf))
conf.get(mesosConfig.REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES)
.getOrElse(getRejectOfferDuration(conf))
}
/**
@ -558,8 +554,8 @@ trait MesosSchedulerUtils extends Logging {
* framework ID, the driver calls this method after the first registration.
*/
def unsetFrameworkID(sc: SparkContext) {
sc.conf.remove("spark.mesos.driver.frameworkId")
System.clearProperty("spark.mesos.driver.frameworkId")
sc.conf.remove(mesosConfig.DRIVER_FRAMEWORK_ID)
System.clearProperty(mesosConfig.DRIVER_FRAMEWORK_ID.key)
}
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {

View file

@ -18,11 +18,12 @@
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark._
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.internal.config._
class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
val conf = new SparkConf().set(mesosConfig.COARSE_MODE, coarse)
sc = new SparkContext("local", "test", conf)
val clusterManager = new MesosClusterManager()

View file

@ -105,7 +105,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")),
"s1",
new Date()))
assert(response.success)
@ -209,9 +209,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test",
"spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
config.DRIVER_ENV_PREFIX + "TEST_ENV" -> "TEST_VAL"),
"s1",
new Date()))
assert(response.success)
@ -233,10 +233,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
config.NETWORK_NAME.key -> "test-network-name",
config.NETWORK_LABELS.key -> "key1:val1,key2:val2"),
"s1",
new Date()))
@ -256,7 +256,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("supports setting fetcher cache") {
setScheduler(Map("spark.mesos.fetcherCache.enable" -> "true"))
setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "true"))
val mem = 1000
val cpu = 1
@ -264,7 +264,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
@ -280,7 +280,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("supports disabling fetcher cache") {
setScheduler(Map("spark.mesos.fetcherCache.enable" -> "false"))
setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "false"))
val mem = 1000
val cpu = 1
@ -288,7 +288,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
@ -321,7 +321,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test",
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
"s1",
@ -359,9 +359,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
config.DRIVER_LABELS.key -> "key:value"),
"s1",
new Date()))
@ -385,7 +385,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date()))
Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date()))
assert(response.success)
val slaveId = SlaveID.newBuilder().setValue("s1").build()
val offer = Offer.newBuilder()
@ -471,7 +471,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
cpu,
true,
command,
Map("spark.mesos.executor.home" -> "test",
Map(config.EXECUTOR_HOME.key -> "test",
"spark.app.name" -> "test") ++
addlSparkConfVars,
"s1",

View file

@ -31,7 +31,7 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.mockito.MockitoSugar
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
@ -59,8 +59,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports killing and limiting executors") {
setBackend()
sparkConf.set("spark.driver.host", "driverHost")
sparkConf.set("spark.driver.port", "1234")
sparkConf.set(DRIVER_HOST_ADDRESS, "driverHost")
sparkConf.set(DRIVER_PORT, 1234)
val minMem = backend.executorMemory(sc)
val minCpu = 4
@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports spark.executor.cores") {
val executorCores = 4
setBackend(Map("spark.executor.cores" -> executorCores.toString))
setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString))
val executorMemory = backend.executorMemory(sc)
val offers = List(Resources(executorMemory * 2, executorCores + 1))
@ -138,7 +138,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
setBackend(Map(CORES_MAX.key -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, maxCores + 1)))
@ -166,7 +166,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos does not acquire more than spark.mesos.gpus.max") {
val maxGpus = 5
setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
setBackend(Map(mesosConfig.MAX_GPUS.key -> maxGpus.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
@ -180,14 +180,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos declines offers that violate attribute constraints") {
setBackend(Map("spark.mesos.constraints" -> "x:true"))
setBackend(Map(mesosConfig.CONSTRAINTS.key -> "x:true"))
offerResources(List(Resources(backend.executorMemory(sc), 4)))
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}
test("mesos declines offers with a filter when reached spark.cores.max") {
val maxCores = 3
setBackend(Map("spark.cores.max" -> maxCores.toString))
setBackend(Map(CORES_MAX.key -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(
@ -202,8 +202,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val maxCores = 4
val executorCores = 3
setBackend(Map(
"spark.cores.max" -> maxCores.toString,
"spark.executor.cores" -> executorCores.toString
CORES_MAX.key -> maxCores.toString,
EXECUTOR_CORES.key -> executorCores.toString
))
val executorMemory = backend.executorMemory(sc)
offerResources(List(
@ -218,8 +218,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val maxCores = 4
val executorCores = 2
setBackend(Map(
"spark.cores.max" -> maxCores.toString,
"spark.executor.cores" -> executorCores.toString
CORES_MAX.key -> maxCores.toString,
EXECUTOR_CORES.key -> executorCores.toString
))
val executorMemory = backend.executorMemory(sc)
offerResources(List(
@ -235,8 +235,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos assigns tasks round-robin on offers") {
val executorCores = 4
val maxCores = executorCores * 2
setBackend(Map("spark.executor.cores" -> executorCores.toString,
"spark.cores.max" -> maxCores.toString))
setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString,
CORES_MAX.key -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(
@ -249,7 +249,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos creates multiple executors on a single slave") {
val executorCores = 4
setBackend(Map("spark.executor.cores" -> executorCores.toString))
setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString))
// offer with room for two executors
val executorMemory = backend.executorMemory(sc)
@ -370,7 +370,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("failover timeout is set in created scheduler driver") {
val failoverTimeoutIn = 3600.0
initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
initializeSparkConf(Map(mesosConfig.DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
sc = new SparkContext(sparkConf)
val taskScheduler = mock[TaskSchedulerImpl]
@ -404,7 +404,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
setBackend(Map(mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "test"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -417,8 +417,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("honors spark.mesos.containerizer=\"mesos\"") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "test",
"spark.mesos.containerizer" -> "mesos"))
mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "test",
mesosConfig.CONTAINERIZER.key -> "mesos"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -431,10 +431,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",
"spark.mesos.executor.docker.forcePullImage" -> "true",
"spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
"spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "some_image",
mesosConfig.EXECUTOR_DOCKER_FORCE_PULL_IMAGE.key -> "true",
mesosConfig.EXECUTOR_DOCKER_VOLUMES.key -> "/host_vol:/container_vol:ro",
mesosConfig.EXECUTOR_DOCKER_PORT_MAPS.key -> "8080:80:tcp"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -469,7 +469,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("force-pull-image option is disabled by default") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image"
mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "some_image"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -492,7 +492,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports spark.executor.uri") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.executor.uri" -> url
mesosConfig.EXECUTOR_URI.key -> url
), null)
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -507,8 +507,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports setting fetcher cache") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
mesosConfig.ENABLE_FETCHER_CACHE.key -> "true",
mesosConfig.EXECUTOR_URI.key -> url
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
@ -521,8 +521,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports disabling fetcher cache") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
mesosConfig.ENABLE_FETCHER_CACHE.key -> "false",
mesosConfig.EXECUTOR_URI.key -> url
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
@ -546,7 +546,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
mesosConfig.TASK_LABELS.key -> taskLabelsString
))
// Build up the labels
@ -568,8 +568,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"
mesosConfig.NETWORK_NAME.key -> "test-network-name",
mesosConfig.NETWORK_LABELS.key -> "key1:val1,key2:val2"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -590,7 +590,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("supports spark.scheduler.minRegisteredResourcesRatio") {
val expectedCores = 1
setBackend(Map(
"spark.cores.max" -> expectedCores.toString,
CORES_MAX.key -> expectedCores.toString,
SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO.key -> "1.0"))
val offers = List(Resources(backend.executorMemory(sc), expectedCores))
@ -606,7 +606,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
DYN_ALLOCATION_ENABLED.key -> "true",
DYN_ALLOCATION_TESTING.key -> "true",
"spark.locality.wait" -> "1s"))
LOCALITY_WAIT.key -> "1s"))
assert(backend.getExecutorIds().isEmpty)
@ -652,22 +652,26 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("Creates an env-based reference secrets.") {
val launchedTasks = launchExecutorTasks(configEnvBasedRefSecrets(executorSecretConfig))
val launchedTasks = launchExecutorTasks(
configEnvBasedRefSecrets(mesosConfig.executorSecretConfig))
verifyEnvBasedRefSecrets(launchedTasks)
}
test("Creates an env-based value secrets.") {
val launchedTasks = launchExecutorTasks(configEnvBasedValueSecrets(executorSecretConfig))
val launchedTasks = launchExecutorTasks(
configEnvBasedValueSecrets(mesosConfig.executorSecretConfig))
verifyEnvBasedValueSecrets(launchedTasks)
}
test("Creates file-based reference secrets.") {
val launchedTasks = launchExecutorTasks(configFileBasedRefSecrets(executorSecretConfig))
val launchedTasks = launchExecutorTasks(
configFileBasedRefSecrets(mesosConfig.executorSecretConfig))
verifyFileBasedRefSecrets(launchedTasks)
}
test("Creates a file-based value secrets.") {
val launchedTasks = launchExecutorTasks(configFileBasedValueSecrets(executorSecretConfig))
val launchedTasks = launchExecutorTasks(
configFileBasedValueSecrets(mesosConfig.executorSecretConfig))
verifyFileBasedValueSecrets(launchedTasks)
}
@ -770,7 +774,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.set("spark.mesos.driver.webui.url", "http://webui")
.set(mesosConfig.DRIVER_WEBUI_URL, "http://webui")
if (home != null) {
sparkConf.setSparkHome(home)

View file

@ -36,6 +36,7 @@ import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
TaskDescription, TaskSchedulerImpl, WorkerOffer}
@ -46,7 +47,7 @@ class MesosFineGrainedSchedulerBackendSuite
test("weburi is set in created scheduler driver") {
val conf = new SparkConf
conf.set("spark.mesos.driver.webui.url", "http://webui")
conf.set(DRIVER_WEBUI_URL, "http://webui")
conf.set("spark.app.name", "name1")
val sc = mock[SparkContext]
@ -80,9 +81,9 @@ class MesosFineGrainedSchedulerBackendSuite
}
test("Use configured mesosExecutor.cores for ExecutorInfo") {
val mesosExecutorCores = 3
val mesosExecutorCores = 3.0
val conf = new SparkConf
conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
conf.set(EXECUTOR_CORES, mesosExecutorCores)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
@ -114,7 +115,7 @@ class MesosFineGrainedSchedulerBackendSuite
test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home", "/mesos-home")
conf.set(EXECUTOR_HOME, "/mesos-home")
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
@ -142,7 +143,7 @@ class MesosFineGrainedSchedulerBackendSuite
s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
conf.set(EXECUTOR_URI, "hdfs:///test-app-1.0.0.tgz")
val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
assert(executorInfo1.getCommand.getValue ===
s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
@ -152,10 +153,10 @@ class MesosFineGrainedSchedulerBackendSuite
val taskScheduler = mock[TaskSchedulerImpl]
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
.set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
.set(EXECUTOR_DOCKER_IMAGE, "spark/mock")
.set(EXECUTOR_DOCKER_FORCE_PULL_IMAGE, true)
.set(EXECUTOR_DOCKER_VOLUMES, Seq("/a", "/b:/b", "/c:/c:rw", "/d:ro", "/e:/e:ro"))
.set(EXECUTOR_DOCKER_PORT_MAPS, Seq("80:8080", "53:53:tcp"))
val listenerBus = mock[LiveListenerBus]
listenerBus.post(

View file

@ -24,8 +24,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
test("ContainerInfo fails to parse invalid docker parameters") {
val conf = new SparkConf()
conf.set("spark.mesos.executor.docker.parameters", "a,b")
conf.set("spark.mesos.executor.docker.image", "test")
conf.set(config.EXECUTOR_DOCKER_PARAMETERS, Seq("a", "b"))
conf.set(config.EXECUTOR_DOCKER_IMAGE, "test")
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(
conf)
@ -36,8 +36,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
test("ContainerInfo parses docker parameters") {
val conf = new SparkConf()
conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
conf.set("spark.mesos.executor.docker.image", "test")
conf.set(config.EXECUTOR_DOCKER_PARAMETERS, Seq("a=1", "b=2", "c=3"))
conf.set(config.EXECUTOR_DOCKER_IMAGE, "test")
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(
conf)

View file

@ -29,6 +29,7 @@ import org.scalatest._
import org.scalatest.mockito.MockitoSugar
import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.deploy.mesos.{config => mesosConfig}
import org.apache.spark.internal.config._
import org.apache.spark.util.SparkConfWithEnv
@ -94,7 +95,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("use spark.mesos.executor.memoryOverhead (if set)") {
val f = fixture
when(f.sc.executorMemory).thenReturn(1024)
f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
f.sparkConf.set(mesosConfig.EXECUTOR_MEMORY_OVERHEAD, 512)
utils.executorMemory(f.sc) shouldBe 1536
}
@ -215,7 +216,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only - multiple ranges") {
val conf = new SparkConf()
conf.set("spark.blockManager.port", "4000")
conf.set(BLOCK_MANAGER_PORT, 4000)
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
createTestPortResource((2000, 2500), Some("other_role")))
val (resourcesLeft, resourcesToBeUsed) = utils
@ -244,7 +245,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Principal specified via spark.mesos.principal") {
val conf = new SparkConf()
conf.set("spark.mesos.principal", "test-principal")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -256,7 +257,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
pFile.deleteOnExit()
Files.write("test-principal".getBytes("UTF-8"), pFile);
val conf = new SparkConf()
conf.set("spark.mesos.principal.file", pFile.getAbsolutePath())
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL_FILE, pFile.getAbsolutePath())
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -265,7 +266,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Principal specified via spark.mesos.principal.file that does not exist") {
val conf = new SparkConf()
conf.set("spark.mesos.principal.file", "/tmp/does-not-exist")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL_FILE, "/tmp/does-not-exist")
intercept[FileNotFoundException] {
utils.buildCredentials(conf, FrameworkInfo.newBuilder())
@ -301,8 +302,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Secret specified via spark.mesos.secret") {
val conf = new SparkConf()
conf.set("spark.mesos.principal", "test-principal")
conf.set("spark.mesos.secret", "my-secret")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -316,8 +317,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
sFile.deleteOnExit()
Files.write("my-secret".getBytes("UTF-8"), sFile);
val conf = new SparkConf()
conf.set("spark.mesos.principal", "test-principal")
conf.set("spark.mesos.secret.file", sFile.getAbsolutePath())
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
conf.set(mesosConfig.CREDENTIAL_SECRET_FILE, sFile.getAbsolutePath())
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -328,8 +329,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Principal specified via spark.mesos.secret.file that does not exist") {
val conf = new SparkConf()
conf.set("spark.mesos.principal", "test-principal")
conf.set("spark.mesos.secret.file", "/tmp/does-not-exist")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
conf.set(mesosConfig.CREDENTIAL_SECRET_FILE, "/tmp/does-not-exist")
intercept[FileNotFoundException] {
utils.buildCredentials(conf, FrameworkInfo.newBuilder())
@ -339,7 +340,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Principal specified via SPARK_MESOS_SECRET") {
val env = Map("SPARK_MESOS_SECRET" -> "my-secret")
val conf = new SparkConfWithEnv(env)
conf.set("spark.mesos.principal", "test-principal")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -356,7 +357,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
val sFilePath = sFile.getAbsolutePath()
val env = Map("SPARK_MESOS_SECRET_FILE" -> sFilePath)
val conf = new SparkConfWithEnv(env)
conf.set("spark.mesos.principal", "test-principal")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -367,7 +368,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Secret specified with no principal") {
val conf = new SparkConf()
conf.set("spark.mesos.secret", "my-secret")
conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret")
intercept[SparkException] {
utils.buildCredentials(conf, FrameworkInfo.newBuilder())
@ -376,7 +377,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Principal specification preference") {
val conf = new SparkConfWithEnv(Map("SPARK_MESOS_PRINCIPAL" -> "other-principal"))
conf.set("spark.mesos.principal", "test-principal")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true
@ -385,8 +386,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Secret specification preference") {
val conf = new SparkConfWithEnv(Map("SPARK_MESOS_SECRET" -> "other-secret"))
conf.set("spark.mesos.principal", "test-principal")
conf.set("spark.mesos.secret", "my-secret")
conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret")
val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
credBuilder.hasPrincipal shouldBe true