[SPARK-25815][K8S] Support kerberos in client mode, keytab-based token renewal.

This change hooks up the k8s backed to the updated HadoopDelegationTokenManager,
so that delegation tokens are also available in client mode, and keytab-based token
renewal is enabled.

The change re-works the k8s feature steps related to kerberos so
that the driver does all the credential management and provides all
the needed information to executors - so nothing needs to be added
to executor pods. This also makes cluster mode behave a lot more
similarly to client mode, since no driver-related config steps are run
in the latter case.

The main two things that don't need to happen in executors anymore are:

- adding the Hadoop config to the executor pods: this is not needed
  since the Spark driver will serialize the Hadoop config and send
  it to executors when running tasks.

- mounting the kerberos config file in the executor pods: this is
  not needed once you remove the above. The Hadoop conf sent by
  the driver with the tasks is already resolved (i.e. has all the
  kerberos names properly defined), so executors do not need access
  to the kerberos realm information anymore.

The change also avoids creating delegation tokens unnecessarily.
This means that they'll only be created if a secret with tokens
was not provided, and if a keytab is not provided. In either of
those cases, the driver code will handle delegation tokens: in
cluster mode by creating a secret and stashing them, in client
mode by using existing mechanisms to send DTs to executors.

One last feature: the change also allows defining a keytab with
a "local:" URI. This is supported in client mode (although that's
the same as not saying "local:"), and in k8s cluster mode. This
allows the keytab to be mounted onto the image from a pre-existing
secret, for example.

Finally, the new code always sets SPARK_USER in the driver and
executor pods. This is in line with how other resource managers
behave: the submitting user reflects which user will access
Hadoop services in the app. (With kerberos, that's overridden
by the logged in user.) That user is unrelated to the OS user
the app is running as inside the containers.

Tested:
- client and cluster mode with kinit
- cluster mode with keytab
- cluster mode with local: keytab
- YARN cluster with keytab (to make sure it isn't broken)

Closes #22911 from vanzin/SPARK-25815.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2018-12-18 13:30:09 -08:00
parent 428eb2ad0a
commit 4b3fe3a9cc
25 changed files with 652 additions and 624 deletions

View file

@ -19,7 +19,7 @@ package org.apache.spark.deploy
import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
// Kerberos is not supported in standalone mode, and keytab support is not yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}
if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.deploy.security
import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull
// The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is
// needed later on, the code will check that it exists.
private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
private val delegationTokenProviders = loadProviders()
logDebug("Using the following builtin delegation token providers: " +
@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(
private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi

View file

@ -92,6 +92,9 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
/** Scheme used for files that are locally available on worker nodes in the cluster. */
val LOCAL_SCHEME = "local"
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging {
def isClientMode(conf: SparkConf): Boolean = {
"client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
}
/** Returns whether the URI is a "local:" URI. */
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}
}
private[util] object CallerContext extends Logging {

View file

@ -87,25 +87,22 @@ private[spark] object Constants {
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
val HADOOP_CONF_VOLUME = "hadoop-properties"
val KRB_FILE_VOLUME = "krb5-file"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KRB_FILE_DIR_PATH = "/etc"
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB5_CONFIG_MAP_NAME =
"spark.kubernetes.executor.krb5ConfigMapName"
// Kerberos Configuration
val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
val KERBEROS_DT_SECRET_NAME =
"spark.kubernetes.kerberos.dt-secret-name"
val KERBEROS_DT_SECRET_KEY =
"spark.kubernetes.kerberos.dt-secret-key"
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"
// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"

View file

@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
def appName: String = get("spark.app.name", "spark")
def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"
def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"
def namespace: String = get(KUBERNETES_NAMESPACE)
def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)

View file

@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
private[spark] case class SparkPod(pod: Pod, container: Container)
private[spark] case class SparkPod(pod: Pod, container: Container) {
/**
* Convenience method to apply a series of chained transformations to a pod.
*
* Use it like:
*
* original.modify { case pod =>
* // update pod and return new one
* }.modify { case pod =>
* // more changes that create a new pod
* }.modify {
* case pod if someCondition => // new pod
* }
*
* This makes it cleaner to apply multiple transformations, avoiding having to create
* a bunch of awkwardly-named local variables. Since the argument is a partial function,
* it can do matching without needing to exhaust all the possibilities. If the function
* is not applied, then the original pod will be kept.
*/
def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this)
}
private[spark] object SparkPod {
def initialPod(): SparkPod = {

View file

@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.withContainerPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(driverCustomEnvs.asJava)
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)

View file

@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep(
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
.endResources()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava)
.addToArgs("executor")

View file

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import java.io.File
import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
/**
* Mounts the Hadoop configuration - either a pre-defined config map, or a local configuration
* directory - on the driver pod.
*/
private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {
private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
confDir,
existingConfMap,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous")
private lazy val confFiles: Seq[File] = {
val dir = new File(confDir.get)
if (dir.isDirectory) {
dir.listFiles.filter(_.isFile).toSeq
} else {
Nil
}
}
private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"
private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined
override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasHadoopConf =>
val confVolume = if (confDir.isDefined) {
val keyPaths = confFiles.map { file =>
new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build()
}
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
} else {
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.build()
}
val podWithConf = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(confVolume)
.endVolume()
.endSpec()
.build()
val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(HADOOP_CONF_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()
SparkPod(podWithConf, containerWithMount)
}
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
val fileMap = confFiles.map { file =>
(file.getName(), Files.toString(file, StandardCharsets.UTF_8))
}.toMap.asJava
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(newConfigMapName)
.endMetadata()
.addToData(fileMap)
.build())
} else {
Nil
}
}
}

