[SPARK-21694][MESOS] Support Mesos CNI network labels

JIRA ticket: https://issues.apache.org/jira/browse/SPARK-21694

## What changes were proposed in this pull request?

Spark already supports launching containers attached to a given CNI network by specifying it via the config `spark.mesos.network.name`.

This PR adds support to pass in network labels to CNI plugins via a new config option `spark.mesos.network.labels`. These network labels are key-value pairs that are set in the `NetworkInfo` of both the driver and executor tasks. More details in the related Mesos documentation:  http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins

## How was this patch tested?

Unit tests, for both driver and executor tasks.
Manual integration test to submit a job with the `spark.mesos.network.labels` option, hit the mesos/state.json endpoint, and check that the labels are set in the driver and executor tasks.

ArtRand skonto

Author: Susan X. Huynh <xhuynh@mesosphere.com>

Closes #18910 from susanxhuynh/sh-mesos-cni-labels.
This commit is contained in:
Susan X. Huynh 2017-08-24 10:05:38 +01:00 committed by Sean Owen
parent 43cbfad999
commit ce0d3bb377
6 changed files with 53 additions and 9 deletions

View file

@ -537,6 +537,20 @@ See the [configuration page](configuration.html) for information on Spark config
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.network.labels</code></td>
<td><code>(none)</code></td>
<td>
Pass network labels to CNI plugins. This is a comma-separated list
of key-value pairs, where each key-value pair has the format key:value.
Example:
<pre>key1:val1,key2:val2</pre>
See
<a href="http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins">the Mesos CNI docs</a>
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.fetcherCache.enable</code></td>
<td><code>false</code></td>

View file

@ -56,7 +56,7 @@ package object config {
.stringConf
.createOptional
private [spark] val DRIVER_LABELS =
private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
@ -64,10 +64,25 @@ package object config {
.stringConf
.createOptional
private [spark] val DRIVER_FAILOVER_TIMEOUT =
private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)
private[spark] val NETWORK_NAME =
ConfigBuilder("spark.mesos.network.name")
.doc("Attach containers to the given named network. If this job is launched " +
"in cluster mode, also launch the driver in the given named network.")
.stringConf
.createOptional
private[spark] val NETWORK_LABELS =
ConfigBuilder("spark.mesos.network.labels")
.doc("Network labels to pass to CNI plugins. This is a comma-separated list " +
"of key-value pairs, where each key-value pair has the format key:value. " +
"Example: key1:val1,key2:val2")
.stringConf
.createOptional
}

View file

@ -670,7 +670,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private def executorHostname(offer: Offer): String = {
if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
if (sc.conf.get(NETWORK_NAME).isDefined) {
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
"0.0.0.0"
} else {

View file

@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
import org.apache.spark.internal.Logging
/**
@ -161,8 +162,12 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
}
conf.getOption("spark.mesos.network.name").map { name =>
val info = NetworkInfo.newBuilder().setName(name).build()
conf.get(NETWORK_NAME).map { name =>
val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse(""))
val info = NetworkInfo.newBuilder()
.setName(name)
.setLabels(networkLabels)
.build()
containerInfo.addNetworkInfos(info)
}

View file

@ -222,7 +222,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
}
test("supports spark.mesos.network.name") {
test("supports spark.mesos.network.name and spark.mesos.network.labels") {
setScheduler()
val mem = 1000
@ -233,7 +233,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name"),
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"s1",
new Date()))
@ -246,6 +247,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}
test("supports spark.mesos.driver.labels") {

View file

@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
test("mesos supports spark.mesos.network.name") {
test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}
test("supports spark.scheduler.minRegisteredResourcesRatio") {