diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8745e76d12..ec130c1db8 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config
(none) |
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
- Key-value pairs should be separated by a colon, and commas used to list more than one.
- Ex. key:value,key2:value2.
+ Key-value pairs should be separated by a colon, and commas used to
+ list more than one. If your label includes a colon or comma, you
+ can escape it with a backslash. Ex. key:value,key2:a\:b.
|
@@ -468,6 +469,15 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
+
+ spark.mesos.driver.labels |
+ (none) |
+
+ Mesos labels to add to the driver. See spark.mesos.task.labels
+ for formatting information.
+ |
+
+
spark.mesos.driverEnv.[EnvironmentVariableName] |
(none) |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 19e253394f..56d697f359 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,4 +56,11 @@ package object config {
.stringConf
.createOptional
+ private [spark] val DRIVER_LABELS =
+ ConfigBuilder("spark.mesos.driver.labels")
+ .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
+ "pairs should be separated by a colon, and commas used to list more than one." +
+ "Ex. key:value,key2:value2")
+ .stringConf
+ .createOptional
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1bc6f71860..577f9a876b 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
+
/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
* @param driverDescription Submitted driver description from
@@ -525,15 +527,17 @@ private[spark] class MesosClusterScheduler(
offer.remainingResources = finalResources.asJava
val appName = desc.conf.get("spark.app.name")
- val taskInfo = TaskInfo.newBuilder()
+
+ TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
- taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
- taskInfo.build
+ .setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
+ .setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
+ .build
}
/**
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index ac7aec7b0a..871685c6cc 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")
-
- taskBuilder.addAllResources(resourcesToUse.asJava)
- taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
-
- val labelsBuilder = taskBuilder.getLabelsBuilder
- val labels = buildMesosLabels().asJava
-
- labelsBuilder.addAllLabels(labels)
-
- taskBuilder.setLabels(labelsBuilder)
+ .setLabels(MesosProtoUtils.mesosLabels(taskLabels))
+ .addAllResources(resourcesToUse.asJava)
+ .setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
@@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}
- private def buildMesosLabels(): List[Label] = {
- taskLabels.split(",").flatMap(label =>
- label.split(":") match {
- case Array(key, value) =>
- Some(Label.newBuilder()
- .setKey(key)
- .setValue(value)
- .build())
- case _ =>
- logWarning(s"Unable to parse $label into a key:value label for the task.")
- None
- }
- ).toList
- }
-
/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
new file mode 100644
index 0000000000..fea01c7068
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+
+object MesosProtoUtils extends Logging {
+
+ /** Parses a label string of the format specified in spark.mesos.task.labels. */
+ def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
+ val labels: Seq[Protos.Label] = if (labelsStr == "") {
+ Seq()
+ } else {
+ labelsStr.split("""(?
+ val parts = labelStr.split("""(? part.replaceAll("""\\,""", ","))
+ .map(part => part.replaceAll("""\\:""", ":"))
+
+ Protos.Label.newBuilder()
+ .setKey(cleanedParts(0))
+ .setValue(cleanedParts(1))
+ .build()
+ }
+ }
+
+ Protos.Labels.newBuilder().addAllLabels(labels.asJava)
+ }
+}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 32967b04cd..0bb4790634 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getName == "test-network-name")
}
+ test("supports spark.mesos.driver.labels") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driver.labels" -> "key:value"),
+ "s1",
+ new Date()))
+
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ val labels = launchedTasks.head.getLabels
+ assert(labels.getLabelsCount == 1)
+ assert(labels.getLabels(0).getKey == "key")
+ assert(labels.getLabels(0).getValue == "value")
+ }
+
test("can kill supervised drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 0418bfbaa5..7cca5fedb3 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -532,29 +532,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
- test("mesos ignored invalid labels and sets configurable labels on tasks") {
- val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
- setBackend(Map(
- "spark.mesos.task.labels" -> taskLabelsString
- ))
-
- // Build up the labels
- val taskLabels = Protos.Labels.newBuilder()
- .addLabels(Protos.Label.newBuilder()
- .setKey("mesos").setValue("test").build())
- .addLabels(Protos.Label.newBuilder()
- .setKey("label").setValue("test").build())
- .build()
-
- val offers = List(Resources(backend.executorMemory(sc), 1))
- offerResources(offers)
- val launchedTasks = verifyTaskLaunched(driver, "o1")
-
- val labels = launchedTasks.head.getLabels
-
- assert(launchedTasks.head.getLabels.equals(taskLabels))
- }
-
test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
new file mode 100644
index 0000000000..36a4c1ab1a
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkFunSuite
+
+class MesosProtoUtilsSuite extends SparkFunSuite {
+ test("mesosLabels") {
+ val labels = MesosProtoUtils.mesosLabels("key:value")
+ assert(labels.getLabelsCount == 1)
+ val label = labels.getLabels(0)
+ assert(label.getKey == "key")
+ assert(label.getValue == "value")
+
+ val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
+ assert(labels2.getLabelsCount == 1)
+ val label2 = labels2.getLabels(0)
+ assert(label2.getKey == "key")
+ assert(label2.getValue == "value:value")
+
+ val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
+ assert(labels3.getLabelsCount == 2)
+ assert(labels3.getLabels(0).getKey == "key")
+ assert(labels3.getLabels(0).getValue == "value")
+ assert(labels3.getLabels(1).getKey == "key2")
+ assert(labels3.getLabels(1).getValue == "value2")
+
+ val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
+ assert(labels4.getLabelsCount == 1)
+ assert(labels4.getLabels(0).getKey == "key")
+ assert(labels4.getLabels(0).getValue == "value,value")
+ }
+}