View file

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
import org.apache.spark.internal.Logging
/**
* This step is responsible for bootstraping the container with ConfigMaps
* containing Hadoop config files mounted as volumes and an ENV variable
* pointed to the mounted file directory.
*/
private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep with Logging {
override def configurePod(pod: SparkPod): SparkPod = {
val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
if (hadoopConfDirCMapName.isDefined) {
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
} else {
pod
}
}
}

View file

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
/**
* This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected
* however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER
*/
private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
}.getOrElse(pod)
}
}

View file

@ -16,31 +16,40 @@
*/
package org.apache.spark.deploy.k8s.features
import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder}
import java.io.File
import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.hadooputils._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
* Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the
* HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil.
* Provide kerberos / service credentials to the Spark driver.
*
* There are three use cases, in order of precedence:
*
* - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
* manage the kerberos login and the creation of delegation tokens.
* - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
* on the driver pod, and the driver will handle distribution of those tokens to executors.
* - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
* tokens which will be provided to the driver. The driver will handle distribution of the
* tokens to executors.
*/
private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDriverConf)
extends KubernetesFeatureConfigStep {
private val hadoopConfDir = Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
private val hadoopConfigMapName = kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
hadoopConfDir,
hadoopConfigMapName,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous")
extends KubernetesFeatureConfigStep with Logging {
private val principal = kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL)
private val keytab = kubernetesConf.get(org.apache.spark.internal.config.KEYTAB)
@ -49,15 +58,6 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE)
private val krb5CMap = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)
private val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf)
private val isKerberosEnabled =
(hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) ||
(hadoopConfigMapName.isDefined && (krb5File.isDefined || krb5CMap.isDefined))
require(keytab.isEmpty || isKerberosEnabled,
"You must enable Kerberos support if you are specifying a Kerberos Keytab")
require(existingSecretName.isEmpty || isKerberosEnabled,
"You must enable Kerberos support if you are specifying a Kerberos Secret")
KubernetesUtils.requireNandDefined(
krb5File,
@ -79,128 +79,183 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
"If a secret storing a Kerberos Delegation Token is specified you must also" +
" specify the item-key where the data is stored")
private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir =>
HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
if (!hasKerberosConf) {
logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " +
"Make sure that you have the krb5.conf locally on the driver image.")
}
private val newHadoopConfigMapName =
if (hadoopConfigMapName.isEmpty) {
Some(kubernetesConf.hadoopConfigMapName)
// Create delegation tokens if needed. This is a lazy val so that it's not populated
// unnecessarily. But it needs to be accessible to different methods in this class,
// since it's not clear based solely on available configuration options that delegation
// tokens are needed when other credentials are not available.
private lazy val delegationTokens: Array[Byte] = {
if (keytab.isEmpty && existingSecretName.isEmpty) {
val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf))
val creds = UserGroupInformation.getCurrentUser().getCredentials()
tokenManager.obtainDelegationTokens(creds)
// If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
// to avoid creating an unnecessary secret.
if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
SparkHadoopUtil.get.serialize(creds)
} else {
null
}
} else {
None
null
}
}
// Either use pre-existing secret or login to create new Secret with DT stored within
private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
secretName <- existingSecretName
secretItemKey <- existingSecretItemKey
} yield {
KerberosConfigSpec(
dtSecret = None,
dtSecretName = secretName,
dtSecretItemKey = secretItemKey,
jobUserName = UserGroupInformation.getCurrentUser.getShortUserName)
}).orElse(
if (isKerberosEnabled) {
Some(buildKerberosSpec())
} else {
None
private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_))
private def dtSecretName: String = s"${kubernetesConf.resourceNamePrefix}-delegation-tokens"
private def ktSecretName: String = s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab"
private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined
private def newConfigMapName: String = s"${kubernetesConf.resourceNamePrefix}-krb5-file"
override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasKerberosConf =>
val configMapVolume = if (krb5CMap.isDefined) {
new VolumeBuilder()
.withName(KRB_FILE_VOLUME)
.withNewConfigMap()
.withName(krb5CMap.get)
.endConfigMap()
.build()
} else {
val krb5Conf = new File(krb5File.get)
new VolumeBuilder()
.withName(KRB_FILE_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(new KeyToPathBuilder()
.withKey(krb5Conf.getName())
.withPath(krb5Conf.getName())
.build())
.endConfigMap()
.build()
}
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(configMapVolume)
.endVolume()
.endSpec()
.build()
val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(KRB_FILE_VOLUME)
.withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
.withSubPath("krb5.conf")
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithMount)
}.transform {
case pod if needKeytabUpload =>
// If keytab is defined and is a submission-local file (not local: URI), then create a
// secret for it. The keytab data will be stored in this secret below.
val podWitKeytab = new PodBuilder(pod.pod)
.editOrNewSpec()
.addNewVolume()
.withName(KERBEROS_KEYTAB_VOLUME)
.withNewSecret()
.withSecretName(ktSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val containerWithKeytab = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(KERBEROS_KEYTAB_VOLUME)
.withMountPath(KERBEROS_KEYTAB_MOUNT_POINT)
.endVolumeMount()
.build()
SparkPod(podWitKeytab, containerWithKeytab)
case pod if existingSecretName.isDefined | delegationTokens != null =>
val secretName = existingSecretName.getOrElse(dtSecretName)
val itemKey = existingSecretItemKey.getOrElse(KERBEROS_SECRET_KEY)
val podWithTokens = new PodBuilder(pod.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val containerWithTokens = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
.endEnv()
.build()
SparkPod(podWithTokens, containerWithTokens)
}
)
override def configurePod(pod: SparkPod): SparkPod = {
if (!isKerberosEnabled) {
return pod
}
val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
hadoopConfDir,
newHadoopConfigMapName,
hadoopConfigMapName,
pod)
kerberosConfSpec.map { hSpec =>
HadoopBootstrapUtil.bootstrapKerberosPod(
hSpec.dtSecretName,
hSpec.dtSecretItemKey,
hSpec.jobUserName,
krb5File,
Some(kubernetesConf.krbConfigMapName),
krb5CMap,
hadoopBasedSparkPod)
}.getOrElse(
HadoopBootstrapUtil.bootstrapSparkUserPod(
UserGroupInformation.getCurrentUser.getShortUserName,
hadoopBasedSparkPod))
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (!isKerberosEnabled) {
return Map.empty
// If a submission-local keytab is provided, update the Spark config so that it knows the
// path of the keytab in the driver container.
if (needKeytabUpload) {
val ktName = new File(keytab.get).getName()
Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
} else {
Map.empty
}
val resolvedConfValues = kerberosConfSpec.map { hSpec =>
Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
}.getOrElse(
Map(KERBEROS_SPARK_USER_NAME ->
UserGroupInformation.getCurrentUser.getShortUserName))
Map(HADOOP_CONFIG_MAP_NAME ->
hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
}
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (!isKerberosEnabled) {
return Seq.empty
Seq[HasMetadata]() ++ {
krb5File.map { path =>
val file = new File(path)
new ConfigMapBuilder()
.withNewMetadata()
.withName(newConfigMapName)
.endMetadata()
.addToData(
Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava)
.build()
}
} ++ {
// If a submission-local keytab is provided, stash it in a secret.
if (needKeytabUpload) {
val kt = new File(keytab.get)
Seq(new SecretBuilder()
.withNewMetadata()
.withName(ktSecretName)
.endMetadata()
.addToData(kt.getName(), Base64.encodeBase64String(Files.toByteArray(kt)))
.build())
} else {
Nil
}
} ++ {
if (delegationTokens != null) {
Seq(new SecretBuilder()
.withNewMetadata()
.withName(dtSecretName)
.endMetadata()
.addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens))
.build())
} else {
Nil
}
}
val hadoopConfConfigMap = for {
hName <- newHadoopConfigMapName
hFiles <- hadoopConfigurationFiles
} yield {
HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
}
val krb5ConfigMap = krb5File.map { fileLocation =>
HadoopBootstrapUtil.buildkrb5ConfigMap(
kubernetesConf.krbConfigMapName,
fileLocation)
}
val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret)
hadoopConfConfigMap.toSeq ++
krb5ConfigMap.toSeq ++
kerberosDTSecret.toSeq
}
private def buildKerberosSpec(): KerberosConfigSpec = {
// The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal
// The login happens in the SparkSubmit so login logic is not necessary to include
val jobUserUGI = UserGroupInformation.getCurrentUser
val creds = jobUserUGI.getCredentials
tokenManager.obtainDelegationTokens(creds)
val tokenData = SparkHadoopUtil.get.serialize(creds)
require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
val newSecretName =
s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
val secretDT =
new SecretBuilder()
.withNewMetadata()
.withName(newSecretName)
.endMetadata()
.addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData))
.build()
KerberosConfigSpec(
dtSecret = Some(secretDT),
dtSecretName = newSecretName,
dtSecretItemKey = KERBEROS_SECRET_KEY,
jobUserName = jobUserUGI.getShortUserName)
}
private case class KerberosConfigSpec(
dtSecret: Option[Secret],
dtSecretName: String,
dtSecretItemKey: String,
jobUserName: String)
}

