[SPARK-25877][K8S] Move all feature logic to feature classes.

This change makes the driver and executor builders a lot simpler
by encapsulating almost all feature logic into the respective
feature classes. The only logic that remains is the creation of
the initial pod, which needs to happen before anything else so
is better to be left in the builder class.

Most feature classes already behave fine when the config has nothing
they should handle, but a few minor tweaks had to be added. Unit
tests were also updated or added to account for these.

The builder suites were simplified a lot and just test the remaining
pod-related code in the builders themselves.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #23220 from vanzin/SPARK-25877.
This commit is contained in:
Marcelo Vanzin 2018-12-12 12:01:21 -08:00 committed by mcheah
parent 570b8f3d45
commit a63e7b2a21
16 changed files with 342 additions and 675 deletions

View file

@ -31,10 +31,10 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf)
override def configurePod(pod: SparkPod): SparkPod = {
val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
require(hadoopConfDirCMapName.isDefined,
"Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " +
" using pre-existing ConfigMaps")
logInfo("HADOOP_CONF_DIR defined")
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
if (hadoopConfDirCMapName.isDefined) {
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
} else {
pod
}
}
}

View file

@ -28,7 +28,8 @@ private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutor
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME)
HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
}.getOrElse(pod)
}
}

View file

@ -27,18 +27,20 @@ import org.apache.spark.internal.Logging
private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep with Logging {
private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")
override def configurePod(pod: SparkPod): SparkPod = {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
if (maybeKrb5CMap.isDefined) {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
} else {
pod
}
}
}

View file

@ -28,44 +28,60 @@ import org.apache.spark.deploy.k8s.Constants._
private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
def configurePod(pod: SparkPod): SparkPod = {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()
val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
def configurePod(pod: SparkPod): SparkPod = {
if (hasTemplate) {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()
val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
} else {
pod
}
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (hasTemplate) {
Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
} else {
Map.empty
}
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
if (hasTemplate) {
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
} else {
Nil
}
}
}

View file

@ -104,7 +104,7 @@ private[spark] class Client(
watcher: LoggingPodStatusWatcher) extends Logging {
def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf)
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
@ -232,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None)) { kubernetesClient =>
val client = new Client(
kubernetesConf,
KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
new KubernetesDriverBuilder(),
kubernetesClient,
waitForAppCompletion,
watcher)

View file

@ -20,90 +20,49 @@ import java.io.File
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) =
new BasicDriverFeatureStep(_),
provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) =
new DriverKubernetesCredentialsFeatureStep(_),
provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) =
new DriverServiceFeatureStep(_),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) =
new DriverCommandFeatureStep(_),
provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) =
new KerberosConfDriverFeatureStep(_),
providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) =
new PodTemplateConfigMapStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod) {
private[spark] class KubernetesDriverBuilder {
def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = {
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient): KubernetesDriverSpec = {
val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
}
.getOrElse(SparkPod.initialPod())
val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
Seq(provideEnvSecretsStep(kubernetesConf))
} else Nil
val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val podTemplateFeature = if (
kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
Seq(providePodTemplateConfigMapStep(kubernetesConf))
} else Nil
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))
val driverCommandStep = provideDriverCommandStep(kubernetesConf)
val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf))
val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++ Seq(driverCommandStep) ++
secretFeature ++ envSecretFeature ++ volumesFeature ++
hadoopConfigStep ++ podTemplateFeature
var spec = KubernetesDriverSpec(
provideInitialPod(),
val spec = KubernetesDriverSpec(
initialPod,
driverKubernetesResources = Seq.empty,
kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
conf.sparkConf.getAll.toMap)
features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedResources = feature.getAdditionalKubernetesResources()
spec = KubernetesDriverSpec(
KubernetesDriverSpec(
configuredPod,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
spec
}
}
private[spark] object KubernetesDriverBuilder {
def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = {
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map(new File(_))
.map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
file,
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
))
.getOrElse(new KubernetesDriverBuilder())
}
}

