[SPARK-20812][MESOS] Add secrets support to the dispatcher

Mesos has secrets primitives for environment and file-based secrets, this PR adds that functionality to the Spark dispatcher and the appropriate configuration flags.
Unit tested and manually tested against a DC/OS cluster with Mesos 1.4.

Author: ArtRand <arand@soe.ucsc.edu>

Closes #18837 from ArtRand/spark-20812-dispatcher-secrets-and-labels.
This commit is contained in:
ArtRand 2017-08-31 10:58:13 -07:00 committed by Marcelo Vanzin
parent 9696580c33
commit fc45c2c88a
10 changed files with 386 additions and 20 deletions

View file

@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
mesos-1.0.0-shaded-protobuf.jar
mesos-1.3.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -139,7 +139,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
mesos-1.0.0-shaded-protobuf.jar
mesos-1.3.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -33,7 +33,8 @@ To get started, follow the steps below to install Mesos and deploy Spark jobs vi
# Installing Mesos
Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not
require any special patches of Mesos.
require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or
newer.
If you already have a Mesos cluster running, you can skip this Mesos installation step.
@ -430,7 +431,8 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.secret</code></td>
<td>(none)</td>
<td>
Set the secret with which Spark framework will use to authenticate with Mesos.
Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when
authenticating with the registry.
</td>
</tr>
<tr>
@ -482,6 +484,43 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.secret.envkeys</code></td>
<td><code>(none)</code></td>
<td>
A comma-separated list that, if set, the contents of the secret referenced
by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
set to the provided environment variable in the driver's process.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.secret.filenames</code></td>
<td><code>(none)</code></td>
<td>
A comma-separated list that, if set, the contents of the secret referenced by
spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
written to the provided file. Paths are relative to the container's work
directory. Absolute paths must already exist. Consult the Mesos Secret
protobuf for more information.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.secret.names</code></td>
<td><code>(none)</code></td>
<td>
A comma-separated list of secret references. Consult the Mesos Secret
protobuf for more information.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.secret.values</code></td>
<td><code>(none)</code></td>
<td>
A comma-separated list of secret values. Consult the Mesos Secret
protobuf for more information.
</td>
</tr>
<tr>
<td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>(none)</code></td>

View file