View file

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
import org.apache.spark.internal.Logging
/**
* This step is responsible for mounting the DT secret for the executors
*/
private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep with Logging {
override def configurePod(pod: SparkPod): SparkPod = {
val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
if (maybeKrb5CMap.isDefined) {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
} else {
pod
}
}
}

View file

@ -1,283 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features.hadooputils
import java.io.File
import java.nio.charset.StandardCharsets
import scala.collection.JavaConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.SparkPod
import org.apache.spark.internal.Logging
private[spark] object HadoopBootstrapUtil extends Logging {
/**
* Mounting the DT secret for both the Driver and the executors
*
* @param dtSecretName Name of the secret that stores the Delegation Token
* @param dtSecretItemKey Name of the Item Key storing the Delegation Token
* @param userName Name of the SparkUser to set SPARK_USER
* @param fileLocation Optional Location of the krb5 file
* @param newKrb5ConfName Optional location of the ConfigMap for Krb5
* @param existingKrb5ConfName Optional name of ConfigMap for Krb5
* @param pod Input pod to be appended to
* @return a modified SparkPod
*/
def bootstrapKerberosPod(
dtSecretName: String,
dtSecretItemKey: String,
userName: String,
fileLocation: Option[String],
newKrb5ConfName: Option[String],
existingKrb5ConfName: Option[String],
pod: SparkPod): SparkPod = {
val preConfigMapVolume = existingKrb5ConfName.map { kconf =>
new VolumeBuilder()
.withName(KRB_FILE_VOLUME)
.withNewConfigMap()
.withName(kconf)
.endConfigMap()
.build()
}
val createConfigMapVolume = for {
fLocation <- fileLocation
krb5ConfName <- newKrb5ConfName
} yield {
val krb5File = new File(fLocation)
val fileStringPath = krb5File.toPath.getFileName.toString
new VolumeBuilder()
.withName(KRB_FILE_VOLUME)
.withNewConfigMap()
.withName(krb5ConfName)
.withItems(new KeyToPathBuilder()
.withKey(fileStringPath)
.withPath(fileStringPath)
.build())
.endConfigMap()
.build()
}
// Breaking up Volume creation for clarity
val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume)
if (configMapVolume.isEmpty) {
logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " +
"Make sure that you have the krb5.conf locally on the Driver and Executor images")
}
val kerberizedPodWithDTSecret = new PodBuilder(pod.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(dtSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()
// Optionally add the krb5.conf ConfigMap
val kerberizedPod = configMapVolume.map { cmVolume =>
new PodBuilder(kerberizedPodWithDTSecret)
.editSpec()
.addNewVolumeLike(cmVolume)
.endVolume()
.endSpec()
.build()
}.getOrElse(kerberizedPodWithDTSecret)
val kerberizedContainerWithMounts = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey")
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(userName)
.endEnv()
.build()
// Optionally add the krb5.conf Volume Mount
val kerberizedContainer =
if (configMapVolume.isDefined) {
new ContainerBuilder(kerberizedContainerWithMounts)
.addNewVolumeMount()
.withName(KRB_FILE_VOLUME)
.withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
.withSubPath("krb5.conf")
.endVolumeMount()
.build()
} else {
kerberizedContainerWithMounts
}
SparkPod(kerberizedPod, kerberizedContainer)
}
/**
* setting ENV_SPARK_USER when HADOOP_FILES are detected
*
* @param sparkUserName Name of the SPARK_USER
* @param pod Input pod to be appended to
* @return a modified SparkPod
*/
def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = {
val envModifiedContainer = new ContainerBuilder(pod.container)
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(sparkUserName)
.endEnv()
.build()
SparkPod(pod.pod, envModifiedContainer)
}
/**
* Grabbing files in the HADOOP_CONF_DIR
*
* @param path location of HADOOP_CONF_DIR
* @return a list of File object
*/
def getHadoopConfFiles(path: String): Seq[File] = {
val dir = new File(path)
if (dir.isDirectory) {
dir.listFiles.filter(_.isFile).toSeq
} else {
Seq.empty[File]
}
}
/**
* Bootstraping the container with ConfigMaps that store
* Hadoop configuration files
*
* @param hadoopConfDir directory location of HADOOP_CONF_DIR env
* @param newHadoopConfigMapName name of the new configMap for HADOOP_CONF_DIR
* @param existingHadoopConfigMapName name of the pre-defined configMap for HADOOP_CONF_DIR
* @param pod Input pod to be appended to
* @return a modified SparkPod
*/
def bootstrapHadoopConfDir(
hadoopConfDir: Option[String],
newHadoopConfigMapName: Option[String],
existingHadoopConfigMapName: Option[String],
pod: SparkPod): SparkPod = {
val preConfigMapVolume = existingHadoopConfigMapName.map { hConf =>
new VolumeBuilder()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hConf)
.endConfigMap()
.build() }
val createConfigMapVolume = for {
dirLocation <- hadoopConfDir
hConfName <- newHadoopConfigMapName
} yield {
val hadoopConfigFiles = getHadoopConfFiles(dirLocation)
val keyPaths = hadoopConfigFiles.map { file =>
val fileStringPath = file.toPath.getFileName.toString
new KeyToPathBuilder()
.withKey(fileStringPath)
.withPath(fileStringPath)
.build()
}
new VolumeBuilder()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hConfName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
}
// Breaking up Volume Creation for clarity
val configMapVolume = preConfigMapVolume.getOrElse(createConfigMapVolume.get)
val hadoopSupportedPod = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(configMapVolume)
.endVolume()
.endSpec()
.build()
val hadoopSupportedContainer = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()
SparkPod(hadoopSupportedPod, hadoopSupportedContainer)
}
/**
* Builds ConfigMap given the file location of the
* krb5.conf file
*
* @param configMapName name of configMap for krb5
* @param fileLocation location of krb5 file
* @return a ConfigMap
*/
def buildkrb5ConfigMap(
configMapName: String,
fileLocation: String): ConfigMap = {
val file = new File(fileLocation)
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.endMetadata()
.addToData(Map(file.toPath.getFileName.toString ->
Files.toString(file, StandardCharsets.UTF_8)).asJava)
.build()
}
/**
* Builds ConfigMap given the ConfigMap name
* and a list of Hadoop Conf files
*
* @param hadoopConfigMapName name of hadoopConfigMap
* @param hadoopConfFiles list of hadoopFiles
* @return a ConfigMap
*/
def buildHadoopConfigMap(
hadoopConfigMapName: String,
hadoopConfFiles: Seq[File]): ConfigMap = {
new ConfigMapBuilder()
.withNewMetadata()
.withName(hadoopConfigMapName)
.endMetadata()
.addToData(hadoopConfFiles.map { file =>
(file.toPath.getFileName.toString,
Files.toString(file, StandardCharsets.UTF_8))
}.toMap.asJava)
.build()
}
}

