Revert "[SPARK-15271][MESOS] Allow force pulling executor docker images"
This reverts commit 978cd5f125
.
This commit is contained in:
parent
3b6e1d094e
commit
fc17121d59
|
@ -537,10 +537,16 @@ private[spark] class MesosClusterScheduler(
|
|||
.addAllResources(memResourcesToUse.asJava)
|
||||
offer.resources = finalResources.asJava
|
||||
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
|
||||
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
|
||||
image,
|
||||
submission.schedulerProperties.get,
|
||||
taskInfo.getContainerBuilder())
|
||||
val container = taskInfo.getContainerBuilder()
|
||||
val volumes = submission.schedulerProperties
|
||||
.get("spark.mesos.executor.docker.volumes")
|
||||
.map(MesosSchedulerBackendUtil.parseVolumesSpec)
|
||||
val portmaps = submission.schedulerProperties
|
||||
.get("spark.mesos.executor.docker.portmaps")
|
||||
.map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
|
||||
MesosSchedulerBackendUtil.addDockerInfo(
|
||||
container, image, volumes = volumes, portmaps = portmaps)
|
||||
taskInfo.setContainer(container.build())
|
||||
}
|
||||
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
|
||||
queuedTasks += taskInfo.build()
|
||||
|
|
|
@ -408,11 +408,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
|
|||
.addAllResources(memResourcesToUse.asJava)
|
||||
|
||||
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
|
||||
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
|
||||
image,
|
||||
sc.conf.getOption,
|
||||
taskBuilder.getContainerBuilder
|
||||
)
|
||||
MesosSchedulerBackendUtil
|
||||
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
|
||||
}
|
||||
|
||||
tasks(offer.getId) ::= taskBuilder.build()
|
||||
|
|
|
@ -151,11 +151,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
|
|||
.setData(ByteString.copyFrom(createExecArg()))
|
||||
|
||||
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
|
||||
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
|
||||
image,
|
||||
sc.conf.getOption,
|
||||
executorInfo.getContainerBuilder()
|
||||
)
|
||||
MesosSchedulerBackendUtil
|
||||
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
|
||||
}
|
||||
|
||||
(executorInfo.build(), resourcesAfterMem.asJava)
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
|
|||
import org.apache.mesos.Protos.{ContainerInfo, Volume}
|
||||
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.internal.Logging
|
||||
|
||||
/**
|
||||
|
@ -104,14 +105,11 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
def addDockerInfo(
|
||||
container: ContainerInfo.Builder,
|
||||
image: String,
|
||||
forcePullImage: Boolean = false,
|
||||
volumes: Option[List[Volume]] = None,
|
||||
network: Option[ContainerInfo.DockerInfo.Network] = None,
|
||||
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
|
||||
|
||||
val docker = ContainerInfo.DockerInfo.newBuilder()
|
||||
.setImage(image)
|
||||
.setForcePullImage(forcePullImage)
|
||||
val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
|
||||
|
||||
network.foreach(docker.setNetwork)
|
||||
portmaps.foreach(_.foreach(docker.addPortMappings))
|
||||
|
@ -121,23 +119,21 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Setup a docker containerizer from MesosDriverDescription scheduler properties
|
||||
* Setup a docker containerizer
|
||||
*/
|
||||
def setupContainerBuilderDockerInfo(
|
||||
imageName: String,
|
||||
conf: String => Option[String],
|
||||
conf: SparkConf,
|
||||
builder: ContainerInfo.Builder): Unit = {
|
||||
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
|
||||
.exists(_.equals("true"))
|
||||
val volumes = conf("spark.mesos.executor.docker.volumes")
|
||||
val volumes = conf
|
||||
.getOption("spark.mesos.executor.docker.volumes")
|
||||
.map(parseVolumesSpec)
|
||||
val portmaps = conf("spark.mesos.executor.docker.portmaps")
|
||||
val portmaps = conf
|
||||
.getOption("spark.mesos.executor.docker.portmaps")
|
||||
.map(parsePortMappingsSpec)
|
||||
|
||||
addDockerInfo(
|
||||
builder,
|
||||
imageName,
|
||||
forcePullImage = forcePullImage,
|
||||
volumes = volumes,
|
||||
portmaps = portmaps)
|
||||
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
|
||||
|
|
|
@ -252,69 +252,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
|
|||
backend.start()
|
||||
}
|
||||
|
||||
test("docker settings are reflected in created tasks") {
|
||||
setBackend(Map(
|
||||
"spark.mesos.executor.docker.image" -> "some_image",
|
||||
"spark.mesos.executor.docker.forcePullImage" -> "true",
|
||||
"spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
|
||||
"spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
|
||||
))
|
||||
|
||||
val (mem, cpu) = (backend.executorMemory(sc), 4)
|
||||
|
||||
val offer1 = createOffer("o1", "s1", mem, cpu)
|
||||
backend.resourceOffers(driver, List(offer1).asJava)
|
||||
|
||||
val launchedTasks = verifyTaskLaunched("o1").asScala
|
||||
assert(launchedTasks.size == 1)
|
||||
|
||||
val containerInfo = launchedTasks.head.getContainer
|
||||
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
|
||||
|
||||
val volumes = containerInfo.getVolumesList.asScala
|
||||
assert(volumes.size == 1)
|
||||
|
||||
val volume = volumes.head
|
||||
assert(volume.getHostPath == "/host_vol")
|
||||
assert(volume.getContainerPath == "/container_vol")
|
||||
assert(volume.getMode == Volume.Mode.RO)
|
||||
|
||||
val dockerInfo = containerInfo.getDocker
|
||||
|
||||
assert(dockerInfo.getImage == "some_image")
|
||||
assert(dockerInfo.getForcePullImage)
|
||||
|
||||
val portMappings = dockerInfo.getPortMappingsList.asScala
|
||||
assert(portMappings.size == 1)
|
||||
|
||||
val portMapping = portMappings.head
|
||||
assert(portMapping.getHostPort == 8080)
|
||||
assert(portMapping.getContainerPort == 80)
|
||||
assert(portMapping.getProtocol == "tcp")
|
||||
}
|
||||
|
||||
test("force-pull-image option is disabled by default") {
|
||||
setBackend(Map(
|
||||
"spark.mesos.executor.docker.image" -> "some_image"
|
||||
))
|
||||
|
||||
val (mem, cpu) = (backend.executorMemory(sc), 4)
|
||||
|
||||
val offer1 = createOffer("o1", "s1", mem, cpu)
|
||||
backend.resourceOffers(driver, List(offer1).asJava)
|
||||
|
||||
val launchedTasks = verifyTaskLaunched("o1").asScala
|
||||
assert(launchedTasks.size == 1)
|
||||
|
||||
val containerInfo = launchedTasks.head.getContainer
|
||||
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
|
||||
|
||||
val dockerInfo = containerInfo.getDocker
|
||||
|
||||
assert(dockerInfo.getImage == "some_image")
|
||||
assert(!dockerInfo.getForcePullImage)
|
||||
}
|
||||
|
||||
private def verifyDeclinedOffer(driver: SchedulerDriver,
|
||||
offerId: OfferID,
|
||||
filter: Boolean = false): Unit = {
|
||||
|
|
|
@ -150,7 +150,6 @@ class MesosFineGrainedSchedulerBackendSuite
|
|||
|
||||
val conf = new SparkConf()
|
||||
.set("spark.mesos.executor.docker.image", "spark/mock")
|
||||
.set("spark.mesos.executor.docker.forcePullImage", "true")
|
||||
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
|
||||
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
|
||||
|
||||
|
@ -170,7 +169,6 @@ class MesosFineGrainedSchedulerBackendSuite
|
|||
val (execInfo, _) = backend.createExecutorInfo(
|
||||
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
|
||||
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
|
||||
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
|
||||
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
|
||||
assert(portmaps.get(0).getHostPort.equals(80))
|
||||
assert(portmaps.get(0).getContainerPort.equals(8080))
|
||||
|
|
|
@ -116,7 +116,7 @@ libfb303-0.9.2.jar
|
|||
libthrift-0.9.2.jar
|
||||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
mesos-0.22.2-shaded-protobuf.jar
|
||||
mesos-0.21.1-shaded-protobuf.jar
|
||||
metrics-core-3.1.2.jar
|
||||
metrics-graphite-3.1.2.jar
|
||||
metrics-json-3.1.2.jar
|
||||
|
|
|
@ -122,7 +122,7 @@ libthrift-0.9.2.jar
|
|||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
mail-1.4.7.jar
|
||||
mesos-0.22.2-shaded-protobuf.jar
|
||||
mesos-0.21.1-shaded-protobuf.jar
|
||||
metrics-core-3.1.2.jar
|
||||
metrics-graphite-3.1.2.jar
|
||||
metrics-json-3.1.2.jar
|
||||
|
|
|
@ -122,7 +122,7 @@ libthrift-0.9.2.jar
|
|||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
mail-1.4.7.jar
|
||||
mesos-0.22.2-shaded-protobuf.jar
|
||||
mesos-0.21.1-shaded-protobuf.jar
|
||||
metrics-core-3.1.2.jar
|
||||
metrics-graphite-3.1.2.jar
|
||||
metrics-json-3.1.2.jar
|
||||
|
|
|
@ -130,7 +130,7 @@ libthrift-0.9.2.jar
|
|||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
mail-1.4.7.jar
|
||||
mesos-0.22.2-shaded-protobuf.jar
|
||||
mesos-0.21.1-shaded-protobuf.jar
|
||||
metrics-core-3.1.2.jar
|
||||
metrics-graphite-3.1.2.jar
|
||||
metrics-json-3.1.2.jar
|
||||
|
|
|
@ -131,7 +131,7 @@ libthrift-0.9.2.jar
|
|||
log4j-1.2.17.jar
|
||||
lz4-1.3.0.jar
|
||||
mail-1.4.7.jar
|
||||
mesos-0.22.2-shaded-protobuf.jar
|
||||
mesos-0.21.1-shaded-protobuf.jar
|
||||
metrics-core-3.1.2.jar
|
||||
metrics-graphite-3.1.2.jar
|
||||
metrics-json-3.1.2.jar
|
||||
|
|
|
@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
|
|||
SPARK_VERSION_SHORT: 2.1.0
|
||||
SCALA_BINARY_VERSION: "2.11"
|
||||
SCALA_VERSION: "2.11.7"
|
||||
MESOS_VERSION: 0.22.0
|
||||
MESOS_VERSION: 0.21.0
|
||||
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
|
||||
SPARK_GITHUB_URL: https://github.com/apache/spark
|
||||
|
|
|
@ -260,10 +260,6 @@ have Mesos download Spark via the usual methods.
|
|||
|
||||
Requires Mesos version 0.20.1 or later.
|
||||
|
||||
Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
|
||||
tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
|
||||
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
|
||||
|
||||
# Running Alongside Hadoop
|
||||
|
||||
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
|
||||
|
@ -338,14 +334,6 @@ See the [configuration page](configuration.html) for information on Spark config
|
|||
the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.mesos.executor.docker.forcePullImage</code></td>
|
||||
<td>false</td>
|
||||
<td>
|
||||
Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
|
||||
By default Mesos agents will not pull images they already have cached.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.mesos.executor.docker.volumes</code></td>
|
||||
<td>(none)</td>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -119,7 +119,7 @@
|
|||
<java.version>1.7</java.version>
|
||||
<maven.version>3.3.9</maven.version>
|
||||
<sbt.project.name>spark</sbt.project.name>
|
||||
<mesos.version>0.22.2</mesos.version>
|
||||
<mesos.version>0.21.1</mesos.version>
|
||||
<mesos.classifier>shaded-protobuf</mesos.classifier>
|
||||
<slf4j.version>1.7.16</slf4j.version>
|
||||
<log4j.version>1.2.17</log4j.version>
|
||||
|
|
Loading…
Reference in a new issue