From a888fed3099e84c2cf45e9419f684a3658ada19d Mon Sep 17 00:00:00 2001 From: Ji Yan Date: Sun, 16 Apr 2017 14:34:12 +0100 Subject: [PATCH] [SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into docker when running on mesos with docker containerizer ## What changes were proposed in this pull request? Allow passing in arbitrary parameters into docker when launching spark executors on mesos with docker containerizer tnachen ## How was this patch tested? Manually built and tested with passed in parameter Author: Ji Yan Closes #17109 from yanji84/ji/allow_set_docker_user. --- docs/running-on-mesos.md | 10 ++++ .../mesos/MesosSchedulerBackendUtil.scala | 36 +++++++++++-- .../MesosSchedulerBackendUtilSuite.scala | 53 +++++++++++++++++++ 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index ef01cfe4b9..314a806edf 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config By default Mesos agents will not pull images they already have cached. + + spark.mesos.executor.docker.parameters + (none) + + Set the list of custom parameters which will be passed into the docker run command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of + key/value pairs. Example: + +
key1=val1,key2=val2,key3=val3
+ + spark.mesos.executor.docker.volumes (none) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index a2adb228dc..fbcbc55099 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume} +import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume} import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} import org.apache.spark.{SparkConf, SparkException} @@ -99,6 +99,28 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } + /** + * Parse a list of docker parameters, each of which + * takes the form key=value + */ + private def parseParamsSpec(params: String): List[Parameter] = { + // split with limit of 2 to avoid parsing error when '=' + // exists in the parameter value + params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] => + val param: Parameter.Builder = Parameter.newBuilder() + spec match { + case Array(key, value) => + Some(param.setKey(key).setValue(value)) + case spec => + logWarning(s"Unable to parse arbitary parameters: $params. " + + "Expected form: \"key=value(, ...)\"") + None + } + } + .map { _.build() } + .toList + } + def containerInfo(conf: SparkConf): ContainerInfo = { val containerType = if (conf.contains("spark.mesos.executor.docker.image") && conf.get("spark.mesos.containerizer", "docker") == "docker") { @@ -120,8 +142,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .map(parsePortMappingsSpec) .getOrElse(List.empty) + val params = conf + .getOption("spark.mesos.executor.docker.parameters") + .map(parseParamsSpec) + .getOrElse(List.empty) + if (containerType == ContainerInfo.Type.DOCKER) { - containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps)) + containerInfo + .setDocker(dockerInfo(image, forcePullImage, portMaps, params)) } else { containerInfo.setMesos(mesosInfo(image, forcePullImage)) } @@ -144,11 +172,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { private def dockerInfo( image: String, forcePullImage: Boolean, - portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = { + portMaps: List[ContainerInfo.DockerInfo.PortMapping], + params: List[Parameter]): DockerInfo = { val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() .setImage(image) .setForcePullImage(forcePullImage) portMaps.foreach(dockerBuilder.addPortMappings(_)) + params.foreach(dockerBuilder.addParameters(_)) dockerBuilder.build } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala new file mode 100644 index 0000000000..caf9d89fdd --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class MesosSchedulerBackendUtilSuite extends SparkFunSuite { + + test("ContainerInfo fails to parse invalid docker parameters") { + val conf = new SparkConf() + conf.set("spark.mesos.executor.docker.parameters", "a,b") + conf.set("spark.mesos.executor.docker.image", "test") + + val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf) + val params = containerInfo.getDocker.getParametersList + + assert(params.size() == 0) + } + + test("ContainerInfo parses docker parameters") { + val conf = new SparkConf() + conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3") + conf.set("spark.mesos.executor.docker.image", "test") + + val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf) + val params = containerInfo.getDocker.getParametersList + assert(params.size() == 3) + assert(params.get(0).getKey == "a") + assert(params.get(0).getValue == "1") + assert(params.get(1).getKey == "b") + assert(params.get(1).getValue == "2") + assert(params.get(2).getKey == "c") + assert(params.get(2).getValue == "3") + } +}