View file

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features.hadooputils
import io.fabric8.kubernetes.api.model.Secret
/**
* Represents a given configuration of the Kerberos Configuration logic
* <p>
* - The secret containing a DT, either previously specified or built on the fly
* - The name of the secret where the DT will be stored
* - The data item-key on the secret which correlates with where the current DT data is stored
* - The Job User's username
*/
private[spark] case class KerberosConfigSpec(
dtSecret: Option[Secret],
dtSecretName: String,
dtSecretItemKey: String,
jobUserName: String)

View file

@ -46,6 +46,7 @@ private[spark] class KubernetesDriverBuilder {
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

View file

@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@ -143,7 +144,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
}
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
}
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])

View file

@ -44,10 +44,7 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new KerberosConfExecutorFeatureStep(conf),
new HadoopSparkUserExecutorFeatureStep(conf))
new MountVolumesFeatureStep(conf))
features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) }
}

View file

@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
class BasicDriverFeatureStepSuite extends SparkFunSuite {
@ -73,7 +74,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val foundPortNames = configuredPod.container.getPorts.asScala.toSet
assert(expectedPortNames === foundPortNames)
assert(configuredPod.container.getEnv.size === 3)
val envs = configuredPod.container
.getEnv
.asScala
@ -82,6 +82,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ENVS.foreach { case (k, v) =>
assert(envs(v) === v)
}
assert(envs(ENV_SPARK_USER) === Utils.getCurrentUserName())
assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
TEST_IMAGE_PULL_SECRET_OBJECTS)