View file

@ -136,7 +136,8 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)

View file

@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf,
sc.env.securityManager,
KubernetesExecutorBuilder(kubernetesClient, sc.conf),
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
new SystemClock())

View file

@ -20,86 +20,36 @@ import java.io.File
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SecurityManager
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_, _),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
provideHadoopConfStep: (KubernetesExecutorConf => HadoopConfExecutorFeatureStep) =
new HadoopConfExecutorFeatureStep(_),
provideKerberosConfStep: (KubernetesExecutorConf => KerberosConfExecutorFeatureStep) =
new KerberosConfExecutorFeatureStep(_),
provideHadoopSparkUserStep: (KubernetesExecutorConf => HadoopSparkUserExecutorFeatureStep) =
new HadoopSparkUserExecutorFeatureStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
private[spark] class KubernetesExecutorBuilder {
def buildFromFeatures(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager): SparkPod = {
val sparkConf = kubernetesConf.sparkConf
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
conf: KubernetesExecutorConf,
secMgr: SecurityManager,
client: KubernetesClient): SparkPod = {
val initialPod = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
}
.getOrElse(SparkPod.initialPod())
val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr),
provideLocalDirsStep(kubernetesConf))
val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
val secretEnvFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
Seq(provideEnvSecretsStep(kubernetesConf))
} else Nil
val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val features = Seq(
new BasicExecutorFeatureStep(conf, secMgr),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new KerberosConfExecutorFeatureStep(conf),
new HadoopSparkUserExecutorFeatureStep(conf))
val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ =>
val maybeKerberosStep =
if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) {
provideKerberosConfStep(kubernetesConf)
} else {
provideHadoopSparkUserStep(kubernetesConf)
}
Seq(provideHadoopConfStep(kubernetesConf)) :+
maybeKerberosStep
}.getOrElse(Seq.empty[KubernetesFeatureConfigStep])
val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++
secretFeature ++
secretEnvFeature ++
volumesFeature ++
maybeHadoopConfFeatureSteps
var executorPod = provideInitialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
}
executorPod
features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
}
}
private[spark] object KubernetesExecutorBuilder {
def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = {
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
.map(new File(_))
.map(file => new KubernetesExecutorBuilder(provideInitialPod = () =>
KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
file,
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
))
.getOrElse(new KubernetesExecutorBuilder())
}
}

