[SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into docker when running on mesos with docker containerizer
## What changes were proposed in this pull request? Allow passing in arbitrary parameters into docker when launching spark executors on mesos with docker containerizer tnachen ## How was this patch tested? Manually built and tested with passed in parameter Author: Ji Yan <jiyan@Jis-MacBook-Air.local> Closes #17109 from yanji84/ji/allow_set_docker_user.
This commit is contained in:
parent
e090f3c0ce
commit
a888fed309
|
@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config
|
|||
By default Mesos agents will not pull images they already have cached.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.mesos.executor.docker.parameters</code></td>
|
||||
<td>(none)</td>
|
||||
<td>
|
||||
Set the list of custom parameters which will be passed into the <code>docker run</code> command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of
|
||||
key/value pairs. Example:
|
||||
|
||||
<pre>key1=val1,key2=val2,key3=val3</pre>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.mesos.executor.docker.volumes</code></td>
|
||||
<td>(none)</td>
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
|
||||
import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume}
|
||||
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkException}
|
||||
|
@ -99,6 +99,28 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
.toList
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a list of docker parameters, each of which
|
||||
* takes the form key=value
|
||||
*/
|
||||
private def parseParamsSpec(params: String): List[Parameter] = {
|
||||
// split with limit of 2 to avoid parsing error when '='
|
||||
// exists in the parameter value
|
||||
params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] =>
|
||||
val param: Parameter.Builder = Parameter.newBuilder()
|
||||
spec match {
|
||||
case Array(key, value) =>
|
||||
Some(param.setKey(key).setValue(value))
|
||||
case spec =>
|
||||
logWarning(s"Unable to parse arbitary parameters: $params. "
|
||||
+ "Expected form: \"key=value(, ...)\"")
|
||||
None
|
||||
}
|
||||
}
|
||||
.map { _.build() }
|
||||
.toList
|
||||
}
|
||||
|
||||
def containerInfo(conf: SparkConf): ContainerInfo = {
|
||||
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
|
||||
conf.get("spark.mesos.containerizer", "docker") == "docker") {
|
||||
|
@ -120,8 +142,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
.map(parsePortMappingsSpec)
|
||||
.getOrElse(List.empty)
|
||||
|
||||
val params = conf
|
||||
.getOption("spark.mesos.executor.docker.parameters")
|
||||
.map(parseParamsSpec)
|
||||
.getOrElse(List.empty)
|
||||
|
||||
if (containerType == ContainerInfo.Type.DOCKER) {
|
||||
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
|
||||
containerInfo
|
||||
.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
|
||||
} else {
|
||||
containerInfo.setMesos(mesosInfo(image, forcePullImage))
|
||||
}
|
||||
|
@ -144,11 +172,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
|
|||
private def dockerInfo(
|
||||
image: String,
|
||||
forcePullImage: Boolean,
|
||||
portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
|
||||
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
|
||||
params: List[Parameter]): DockerInfo = {
|
||||
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
|
||||
.setImage(image)
|
||||
.setForcePullImage(forcePullImage)
|
||||
portMaps.foreach(dockerBuilder.addPortMappings(_))
|
||||
params.foreach(dockerBuilder.addParameters(_))
|
||||
|
||||
dockerBuilder.build
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster.mesos
|
||||
|
||||
import org.scalatest._
|
||||
import org.scalatest.mock.MockitoSugar
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||
|
||||
class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
|
||||
|
||||
test("ContainerInfo fails to parse invalid docker parameters") {
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.mesos.executor.docker.parameters", "a,b")
|
||||
conf.set("spark.mesos.executor.docker.image", "test")
|
||||
|
||||
val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
|
||||
val params = containerInfo.getDocker.getParametersList
|
||||
|
||||
assert(params.size() == 0)
|
||||
}
|
||||
|
||||
test("ContainerInfo parses docker parameters") {
|
||||
val conf = new SparkConf()
|
||||
conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
|
||||
conf.set("spark.mesos.executor.docker.image", "test")
|
||||
|
||||
val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
|
||||
val params = containerInfo.getDocker.getParametersList
|
||||
assert(params.size() == 3)
|
||||
assert(params.get(0).getKey == "a")
|
||||
assert(params.get(0).getValue == "1")
|
||||
assert(params.get(1).getKey == "b")
|
||||
assert(params.get(1).getValue == "2")
|
||||
assert(params.get(2).getKey == "c")
|
||||
assert(params.get(2).getValue == "3")
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue