From 4b3fe3a9ccc8a4a8eb0d037d19cb07a8a288e37a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 18 Dec 2018 13:30:09 -0800 Subject: [PATCH] [SPARK-25815][K8S] Support kerberos in client mode, keytab-based token renewal. This change hooks up the k8s backed to the updated HadoopDelegationTokenManager, so that delegation tokens are also available in client mode, and keytab-based token renewal is enabled. The change re-works the k8s feature steps related to kerberos so that the driver does all the credential management and provides all the needed information to executors - so nothing needs to be added to executor pods. This also makes cluster mode behave a lot more similarly to client mode, since no driver-related config steps are run in the latter case. The main two things that don't need to happen in executors anymore are: - adding the Hadoop config to the executor pods: this is not needed since the Spark driver will serialize the Hadoop config and send it to executors when running tasks. - mounting the kerberos config file in the executor pods: this is not needed once you remove the above. The Hadoop conf sent by the driver with the tasks is already resolved (i.e. has all the kerberos names properly defined), so executors do not need access to the kerberos realm information anymore. The change also avoids creating delegation tokens unnecessarily. This means that they'll only be created if a secret with tokens was not provided, and if a keytab is not provided. In either of those cases, the driver code will handle delegation tokens: in cluster mode by creating a secret and stashing them, in client mode by using existing mechanisms to send DTs to executors. One last feature: the change also allows defining a keytab with a "local:" URI. This is supported in client mode (although that's the same as not saying "local:"), and in k8s cluster mode. This allows the keytab to be mounted onto the image from a pre-existing secret, for example. Finally, the new code always sets SPARK_USER in the driver and executor pods. This is in line with how other resource managers behave: the submitting user reflects which user will access Hadoop services in the app. (With kerberos, that's overridden by the logged in user.) That user is unrelated to the OS user the app is running as inside the containers. Tested: - client and cluster mode with kinit - cluster mode with keytab - cluster mode with local: keytab - YARN cluster with keytab (to make sure it isn't broken) Closes #22911 from vanzin/SPARK-25815. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/SparkSubmit.scala | 29 +- .../HadoopDelegationTokenManager.scala | 8 +- .../scala/org/apache/spark/util/Utils.scala | 8 + .../apache/spark/deploy/k8s/Constants.scala | 9 +- .../spark/deploy/k8s/KubernetesConf.scala | 4 - .../apache/spark/deploy/k8s/SparkPod.scala | 25 +- .../k8s/features/BasicDriverFeatureStep.scala | 4 + .../features/BasicExecutorFeatureStep.scala | 4 + .../HadoopConfDriverFeatureStep.scala | 124 +++++++ .../HadoopConfExecutorFeatureStep.scala | 40 --- .../HadoopSparkUserExecutorFeatureStep.scala | 35 -- .../KerberosConfDriverFeatureStep.scala | 321 ++++++++++-------- .../KerberosConfExecutorFeatureStep.scala | 46 --- .../hadooputils/HadoopBootstrapUtil.scala | 283 --------------- .../hadooputils/KerberosConfigSpec.scala | 33 -- .../k8s/submit/KubernetesDriverBuilder.scala | 1 + .../KubernetesClusterSchedulerBackend.scala | 7 +- .../k8s/KubernetesExecutorBuilder.scala | 5 +- .../BasicDriverFeatureStepSuite.scala | 3 +- .../BasicExecutorFeatureStepSuite.scala | 9 +- .../HadoopConfDriverFeatureStepSuite.scala | 71 ++++ .../KerberosConfDriverFeatureStepSuite.scala | 171 ++++++++++ .../KubernetesFeaturesTestUtils.scala | 6 + .../org/apache/spark/deploy/yarn/Client.scala | 24 +- .../spark/deploy/yarn/ClientSuite.scala | 6 +- 25 files changed, 652 insertions(+), 624 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d4055cb6c5..763bd0a70a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} -import java.net.URL +import java.net.{URI, URL} import java.security.PrivilegedExceptionAction import java.text.ParseException import java.util.UUID @@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging { val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() - // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) { - if (args.principal != null) { - if (args.keytab != null) { - require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sparkConf.set(KEYTAB, args.keytab) - sparkConf.set(PRINCIPAL, args.principal) - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - } + // Kerberos is not supported in standalone mode, and keytab support is not yet available + // in Mesos cluster mode. + if (clusterManager != STANDALONE + && !isMesosCluster + && args.principal != null + && args.keytab != null) { + // If client mode, make sure the keytab is just a local path. + if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) { + args.keytab = new URI(args.keytab).getPath() + } + + if (!Utils.isLocalUri(args.keytab)) { + require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 126a6ab801..f7e3ddecee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.security import java.io.File +import java.net.URI import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import java.util.concurrent.atomic.AtomicReference @@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager( private val providerEnabledConfig = "spark.security.credentials.%s.enabled" private val principal = sparkConf.get(PRINCIPAL).orNull - private val keytab = sparkConf.get(KEYTAB).orNull + + // The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is + // needed later on, the code will check that it exists. + private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull require((principal == null) == (keytab == null), "Both principal and keytab must be defined, or neither.") - require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.") private val delegationTokenProviders = loadProviders() logDebug("Using the following builtin delegation token providers: " + @@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager( private def doLogin(): UserGroupInformation = { logInfo(s"Attempting to login to KDC using principal: $principal") + require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) logInfo("Successfully logged into KDC.") ugi diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 143abd3bbe..f322e92c6c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -92,6 +92,9 @@ private[spark] object Utils extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + /** Scheme used for files that are locally available on worker nodes in the cluster. */ + val LOCAL_SCHEME = "local" + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging { def isClientMode(conf: SparkConf): Boolean = { "client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client")) } + + /** Returns whether the URI is a "local:" URI. */ + def isLocalUri(uri: String): Boolean = { + uri.startsWith(s"$LOCAL_SCHEME:") + } } private[util] object CallerContext extends Logging { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 85917b88e9..76041e7de5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -87,25 +87,22 @@ private[spark] object Constants { val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d // Hadoop Configuration - val HADOOP_FILE_VOLUME = "hadoop-properties" + val HADOOP_CONF_VOLUME = "hadoop-properties" val KRB_FILE_VOLUME = "krb5-file" val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf" val KRB_FILE_DIR_PATH = "/etc" val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" val HADOOP_CONFIG_MAP_NAME = "spark.kubernetes.executor.hadoopConfigMapName" - val KRB5_CONFIG_MAP_NAME = - "spark.kubernetes.executor.krb5ConfigMapName" // Kerberos Configuration - val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens" val KERBEROS_DT_SECRET_NAME = "spark.kubernetes.kerberos.dt-secret-name" val KERBEROS_DT_SECRET_KEY = "spark.kubernetes.kerberos.dt-secret-key" - val KERBEROS_SPARK_USER_NAME = - "spark.kubernetes.kerberos.spark-user-name" val KERBEROS_SECRET_KEY = "hadoop-tokens" + val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab" + val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab" // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index a06c21b47f..6febad981a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { def appName: String = get("spark.app.name", "spark") - def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config" - - def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file" - def namespace: String = get(KUBERNETES_NAMESPACE) def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala index 345dd117fd..fd1196368a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala @@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -private[spark] case class SparkPod(pod: Pod, container: Container) +private[spark] case class SparkPod(pod: Pod, container: Container) { + + /** + * Convenience method to apply a series of chained transformations to a pod. + * + * Use it like: + * + * original.modify { case pod => + * // update pod and return new one + * }.modify { case pod => + * // more changes that create a new pod + * }.modify { + * case pod if someCondition => // new pod + * } + * + * This makes it cleaner to apply multiple transformations, avoiding having to create + * a bunch of awkwardly-named local variables. Since the argument is a partial function, + * it can do matching without needing to exhaust all the possibilities. If the function + * is not applied, then the original pod will be kept. + */ + def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this) + +} + private[spark] object SparkPod { def initialPod(): SparkPod = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index d8cf3653d3..8362c14fb2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .withContainerPort(driverUIPort) .withProtocol("TCP") .endPort() + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(Utils.getCurrentUserName()) + .endEnv() .addAllToEnv(driverCustomEnvs.asJava) .addNewEnv() .withName(ENV_DRIVER_BIND_ADDRESS) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 4bcf4c9446..c8bf7cdb42 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep( .addToLimits("memory", executorMemoryQuantity) .addToRequests("cpu", executorCpuQuantity) .endResources() + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(Utils.getCurrentUserName()) + .endEnv() .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .addToArgs("executor") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala new file mode 100644 index 0000000000..d602ed5481 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +/** + * Mounts the Hadoop configuration - either a pre-defined config map, or a local configuration + * directory - on the driver pod. + */ +private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf) + extends KubernetesFeatureConfigStep { + + private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR)) + private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) + + KubernetesUtils.requireNandDefined( + confDir, + existingConfMap, + "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + + "as the creation of an additional ConfigMap, when one is already specified is extraneous") + + private lazy val confFiles: Seq[File] = { + val dir = new File(confDir.get) + if (dir.isDirectory) { + dir.listFiles.filter(_.isFile).toSeq + } else { + Nil + } + } + + private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config" + + private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hasHadoopConf => + val confVolume = if (confDir.isDefined) { + val keyPaths = confFiles.map { file => + new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build() + } + new VolumeBuilder() + .withName(HADOOP_CONF_VOLUME) + .withNewConfigMap() + .withName(newConfigMapName) + .withItems(keyPaths.asJava) + .endConfigMap() + .build() + } else { + new VolumeBuilder() + .withName(HADOOP_CONF_VOLUME) + .withNewConfigMap() + .withName(existingConfMap.get) + .endConfigMap() + .build() + } + + val podWithConf = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(confVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(HADOOP_CONF_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + + SparkPod(podWithConf, containerWithMount) + } + } + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + if (confDir.isDefined) { + val fileMap = confFiles.map { file => + (file.getName(), Files.toString(file, StandardCharsets.UTF_8)) + }.toMap.asJava + + Seq(new ConfigMapBuilder() + .withNewMetadata() + .withName(newConfigMapName) + .endMetadata() + .addToData(fileMap) + .build()) + } else { + Nil + } + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala deleted file mode 100644 index da332881ae..0000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features - -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil -import org.apache.spark.internal.Logging - -/** - * This step is responsible for bootstraping the container with ConfigMaps - * containing Hadoop config files mounted as volumes and an ENV variable - * pointed to the mounted file directory. - */ -private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf) - extends KubernetesFeatureConfigStep with Logging { - - override def configurePod(pod: SparkPod): SparkPod = { - val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME) - if (hadoopConfDirCMapName.isDefined) { - HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod) - } else { - pod - } - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala deleted file mode 100644 index c038e75491..0000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features - -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil - -/** - * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected - * however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER - */ -private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutorConf) - extends KubernetesFeatureConfigStep { - - override def configurePod(pod: SparkPod): SparkPod = { - conf.getOption(KERBEROS_SPARK_USER_NAME).map { user => - HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod) - }.getOrElse(pod) - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index c6d5a866fa..721d7e97b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -16,31 +16,40 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder} +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ import org.apache.commons.codec.binary.Base64 -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.hadooputils._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils /** - * Runs the necessary Hadoop-based logic based on Kerberos configs and the presence of the - * HADOOP_CONF_DIR. This runs various bootstrap methods defined in HadoopBootstrapUtil. + * Provide kerberos / service credentials to the Spark driver. + * + * There are three use cases, in order of precedence: + * + * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will + * manage the kerberos login and the creation of delegation tokens. + * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted + * on the driver pod, and the driver will handle distribution of those tokens to executors. + * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation + * tokens which will be provided to the driver. The driver will handle distribution of the + * tokens to executors. */ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDriverConf) - extends KubernetesFeatureConfigStep { - - private val hadoopConfDir = Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR)) - private val hadoopConfigMapName = kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) - KubernetesUtils.requireNandDefined( - hadoopConfDir, - hadoopConfigMapName, - "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + - "as the creation of an additional ConfigMap, when one is already specified is extraneous") + extends KubernetesFeatureConfigStep with Logging { private val principal = kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL) private val keytab = kubernetesConf.get(org.apache.spark.internal.config.KEYTAB) @@ -49,15 +58,6 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE) private val krb5CMap = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf) - private val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf) - private val isKerberosEnabled = - (hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) || - (hadoopConfigMapName.isDefined && (krb5File.isDefined || krb5CMap.isDefined)) - require(keytab.isEmpty || isKerberosEnabled, - "You must enable Kerberos support if you are specifying a Kerberos Keytab") - - require(existingSecretName.isEmpty || isKerberosEnabled, - "You must enable Kerberos support if you are specifying a Kerberos Secret") KubernetesUtils.requireNandDefined( krb5File, @@ -79,128 +79,183 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir => - HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { + logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = - if (hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) + + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = { + if (keytab.isEmpty && existingSecretName.isEmpty) { + val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf, + SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)) + val creds = UserGroupInformation.getCurrentUser().getCredentials() + tokenManager.obtainDelegationTokens(creds) + // If no tokens and no secrets are stored in the credentials, make sure nothing is returned, + // to avoid creating an unnecessary secret. + if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) { + SparkHadoopUtil.get.serialize(creds) + } else { + null + } } else { - None + null } + } - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { - secretName <- existingSecretName - secretItemKey <- existingSecretItemKey - } yield { - KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = UserGroupInformation.getCurrentUser.getShortUserName) - }).orElse( - if (isKerberosEnabled) { - Some(buildKerberosSpec()) - } else { - None + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.resourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + private def newConfigMapName: String = s"${kubernetesConf.resourceNamePrefix}-krb5-file" + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(krb5CMap.get) + .endConfigMap() + .build() + } else { + val krb5Conf = new File(krb5File.get) + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(newConfigMapName) + .withItems(new KeyToPathBuilder() + .withKey(krb5Conf.getName()) + .withPath(krb5Conf.getName()) + .build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(configMapVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() + .build() + + SparkPod(podWithVolume, containerWithMount) + }.transform { + case pod if needKeytabUpload => + // If keytab is defined and is a submission-local file (not local: URI), then create a + // secret for it. The keytab data will be stored in this secret below. + val podWitKeytab = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(KERBEROS_KEYTAB_VOLUME) + .withNewSecret() + .withSecretName(ktSecretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithKeytab = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KERBEROS_KEYTAB_VOLUME) + .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT) + .endVolumeMount() + .build() + + SparkPod(podWitKeytab, containerWithKeytab) + + case pod if existingSecretName.isDefined | delegationTokens != null => + val secretName = existingSecretName.getOrElse(dtSecretName) + val itemKey = existingSecretItemKey.getOrElse(KERBEROS_SECRET_KEY) + + val podWithTokens = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithTokens = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey") + .endEnv() + .build() + + SparkPod(podWithTokens, containerWithTokens) } - ) - - override def configurePod(pod: SparkPod): SparkPod = { - if (!isKerberosEnabled) { - return pod - } - - val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( - hadoopConfDir, - newHadoopConfigMapName, - hadoopConfigMapName, - pod) - kerberosConfSpec.map { hSpec => - HadoopBootstrapUtil.bootstrapKerberosPod( - hSpec.dtSecretName, - hSpec.dtSecretItemKey, - hSpec.jobUserName, - krb5File, - Some(kubernetesConf.krbConfigMapName), - krb5CMap, - hadoopBasedSparkPod) - }.getOrElse( - HadoopBootstrapUtil.bootstrapSparkUserPod( - UserGroupInformation.getCurrentUser.getShortUserName, - hadoopBasedSparkPod)) } override def getAdditionalPodSystemProperties(): Map[String, String] = { - if (!isKerberosEnabled) { - return Map.empty + // If a submission-local keytab is provided, update the Spark config so that it knows the + // path of the keytab in the driver container. + if (needKeytabUpload) { + val ktName = new File(keytab.get).getName() + Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName") + } else { + Map.empty } - - val resolvedConfValues = kerberosConfSpec.map { hSpec => - Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName, - KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey, - KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName, - KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName)) - }.getOrElse( - Map(KERBEROS_SPARK_USER_NAME -> - UserGroupInformation.getCurrentUser.getShortUserName)) - Map(HADOOP_CONFIG_MAP_NAME -> - hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - if (!isKerberosEnabled) { - return Seq.empty + Seq[HasMetadata]() ++ { + krb5File.map { path => + val file = new File(path) + new ConfigMapBuilder() + .withNewMetadata() + .withName(newConfigMapName) + .endMetadata() + .addToData( + Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava) + .build() + } + } ++ { + // If a submission-local keytab is provided, stash it in a secret. + if (needKeytabUpload) { + val kt = new File(keytab.get) + Seq(new SecretBuilder() + .withNewMetadata() + .withName(ktSecretName) + .endMetadata() + .addToData(kt.getName(), Base64.encodeBase64String(Files.toByteArray(kt))) + .build()) + } else { + Nil + } + } ++ { + if (delegationTokens != null) { + Seq(new SecretBuilder() + .withNewMetadata() + .withName(dtSecretName) + .endMetadata() + .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens)) + .build()) + } else { + Nil + } } - - val hadoopConfConfigMap = for { - hName <- newHadoopConfigMapName - hFiles <- hadoopConfigurationFiles - } yield { - HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles) - } - - val krb5ConfigMap = krb5File.map { fileLocation => - HadoopBootstrapUtil.buildkrb5ConfigMap( - kubernetesConf.krbConfigMapName, - fileLocation) - } - - val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret) - - hadoopConfConfigMap.toSeq ++ - krb5ConfigMap.toSeq ++ - kerberosDTSecret.toSeq } - - private def buildKerberosSpec(): KerberosConfigSpec = { - // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal - // The login happens in the SparkSubmit so login logic is not necessary to include - val jobUserUGI = UserGroupInformation.getCurrentUser - val creds = jobUserUGI.getCredentials - tokenManager.obtainDelegationTokens(creds) - val tokenData = SparkHadoopUtil.get.serialize(creds) - require(tokenData.nonEmpty, "Did not obtain any delegation tokens") - val newSecretName = - s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME" - val secretDT = - new SecretBuilder() - .withNewMetadata() - .withName(newSecretName) - .endMetadata() - .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData)) - .build() - KerberosConfigSpec( - dtSecret = Some(secretDT), - dtSecretName = newSecretName, - dtSecretItemKey = KERBEROS_SECRET_KEY, - jobUserName = jobUserUGI.getShortUserName) - } - - private case class KerberosConfigSpec( - dtSecret: Option[Secret], - dtSecretName: String, - dtSecretItemKey: String, - jobUserName: String) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala deleted file mode 100644 index 907271b1cb..0000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features - -import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil -import org.apache.spark.internal.Logging - -/** - * This step is responsible for mounting the DT secret for the executors - */ -private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf) - extends KubernetesFeatureConfigStep with Logging { - - override def configurePod(pod: SparkPod): SparkPod = { - val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME) - if (maybeKrb5CMap.isDefined) { - logInfo(s"Mounting Resources for Kerberos") - HadoopBootstrapUtil.bootstrapKerberosPod( - conf.get(KERBEROS_DT_SECRET_NAME), - conf.get(KERBEROS_DT_SECRET_KEY), - conf.get(KERBEROS_SPARK_USER_NAME), - None, - None, - maybeKrb5CMap, - pod) - } else { - pod - } - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala deleted file mode 100644 index 5bee766caf..0000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.hadooputils - -import java.io.File -import java.nio.charset.StandardCharsets - -import scala.collection.JavaConverters._ - -import com.google.common.io.Files -import io.fabric8.kubernetes.api.model._ - -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.SparkPod -import org.apache.spark.internal.Logging - -private[spark] object HadoopBootstrapUtil extends Logging { - - /** - * Mounting the DT secret for both the Driver and the executors - * - * @param dtSecretName Name of the secret that stores the Delegation Token - * @param dtSecretItemKey Name of the Item Key storing the Delegation Token - * @param userName Name of the SparkUser to set SPARK_USER - * @param fileLocation Optional Location of the krb5 file - * @param newKrb5ConfName Optional location of the ConfigMap for Krb5 - * @param existingKrb5ConfName Optional name of ConfigMap for Krb5 - * @param pod Input pod to be appended to - * @return a modified SparkPod - */ - def bootstrapKerberosPod( - dtSecretName: String, - dtSecretItemKey: String, - userName: String, - fileLocation: Option[String], - newKrb5ConfName: Option[String], - existingKrb5ConfName: Option[String], - pod: SparkPod): SparkPod = { - - val preConfigMapVolume = existingKrb5ConfName.map { kconf => - new VolumeBuilder() - .withName(KRB_FILE_VOLUME) - .withNewConfigMap() - .withName(kconf) - .endConfigMap() - .build() - } - - val createConfigMapVolume = for { - fLocation <- fileLocation - krb5ConfName <- newKrb5ConfName - } yield { - val krb5File = new File(fLocation) - val fileStringPath = krb5File.toPath.getFileName.toString - new VolumeBuilder() - .withName(KRB_FILE_VOLUME) - .withNewConfigMap() - .withName(krb5ConfName) - .withItems(new KeyToPathBuilder() - .withKey(fileStringPath) - .withPath(fileStringPath) - .build()) - .endConfigMap() - .build() - } - - // Breaking up Volume creation for clarity - val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume) - if (configMapVolume.isEmpty) { - logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + - "Make sure that you have the krb5.conf locally on the Driver and Executor images") - } - - val kerberizedPodWithDTSecret = new PodBuilder(pod.pod) - .editOrNewSpec() - .addNewVolume() - .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) - .withNewSecret() - .withSecretName(dtSecretName) - .endSecret() - .endVolume() - .endSpec() - .build() - - // Optionally add the krb5.conf ConfigMap - val kerberizedPod = configMapVolume.map { cmVolume => - new PodBuilder(kerberizedPodWithDTSecret) - .editSpec() - .addNewVolumeLike(cmVolume) - .endVolume() - .endSpec() - .build() - }.getOrElse(kerberizedPodWithDTSecret) - - val kerberizedContainerWithMounts = new ContainerBuilder(pod.container) - .addNewVolumeMount() - .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) - .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) - .endVolumeMount() - .addNewEnv() - .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) - .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey") - .endEnv() - .addNewEnv() - .withName(ENV_SPARK_USER) - .withValue(userName) - .endEnv() - .build() - - // Optionally add the krb5.conf Volume Mount - val kerberizedContainer = - if (configMapVolume.isDefined) { - new ContainerBuilder(kerberizedContainerWithMounts) - .addNewVolumeMount() - .withName(KRB_FILE_VOLUME) - .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") - .withSubPath("krb5.conf") - .endVolumeMount() - .build() - } else { - kerberizedContainerWithMounts - } - - SparkPod(kerberizedPod, kerberizedContainer) - } - - /** - * setting ENV_SPARK_USER when HADOOP_FILES are detected - * - * @param sparkUserName Name of the SPARK_USER - * @param pod Input pod to be appended to - * @return a modified SparkPod - */ - def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = { - val envModifiedContainer = new ContainerBuilder(pod.container) - .addNewEnv() - .withName(ENV_SPARK_USER) - .withValue(sparkUserName) - .endEnv() - .build() - SparkPod(pod.pod, envModifiedContainer) - } - - /** - * Grabbing files in the HADOOP_CONF_DIR - * - * @param path location of HADOOP_CONF_DIR - * @return a list of File object - */ - def getHadoopConfFiles(path: String): Seq[File] = { - val dir = new File(path) - if (dir.isDirectory) { - dir.listFiles.filter(_.isFile).toSeq - } else { - Seq.empty[File] - } - } - - /** - * Bootstraping the container with ConfigMaps that store - * Hadoop configuration files - * - * @param hadoopConfDir directory location of HADOOP_CONF_DIR env - * @param newHadoopConfigMapName name of the new configMap for HADOOP_CONF_DIR - * @param existingHadoopConfigMapName name of the pre-defined configMap for HADOOP_CONF_DIR - * @param pod Input pod to be appended to - * @return a modified SparkPod - */ - def bootstrapHadoopConfDir( - hadoopConfDir: Option[String], - newHadoopConfigMapName: Option[String], - existingHadoopConfigMapName: Option[String], - pod: SparkPod): SparkPod = { - val preConfigMapVolume = existingHadoopConfigMapName.map { hConf => - new VolumeBuilder() - .withName(HADOOP_FILE_VOLUME) - .withNewConfigMap() - .withName(hConf) - .endConfigMap() - .build() } - - val createConfigMapVolume = for { - dirLocation <- hadoopConfDir - hConfName <- newHadoopConfigMapName - } yield { - val hadoopConfigFiles = getHadoopConfFiles(dirLocation) - val keyPaths = hadoopConfigFiles.map { file => - val fileStringPath = file.toPath.getFileName.toString - new KeyToPathBuilder() - .withKey(fileStringPath) - .withPath(fileStringPath) - .build() - } - new VolumeBuilder() - .withName(HADOOP_FILE_VOLUME) - .withNewConfigMap() - .withName(hConfName) - .withItems(keyPaths.asJava) - .endConfigMap() - .build() - } - - // Breaking up Volume Creation for clarity - val configMapVolume = preConfigMapVolume.getOrElse(createConfigMapVolume.get) - - val hadoopSupportedPod = new PodBuilder(pod.pod) - .editSpec() - .addNewVolumeLike(configMapVolume) - .endVolume() - .endSpec() - .build() - - val hadoopSupportedContainer = new ContainerBuilder(pod.container) - .addNewVolumeMount() - .withName(HADOOP_FILE_VOLUME) - .withMountPath(HADOOP_CONF_DIR_PATH) - .endVolumeMount() - .addNewEnv() - .withName(ENV_HADOOP_CONF_DIR) - .withValue(HADOOP_CONF_DIR_PATH) - .endEnv() - .build() - SparkPod(hadoopSupportedPod, hadoopSupportedContainer) - } - - /** - * Builds ConfigMap given the file location of the - * krb5.conf file - * - * @param configMapName name of configMap for krb5 - * @param fileLocation location of krb5 file - * @return a ConfigMap - */ - def buildkrb5ConfigMap( - configMapName: String, - fileLocation: String): ConfigMap = { - val file = new File(fileLocation) - new ConfigMapBuilder() - .withNewMetadata() - .withName(configMapName) - .endMetadata() - .addToData(Map(file.toPath.getFileName.toString -> - Files.toString(file, StandardCharsets.UTF_8)).asJava) - .build() - } - - /** - * Builds ConfigMap given the ConfigMap name - * and a list of Hadoop Conf files - * - * @param hadoopConfigMapName name of hadoopConfigMap - * @param hadoopConfFiles list of hadoopFiles - * @return a ConfigMap - */ - def buildHadoopConfigMap( - hadoopConfigMapName: String, - hadoopConfFiles: Seq[File]): ConfigMap = { - new ConfigMapBuilder() - .withNewMetadata() - .withName(hadoopConfigMapName) - .endMetadata() - .addToData(hadoopConfFiles.map { file => - (file.toPath.getFileName.toString, - Files.toString(file, StandardCharsets.UTF_8)) - }.toMap.asJava) - .build() - } - -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala deleted file mode 100644 index 7f7ef216cf..0000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.hadooputils - -import io.fabric8.kubernetes.api.model.Secret - -/** - * Represents a given configuration of the Kerberos Configuration logic - *

- * - The secret containing a DT, either previously specified or built on the fly - * - The name of the secret where the DT will be stored - * - The data item-key on the secret which correlates with where the current DT data is stored - * - The Job User's username - */ -private[spark] case class KerberosConfigSpec( - dtSecret: Option[Secret], - dtSecretName: String, - dtSecretItemKey: String, - jobUserName: String) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index d2c0ced9fa..57e4060bc8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -46,6 +46,7 @@ private[spark] class KubernetesDriverBuilder { new LocalDirsFeatureStep(conf), new MountVolumesFeatureStep(conf), new DriverCommandFeatureStep(conf), + new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 03f5da2bb0..cd298971e0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} @@ -143,7 +144,11 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new KubernetesDriverEndpoint(rpcEnv, properties) + new KubernetesDriverEndpoint(sc.env.rpcEnv, properties) + } + + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) } private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 0b74966fe8..48aa2c56d4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -44,10 +44,7 @@ private[spark] class KubernetesExecutorBuilder { new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), new LocalDirsFeatureStep(conf), - new MountVolumesFeatureStep(conf), - new HadoopConfExecutorFeatureStep(conf), - new KerberosConfExecutorFeatureStep(conf), - new HadoopSparkUserExecutorFeatureStep(conf)) + new MountVolumesFeatureStep(conf)) features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index e4951bc1e6..5ceb9d6d6f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -73,7 +74,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val foundPortNames = configuredPod.container.getPorts.asScala.toSet assert(expectedPortNames === foundPortNames) - assert(configuredPod.container.getEnv.size === 3) val envs = configuredPod.container .getEnv .asScala @@ -82,6 +82,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { DRIVER_ENVS.foreach { case (k, v) => assert(envs(v) === v) } + assert(envs(ENV_SPARK_USER) === Utils.getCurrentUserName()) assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 05989d9be7..c2efab01e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -200,7 +200,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ENV_EXECUTOR_MEMORY -> "1g", ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, - ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars + ENV_EXECUTOR_POD_IP -> null, + ENV_SPARK_USER -> Utils.getCurrentUserName()) val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX)) val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) @@ -208,9 +209,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt }.toMap - val mapEnvs = executorPod.container.getEnv.asScala.map { + val containerEnvs = executorPod.container.getEnv.asScala.map { x => (x.getName, x.getValue) }.toMap - assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs) + + val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs + assert(containerEnvs === expectedEnvs) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala new file mode 100644 index 0000000000..e1c01dbdc7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.ConfigMap + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.util.{SparkConfWithEnv, Utils} + +class HadoopConfDriverFeatureStepSuite extends SparkFunSuite { + + import KubernetesFeaturesTestUtils._ + import SecretVolumeUtils._ + + test("mount hadoop config map if defined") { + val sparkConf = new SparkConf(false) + .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap") + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val step = new HadoopConfDriverFeatureStep(conf) + checkPod(step.configurePod(SparkPod.initialPod())) + assert(step.getAdditionalKubernetesResources().isEmpty) + } + + test("create hadoop config map if config dir is defined") { + val confDir = Utils.createTempDir() + val confFiles = Set("core-site.xml", "hdfs-site.xml") + + confFiles.foreach { f => + Files.write("some data", new File(confDir, f), UTF_8) + } + + val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + + val step = new HadoopConfDriverFeatureStep(conf) + checkPod(step.configurePod(SparkPod.initialPod())) + + val hadoopConfMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head + assert(hadoopConfMap.getData().keySet().asScala === confFiles) + } + + private def checkPod(pod: SparkPod): Unit = { + assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME)) + assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR)) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala new file mode 100644 index 0000000000..41ca3a94ce --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 +import java.security.PrivilegedExceptionAction + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMap, Secret} +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { + + import KubernetesFeaturesTestUtils._ + import SecretVolumeUtils._ + + private val tmpDir = Utils.createTempDir() + + test("mount krb5 config map if defined") { + val configMap = "testConfigMap" + val step = createStep( + new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap)) + + checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap) + assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty) + } + + test("create krb5.conf config map if local config provided") { + val krbConf = File.createTempFile("krb5", ".conf", tmpDir) + Files.write("some data", krbConf, UTF_8) + + val sparkConf = new SparkConf(false) + .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()) + val step = createStep(sparkConf) + + val confMap = filter[ConfigMap](step.getAdditionalKubernetesResources()).head + assert(confMap.getData().keySet().asScala === Set(krbConf.getName())) + + checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), confMap.getMetadata().getName()) + assert(step.getAdditionalPodSystemProperties().isEmpty) + } + + test("create keytab secret if client keytab file used") { + val keytab = File.createTempFile("keytab", ".bin", tmpDir) + Files.write("some data", keytab, UTF_8) + + val sparkConf = new SparkConf(false) + .set(KEYTAB, keytab.getAbsolutePath()) + .set(PRINCIPAL, "alice") + val step = createStep(sparkConf) + + val pod = step.configurePod(SparkPod.initialPod()) + assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME)) + assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, KERBEROS_KEYTAB_MOUNT_POINT)) + + assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key)) + + val secret = filter[Secret](step.getAdditionalKubernetesResources()).head + assert(secret.getData().keySet().asScala === Set(keytab.getName())) + } + + test("do nothing if container-local keytab used") { + val sparkConf = new SparkConf(false) + .set(KEYTAB, "local:/my.keytab") + .set(PRINCIPAL, "alice") + val step = createStep(sparkConf) + + val initial = SparkPod.initialPod() + assert(step.configurePod(initial) === initial) + assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(step.getAdditionalKubernetesResources().isEmpty) + } + + test("mount delegation tokens if provided") { + val dtSecret = "tokenSecret" + val sparkConf = new SparkConf(false) + .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret) + .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens") + val step = createStep(sparkConf) + + checkPodForTokens(step.configurePod(SparkPod.initialPod()), dtSecret) + assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(step.getAdditionalKubernetesResources().isEmpty) + } + + test("create delegation tokens if needed") { + // Since HadoopDelegationTokenManager does not create any tokens without proper configs and + // services, start with a test user that already has some tokens that will just be piped + // through to the driver. + val testUser = UserGroupInformation.createUserForTesting("k8s", Array()) + testUser.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + val creds = testUser.getCredentials() + creds.addSecretKey(new Text("K8S_TEST_KEY"), Array[Byte](0x4, 0x2)) + testUser.addCredentials(creds) + + val tokens = SparkHadoopUtil.get.serialize(creds) + + val step = createStep(new SparkConf(false)) + + val dtSecret = filter[Secret](step.getAdditionalKubernetesResources()).head + assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === Base64.encodeBase64String(tokens)) + + checkPodForTokens(step.configurePod(SparkPod.initialPod()), + dtSecret.getMetadata().getName()) + + assert(step.getAdditionalPodSystemProperties().isEmpty) + } + }) + } + + test("do nothing if no config and no tokens") { + val step = createStep(new SparkConf(false)) + val initial = SparkPod.initialPod() + assert(step.configurePod(initial) === initial) + assert(step.getAdditionalPodSystemProperties().isEmpty) + assert(step.getAdditionalKubernetesResources().isEmpty) + } + + private def checkPodForKrbConf(pod: SparkPod, confMapName: String): Unit = { + val podVolume = pod.pod.getSpec().getVolumes().asScala.find(_.getName() == KRB_FILE_VOLUME) + assert(podVolume.isDefined) + assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, KRB_FILE_DIR_PATH + "/krb5.conf")) + assert(podVolume.get.getConfigMap().getName() === confMapName) + } + + private def checkPodForTokens(pod: SparkPod, dtSecretName: String): Unit = { + val podVolume = pod.pod.getSpec().getVolumes().asScala + .find(_.getName() == SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + assert(podVolume.isDefined) + assert(containerHasVolume(pod.container, SPARK_APP_HADOOP_SECRET_VOLUME_NAME, + SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)) + assert(containerHasEnvVar(pod.container, ENV_HADOOP_TOKEN_FILE_LOCATION)) + assert(podVolume.get.getSecret().getSecretName() === dtSecretName) + } + + private def createStep(conf: SparkConf): KerberosConfDriverFeatureStep = { + val kconf = KubernetesTestConf.createDriverConf(sparkConf = conf) + new KerberosConfDriverFeatureStep(kconf) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala index f90380e30e..076b681be2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ +import scala.reflect.ClassTag import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, SecretBuilder} import org.mockito.Matchers @@ -63,4 +64,9 @@ object KubernetesFeaturesTestUtils { def containerHasEnvVar(container: Container, envVarName: String): Boolean = { container.getEnv.asScala.exists(envVar => envVar.getName == envVarName) } + + def filter[T: ClassTag](list: Seq[HasMetadata]): Seq[T] = { + val desired = implicitly[ClassTag[T]].runtimeClass + list.filter(_.getClass() == desired).map(_.asInstanceOf[T]).toSeq + } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6240f7b68d..184fb6a8ad 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -116,6 +116,8 @@ private[spark] class Client( } } + require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should reference a local file.") + private val launcherBackend = new LauncherBackend() { override protected def conf: SparkConf = sparkConf @@ -472,7 +474,7 @@ private[spark] class Client( appMasterOnly: Boolean = false): (Boolean, String) = { val trimmedPath = path.trim() val localURI = Utils.resolveURI(trimmedPath) - if (localURI.getScheme != LOCAL_SCHEME) { + if (localURI.getScheme != Utils.LOCAL_SCHEME) { if (addDistributedUri(localURI)) { val localPath = getQualifiedLocalPath(localURI, hadoopConf) val linkname = targetDir.map(_ + "/").getOrElse("") + @@ -515,7 +517,7 @@ private[spark] class Client( val sparkArchive = sparkConf.get(SPARK_ARCHIVE) if (sparkArchive.isDefined) { val archive = sparkArchive.get - require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.") + require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.") distribute(Utils.resolveURI(archive).toString, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) @@ -525,7 +527,7 @@ private[spark] class Client( // Break the list of jars to upload, and resolve globs. val localJars = new ArrayBuffer[String]() jars.foreach { jar => - if (!isLocalUri(jar)) { + if (!Utils.isLocalUri(jar)) { val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf) val pathFs = FileSystem.get(path.toUri(), hadoopConf) pathFs.globStatus(path).filter(_.isFile()).foreach { entry => @@ -814,7 +816,7 @@ private[spark] class Client( } (pySparkArchives ++ pyArchives).foreach { path => val uri = Utils.resolveURI(path) - if (uri.getScheme != LOCAL_SCHEME) { + if (uri.getScheme != Utils.LOCAL_SCHEME) { pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName()) } else { pythonPath += uri.getPath() @@ -1183,9 +1185,6 @@ private object Client extends Logging { // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" - // URI scheme that identifies local resources - val LOCAL_SCHEME = "local" - // Staging directory for any temporary jars or files val SPARK_STAGING: String = ".sparkStaging" @@ -1307,7 +1306,7 @@ private object Client extends Logging { addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env) if (sparkConf.get(SPARK_ARCHIVE).isEmpty) { sparkConf.get(SPARK_JARS).foreach { jars => - jars.filter(isLocalUri).foreach { jar => + jars.filter(Utils.isLocalUri).foreach { jar => val uri = new URI(jar) addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env) } @@ -1340,7 +1339,7 @@ private object Client extends Logging { private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) - if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None + if (uri.getScheme == Utils.LOCAL_SCHEME) Some(uri) else None }.orElse(Some(new URI(APP_JAR_NAME))) } @@ -1368,7 +1367,7 @@ private object Client extends Logging { uri: URI, fileName: String, env: HashMap[String, String]): Unit = { - if (uri != null && uri.getScheme == LOCAL_SCHEME) { + if (uri != null && uri.getScheme == Utils.LOCAL_SCHEME) { addClasspathEntry(getClusterPath(conf, uri.getPath), env) } else if (fileName != null) { addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env) @@ -1489,11 +1488,6 @@ private object Client extends Logging { components.mkString(Path.SEPARATOR) } - /** Returns whether the URI is a "local:" URI. */ - def isLocalUri(uri: String): Boolean = { - uri.startsWith(s"$LOCAL_SCHEME:") - } - def createAppReport(report: ApplicationReport): YarnAppReport = { val diags = report.getDiagnostics() val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index b3286e8fd8..a6f57fcdb2 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -100,7 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) - if (LOCAL_SCHEME.equals(uri.getScheme())) { + if (Utils.LOCAL_SCHEME.equals(uri.getScheme())) { cp should contain (uri.getPath()) } else { cp should not contain (uri.getPath()) @@ -136,7 +136,7 @@ class ClientSuite extends SparkFunSuite with Matchers { val expected = ADDED.split(",") .map(p => { val uri = new URI(p) - if (LOCAL_SCHEME == uri.getScheme()) { + if (Utils.LOCAL_SCHEME == uri.getScheme()) { p } else { Option(uri.getFragment()).getOrElse(new File(p).getName()) @@ -249,7 +249,7 @@ class ClientSuite extends SparkFunSuite with Matchers { any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) - sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) + sparkConf.set(SPARK_ARCHIVE, Utils.LOCAL_SCHEME + ":" + archive.getPath()) intercept[IllegalArgumentException] { client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) }