View file

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s
import java.io.File
import io.fabric8.kubernetes.api.model.{Config => _, _}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, never, verify, when}
import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.internal.config.ConfigEntry
abstract class PodBuilderSuite extends SparkFunSuite {
protected def templateFileConf: ConfigEntry[_]
protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod
private val baseConf = new SparkConf(false)
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
test("use empty initial pod if template is not specified") {
val client = mock(classOf[KubernetesClient])
buildPod(baseConf.clone(), client)
verify(client, never()).pods()
}
test("load pod template if specified") {
val client = mockKubernetesClient()
val sparkConf = baseConf.clone().set(templateFileConf.key, "template-file.yaml")
val pod = buildPod(sparkConf, client)
verifyPod(pod)
}
test("complain about misconfigured pod template") {
val client = mockKubernetesClient(
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.endMetadata()
.build())
val sparkConf = baseConf.clone().set(templateFileConf.key, "template-file.yaml")
val exception = intercept[SparkException] {
buildPod(sparkConf, client)
}
assert(exception.getMessage.contains("Could not load pod from template file."))
}
private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
when(kubernetesClient.pods()).thenReturn(pods)
when(pods.load(any(classOf[File]))).thenReturn(podResource)
when(podResource.get()).thenReturn(pod)
kubernetesClient
}
private def verifyPod(pod: SparkPod): Unit = {
val metadata = pod.pod.getMetadata
assert(metadata.getLabels.containsKey("test-label-key"))
assert(metadata.getAnnotations.containsKey("test-annotation-key"))
assert(metadata.getNamespace === "namespace")
assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference"))
val spec = pod.pod.getSpec
assert(!spec.getContainers.asScala.exists(_.getName == "executor-container"))
assert(spec.getDnsPolicy === "dns-policy")
assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname")))
assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference"))
assert(spec.getInitContainers.asScala.exists(_.getName == "init-container"))
assert(spec.getNodeName == "node-name")
assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value")
assert(spec.getSchedulerName === "scheduler")
assert(spec.getSecurityContext.getRunAsUser === 1000L)
assert(spec.getServiceAccount === "service-account")
assert(spec.getSubdomain === "subdomain")
assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
val container = pod.container
assert(container.getName === "executor-container")
assert(container.getArgs.contains("arg"))
assert(container.getCommand.equals(List("command").asJava))
assert(container.getEnv.asScala.exists(_.getName == "env-key"))
assert(container.getResources.getLimits.get("gpu") ===
new QuantityBuilder().withAmount("1").build())
assert(container.getSecurityContext.getRunAsNonRoot)
assert(container.getStdin)
assert(container.getTerminationMessagePath === "termination-message-path")
assert(container.getTerminationMessagePolicy === "termination-message-policy")
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume"))
}
private def podWithSupportedFeatures(): Pod = {
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.addToAnnotations("test-annotation-key", "test-annotation-value")
.withNamespace("namespace")
.addNewOwnerReference()
.withController(true)
.withName("owner-reference")
.endOwnerReference()
.endMetadata()
.withNewSpec()
.withDnsPolicy("dns-policy")
.withHostAliases(new HostAliasBuilder().withHostnames("hostname").build())
.withImagePullSecrets(
new LocalObjectReferenceBuilder().withName("local-reference").build())
.withInitContainers(new ContainerBuilder().withName("init-container").build())
.withNodeName("node-name")
.withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava)
.withSchedulerName("scheduler")
.withNewSecurityContext()
.withRunAsUser(1000L)
.endSecurityContext()
.withServiceAccount("service-account")
.withSubdomain("subdomain")
.withTolerations(new TolerationBuilder()
.withKey("toleration-key")
.withOperator("Equal")
.withEffect("NoSchedule")
.build())
.addNewVolume()
.withNewHostPath()
.withPath("/test")
.endHostPath()
.withName("test-volume")
.endVolume()
.addNewContainer()
.withArgs("arg")
.withCommand("command")
.addNewEnv()
.withName("env-key")
.withValue("env-value")
.endEnv()
.withImagePullPolicy("Always")
.withName("executor-container")
.withNewResources()
.withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava)
.endResources()
.withNewSecurityContext()
.withRunAsNonRoot(true)
.endSecurityContext()
.withStdin(true)
.withTerminationMessagePath("termination-message-path")
.withTerminationMessagePolicy("termination-message-policy")
.addToVolumeMounts(
new VolumeMountBuilder()
.withName("test-volume")
.withMountPath("/test")
.build())
.endContainer()
.endSpec()
.build()
}
}

View file