View file

@ -200,7 +200,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
ENV_EXECUTOR_MEMORY -> "1g",
ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
ENV_EXECUTOR_POD_IP -> null,
ENV_SPARK_USER -> Utils.getCurrentUserName())
val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
@ -208,9 +209,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
}.toMap
val mapEnvs = executorPod.container.getEnv.asScala.map {
val containerEnvs = executorPod.container.getEnv.asScala.map {
x => (x.getName, x.getValue)
}.toMap
assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs)
val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
assert(containerEnvs === expectedEnvs)
}
}

View file

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import scala.collection.JavaConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.ConfigMap
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.{SparkConfWithEnv, Utils}
class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
import KubernetesFeaturesTestUtils._
import SecretVolumeUtils._
test("mount hadoop config map if defined") {
val sparkConf = new SparkConf(false)
.set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val step = new HadoopConfDriverFeatureStep(conf)
checkPod(step.configurePod(SparkPod.initialPod()))
assert(step.getAdditionalKubernetesResources().isEmpty)
}
test("create hadoop config map if config dir is defined") {
val confDir = Utils.createTempDir()
val confFiles = Set("core-site.xml", "hdfs-site.xml")
confFiles.foreach { f =>
Files.write("some data", new File(confDir, f), UTF_8)
}
val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()))
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val step = new HadoopConfDriverFeatureStep(conf)
checkPod(step.configurePod(SparkPod.initialPod()))
val hadoopConfMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
assert(hadoopConfMap.getData().keySet().asScala === confFiles)
}
private def checkPod(pod: SparkPod): Unit = {
assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
}
}