@ -73,6 +73,9 @@ For long-running apps like Spark Streaming apps to be able to write to HDFS, it
### Standalone mode
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
### Mesos mode
Mesos 1.3.0 and newer supports `Secrets` primitives as both file-based and environment based secrets. Spark allows the specification of file-based and environment variable based secrets with the `spark.mesos.driver.secret.filenames` and `spark.mesos.driver.secret.envkeys`, respectively. Depending on the secret store backend secrets can be passed by reference or by value with the `spark.mesos.driver.secret.names` and `spark.mesos.driver.secret.values` configuration properties, respectively. Reference type secrets are served by the secret store and referred to by name, for example `/mysecret`. Value type secrets are passed on the command line and translated into their appropriate files or environment variables.
### Preparing the key-stores
Key-stores can be generated by `keytool` program. The reference documentation for this tool is
[here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic

View file

@ -29,7 +29,7 @@
<name>Spark Project Mesos</name>
<properties>
<sbt.project.name>mesos</sbt.project.name>
<mesos.version>1.0.0</mesos.version>
<mesos.version>1.3.0</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
</properties>

View file

@ -58,12 +58,43 @@ package object config {
private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional
private[spark] val SECRET_NAME =
ConfigBuilder("spark.mesos.driver.secret.names")
.doc("A comma-separated list of secret reference names. Consult the Mesos Secret protobuf " +
"for more information.")
.stringConf
.toSequence
.createOptional
private[spark] val SECRET_VALUE =
ConfigBuilder("spark.mesos.driver.secret.values")
.doc("A comma-separated list of secret values.")
.stringConf
.toSequence
.createOptional
private[spark] val SECRET_ENVKEY =
ConfigBuilder("spark.mesos.driver.secret.envkeys")
.doc("A comma-separated list of the environment variables to contain the secrets." +
"The environment variable will be set on the driver.")
.stringConf
.toSequence
.createOptional
private[spark] val SECRET_FILENAME =
ConfigBuilder("spark.mesos.driver.secret.filenames")
.doc("A comma-seperated list of file paths secret will be written to. Consult the Mesos " +
"Secret protobuf for more information.")
.stringConf
.toSequence
.createOptional
private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +

View file

@ -28,6 +28,7 @@ import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.MesosDriverDescription
@ -386,12 +387,46 @@ private[spark] class MesosClusterScheduler(
val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val envBuilder = Environment.newBuilder()
// add normal environment variables
env.foreach { case (k, v) =>
envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v))
}
// add secret environment variables
getSecretEnvVar(desc).foreach { variable =>
if (variable.getSecret.getReference.isInitialized) {
logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" +
s"on file ${variable.getName}")
} else {
logInfo(s"Setting secret on environment variable name=${variable.getName}")
}
envBuilder.addVariables(variable)
}
envBuilder.build()
}
private def getSecretEnvVar(desc: MesosDriverDescription): List[Variable] = {
val secrets = getSecrets(desc)
val secretEnvKeys = desc.conf.get(config.SECRET_ENVKEY).getOrElse(Nil)
if (illegalSecretInput(secretEnvKeys, secrets)) {
throw new SparkException(
s"Need to give equal numbers of secrets and environment keys " +
s"for environment-based reference secrets got secrets $secrets, " +
s"and keys $secretEnvKeys")
}
secrets.zip(secretEnvKeys).map {
case (s, k) =>
Variable.newBuilder()
.setName(k)
.setType(Variable.Type.SECRET)
.setSecret(s)
.build
}.toList
}
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
@ -529,18 +564,104 @@ private[spark] class MesosClusterScheduler(
val appName = desc.conf.get("spark.app.name")
val driverLabels = MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS)
.getOrElse(""))
TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.setContainer(getContainerInfo(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
.setLabels(driverLabels)
.build
}
private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = {
val containerInfo = MesosSchedulerBackendUtil.containerInfo(desc.conf)
getSecretVolume(desc).foreach { volume =>
if (volume.getSource.getSecret.getReference.isInitialized) {
logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" +
s"on file ${volume.getContainerPath}")
} else {
logInfo(s"Setting secret on file name=${volume.getContainerPath}")
}
containerInfo.addVolumes(volume)
}
containerInfo
}
private def getSecrets(desc: MesosDriverDescription): Seq[Secret] = {
def createValueSecret(data: String): Secret = {
Secret.newBuilder()
.setType(Secret.Type.VALUE)
.setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
.build()
}
def createReferenceSecret(name: String): Secret = {
Secret.newBuilder()
.setReference(Secret.Reference.newBuilder().setName(name))
.setType(Secret.Type.REFERENCE)
.build()
}
val referenceSecrets: Seq[Secret] =
desc.conf.get(config.SECRET_NAME).getOrElse(Nil).map(s => createReferenceSecret(s))
val valueSecrets: Seq[Secret] = {
desc.conf.get(config.SECRET_VALUE).getOrElse(Nil).map(s => createValueSecret(s))
}
if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
throw new SparkException("Cannot specify VALUE type secrets and REFERENCE types ones")
}
if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
}
private def illegalSecretInput(dest: Seq[String], s: Seq[Secret]): Boolean = {
if (dest.isEmpty) { // no destination set (ie not using secrets of this type
return false
}
if (dest.nonEmpty && s.nonEmpty) {
// make sure there is a destination for each secret of this type
if (dest.length != s.length) {
return true
}
}
false
}
private def getSecretVolume(desc: MesosDriverDescription): List[Volume] = {
val secrets = getSecrets(desc)
val secretPaths: Seq[String] =
desc.conf.get(config.SECRET_FILENAME).getOrElse(Nil)
if (illegalSecretInput(secretPaths, secrets)) {
throw new SparkException(
s"Need to give equal numbers of secrets and file paths for file-based " +
s"reference secrets got secrets $secrets, and paths $secretPaths")
}
secrets.zip(secretPaths).map {
case (s, p) =>
val source = Volume.Source.newBuilder()
.setType(Volume.Source.Type.SECRET)
.setSecret(s)
Volume.newBuilder()
.setContainerPath(p)
.setSource(source)
.setMode(Volume.Mode.RO)
.build
}.toList
}
/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@ -584,9 +705,14 @@ private[spark] class MesosClusterScheduler(
} catch {
case e: SparkException =>
afterLaunchCallback(submission.submissionId)
finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
build(), None, null, None, getDriverFrameworkID(submission))
finishedDrivers += new MesosClusterSubmissionState(
submission,
TaskID.newBuilder().setValue(submission.submissionId).build(),
SlaveID.newBuilder().setValue("").build(),
None,
null,
None,
getDriverFrameworkID(submission))
logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
}

View file