@ -20,25 +20,32 @@ import java.io.{File, PrintWriter}
import java.nio.file.Files
import io.fabric8.kubernetes.api.model.ConfigMap
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
private var kubernetesConf : KubernetesConf = _
private var templateFile: File = _
class PodTemplateConfigMapStepSuite extends SparkFunSuite {
before {
templateFile = Files.createTempFile("pod-template", "yml").toFile
test("Do nothing when executor template is not specified") {
val conf = KubernetesTestConf.createDriverConf()
val step = new PodTemplateConfigMapStep(conf)
val initialPod = SparkPod.initialPod()
val configuredPod = step.configurePod(initialPod)
assert(configuredPod === initialPod)
assert(step.getAdditionalKubernetesResources().isEmpty)
assert(step.getAdditionalPodSystemProperties().isEmpty)
}
test("Mounts executor template volume if config specified") {
val templateFile = Files.createTempFile("pod-template", "yml").toFile
templateFile.deleteOnExit()
val sparkConf = new SparkConf(false)
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath)
kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
}
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
test("Mounts executor template volume if config specified") {
val writer = new PrintWriter(templateFile)
writer.write("pod-template-contents")
writer.close()

View file

@ -126,7 +126,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
MockitoAnnotations.initMocks(this)
kconf = KubernetesTestConf.createDriverConf(
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
when(driverBuilder.buildFromFeatures(kconf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(POD_NAME)).thenReturn(namedPods)

View file

@ -16,201 +16,21 @@
*/
package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE}
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.internal.config.ConfigEntry
class KubernetesDriverBuilderSuite extends SparkFunSuite {
class KubernetesDriverBuilderSuite extends PodBuilderSuite {
private val BASIC_STEP_TYPE = "basic"
private val CREDENTIALS_STEP_TYPE = "credentials"
private val SERVICE_STEP_TYPE = "service"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val SECRETS_STEP_TYPE = "mount-secrets"
private val DRIVER_CMD_STEP_TYPE = "driver-command"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
CREDENTIALS_STEP_TYPE, classOf[DriverKubernetesCredentialsFeatureStep])
private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
private val driverCommandStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep])
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
private val hadoopGlobalStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep])
private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep]
)
private val builderUnderTest: KubernetesDriverBuilder =
new KubernetesDriverBuilder(
_ => basicFeatureStep,
_ => credentialsStep,
_ => serviceStep,
_ => secretsStep,
_ => envSecretsStep,
_ => localDirsStep,
_ => mountVolumesStep,
_ => driverCommandStep,
_ => hadoopGlobalStep,
_ => templateVolumeStep)
test("Apply fundamental steps all the time.") {
val conf = KubernetesTestConf.createDriverConf()
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
DRIVER_CMD_STEP_TYPE,
HADOOP_GLOBAL_STEP_TYPE)
override protected def templateFileConf: ConfigEntry[_] = {
Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE
}
test("Apply secrets step if secrets are present.") {
val conf = KubernetesTestConf.createDriverConf(
secretEnvNamesToKeyRefs = Map("EnvName" -> "SecretName:secretKey"),
secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
ENV_SECRETS_STEP_TYPE,
DRIVER_CMD_STEP_TYPE,
HADOOP_GLOBAL_STEP_TYPE)
}
test("Apply volumes step if mounts are present.") {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/path"))
val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE,
DRIVER_CMD_STEP_TYPE,
HADOOP_GLOBAL_STEP_TYPE)
}
test("Apply volumes step if a mount subpath is present.") {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"foo",
false,
KubernetesHostPathVolumeConf("/path"))
val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE,
DRIVER_CMD_STEP_TYPE,
HADOOP_GLOBAL_STEP_TYPE)
}
test("Apply template volume step if executor template is present.") {
val sparkConf = new SparkConf(false)
.set(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "filename")
override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
DRIVER_CMD_STEP_TYPE,
HADOOP_GLOBAL_STEP_TYPE,
TEMPLATE_VOLUME_STEP_TYPE)
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
}
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
: Unit = {
val addedProperties = resolvedSpec.systemProperties
.filter { case (k, _) => !k.startsWith("spark.") }
.toMap
assert(addedProperties.keys.toSet === stepTypes.toSet)
stepTypes.foreach { stepType =>
assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType)
assert(resolvedSpec.driverKubernetesResources.containsSlice(
KubernetesFeaturesTestUtils.getSecretsForStepType(stepType)))
assert(resolvedSpec.systemProperties(stepType) === stepType)
}
}
test("Start with empty pod if template is not specified") {
val kubernetesClient = mock(classOf[KubernetesClient])
val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf())
verify(kubernetesClient, never()).pods()
}
test("Starts with template if specified") {
val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val driverSpec = KubernetesDriverBuilder
.apply(kubernetesClient, sparkConf)
.buildFromFeatures(kubernetesConf)
PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod)
}
test("Throws on misconfigured pod template") {
val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient(
new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.endMetadata()
.build())
val sparkConf = new SparkConf(false)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val exception = intercept[SparkException] {
KubernetesDriverBuilder
.apply(kubernetesClient, sparkConf)
.buildFromFeatures(kubernetesConf)
}
assert(exception.getMessage.contains("Could not load pod from template file."))
}
}

