diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 2b8655ceee..79a113792d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,6 +56,31 @@ package object config { .createOptional } + private[spark] val CREDENTIAL_PRINCIPAL = + ConfigBuilder("spark.mesos.principal") + .doc("Name of the Kerberos principal to authenticate Spark to Mesos.") + .stringConf + .createOptional + + private[spark] val CREDENTIAL_PRINCIPAL_FILE = + ConfigBuilder("spark.mesos.principal.file") + .doc("The path of file which contains the name of the Kerberos principal " + + "to authenticate Spark to Mesos.") + .stringConf + .createOptional + + private[spark] val CREDENTIAL_SECRET = + ConfigBuilder("spark.mesos.secret") + .doc("The secret value to authenticate Spark to Mesos.") + .stringConf + .createOptional + + private[spark] val CREDENTIAL_SECRET_FILE = + ConfigBuilder("spark.mesos.secret.file") + .doc("The path of file which contains the secret value to authenticate Spark to Mesos.") + .stringConf + .createOptional + /* Common app configuration. */ private[spark] val SHUFFLE_CLEANER_INTERVAL_S = @@ -85,6 +110,13 @@ package object config { .stringConf .createOptional + private[spark] val DRIVER_WEBUI_URL = + ConfigBuilder("spark.mesos.driver.webui.url") + .doc("Set the Spark Mesos driver webui_url for interacting with the framework. " + + "If unset it will point to Spark's internal web UI.") + .stringConf + .createOptional + private[spark] val driverSecretConfig = new MesosSecretConfig("driver") private[spark] val executorSecretConfig = new MesosSecretConfig("executor") @@ -118,6 +150,211 @@ package object config { .stringConf .createWithDefault("") + private[spark] val DRIVER_FRAMEWORK_ID = + ConfigBuilder("spark.mesos.driver.frameworkId") + .stringConf + .createOptional + private[spark] val EXECUTOR_URI = ConfigBuilder("spark.executor.uri").stringConf.createOptional + + private[spark] val PROXY_BASE_URL = + ConfigBuilder("spark.mesos.proxy.baseURL").stringConf.createOptional + + private[spark] val COARSE_MODE = + ConfigBuilder("spark.mesos.coarse") + .doc("If set to true, runs over Mesos clusters in \"coarse-grained\" sharing mode, where " + + "Spark acquires one long-lived Mesos task on each machine. If set to false, runs over " + + "Mesos cluster in \"fine-grained\" sharing mode, where one Mesos task is created per " + + "Spark task.") + .booleanConf.createWithDefault(true) + + private[spark] val COARSE_SHUTDOWN_TIMEOUT = + ConfigBuilder("spark.mesos.coarse.shutdownTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ >= 0, s"spark.mesos.coarse.shutdownTimeout must be >= 0") + .createWithDefaultString("10s") + + private[spark] val MAX_DRIVERS = + ConfigBuilder("spark.mesos.maxDrivers").intConf.createWithDefault(200) + + private[spark] val RETAINED_DRIVERS = + ConfigBuilder("spark.mesos.retainedDrivers").intConf.createWithDefault(200) + + private[spark] val CLUSTER_RETRY_WAIT_MAX_SECONDS = + ConfigBuilder("spark.mesos.cluster.retry.wait.max") + .intConf + .createWithDefault(60) // 1 minute + + private[spark] val ENABLE_FETCHER_CACHE = + ConfigBuilder("spark.mesos.fetcherCache.enable") + .doc("If set to true, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be " + + "cached by the Mesos Fetcher Cache.") + .booleanConf + .createWithDefault(false) + + private[spark] val APP_JAR_LOCAL_RESOLUTION_MODE = + ConfigBuilder("spark.mesos.appJar.local.resolution.mode") + .doc("Provides support for the `local:///` scheme to reference the app jar resource in " + + "cluster mode. If user uses a local resource (`local:///path/to/jar`) and the config " + + "option is not used it defaults to `host` eg. the mesos fetcher tries to get the " + + "resource from the host's file system. If the value is unknown it prints a warning msg " + + "in the dispatcher logs and defaults to `host`. If the value is `container` then spark " + + "submit in the container will use the jar in the container's path: `/path/to/jar`.") + .stringConf + .checkValues(Set("host", "container")) + .createWithDefault("host") + + private[spark] val REJECT_OFFER_DURATION = + ConfigBuilder("spark.mesos.rejectOfferDuration") + .doc("Time to consider unused resources refused, serves as a fallback of " + + "`spark.mesos.rejectOfferDurationForUnmetConstraints`, " + + "`spark.mesos.rejectOfferDurationForReachedMaxCores`.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("120s") + + private[spark] val REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS = + ConfigBuilder("spark.mesos.rejectOfferDurationForUnmetConstraints") + .doc("Time to consider unused resources refused with unmet constraints.") + .timeConf(TimeUnit.SECONDS) + .createOptional + + private[spark] val REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES = + ConfigBuilder("spark.mesos.rejectOfferDurationForReachedMaxCores") + .doc("Time to consider unused resources refused when maximum number of cores " + + "`spark.cores.max` is reached.") + .timeConf(TimeUnit.SECONDS) + .createOptional + + private[spark] val URIS_TO_DOWNLOAD = + ConfigBuilder("spark.mesos.uris") + .doc("A comma-separated list of URIs to be downloaded to the sandbox when driver or " + + "executor is launched by Mesos. This applies to both coarse-grained and fine-grained " + + "mode.") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val EXECUTOR_HOME = + ConfigBuilder("spark.mesos.executor.home") + .doc("Set the directory in which Spark is installed on the executors in Mesos. " + + "By default, the executors will simply use the driver's Spark home directory, which may " + + "not be visible to them. Note that this is only relevant if a Spark binary package is " + + "not specified through `spark.executor.uri`.") + .stringConf + .createOptional + + private[spark] val EXECUTOR_CORES = + ConfigBuilder("spark.mesos.mesosExecutor.cores") + .doc("(Fine-grained mode only) Number of cores to give each Mesos executor. This does not " + + "include the cores used to run the Spark tasks. In other words, even if no Spark task " + + "is being run, each Mesos executor will occupy the number of cores configured here. " + + "The value can be a floating point number.") + .doubleConf + .createWithDefault(1.0) + + private[spark] val EXTRA_CORES_PER_EXECUTOR = + ConfigBuilder("spark.mesos.extra.cores") + .doc("Set the extra number of cores for an executor to advertise. This does not result in " + + "more cores allocated. It instead means that an executor will \"pretend\" it has more " + + "cores, so that the driver will send it more tasks. Use this to increase parallelism. " + + "This setting is only used for Mesos coarse-grained mode.") + .intConf + .createWithDefault(0) + + private[spark] val EXECUTOR_MEMORY_OVERHEAD = + ConfigBuilder("spark.mesos.executor.memoryOverhead") + .doc("The amount of additional memory, specified in MiB, to be allocated per executor. " + + "By default, the overhead will be larger of either 384 or 10% of " + + "`spark.executor.memory`. If set, the final overhead will be this value.") + .intConf + .createOptional + + private[spark] val EXECUTOR_DOCKER_IMAGE = + ConfigBuilder("spark.mesos.executor.docker.image") + .doc("Set the name of the docker image that the Spark executors will run in. The selected " + + "image must have Spark installed, as well as a compatible version of the Mesos library. " + + "The installed path of Spark in the image can be specified with " + + "`spark.mesos.executor.home`; the installed path of the Mesos library can be specified " + + "with `spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY`.") + .stringConf + .createOptional + + private[spark] val EXECUTOR_DOCKER_FORCE_PULL_IMAGE = + ConfigBuilder("spark.mesos.executor.docker.forcePullImage") + .doc("Force Mesos agents to pull the image specified in " + + "`spark.mesos.executor.docker.image`. By default Mesos agents will not pull images they " + + "already have cached.") + .booleanConf + .createOptional + + private[spark] val EXECUTOR_DOCKER_PORT_MAPS = + ConfigBuilder("spark.mesos.executor.docker.portmaps") + .stringConf + .toSequence + .createOptional + + private[spark] val EXECUTOR_DOCKER_PARAMETERS = + ConfigBuilder("spark.mesos.executor.docker.parameters") + .doc("Set the list of custom parameters which will be passed into the `docker run` " + + "command when launching the Spark executor on Mesos using the docker containerizer. " + + "The format of this property is a list of key/value pairs which pair looks key1=value1.") + .stringConf + .toSequence + .createOptional + + private[spark] val EXECUTOR_DOCKER_VOLUMES = + ConfigBuilder("spark.mesos.executor.docker.volumes") + .doc("Set the list of volumes which will be mounted into the Docker image, which was set " + + "using `spark.mesos.executor.docker.image`. The format of this property is a list of " + + "mappings following the form passed to `docker run -v`. That is they take the form: " + + "`[host_path:]container_path[:ro|:rw]`") + .stringConf + .toSequence + .createOptional + + private[spark] val MAX_GPUS = + ConfigBuilder("spark.mesos.gpus.max") + .doc("Set the maximum number GPU resources to acquire for this job. Note that executors " + + "will still launch when no GPU resources are found since this configuration is just an " + + "upper limit and not a guaranteed amount.") + .intConf + .createWithDefault(0) + + private[spark] val TASK_LABELS = + ConfigBuilder("spark.mesos.task.labels") + .doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " + + "Key-value pairs should be separated by a colon, and commas used to list more than one. " + + "If your label includes a colon or comma, you can escape it with a backslash. " + + "Ex. key:value,key2:a\\:b.") + .stringConf + .createWithDefault("") + + private[spark] val CONSTRAINTS = + ConfigBuilder("spark.mesos.constraints") + .doc("Attribute-based constraints on mesos resource offers. By default, all resource " + + "offers will be accepted. This setting applies only to executors. Refer to Mesos " + + "Attributes & Resources doc for more information on attributes.") + .stringConf + .createWithDefault("") + + private[spark] val CONTAINERIZER = + ConfigBuilder("spark.mesos.containerizer") + .doc("This only affects docker containers, and must be one of \"docker\" or \"mesos\". " + + "Mesos supports two types of containerizers for docker: the \"docker\" containerizer, " + + "and the preferred \"mesos\" containerizer. " + + "Read more here: http://mesos.apache.org/documentation/latest/container-image/") + .stringConf + .checkValues(Set("docker", "mesos")) + .createWithDefault("docker") + + private[spark] val ROLE = + ConfigBuilder("spark.mesos.role") + .doc("Set the role of this Spark framework for Mesos. Roles are used in Mesos for " + + "reservations and resource weight sharing.") + .stringConf + .createOptional + + private[spark] val DRIVER_ENV_PREFIX = "spark.mesos.driverEnv." + private[spark] val DISPATCHER_DRIVER_DEFAULT_PREFIX = "spark.mesos.dispatcher.driverDefault." } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala index c53285331e..1755cb7f66 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -77,7 +77,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage( private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { val id = state.driverDescription.submissionId - val proxy = parent.conf.getOption("spark.mesos.proxy.baseURL") + val proxy = parent.conf.get(PROXY_BASE_URL) val sandboxCol = if (proxy.isDefined) { val clusterSchedulerId = parent.scheduler.getSchedulerState().frameworkId diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala index da71f8f9e4..ed1b3d7a16 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.spark.SparkContext +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} @@ -42,7 +43,7 @@ private[spark] class MesosClusterManager extends ExternalClusterManager { "I/O encryption is currently not supported in Mesos.") val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1) - val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) + val coarse = sc.conf.get(COARSE_MODE) if (coarse) { new MesosCoarseGrainedSchedulerBackend( scheduler.asInstanceOf[TaskSchedulerImpl], diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 01a294478a..9df91699b2 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -127,10 +127,10 @@ private[spark] class MesosClusterScheduler( MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf)) private val master = conf.get("spark.master") private val appName = conf.get("spark.app.name") - private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200) - private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) - private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute - private val useFetchCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + private val queuedCapacity = conf.get(config.MAX_DRIVERS) + private val retainedDrivers = conf.get(config.RETAINED_DRIVERS) + private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS) + private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE) private val schedulerState = engineFactory.createEngine("scheduler") private val stateLock = new Object() // Keyed by submission id @@ -390,10 +390,10 @@ private[spark] class MesosClusterScheduler( private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { // TODO(mgummelt): Don't do this here. This should be passed as a --conf val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}" ) - val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv + val env = desc.conf.getAllWithPrefix(config.DRIVER_ENV_PREFIX) ++ commandEnv val envBuilder = Environment.newBuilder() @@ -419,22 +419,17 @@ private[spark] class MesosClusterScheduler( private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = { val isLocalJar = desc.jarUrl.startsWith("local://") - val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists { + val isContainerLocal = desc.conf.get(config.APP_JAR_LOCAL_RESOLUTION_MODE) match { case "container" => true case "host" => false - case other => - logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.") - false - } + } isLocalJar && isContainerLocal } private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { - val confUris = List(conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.mesos.uris"), - Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap( - _.map(_.split(",").map(_.trim)) - ).flatten + val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++ + desc.conf.get(config.URIS_TO_DOWNLOAD) ++ + desc.conf.get(SUBMIT_PYTHON_FILES)).toList if (isContainerLocalAppJar(desc)) { (confUris ++ getDriverExecutorURI(desc).toList).map(uri => @@ -464,7 +459,7 @@ private[spark] class MesosClusterScheduler( } private def getDriverCommandValue(desc: MesosDriverDescription): String = { - val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image") + val dockerDefined = desc.conf.contains(config.EXECUTOR_DOCKER_IMAGE) val executorUri = getDriverExecutorURI(desc) // Gets the path to run spark-submit, and the path to the Mesos sandbox. val (executable, sandboxPath) = if (dockerDefined) { @@ -484,11 +479,11 @@ private[spark] class MesosClusterScheduler( // Sandbox path points to the parent folder as we chdir into the folderBasename. (cmdExecutable, "..") } else { - val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home") + val executorSparkHome = desc.conf.get(config.EXECUTOR_HOME) .orElse(conf.getOption("spark.home")) .orElse(Option(System.getenv("SPARK_HOME"))) .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + throw new SparkException(s"Executor Spark home `${config.EXECUTOR_HOME}` is not set!") } val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath // Sandbox points to the current directory by default with Mesos. @@ -547,7 +542,7 @@ private[spark] class MesosClusterScheduler( "spark.submit.deployMode", // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master ) - val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap + val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap val driverConf = desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } .toMap diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7e2a8ba4b0..8bd61c230d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -76,15 +76,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cores - (cores % minCoresPerExecutor) } - private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE) - private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val maxGpus = conf.get(MAX_GPUS) - private val taskLabels = conf.get("spark.mesos.task.labels", "") + private val taskLabels = conf.get(TASK_LABELS) - private[this] val shutdownTimeoutMS = - conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") - .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") + private[this] val shutdownTimeoutMS = conf.get(COARSE_SHUTDOWN_TIMEOUT) // Synchronization protected by stateLock private[this] var stopCalled: Boolean = false @@ -144,11 +142,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock - private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) + private val extraCoresPerExecutor = conf.get(EXTRA_CORES_PER_EXECUTOR) // Offer constraints private val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + parseConstraintString(sc.conf.get(CONSTRAINTS)) // Reject offers with mismatched constraints in seconds private val rejectOfferDurationForUnmetConstraints = @@ -208,10 +206,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.get(DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)), None, Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)), - sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) + sc.conf.get(DRIVER_FRAMEWORK_ID).map(_ + suffix) ) launcherBackend.setState(SparkAppHandle.State.SUBMITTED) @@ -264,10 +262,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) if (uri.isEmpty) { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") + val executorSparkHome = conf.get(EXECUTOR_HOME) .orElse(sc.getSparkHome()) .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!") } val runScript = new File(executorSparkHome, "./bin/spark-class").getPath command.setValue( @@ -293,7 +291,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) } - conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache)) + setupUris(conf.get(URIS_TO_DOWNLOAD), command, useFetcherCache) command.build() } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index c5c7842202..a03fecdb2a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -28,7 +28,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} -import org.apache.spark.deploy.mesos.config.EXECUTOR_URI +import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.internal.config import org.apache.spark.scheduler._ @@ -60,11 +60,11 @@ private[spark] class MesosFineGrainedSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus - private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) + private[mesos] val mesosExecutorCores = sc.conf.get(mesosConfig.EXECUTOR_CORES) // Offer constraints private[this] val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + parseConstraintString(sc.conf.get(mesosConfig.CONSTRAINTS)) // reject offers with mismatched constraints in seconds private val rejectOfferDurationForUnmetConstraints = @@ -82,10 +82,10 @@ private[spark] class MesosFineGrainedSchedulerBackend( sc.sparkUser, sc.appName, sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), + sc.conf.get(mesosConfig.DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)), Option.empty, Option.empty, - sc.conf.getOption("spark.mesos.driver.frameworkId") + sc.conf.get(mesosConfig.DRIVER_FRAMEWORK_ID) ) unsetFrameworkID(sc) @@ -102,10 +102,10 @@ private[spark] class MesosFineGrainedSchedulerBackend( def createExecutorInfo( availableResources: JList[Resource], execId: String): (MesosExecutorInfo, JList[Resource]) = { - val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") + val executorSparkHome = sc.conf.get(mesosConfig.EXECUTOR_HOME) .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + throw new SparkException(s"Executor Spark home `${mesosConfig.EXECUTOR_HOME}` is not set!") } val environment = Environment.newBuilder() sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp => @@ -133,7 +133,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val uri = sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) + val uri = sc.conf.get(mesosConfig.EXECUTOR_URI) + .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) val executorBackendName = classOf[MesosExecutorBackend].getName if (uri.isEmpty) { @@ -155,7 +156,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( builder.addAllResources(usedCpuResources.asJava) builder.addAllResources(usedMemResources.asJava) - sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) + setupUris(sc.conf.get(mesosConfig.URIS_TO_DOWNLOAD), command) val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index b4364a5e2e..a217deb8b4 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -24,7 +24,7 @@ import org.apache.mesos.protobuf.ByteString import org.apache.spark.SparkConf import org.apache.spark.SparkException -import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.mesos.config.MesosSecretConfig import org.apache.spark.internal.Logging @@ -34,11 +34,11 @@ import org.apache.spark.internal.Logging */ private[mesos] object MesosSchedulerBackendUtil extends Logging { /** - * Parse a comma-delimited list of volume specs, each of which + * Parse a list of volume specs, each of which * takes the form [host-dir:]container-dir[:rw|:ro]. */ - def parseVolumesSpec(volumes: String): List[Volume] = { - volumes.split(",").map(_.split(":")).flatMap { spec => + def parseVolumesSpec(volumes: Seq[String]): List[Volume] = { + volumes.map(_.split(":")).flatMap { spec => val vol: Volume.Builder = Volume .newBuilder() .setMode(Volume.Mode.RW) @@ -71,7 +71,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } /** - * Parse a comma-delimited list of port mapping specs, each of which + * Parse a list of port mapping specs, each of which * takes the form host_port:container_port[:udp|:tcp] * * Note: @@ -81,8 +81,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { * anticipates the expansion of the docker form to allow for a protocol * and leaves open the chance for mesos to begin to accept an 'ip' field */ - def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = { - portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] => + def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping] = { + portmaps.map(_.split(":")).flatMap { spec: Array[String] => val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping .newBuilder() .setProtocol("tcp") @@ -108,10 +108,10 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { * Parse a list of docker parameters, each of which * takes the form key=value */ - private def parseParamsSpec(params: String): List[Parameter] = { + private def parseParamsSpec(params: Seq[String]): List[Parameter] = { // split with limit of 2 to avoid parsing error when '=' // exists in the parameter value - params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] => + params.map(_.split("=", 2)).flatMap { spec: Array[String] => val param: Parameter.Builder = Parameter.newBuilder() spec match { case Array(key, value) => @@ -127,8 +127,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder = { - val containerType = if (conf.contains("spark.mesos.executor.docker.image") && - conf.get("spark.mesos.containerizer", "docker") == "docker") { + val containerType = if (conf.contains(EXECUTOR_DOCKER_IMAGE) && + conf.get(CONTAINERIZER) == "docker") { ContainerInfo.Type.DOCKER } else { ContainerInfo.Type.MESOS @@ -137,18 +137,17 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { val containerInfo = ContainerInfo.newBuilder() .setType(containerType) - conf.getOption("spark.mesos.executor.docker.image").map { image => + conf.get(EXECUTOR_DOCKER_IMAGE).map { image => val forcePullImage = conf - .getOption("spark.mesos.executor.docker.forcePullImage") - .exists(_.equals("true")) + .get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true) val portMaps = conf - .getOption("spark.mesos.executor.docker.portmaps") + .get(EXECUTOR_DOCKER_PORT_MAPS) .map(parsePortMappingsSpec) .getOrElse(List.empty) val params = conf - .getOption("spark.mesos.executor.docker.parameters") + .get(EXECUTOR_DOCKER_PARAMETERS) .map(parseParamsSpec) .getOrElse(List.empty) @@ -159,7 +158,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") + .get(EXECUTOR_DOCKER_VOLUMES) .map(parseVolumesSpec) volumes.foreach(_.foreach(containerInfo.addVolumes(_))) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 634460686b..0699371203 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -36,6 +36,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3} import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.TaskState +import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{Status => _, _} import org.apache.spark.util.Utils @@ -83,10 +84,10 @@ trait MesosSchedulerUtils extends Logging { fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build()) } - conf.getOption("spark.mesos.role").foreach { role => + conf.get(mesosConfig.ROLE).foreach { role => fwInfoBuilder.setRole(role) } - val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + val maxGpus = conf.get(mesosConfig.MAX_GPUS) if (maxGpus > 0) { fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES)) } @@ -103,10 +104,10 @@ trait MesosSchedulerUtils extends Logging { conf: SparkConf, fwInfoBuilder: Protos.FrameworkInfo.Builder): Protos.Credential.Builder = { val credBuilder = Credential.newBuilder() - conf.getOption("spark.mesos.principal") + conf.get(mesosConfig.CREDENTIAL_PRINCIPAL) .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL"))) .orElse( - conf.getOption("spark.mesos.principal.file") + conf.get(mesosConfig.CREDENTIAL_PRINCIPAL_FILE) .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL_FILE"))) .map { principalFile => Files.toString(new File(principalFile), StandardCharsets.UTF_8) @@ -115,10 +116,10 @@ trait MesosSchedulerUtils extends Logging { fwInfoBuilder.setPrincipal(principal) credBuilder.setPrincipal(principal) } - conf.getOption("spark.mesos.secret") + conf.get(mesosConfig.CREDENTIAL_SECRET) .orElse(Option(conf.getenv("SPARK_MESOS_SECRET"))) .orElse( - conf.getOption("spark.mesos.secret.file") + conf.get(mesosConfig.CREDENTIAL_SECRET_FILE) .orElse(Option(conf.getenv("SPARK_MESOS_SECRET_FILE"))) .map { secretFile => Files.toString(new File(secretFile), StandardCharsets.UTF_8) @@ -128,7 +129,8 @@ trait MesosSchedulerUtils extends Logging { } if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) { throw new SparkException( - "spark.mesos.principal must be configured when spark.mesos.secret is set") + s"${mesosConfig.CREDENTIAL_PRINCIPAL} must be configured when " + + s"${mesosConfig.CREDENTIAL_SECRET} is set") } credBuilder } @@ -399,37 +401,31 @@ trait MesosSchedulerUtils extends Logging { * (whichever is larger) */ def executorMemory(sc: SparkContext): Int = { - sc.conf.getInt("spark.mesos.executor.memoryOverhead", + sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory } - def setupUris(uris: String, + def setupUris(uris: Seq[String], builder: CommandInfo.Builder, useFetcherCache: Boolean = false): Unit = { - uris.split(",").foreach { uri => + uris.foreach { uri => builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache)) } } - private def getRejectOfferDurationStr(conf: SparkConf): String = { - conf.get("spark.mesos.rejectOfferDuration", "120s") - } - protected def getRejectOfferDuration(conf: SparkConf): Long = { - Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf)) + conf.get(mesosConfig.REJECT_OFFER_DURATION) } protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = { - conf.getTimeAsSeconds( - "spark.mesos.rejectOfferDurationForUnmetConstraints", - getRejectOfferDurationStr(conf)) + conf.get(mesosConfig.REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS) + .getOrElse(getRejectOfferDuration(conf)) } protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = { - conf.getTimeAsSeconds( - "spark.mesos.rejectOfferDurationForReachedMaxCores", - getRejectOfferDurationStr(conf)) + conf.get(mesosConfig.REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES) + .getOrElse(getRejectOfferDuration(conf)) } /** @@ -558,8 +554,8 @@ trait MesosSchedulerUtils extends Logging { * framework ID, the driver calls this method after the first registration. */ def unsetFrameworkID(sc: SparkContext) { - sc.conf.remove("spark.mesos.driver.frameworkId") - System.clearProperty("spark.mesos.driver.frameworkId") + sc.conf.remove(mesosConfig.DRIVER_FRAMEWORK_ID) + System.clearProperty(mesosConfig.DRIVER_FRAMEWORK_ID.key) } def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match { diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala index a55855428b..e8520061ac 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.spark._ +import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.internal.config._ class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext { def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) { - val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) + val conf = new SparkConf().set(mesosConfig.COARSE_MODE, coarse) sc = new SparkContext("local", "test", conf) val clusterManager = new MesosClusterManager() diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 858146b35f..f26ff04a9a 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -105,7 +105,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 1200, 1.5, true, command, - Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), + Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date())) assert(response.success) @@ -209,9 +209,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test", - "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"), + config.DRIVER_ENV_PREFIX + "TEST_ENV" -> "TEST_VAL"), "s1", new Date())) assert(response.success) @@ -233,10 +233,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test", - "spark.mesos.network.name" -> "test-network-name", - "spark.mesos.network.labels" -> "key1:val1,key2:val2"), + config.NETWORK_NAME.key -> "test-network-name", + config.NETWORK_LABELS.key -> "key1:val1,key2:val2"), "s1", new Date())) @@ -256,7 +256,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("supports setting fetcher cache") { - setScheduler(Map("spark.mesos.fetcherCache.enable" -> "true")) + setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "true")) val mem = 1000 val cpu = 1 @@ -264,7 +264,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test"), "s1", new Date())) @@ -280,7 +280,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("supports disabling fetcher cache") { - setScheduler(Map("spark.mesos.fetcherCache.enable" -> "false")) + setScheduler(Map(config.ENABLE_FETCHER_CACHE.key -> "false")) val mem = 1000 val cpu = 1 @@ -288,7 +288,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test"), "s1", new Date())) @@ -321,7 +321,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test", config.DRIVER_CONSTRAINTS.key -> driverConstraints), "s1", @@ -359,9 +359,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", mem, cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test", - "spark.mesos.driver.labels" -> "key:value"), + config.DRIVER_LABELS.key -> "key:value"), "s1", new Date())) @@ -385,7 +385,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date())) + Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date())) assert(response.success) val slaveId = SlaveID.newBuilder().setValue("s1").build() val offer = Offer.newBuilder() @@ -471,7 +471,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi cpu, true, command, - Map("spark.mesos.executor.home" -> "test", + Map(config.EXECUTOR_HOME.key -> "test", "spark.app.name" -> "test") ++ addlSparkConfVars, "s1", diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index a4e9cc3f5d..37c0f5f450 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.mockito.MockitoSugar import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} @@ -59,8 +59,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports killing and limiting executors") { setBackend() - sparkConf.set("spark.driver.host", "driverHost") - sparkConf.set("spark.driver.port", "1234") + sparkConf.set(DRIVER_HOST_ADDRESS, "driverHost") + sparkConf.set(DRIVER_PORT, 1234) val minMem = backend.executorMemory(sc) val minCpu = 4 @@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports spark.executor.cores") { val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) + setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString)) val executorMemory = backend.executorMemory(sc) val offers = List(Resources(executorMemory * 2, executorCores + 1)) @@ -138,7 +138,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 - setBackend(Map("spark.cores.max" -> maxCores.toString)) + setBackend(Map(CORES_MAX.key -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, maxCores + 1))) @@ -166,7 +166,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos does not acquire more than spark.mesos.gpus.max") { val maxGpus = 5 - setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString)) + setBackend(Map(mesosConfig.MAX_GPUS.key -> maxGpus.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, 1, maxGpus + 1))) @@ -180,14 +180,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos declines offers that violate attribute constraints") { - setBackend(Map("spark.mesos.constraints" -> "x:true")) + setBackend(Map(mesosConfig.CONSTRAINTS.key -> "x:true")) offerResources(List(Resources(backend.executorMemory(sc), 4))) verifyDeclinedOffer(driver, createOfferId("o1"), true) } test("mesos declines offers with a filter when reached spark.cores.max") { val maxCores = 3 - setBackend(Map("spark.cores.max" -> maxCores.toString)) + setBackend(Map(CORES_MAX.key -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List( @@ -202,8 +202,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val maxCores = 4 val executorCores = 3 setBackend(Map( - "spark.cores.max" -> maxCores.toString, - "spark.executor.cores" -> executorCores.toString + CORES_MAX.key -> maxCores.toString, + EXECUTOR_CORES.key -> executorCores.toString )) val executorMemory = backend.executorMemory(sc) offerResources(List( @@ -218,8 +218,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val maxCores = 4 val executorCores = 2 setBackend(Map( - "spark.cores.max" -> maxCores.toString, - "spark.executor.cores" -> executorCores.toString + CORES_MAX.key -> maxCores.toString, + EXECUTOR_CORES.key -> executorCores.toString )) val executorMemory = backend.executorMemory(sc) offerResources(List( @@ -235,8 +235,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos assigns tasks round-robin on offers") { val executorCores = 4 val maxCores = executorCores * 2 - setBackend(Map("spark.executor.cores" -> executorCores.toString, - "spark.cores.max" -> maxCores.toString)) + setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString, + CORES_MAX.key -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List( @@ -249,7 +249,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos creates multiple executors on a single slave") { val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) + setBackend(Map(EXECUTOR_CORES.key -> executorCores.toString)) // offer with room for two executors val executorMemory = backend.executorMemory(sc) @@ -370,7 +370,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("failover timeout is set in created scheduler driver") { val failoverTimeoutIn = 3600.0 - initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString)) + initializeSparkConf(Map(mesosConfig.DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString)) sc = new SparkContext(sparkConf) val taskScheduler = mock[TaskSchedulerImpl] @@ -404,7 +404,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("honors unset spark.mesos.containerizer") { - setBackend(Map("spark.mesos.executor.docker.image" -> "test")) + setBackend(Map(mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "test")) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -417,8 +417,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("honors spark.mesos.containerizer=\"mesos\"") { setBackend(Map( - "spark.mesos.executor.docker.image" -> "test", - "spark.mesos.containerizer" -> "mesos")) + mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "test", + mesosConfig.CONTAINERIZER.key -> "mesos")) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -431,10 +431,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("docker settings are reflected in created tasks") { setBackend(Map( - "spark.mesos.executor.docker.image" -> "some_image", - "spark.mesos.executor.docker.forcePullImage" -> "true", - "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", - "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "some_image", + mesosConfig.EXECUTOR_DOCKER_FORCE_PULL_IMAGE.key -> "true", + mesosConfig.EXECUTOR_DOCKER_VOLUMES.key -> "/host_vol:/container_vol:ro", + mesosConfig.EXECUTOR_DOCKER_PORT_MAPS.key -> "8080:80:tcp" )) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -469,7 +469,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("force-pull-image option is disabled by default") { setBackend(Map( - "spark.mesos.executor.docker.image" -> "some_image" + mesosConfig.EXECUTOR_DOCKER_IMAGE.key -> "some_image" )) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -492,7 +492,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports spark.executor.uri") { val url = "spark.spark.spark.com" setBackend(Map( - "spark.executor.uri" -> url + mesosConfig.EXECUTOR_URI.key -> url ), null) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -507,8 +507,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports setting fetcher cache") { val url = "spark.spark.spark.com" setBackend(Map( - "spark.mesos.fetcherCache.enable" -> "true", - "spark.executor.uri" -> url + mesosConfig.ENABLE_FETCHER_CACHE.key -> "true", + mesosConfig.EXECUTOR_URI.key -> url ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) @@ -521,8 +521,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports disabling fetcher cache") { val url = "spark.spark.spark.com" setBackend(Map( - "spark.mesos.fetcherCache.enable" -> "false", - "spark.executor.uri" -> url + mesosConfig.ENABLE_FETCHER_CACHE.key -> "false", + mesosConfig.EXECUTOR_URI.key -> url ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) @@ -546,7 +546,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos sets configurable labels on tasks") { val taskLabelsString = "mesos:test,label:test" setBackend(Map( - "spark.mesos.task.labels" -> taskLabelsString + mesosConfig.TASK_LABELS.key -> taskLabelsString )) // Build up the labels @@ -568,8 +568,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") { setBackend(Map( - "spark.mesos.network.name" -> "test-network-name", - "spark.mesos.network.labels" -> "key1:val1,key2:val2" + mesosConfig.NETWORK_NAME.key -> "test-network-name", + mesosConfig.NETWORK_LABELS.key -> "key1:val1,key2:val2" )) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -590,7 +590,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("supports spark.scheduler.minRegisteredResourcesRatio") { val expectedCores = 1 setBackend(Map( - "spark.cores.max" -> expectedCores.toString, + CORES_MAX.key -> expectedCores.toString, SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO.key -> "1.0")) val offers = List(Resources(backend.executorMemory(sc), expectedCores)) @@ -606,7 +606,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite setBackend(Map( DYN_ALLOCATION_ENABLED.key -> "true", DYN_ALLOCATION_TESTING.key -> "true", - "spark.locality.wait" -> "1s")) + LOCALITY_WAIT.key -> "1s")) assert(backend.getExecutorIds().isEmpty) @@ -652,22 +652,26 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("Creates an env-based reference secrets.") { - val launchedTasks = launchExecutorTasks(configEnvBasedRefSecrets(executorSecretConfig)) + val launchedTasks = launchExecutorTasks( + configEnvBasedRefSecrets(mesosConfig.executorSecretConfig)) verifyEnvBasedRefSecrets(launchedTasks) } test("Creates an env-based value secrets.") { - val launchedTasks = launchExecutorTasks(configEnvBasedValueSecrets(executorSecretConfig)) + val launchedTasks = launchExecutorTasks( + configEnvBasedValueSecrets(mesosConfig.executorSecretConfig)) verifyEnvBasedValueSecrets(launchedTasks) } test("Creates file-based reference secrets.") { - val launchedTasks = launchExecutorTasks(configFileBasedRefSecrets(executorSecretConfig)) + val launchedTasks = launchExecutorTasks( + configFileBasedRefSecrets(mesosConfig.executorSecretConfig)) verifyFileBasedRefSecrets(launchedTasks) } test("Creates a file-based value secrets.") { - val launchedTasks = launchExecutorTasks(configFileBasedValueSecrets(executorSecretConfig)) + val launchedTasks = launchExecutorTasks( + configFileBasedValueSecrets(mesosConfig.executorSecretConfig)) verifyFileBasedValueSecrets(launchedTasks) } @@ -770,7 +774,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") - .set("spark.mesos.driver.webui.url", "http://webui") + .set(mesosConfig.DRIVER_WEBUI_URL, "http://webui") if (home != null) { sparkConf.setSparkHome(home) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 0b6d93f62d..3e63c35361 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -36,6 +36,7 @@ import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, TaskDescription, TaskSchedulerImpl, WorkerOffer} @@ -46,7 +47,7 @@ class MesosFineGrainedSchedulerBackendSuite test("weburi is set in created scheduler driver") { val conf = new SparkConf - conf.set("spark.mesos.driver.webui.url", "http://webui") + conf.set(DRIVER_WEBUI_URL, "http://webui") conf.set("spark.app.name", "name1") val sc = mock[SparkContext] @@ -80,9 +81,9 @@ class MesosFineGrainedSchedulerBackendSuite } test("Use configured mesosExecutor.cores for ExecutorInfo") { - val mesosExecutorCores = 3 + val mesosExecutorCores = 3.0 val conf = new SparkConf - conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString) + conf.set(EXECUTOR_CORES, mesosExecutorCores) val listenerBus = mock[LiveListenerBus] listenerBus.post( @@ -114,7 +115,7 @@ class MesosFineGrainedSchedulerBackendSuite test("check spark-class location correctly") { val conf = new SparkConf - conf.set("spark.mesos.executor.home", "/mesos-home") + conf.set(EXECUTOR_HOME, "/mesos-home") val listenerBus = mock[LiveListenerBus] listenerBus.post( @@ -142,7 +143,7 @@ class MesosFineGrainedSchedulerBackendSuite s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") // uri exists. - conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") + conf.set(EXECUTOR_URI, "hdfs:///test-app-1.0.0.tgz") val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") @@ -152,10 +153,10 @@ class MesosFineGrainedSchedulerBackendSuite val taskScheduler = mock[TaskSchedulerImpl] val conf = new SparkConf() - .set("spark.mesos.executor.docker.image", "spark/mock") - .set("spark.mesos.executor.docker.forcePullImage", "true") - .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") - .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") + .set(EXECUTOR_DOCKER_IMAGE, "spark/mock") + .set(EXECUTOR_DOCKER_FORCE_PULL_IMAGE, true) + .set(EXECUTOR_DOCKER_VOLUMES, Seq("/a", "/b:/b", "/c:/c:rw", "/d:ro", "/e:/e:ro")) + .set(EXECUTOR_DOCKER_PORT_MAPS, Seq("80:8080", "53:53:tcp")) val listenerBus = mock[LiveListenerBus] listenerBus.post( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala index 442c43960e..6b7ae90076 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala @@ -24,8 +24,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { test("ContainerInfo fails to parse invalid docker parameters") { val conf = new SparkConf() - conf.set("spark.mesos.executor.docker.parameters", "a,b") - conf.set("spark.mesos.executor.docker.image", "test") + conf.set(config.EXECUTOR_DOCKER_PARAMETERS, Seq("a", "b")) + conf.set(config.EXECUTOR_DOCKER_IMAGE, "test") val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo( conf) @@ -36,8 +36,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { test("ContainerInfo parses docker parameters") { val conf = new SparkConf() - conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3") - conf.set("spark.mesos.executor.docker.image", "test") + conf.set(config.EXECUTOR_DOCKER_PARAMETERS, Seq("a=1", "b=2", "c=3")) + conf.set(config.EXECUTOR_DOCKER_IMAGE, "test") val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo( conf) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 8d90e1a859..dfbdae18f2 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -29,6 +29,7 @@ import org.scalatest._ import org.scalatest.mockito.MockitoSugar import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.deploy.mesos.{config => mesosConfig} import org.apache.spark.internal.config._ import org.apache.spark.util.SparkConfWithEnv @@ -94,7 +95,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("use spark.mesos.executor.memoryOverhead (if set)") { val f = fixture when(f.sc.executorMemory).thenReturn(1024) - f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") + f.sparkConf.set(mesosConfig.EXECUTOR_MEMORY_OVERHEAD, 512) utils.executorMemory(f.sc) shouldBe 1536 } @@ -215,7 +216,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Port reservation is done correctly with user specified ports only - multiple ranges") { val conf = new SparkConf() - conf.set("spark.blockManager.port", "4000") + conf.set(BLOCK_MANAGER_PORT, 4000) val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), createTestPortResource((2000, 2500), Some("other_role"))) val (resourcesLeft, resourcesToBeUsed) = utils @@ -244,7 +245,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Principal specified via spark.mesos.principal") { val conf = new SparkConf() - conf.set("spark.mesos.principal", "test-principal") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -256,7 +257,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS pFile.deleteOnExit() Files.write("test-principal".getBytes("UTF-8"), pFile); val conf = new SparkConf() - conf.set("spark.mesos.principal.file", pFile.getAbsolutePath()) + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL_FILE, pFile.getAbsolutePath()) val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -265,7 +266,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Principal specified via spark.mesos.principal.file that does not exist") { val conf = new SparkConf() - conf.set("spark.mesos.principal.file", "/tmp/does-not-exist") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL_FILE, "/tmp/does-not-exist") intercept[FileNotFoundException] { utils.buildCredentials(conf, FrameworkInfo.newBuilder()) @@ -301,8 +302,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Secret specified via spark.mesos.secret") { val conf = new SparkConf() - conf.set("spark.mesos.principal", "test-principal") - conf.set("spark.mesos.secret", "my-secret") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") + conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -316,8 +317,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS sFile.deleteOnExit() Files.write("my-secret".getBytes("UTF-8"), sFile); val conf = new SparkConf() - conf.set("spark.mesos.principal", "test-principal") - conf.set("spark.mesos.secret.file", sFile.getAbsolutePath()) + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") + conf.set(mesosConfig.CREDENTIAL_SECRET_FILE, sFile.getAbsolutePath()) val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -328,8 +329,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Principal specified via spark.mesos.secret.file that does not exist") { val conf = new SparkConf() - conf.set("spark.mesos.principal", "test-principal") - conf.set("spark.mesos.secret.file", "/tmp/does-not-exist") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") + conf.set(mesosConfig.CREDENTIAL_SECRET_FILE, "/tmp/does-not-exist") intercept[FileNotFoundException] { utils.buildCredentials(conf, FrameworkInfo.newBuilder()) @@ -339,7 +340,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Principal specified via SPARK_MESOS_SECRET") { val env = Map("SPARK_MESOS_SECRET" -> "my-secret") val conf = new SparkConfWithEnv(env) - conf.set("spark.mesos.principal", "test-principal") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -356,7 +357,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS val sFilePath = sFile.getAbsolutePath() val env = Map("SPARK_MESOS_SECRET_FILE" -> sFilePath) val conf = new SparkConfWithEnv(env) - conf.set("spark.mesos.principal", "test-principal") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -367,7 +368,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Secret specified with no principal") { val conf = new SparkConf() - conf.set("spark.mesos.secret", "my-secret") + conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret") intercept[SparkException] { utils.buildCredentials(conf, FrameworkInfo.newBuilder()) @@ -376,7 +377,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Principal specification preference") { val conf = new SparkConfWithEnv(Map("SPARK_MESOS_PRINCIPAL" -> "other-principal")) - conf.set("spark.mesos.principal", "test-principal") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true @@ -385,8 +386,8 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Secret specification preference") { val conf = new SparkConfWithEnv(Map("SPARK_MESOS_SECRET" -> "other-secret")) - conf.set("spark.mesos.principal", "test-principal") - conf.set("spark.mesos.secret", "my-secret") + conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal") + conf.set(mesosConfig.CREDENTIAL_SECRET, "my-secret") val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder()) credBuilder.hasPrincipal shouldBe true