diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 8d5ad12cb8..ef01cfe4b9 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -367,6 +367,15 @@ See the [configuration page](configuration.html) for information on Spark config
[host_path:]container_path[:ro|:rw]
+ + spark.mesos.task.labels + (none) + + Set the Mesos labels to add to each task. Labels are free-form key-value pairs. + Key-value pairs should be separated by a colon, and commas used to list more than one. + Ex. key:value,key2:value2. + + spark.mesos.executor.home driver side SPARK_HOME diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5bdc2a2b84..2a36ec4fa8 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val taskLabels = conf.get("spark.mesos.task.labels", "") + private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") @@ -408,6 +410,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskBuilder.addAllResources(resourcesToUse.asJava) taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) + val labelsBuilder = taskBuilder.getLabelsBuilder + val labels = buildMesosLabels().asJava + + labelsBuilder.addAllLabels(labels) + + taskBuilder.setLabels(labelsBuilder) + tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava totalCoresAcquired += taskCPUs @@ -422,6 +431,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( tasks.toMap } + private def buildMesosLabels(): List[Label] = { + taskLabels.split(",").flatMap(label => + label.split(":") match { + case Array(key, value) => + Some(Label.newBuilder() + .setKey(key) + .setValue(value) + .build()) + case _ => + logWarning(s"Unable to parse $label into a key:value label for the task.") + None + } + ).toList + } + /** Extracts task needed resources from a list of available resources. */ private def partitionTaskResources( resources: JList[Resource], diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index eb83926ae4..c040f05d93 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0") } + test("mesos sets configurable labels on tasks") { + val taskLabelsString = "mesos:test,label:test" + setBackend(Map( + "spark.mesos.task.labels" -> taskLabelsString + )) + + // Build up the labels + val taskLabels = Protos.Labels.newBuilder() + .addLabels(Protos.Label.newBuilder() + .setKey("mesos").setValue("test").build()) + .addLabels(Protos.Label.newBuilder() + .setKey("label").setValue("test").build()) + .build() + + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + + val labels = launchedTasks.head.getLabels + + assert(launchedTasks.head.getLabels.equals(taskLabels)) + } + + test("mesos ignored invalid labels and sets configurable labels on tasks") { + val taskLabelsString = "mesos:test,label:test,incorrect:label:here" + setBackend(Map( + "spark.mesos.task.labels" -> taskLabelsString + )) + + // Build up the labels + val taskLabels = Protos.Labels.newBuilder() + .addLabels(Protos.Label.newBuilder() + .setKey("mesos").setValue("test").build()) + .addLabels(Protos.Label.newBuilder() + .setKey("label").setValue("test").build()) + .build() + + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + + val labels = launchedTasks.head.getLabels + + assert(launchedTasks.head.getLabels.equals(taskLabels)) + } + test("mesos supports spark.mesos.network.name") { setBackend(Map( "spark.mesos.network.name" -> "test-network-name"