View file

@ -1,142 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
import java.io.File
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
import org.mockito.Matchers.any
import org.mockito.Mockito.{mock, when}
import org.scalatest.FlatSpec
import scala.collection.JavaConverters._
import org.apache.spark.deploy.k8s.SparkPod
object PodBuilderSuiteUtils extends FlatSpec {
def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
when(kubernetesClient.pods()).thenReturn(pods)
when(pods.load(any(classOf[File]))).thenReturn(podResource)
when(podResource.get()).thenReturn(pod)
kubernetesClient
}
def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = {
val metadata = pod.pod.getMetadata
assert(metadata.getLabels.containsKey("test-label-key"))
assert(metadata.getAnnotations.containsKey("test-annotation-key"))
assert(metadata.getNamespace === "namespace")
assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference"))
val spec = pod.pod.getSpec
assert(!spec.getContainers.asScala.exists(_.getName == "executor-container"))
assert(spec.getDnsPolicy === "dns-policy")
assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname")))
assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference"))
assert(spec.getInitContainers.asScala.exists(_.getName == "init-container"))
assert(spec.getNodeName == "node-name")
assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value")
assert(spec.getSchedulerName === "scheduler")
assert(spec.getSecurityContext.getRunAsUser === 1000L)
assert(spec.getServiceAccount === "service-account")
assert(spec.getSubdomain === "subdomain")
assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
val container = pod.container
assert(container.getName === "executor-container")
assert(container.getArgs.contains("arg"))
assert(container.getCommand.equals(List("command").asJava))
assert(container.getEnv.asScala.exists(_.getName == "env-key"))
assert(container.getResources.getLimits.get("gpu") ===
new QuantityBuilder().withAmount("1").build())
assert(container.getSecurityContext.getRunAsNonRoot)
assert(container.getStdin)
assert(container.getTerminationMessagePath === "termination-message-path")
assert(container.getTerminationMessagePolicy === "termination-message-policy")
assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume"))
}
def podWithSupportedFeatures(): Pod = new PodBuilder()
.withNewMetadata()
.addToLabels("test-label-key", "test-label-value")
.addToAnnotations("test-annotation-key", "test-annotation-value")
.withNamespace("namespace")
.addNewOwnerReference()
.withController(true)
.withName("owner-reference")
.endOwnerReference()
.endMetadata()
.withNewSpec()
.withDnsPolicy("dns-policy")
.withHostAliases(new HostAliasBuilder().withHostnames("hostname").build())
.withImagePullSecrets(
new LocalObjectReferenceBuilder().withName("local-reference").build())
.withInitContainers(new ContainerBuilder().withName("init-container").build())
.withNodeName("node-name")
.withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava)
.withSchedulerName("scheduler")
.withNewSecurityContext()
.withRunAsUser(1000L)
.endSecurityContext()
.withServiceAccount("service-account")
.withSubdomain("subdomain")
.withTolerations(new TolerationBuilder()
.withKey("toleration-key")
.withOperator("Equal")
.withEffect("NoSchedule")
.build())
.addNewVolume()
.withNewHostPath()
.withPath("/test")
.endHostPath()
.withName("test-volume")
.endVolume()
.addNewContainer()
.withArgs("arg")
.withCommand("command")
.addNewEnv()
.withName("env-key")
.withValue("env-value")
.endEnv()
.withImagePullPolicy("Always")
.withName("executor-container")
.withNewResources()
.withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava)
.endResources()
.withNewSecurityContext()
.withRunAsNonRoot(true)
.endSecurityContext()
.withStdin(true)
.withTerminationMessagePath("termination-message-path")
.withTerminationMessagePolicy("termination-message-policy")
.addToVolumeMounts(
new VolumeMountBuilder()
.withName("test-volume")
.withMountPath("/test")
.build())
.endContainer()
.endSpec()
.build()
}

