[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher

## What changes were proposed in this pull request?

Add Mesos labels support to the Spark Dispatcher

## How was this patch tested?

unit tests

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels.
This commit is contained in:
Michael Gummelt 2017-06-11 09:49:39 +01:00 committed by Sean Owen
parent dc4c351837
commit 8da3f7041a
8 changed files with 157 additions and 53 deletions

View file

@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td>(none)</td>
<td>
Set the Mesos labels to add to each task. Labels are free-form key-value pairs.
Key-value pairs should be separated by a colon, and commas used to list more than one.
Ex. key:value,key2:value2.
Key-value pairs should be separated by a colon, and commas used to
list more than one. If your label includes a colon or comma, you
can escape it with a backslash. Ex. key:value,key2:a\:b.
</td>
</tr>
<tr>
@ -468,6 +469,15 @@ See the [configuration page](configuration.html) for information on Spark config
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.labels</code></td>
<td><code>(none)</code></td>
<td>
Mesos labels to add to the driver. See <code>spark.mesos.task.labels</code>
for formatting information.
</td>
</tr>
<tr>
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>(none)</code></td>

View file

@ -56,4 +56,11 @@ package object config {
.stringConf
.createOptional
private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
}

View file

@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
* @param driverDescription Submitted driver description from
@ -525,15 +527,17 @@ private[spark] class MesosClusterScheduler(
offer.remainingResources = finalResources.asJava
val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
taskInfo.build
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
.build
}
/**

View file

@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName(s"${sc.appName} $taskId")
taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
val labelsBuilder = taskBuilder.getLabelsBuilder
val labels = buildMesosLabels().asJava
labelsBuilder.addAllLabels(labels)
taskBuilder.setLabels(labelsBuilder)
.setLabels(MesosProtoUtils.mesosLabels(taskLabels))
.addAllResources(resourcesToUse.asJava)
.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}
private def buildMesosLabels(): List[Label] = {
taskLabels.split(",").flatMap(label =>
label.split(":") match {
case Array(key, value) =>
Some(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
None
}
).toList
}
/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConverters._
import org.apache.mesos.Protos
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
object MesosProtoUtils extends Logging {
/** Parses a label string of the format specified in spark.mesos.task.labels. */
def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
val labels: Seq[Protos.Label] = if (labelsStr == "") {
Seq()
} else {
labelsStr.split("""(?<!\\),""").toSeq.map { labelStr =>
val parts = labelStr.split("""(?<!\\):""")
if (parts.length != 2) {
throw new SparkException(s"Malformed label: ${labelStr}")
}
val cleanedParts = parts
.map(part => part.replaceAll("""\\,""", ","))
.map(part => part.replaceAll("""\\:""", ":"))
Protos.Label.newBuilder()
.setKey(cleanedParts(0))
.setValue(cleanedParts(1))
.build()
}
}
Protos.Labels.newBuilder().addAllLabels(labels.asJava)
}
}

View file

@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getName == "test-network-name")
}
test("supports spark.mesos.driver.labels") {
setScheduler()
val mem = 1000
val cpu = 1
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
"s1",
new Date()))
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, List(offer).asJava)
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(labels.getLabelsCount == 1)
assert(labels.getLabels(0).getKey == "key")
assert(labels.getLabels(0).getValue == "value")
}
test("can kill supervised drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")

View file

@ -532,29 +532,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
test("mesos ignored invalid labels and sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))
// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
val labels = launchedTasks.head.getLabels
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark.SparkFunSuite
class MesosProtoUtilsSuite extends SparkFunSuite {
test("mesosLabels") {
val labels = MesosProtoUtils.mesosLabels("key:value")
assert(labels.getLabelsCount == 1)
val label = labels.getLabels(0)
assert(label.getKey == "key")
assert(label.getValue == "value")
val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
assert(labels2.getLabelsCount == 1)
val label2 = labels2.getLabels(0)
assert(label2.getKey == "key")
assert(label2.getValue == "value:value")
val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
assert(labels3.getLabelsCount == 2)
assert(labels3.getLabels(0).getKey == "key")
assert(labels3.getLabels(0).getValue == "value")
assert(labels3.getLabels(1).getKey == "key2")
assert(labels3.getLabels(1).getValue == "value2")
val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
assert(labels4.getLabelsCount == 1)
assert(labels4.getLabels(0).getKey == "key")
assert(labels4.getLabels(0).getValue == "value,value")
}
}