@ -122,7 +122,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.toList
}
def containerInfo(conf: SparkConf): ContainerInfo = {
def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
conf.get("spark.mesos.containerizer", "docker") == "docker") {
ContainerInfo.Type.DOCKER
@ -149,8 +149,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.getOrElse(List.empty)
if (containerType == ContainerInfo.Type.DOCKER) {
containerInfo
.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
@ -171,7 +170,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
containerInfo.addNetworkInfos(info)
}
containerInfo.build()
containerInfo
}
private def dockerInfo(

View file

@ -510,12 +510,20 @@ trait MesosSchedulerUtils extends Logging {
}
def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
case MesosTaskState.TASK_STAGING |
MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
case MesosTaskState.TASK_RUNNING |
MesosTaskState.TASK_KILLING => TaskState.RUNNING
case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
case MesosTaskState.TASK_FAILED => TaskState.FAILED
case MesosTaskState.TASK_FAILED |
MesosTaskState.TASK_GONE |
MesosTaskState.TASK_GONE_BY_OPERATOR => TaskState.FAILED
case MesosTaskState.TASK_KILLED => TaskState.KILLED
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
case MesosTaskState.TASK_LOST |
MesosTaskState.TASK_ERROR |
MesosTaskState.TASK_DROPPED |
MesosTaskState.TASK_UNKNOWN |
MesosTaskState.TASK_UNREACHABLE => TaskState.LOST
}
def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {

View file

@ -21,9 +21,10 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
@ -338,4 +339,163 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
verify(driver, times(1)).declineOffer(offerId, filter)
}
test("Creates an env-based reference secrets.") {
setScheduler()
val mem = 1000
val cpu = 1
val secretName = "/path/to/secret,/anothersecret"
val envKey = "SECRET_ENV_KEY,PASSWORD"
val driverDesc = new MesosDriverDescription(
"d1",
"jar",
mem,
cpu,
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.secret.names" -> secretName,
"spark.mesos.driver.secret.envkeys" -> envKey),
"s1",
new Date())
val response = scheduler.submitDriver(driverDesc)
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, Collections.singletonList(offer))
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
assert(launchedTasks.head
.getCommand
.getEnvironment
.getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret
val variableOne = launchedTasks.head.getCommand.getEnvironment
.getVariablesList.asScala.filter(_.getName == "SECRET_ENV_KEY").head
assert(variableOne.getSecret.isInitialized)
assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
assert(variableOne.getSecret.getReference.getName == "/path/to/secret")
assert(variableOne.getType == Environment.Variable.Type.SECRET)
val variableTwo = launchedTasks.head.getCommand.getEnvironment
.getVariablesList.asScala.filter(_.getName == "PASSWORD").head
assert(variableTwo.getSecret.isInitialized)
assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE)
assert(variableTwo.getSecret.getReference.getName == "/anothersecret")
assert(variableTwo.getType == Environment.Variable.Type.SECRET)
}
test("Creates an env-based value secrets.") {
setScheduler()
val mem = 1000
val cpu = 1
val secretValues = "user,password"
val envKeys = "USER,PASSWORD"
val driverDesc = new MesosDriverDescription(
"d1",
"jar",
mem,
cpu,
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.secret.values" -> secretValues,
"spark.mesos.driver.secret.envkeys" -> envKeys),
"s1",
new Date())
val response = scheduler.submitDriver(driverDesc)
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, Collections.singletonList(offer))
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
assert(launchedTasks.head
.getCommand
.getEnvironment
.getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret
val variableOne = launchedTasks.head.getCommand.getEnvironment
.getVariablesList.asScala.filter(_.getName == "USER").head
assert(variableOne.getSecret.isInitialized)
assert(variableOne.getSecret.getType == Secret.Type.VALUE)
assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes))
assert(variableOne.getType == Environment.Variable.Type.SECRET)
val variableTwo = launchedTasks.head.getCommand.getEnvironment
.getVariablesList.asScala.filter(_.getName == "PASSWORD").head
assert(variableTwo.getSecret.isInitialized)
assert(variableTwo.getSecret.getType == Secret.Type.VALUE)
assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes))
assert(variableTwo.getType == Environment.Variable.Type.SECRET)
}
test("Creates file-based reference secrets.") {
setScheduler()
val mem = 1000
val cpu = 1
val secretName = "/path/to/secret,/anothersecret"
val secretPath = "/topsecret,/mypassword"
val driverDesc = new MesosDriverDescription(
"d1",
"jar",
mem,
cpu,
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.secret.names" -> secretName,
"spark.mesos.driver.secret.filenames" -> secretPath),
"s1",
new Date())
val response = scheduler.submitDriver(driverDesc)
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, Collections.singletonList(offer))
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
val volumes = launchedTasks.head.getContainer.getVolumesList
assert(volumes.size() == 2)
val secretVolOne = volumes.get(0)
assert(secretVolOne.getContainerPath == "/topsecret")
assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE)
assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret")
val secretVolTwo = volumes.get(1)
assert(secretVolTwo.getContainerPath == "/mypassword")
assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE)
assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret")
}
test("Creates a file-based value secrets.") {
setScheduler()
val mem = 1000
val cpu = 1
val secretValues = "user,password"
val secretPath = "/whoami,/mypassword"
val driverDesc = new MesosDriverDescription(
"d1",
"jar",
mem,
cpu,
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.secret.values" -> secretValues,
"spark.mesos.driver.secret.filenames" -> secretPath),
"s1",
new Date())
val response = scheduler.submitDriver(driverDesc)
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
scheduler.resourceOffers(driver, Collections.singletonList(offer))
val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
val volumes = launchedTasks.head.getContainer.getVolumesList
assert(volumes.size() == 2)
val secretVolOne = volumes.get(0)
assert(secretVolOne.getContainerPath == "/whoami")
assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE)
assert(secretVolOne.getSource.getSecret.getValue.getData ==
ByteString.copyFrom("user".getBytes))
val secretVolTwo = volumes.get(1)
assert(secretVolTwo.getContainerPath == "/mypassword")
assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE)
assert(secretVolTwo.getSource.getSecret.getValue.getData ==
ByteString.copyFrom("password".getBytes))
}
}