[SPARK-16637] Unified containerizer

## What changes were proposed in this pull request?

New config var: spark.mesos.docker.containerizer={"mesos","docker" (default)}

This adds support for running docker containers via the Mesos unified containerizer: http://mesos.apache.org/documentation/latest/container-image/

The benefit is losing the dependency on `dockerd`, and all the costs which it incurs.

I've also updated the supported Mesos version to 0.28.2 for support of the required protobufs.

This is blocked on: https://github.com/apache/spark/pull/14167

## How was this patch tested?

- manually testing jobs submitted with both "mesos" and "docker" settings for the new config var.
- spark/mesos integration test suite

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #14275 from mgummelt/unified-containerizer.
This commit is contained in:
Michael Gummelt 2016-07-29 05:50:47 -07:00 committed by Sean Owen
parent 04a2c072d9
commit 266b92faff
18 changed files with 149 additions and 79 deletions

View file

@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
}
/** Get all parameters that start with `prefix` */
def getAllWithPrefix(prefix: String): Array[(String, String)] = {
getAll.filter { case (k, v) => k.startsWith(prefix) }
.map { case (k, v) => (k.substring(prefix.length), v) }
}
/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
getOption(key).map(_.toInt).getOrElse(defaultValue)
@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
getAll.filter{case (k, v) => k.startsWith(prefix)}
.map{case (k, v) => (k.substring(prefix.length), v)}
getAllWithPrefix("spark.executorEnv.")
}
/**

View file

@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
}
def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
case MesosTaskState.TASK_STAGING => LAUNCHING
case MesosTaskState.TASK_STARTING => LAUNCHING
case MesosTaskState.TASK_RUNNING => RUNNING
case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
case MesosTaskState.TASK_FINISHED => FINISHED
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
}
}

View file

@ -19,6 +19,7 @@ package org.apache.spark.deploy.mesos
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.deploy.Command
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
@ -40,12 +41,15 @@ private[spark] class MesosDriverDescription(
val cores: Double,
val supervise: Boolean,
val command: Command,
val schedulerProperties: Map[String, String],
schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
extends Serializable {
val conf = new SparkConf(false)
schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
def copy(
name: String = name,
jarUrl: String = jarUrl,
@ -53,11 +57,12 @@ private[spark] class MesosDriverDescription(
cores: Double = cores,
supervise: Boolean = supervise,
command: Command = command,
schedulerProperties: Map[String, String] = schedulerProperties,
schedulerProperties: SparkConf = conf,
submissionId: String = submissionId,
submissionDate: Date = submissionDate,
retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
submissionId, submissionDate, retryState)
}

View file

@ -50,7 +50,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
val driverDescription = Iterable.apply(driverState.description)
val submissionState = Iterable.apply(driverState.submissionState)
val command = Iterable.apply(driverState.description.command)
val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
val commandEnv = Iterable.apply(driverState.description.command.environment)
val driverTable =
UIUtils.listingTable(driverHeaders, driverRow, driverDescription)

View file

@ -353,19 +353,16 @@ private[spark] class MesosClusterScheduler(
}
}
private def getDriverExecutorURI(desc: MesosDriverDescription) = {
desc.schedulerProperties.get("spark.executor.uri")
private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
desc.conf.getOption("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
val env = {
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
val prefix = "spark.mesos.driverEnv."
val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
.map { case (k, v) => (k.substring(prefix.length), v) }
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
driverEnv ++ executorEnv ++ desc.command.environment
}
@ -379,8 +376,8 @@ private[spark] class MesosClusterScheduler(
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.schedulerProperties.get("spark.mesos.uris"),
desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
desc.conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.submit.pyFiles")).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
@ -391,7 +388,7 @@ private[spark] class MesosClusterScheduler(
}
private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
@ -411,7 +408,7 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
@ -438,7 +435,7 @@ private[spark] class MesosClusterScheduler(
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
"--name", desc.conf.get("spark.app.name"),
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
@ -454,19 +451,19 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}
desc.schedulerProperties.get("spark.executor.memory").map { v =>
desc.conf.getOption("spark.executor.memory").foreach { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
desc.conf.getOption("spark.cores.max").foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
desc.schedulerProperties
desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
options
@ -476,6 +473,7 @@ private[spark] class MesosClusterScheduler(
* Escape args for Unix-like shells, unless already quoted by the user.
* Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
* and http://www.grymoire.com/Unix/Quote.html
*
* @param value argument
* @return escaped argument
*/
@ -498,6 +496,33 @@ private[spark] class MesosClusterScheduler(
}
}
private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
offer.resources = finalResources.asJava
val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
.setSlaveId(offer.slaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
desc.conf,
taskInfo.getContainerBuilder)
}
taskInfo.build
}
/**
* This method takes all the possible candidates and attempt to schedule them with Mesos offers.
* Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@ -521,32 +546,12 @@ private[spark] class MesosClusterScheduler(
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.resources, "cpus", driverCpu)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for $appName")
.setSlaveId(offer.slaveId)
.setCommand(commandInfo)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
submission.schedulerProperties.get,
taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)

View file

@ -410,7 +410,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
sc.conf,
taskBuilder.getContainerBuilder
)
}

View file

@ -153,7 +153,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
sc.conf,
executorInfo.getContainerBuilder()
)
}

View file

@ -17,9 +17,10 @@
package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
/**
@ -104,19 +105,33 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
containerizer: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
containerizer match {
case "docker" =>
container.setType(ContainerInfo.Type.DOCKER)
val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
// TODO (mgummelt): Remove this. Portmaps have no effect,
// as we don't support bridge networking.
portmaps.foreach(_.foreach(docker.addPortMappings))
container.setType(ContainerInfo.Type.DOCKER)
container.setDocker(docker.build())
container.setDocker(docker)
case "mesos" =>
container.setType(ContainerInfo.Type.MESOS)
val imageProto = Image.newBuilder()
.setType(Image.Type.DOCKER)
.setDocker(Image.Docker.newBuilder().setName(image))
.setCached(!forcePullImage)
container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
case _ =>
throw new SparkException(
"spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
}
volumes.foreach(_.foreach(container.addVolumes))
}
@ -125,18 +140,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
*/
def setupContainerBuilderDockerInfo(
imageName: String,
conf: String => Option[String],
conf: SparkConf,
builder: ContainerInfo.Builder): Unit = {
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
val forcePullImage = conf
.getOption("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
val volumes = conf("spark.mesos.executor.docker.volumes")
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
val portmaps = conf("spark.mesos.executor.docker.portmaps")
val portmaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
val containerizer = conf.get("spark.mesos.containerizer", "docker")
addDockerInfo(
builder,
imageName,
containerizer,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)

View file

@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
@ -79,7 +80,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
credBuilder.setPrincipal(principal)
}
conf.getOption("spark.mesos.secret").foreach { secret =>
credBuilder.setSecret(ByteString.copyFromUtf8(secret))
credBuilder.setSecret(secret)
}
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(

View file

@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == executorCores)
}
@ -123,7 +123,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == offerCores)
}
@ -137,7 +137,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
assert(cpus == maxCores)
}
@ -252,6 +252,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
}
test("honors spark.mesos.containerizer=\"mesos\"") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "test",
"spark.mesos.containerizer" -> "mesos"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
}
test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",

View file

@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mesos-0.22.2-shaded-protobuf.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.22.2-shaded-protobuf.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.22.2-shaded-protobuf.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.22.2-shaded-protobuf.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.22.2-shaded-protobuf.jar
mesos-1.0.0-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar

View file

@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
MESOS_VERSION: 0.22.0
MESOS_VERSION: 1.0.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark

View file

@ -432,6 +432,16 @@ See the [configuration page](configuration.html) for information on Spark config
</ul>
</td>
</tr>
<tr>
<td><code>spark.mesos.containerizer</code></td>
<td><code>docker</code></td>
<td>
This only affects docker containers, and must be one of "docker"
or "mesos". Mesos supports two types of
containerizers for docker: the "docker" containerizer, and the preferred
"mesos" containerizer. Read more here: http://mesos.apache.org/documentation/latest/container-image/
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.webui.url</code></td>
<td><code>(none)</code></td>

View file

@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<mesos.version>0.22.2</mesos.version>
<mesos.version>1.0.0</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>