View file

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
import KubernetesFeaturesTestUtils._
import SecretVolumeUtils._
private val tmpDir = Utils.createTempDir()
test("mount krb5 config map if defined") {
val configMap = "testConfigMap"
val step = createStep(
new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap))
checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
}
test("create krb5.conf config map if local config provided") {
val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
Files.write("some data", krbConf, UTF_8)
val sparkConf = new SparkConf(false)
.set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
val step = createStep(sparkConf)
val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head
assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName())
assert(step.getAdditionalPodSystemProperties().isEmpty)
}
test("create keytab secret if client keytab file used") {
val keytab = File.createTempFile("keytab", ".bin", tmpDir)
Files.write("some data", keytab, UTF_8)
val sparkConf = new SparkConf(false)
.set(KEYTAB, keytab.getAbsolutePath())
.set(PRINCIPAL, "alice")
val step = createStep(sparkConf)
val pod = step.configurePod(SparkPod.initialPod())
assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, KERBEROS_KEYTAB_MOUNT_POINT))
assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key))
val secret = filter[Secret](step.getAdditionalKubernetesResources()).head
assert(secret.getData().keySet().asScala === Set(keytab.getName()))
}
test("do nothing if container-local keytab used") {
val sparkConf = new SparkConf(false)
.set(KEYTAB, "local:/my.keytab")
.set(PRINCIPAL, "alice")
val step = createStep(sparkConf)
val initial = SparkPod.initialPod()
assert(step.configurePod(initial) === initial)
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(step.getAdditionalKubernetesResources().isEmpty)
}
test("mount delegation tokens if provided") {
val dtSecret = "tokenSecret"
val sparkConf = new SparkConf(false)
.set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret)
.set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens")
val step = createStep(sparkConf)
checkPodForTokens(step.configurePod(SparkPod.initialPod()), dtSecret)
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(step.getAdditionalKubernetesResources().isEmpty)
}
test("create delegation tokens if needed") {
// Since HadoopDelegationTokenManager does not create any tokens without proper configs and
// services, start with a test user that already has some tokens that will just be piped
// through to the driver.
val testUser = UserGroupInformation.createUserForTesting("k8s", Array())
testUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
val creds = testUser.getCredentials()
creds.addSecretKey(new Text("K8S_TEST_KEY"), Array[Byte](0x4, 0x2))
testUser.addCredentials(creds)
val tokens = SparkHadoopUtil.get.serialize(creds)
val step = createStep(new SparkConf(false))
val dtSecret = filter[Secret](step.getAdditionalKubernetesResources()).head
assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === Base64.encodeBase64String(tokens))
checkPodForTokens(step.configurePod(SparkPod.initialPod()),
dtSecret.getMetadata().getName())
assert(step.getAdditionalPodSystemProperties().isEmpty)
}
})
}
test("do nothing if no config and no tokens") {
val step = createStep(new SparkConf(false))
val initial = SparkPod.initialPod()
assert(step.configurePod(initial) === initial)
assert(step.getAdditionalPodSystemProperties().isEmpty)
assert(step.getAdditionalKubernetesResources().isEmpty)
}
private def checkPodForKrbConf(pod: SparkPod, confMapName: String): Unit = {
val podVolume = pod.pod.getSpec().getVolumes().asScala.find(_.getName() == KRB_FILE_VOLUME)
assert(podVolume.isDefined)
assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf"))
assert(podVolume.get.getConfigMap().getName() === confMapName)
}
private def checkPodForTokens(pod: SparkPod, dtSecretName: String): Unit = {
val podVolume = pod.pod.getSpec().getVolumes().asScala
.find(_.getName() == SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
assert(podVolume.isDefined)
assert(containerHasVolume(pod.container, SPARK_APP_HADOOP_SECRET_VOLUME_NAME,
SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR))
assert(containerHasEnvVar(pod.container, ENV_HADOOP_TOKEN_FILE_LOCATION))
assert(podVolume.get.getSecret().getSecretName() === dtSecretName)
}
private def createStep(conf: SparkConf): KerberosConfDriverFeatureStep = {
val kconf = KubernetesTestConf.createDriverConf(sparkConf = conf)
new KerberosConfDriverFeatureStep(kconf)
}
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.deploy.k8s.features
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, SecretBuilder}
import org.mockito.Matchers
@ -63,4 +64,9 @@ object KubernetesFeaturesTestUtils {
def containerHasEnvVar(container: Container, envVarName: String): Boolean = {
container.getEnv.asScala.exists(envVar => envVar.getName == envVarName)
}
def filter[T: ClassTag](list: Seq[HasMetadata]): Seq[T] = {
val desired = implicitly[ClassTag[T]].runtimeClass
list.filter(_.getClass() == desired).map(_.asInstanceOf[T]).toSeq
}
}

