[SPARK-20085][MESOS] Configurable mesos labels for executors

## What changes were proposed in this pull request?

Add spark.mesos.task.labels configuration option to add mesos key:value labels to the executor.

 "k1:v1,k2:v2" as the format, colons separating key-value and commas to list out more than one.

Discussion of labels with mgummelt at #17404

## How was this patch tested?

Added unit tests to verify labels were added correctly, with incorrect labels being ignored and added a test to test the name of the executor.

Tested with: `./build/sbt -Pmesos mesos/test`

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Kalvin Chau <kalvin.chau@viasat.com>

Closes #17413 from kalvinnchau/mesos-labels.
This commit is contained in:
Kalvin Chau 2017-04-06 09:14:31 +01:00 committed by Sean Owen
parent e156b5dd39
commit c8fc1f3bad
3 changed files with 79 additions and 0 deletions

View file

@ -367,6 +367,15 @@ See the [configuration page](configuration.html) for information on Spark config
<pre>[host_path:]container_path[:ro|:rw]</pre>
</td>
</tr>
<tr>
<td><code>spark.mesos.task.labels</code></td>
<td>(none)</td>
<td>
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
Key-value pairs should be separated by a colon, and commas used to list more than one.
Ex. key:value,key2:value2.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>

View file

@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private val taskLabels = conf.get("spark.mesos.task.labels", "")
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
@ -408,6 +410,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
val labelsBuilder = taskBuilder.getLabelsBuilder
val labels = buildMesosLabels().asJava
labelsBuilder.addAllLabels(labels)
taskBuilder.setLabels(labelsBuilder)
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
@ -422,6 +431,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}
private def buildMesosLabels(): List[Label] = {
taskLabels.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList
}
/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],

View file

@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
}
test("mesos sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))
// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
test("mesos ignored invalid labels and sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))
// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"