View file

@ -80,8 +80,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr)))
.thenAnswer(executorPodAnswer())
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator(

View file

@ -16,147 +16,23 @@
*/
package org.apache.spark.scheduler.cluster.k8s
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{Config => _, _}
import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.{mock, never, verify}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils
import org.apache.spark.util.SparkConfWithEnv
import org.apache.spark.internal.config.ConfigEntry
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val BASIC_STEP_TYPE = "basic"
private val SECRETS_STEP_TYPE = "mount-secrets"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step"
private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user"
private val KERBEROS_CONF_STEP_TYPE = "kerberos-step"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
private val secMgr = new SecurityManager(new SparkConf(false))
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
private val hadoopConfStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep])
private val hadoopSparkUser = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep])
private val kerberosConf = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep])
private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
private val builderUnderTest = new KubernetesExecutorBuilder(
(_, _) => basicFeatureStep,
_ => mountSecretsStep,
_ => envSecretsStep,
_ => localDirsStep,
_ => mountVolumesStep,
_ => hadoopConfStep,
_ => kerberosConf,
_ => hadoopSparkUser)
test("Basic steps are consistently applied.") {
val conf = KubernetesTestConf.createExecutorConf()
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf, secMgr), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
override protected def templateFileConf: ConfigEntry[_] = {
Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE
}
test("Apply secrets step if secrets are present.") {
val conf = KubernetesTestConf.createExecutorConf(
secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"),
secretNamesToMountPaths = Map("secret" -> "secretMountPath"))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
ENV_SECRETS_STEP_TYPE)
override protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod = {
sparkConf.set("spark.driver.host", "https://driver.host.com")
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
val secMgr = new SecurityManager(sparkConf)
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client)
}
test("Apply volumes step if mounts are present.") {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/checkpoint"))
val conf = KubernetesTestConf.createExecutorConf(
volumes = Seq(volumeSpec))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE)
}
test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") {
// HADOOP_DELEGATION_TOKEN
val conf = KubernetesTestConf.createExecutorConf(
sparkConf = new SparkConfWithEnv(Map("HADOOP_CONF_DIR" -> "/var/hadoop-conf"))
.set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
.set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name"))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
HADOOP_CONF_STEP_TYPE,
HADOOP_SPARK_USER_STEP_TYPE)
}
test("Apply kerberos step if DT secrets created") {
val conf = KubernetesTestConf.createExecutorConf(
sparkConf = new SparkConf(false)
.set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name")
.set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")
.set(KERBEROS_SPARK_USER_NAME, "spark-user")
.set(KERBEROS_DT_SECRET_NAME, "dt-secret")
.set(KERBEROS_DT_SECRET_KEY, "dt-key" ))
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf, secMgr),
BASIC_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
HADOOP_CONF_STEP_TYPE,
KERBEROS_CONF_STEP_TYPE)
}
private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = {
assert(resolvedPod.pod.getMetadata.getLabels.asScala.keys.toSet === stepTypes.toSet)
}
test("Starts with empty executor pod if template is not specified") {
val kubernetesClient = mock(classOf[KubernetesClient])
val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf())
verify(kubernetesClient, never()).pods()
}
test("Starts with executor template if specified") {
val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
val sparkConf = new SparkConf(false)
.set("spark.driver.host", "https://driver.host.com")
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml")
val kubernetesConf = KubernetesTestConf.createExecutorConf(
sparkConf = sparkConf,
driverPod = Some(new PodBuilder()
.withNewMetadata()
.withName("driver")
.endMetadata()
.build()))
val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf)
.buildFromFeatures(kubernetesConf, secMgr)
PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
}
}