View file

@ -116,6 +116,8 @@ private[spark] class Client(
}
}
require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should reference a local file.")
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sparkConf
@ -472,7 +474,7 @@ private[spark] class Client(
appMasterOnly: Boolean = false): (Boolean, String) = {
val trimmedPath = path.trim()
val localURI = Utils.resolveURI(trimmedPath)
if (localURI.getScheme != LOCAL_SCHEME) {
if (localURI.getScheme != Utils.LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
@ -515,7 +517,7 @@ private[spark] class Client(
val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
if (sparkArchive.isDefined) {
val archive = sparkArchive.get
require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
distribute(Utils.resolveURI(archive).toString,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
@ -525,7 +527,7 @@ private[spark] class Client(
// Break the list of jars to upload, and resolve globs.
val localJars = new ArrayBuffer[String]()
jars.foreach { jar =>
if (!isLocalUri(jar)) {
if (!Utils.isLocalUri(jar)) {
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
@ -814,7 +816,7 @@ private[spark] class Client(
}
(pySparkArchives ++ pyArchives).foreach { path =>
val uri = Utils.resolveURI(path)
if (uri.getScheme != LOCAL_SCHEME) {
if (uri.getScheme != Utils.LOCAL_SCHEME) {
pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
} else {
pythonPath += uri.getPath()
@ -1183,9 +1185,6 @@ private object Client extends Logging {
// Alias for the user jar
val APP_JAR_NAME: String = "__app__.jar"
// URI scheme that identifies local resources
val LOCAL_SCHEME = "local"
// Staging directory for any temporary jars or files
val SPARK_STAGING: String = ".sparkStaging"
@ -1307,7 +1306,7 @@ private object Client extends Logging {
addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
sparkConf.get(SPARK_JARS).foreach { jars =>
jars.filter(isLocalUri).foreach { jar =>
jars.filter(Utils.isLocalUri).foreach { jar =>
val uri = new URI(jar)
addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env)
}
@ -1340,7 +1339,7 @@ private object Client extends Logging {
private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
mainJar.flatMap { path =>
val uri = Utils.resolveURI(path)
if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
if (uri.getScheme == Utils.LOCAL_SCHEME) Some(uri) else None
}.orElse(Some(new URI(APP_JAR_NAME)))
}
@ -1368,7 +1367,7 @@ private object Client extends Logging {
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
if (uri != null && uri.getScheme == Utils.LOCAL_SCHEME) {
addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
@ -1489,11 +1488,6 @@ private object Client extends Logging {
components.mkString(Path.SEPARATOR)
}
/** Returns whether the URI is a "local:" URI. */
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}
def createAppReport(report: ApplicationReport): YarnAppReport = {
val diags = report.getDiagnostics()
val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None

View file

@ -100,7 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
val uri = new URI(entry)
if (LOCAL_SCHEME.equals(uri.getScheme())) {
if (Utils.LOCAL_SCHEME.equals(uri.getScheme())) {
cp should contain (uri.getPath())
} else {
cp should not contain (uri.getPath())
@ -136,7 +136,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val expected = ADDED.split(",")
.map(p => {
val uri = new URI(p)
if (LOCAL_SCHEME == uri.getScheme()) {
if (Utils.LOCAL_SCHEME == uri.getScheme()) {
p
} else {
Option(uri.getFragment()).getOrElse(new File(p).getName())
@ -249,7 +249,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
sparkConf.set(SPARK_ARCHIVE, Utils.LOCAL_SCHEME + ":" + archive.getPath())
intercept[IllegalArgumentException] {
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
}