[SPARK-7373] [MESOS] Add docker support for launching drivers in mesos cluster mode.

Using the existing docker support for mesos, also enabling the mesos cluster mode scheduler to launch Spark drivers in docker images as well.

This also allows the executors launched by the drivers to be also in the same Docker image by passing  the docker settings.

Author: Timothy Chen <tnachen@gmail.com>

Closes #5917 from tnachen/spark_cluster_docker and squashes the following commits:

1e842f5 [Timothy Chen] Add docker support for launching drivers in mesos cluster mode.
This commit is contained in:
Timothy Chen 2015-05-07 12:23:16 -07:00 committed by Andrew Or
parent 0c33bf817c
commit 4eecf550aa

View file

@ -370,16 +370,21 @@ private[spark] class MesosClusterScheduler(
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
val cmdOptions = generateCmdOption(desc)
val cmdOptions = generateCmdOption(desc).mkString(" ")
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
val appArguments = desc.command.arguments.mkString(" ")
val cmd = if (executorUri.isDefined) {
val (executable, jar) = if (dockerDefined) {
// Application jar is automatically downloaded in the mounted sandbox by Mesos,
// and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
} else if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
val cmdJar = s"../${desc.jarUrl.split("/").last}"
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
(cmdExecutable, cmdJar)
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
@ -389,9 +394,9 @@ private[spark] class MesosClusterScheduler(
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
val cmdJar = desc.jarUrl.split("/").last
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
(cmdExecutable, cmdJar)
}
builder.setValue(cmd)
builder.setValue(s"$executable $cmdOptions $jar $appArguments")
builder.setEnvironment(envBuilder.build())
builder.build()
}
@ -458,9 +463,20 @@ private[spark] class MesosClusterScheduler(
.setCommand(commandInfo)
.addResources(cpuResource)
.addResources(memResource)
.build()
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
.get("spark.mesos.executor.docker.volumes")
.map(MesosSchedulerBackendUtil.parseVolumesSpec)
val portmaps = submission.schedulerProperties
.get("spark.mesos.executor.docker.portmaps")
.map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
MesosSchedulerBackendUtil.addDockerInfo(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo
queuedTasks += taskInfo.build()
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,