diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider b/core/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider similarity index 100% rename from core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider rename to core/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index e56d03401d..2e21adac86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -23,12 +23,12 @@ import scala.reflect.runtime.universe import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.security.HadoopDelegationTokenProvider import org.apache.spark.util.Utils private[security] class HBaseDelegationTokenProvider diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 6a18a8dd33..4db86ba8f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -26,7 +26,6 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf @@ -35,6 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.security.HadoopDelegationTokenProvider import org.apache.spark.ui.UIUtils import org.apache.spark.util.{ThreadUtils, Utils} diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 725eefbda8..ac432e7581 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.security.HadoopDelegationTokenProvider private[deploy] class HadoopFSDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/security/HadoopDelegationTokenProvider.scala similarity index 92% rename from core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala rename to core/src/main/scala/org/apache/spark/security/HadoopDelegationTokenProvider.scala index 3dc952d54e..cff8d81443 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/security/HadoopDelegationTokenProvider.scala @@ -15,18 +15,20 @@ * limitations under the License. */ -package org.apache.spark.deploy.security +package org.apache.spark.security import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.spark.SparkConf +import org.apache.spark.annotation.DeveloperApi /** + * ::DeveloperApi:: * Hadoop delegation token provider. */ -private[spark] trait HadoopDelegationTokenProvider { +@DeveloperApi +trait HadoopDelegationTokenProvider { /** * Name of the service to provide delegation tokens. This name should be unique. Spark will diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider b/core/src/test/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider similarity index 100% rename from core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider rename to core/src/test/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 2f36dba05c..70174f7ff9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.security.HadoopDelegationTokenProvider private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider { ExceptionThrowingDelegationTokenProvider.constructed = true diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 8f1a12726b..6ee4b3d410 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -538,13 +538,6 @@ for: filesystem if `spark.yarn.stagingDir` is not set); - if Hadoop federation is enabled, all the federated filesystems in the configuration. -The YARN integration also supports custom delegation token providers using the Java Services -mechanism (see `java.util.ServiceLoader`). Implementations of -`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark -by listing their names in the corresponding file in the jar's `META-INF/services` directory. These -providers can be disabled individually by setting `spark.security.credentials.{service}.enabled` to -`false`, where `{service}` is the name of the credential provider. - ## YARN-specific Kerberos Configuration diff --git a/docs/security.md b/docs/security.md index d2cff41eb0..20492d871b 100644 --- a/docs/security.md +++ b/docs/security.md @@ -756,6 +756,11 @@ If an application needs to interact with other secure Hadoop filesystems, their explicitly provided to Spark at launch time. This is done by listing them in the `spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below. +Spark also supports custom delegation token providers using the Java Services +mechanism (see `java.util.ServiceLoader`). Implementations of +`org.apache.spark.security.HadoopDelegationTokenProvider` can be made available to Spark +by listing their names in the corresponding file in the jar's `META-INF/services` directory. + Delegation token support is currently only supported in YARN and Mesos modes. Consult the deployment-specific page for more information. diff --git a/external/kafka-0-10-token-provider/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider b/external/kafka-0-10-token-provider/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider similarity index 100% rename from external/kafka-0-10-token-provider/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider rename to external/kafka-0-10-token-provider/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala index c69e8a3200..cba4b40ca7 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.security.Credentials import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} import org.apache.spark.SparkConf -import org.apache.spark.deploy.security.HadoopDelegationTokenProvider import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Kafka +import org.apache.spark.security.HadoopDelegationTokenProvider private[spark] class KafkaDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7523e3c42c..6ca81fb97c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,7 +21,6 @@ import java.io.{FileSystem => _, _} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import java.security.PrivilegedExceptionAction import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -34,9 +33,9 @@ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.{DataOutputBuffer, Text} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -50,8 +49,8 @@ import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ @@ -315,7 +314,7 @@ private[spark] class Client( val credentials = currentUser.getCredentials() if (isClusterMode) { - val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) + val credentialManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null) credentialManager.obtainDelegationTokens(credentials) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala deleted file mode 100644 index cc24ac4d9b..0000000000 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.SparkConf - -/** - * A credential provider for a service. User must implement this if they need to access a - * secure service from Spark. - */ -trait ServiceCredentialProvider { - - /** - * Name of the service to provide credentials. This name should unique, Spark internally will - * use this name to differentiate credential provider. - */ - def serviceName: String - - /** - * Returns true if credentials are required by this service. By default, it is based on whether - * Hadoop security is enabled. - */ - def credentialsRequired(hadoopConf: Configuration): Boolean = { - UserGroupInformation.isSecurityEnabled - } - - /** - * Obtain credentials for this service and get the time of the next renewal. - * - * @param hadoopConf Configuration of current Hadoop Compatible system. - * @param sparkConf Spark configuration. - * @param creds Credentials to add tokens and security keys to. - * @return If this Credential is renewable and can be renewed, return the time of the next - * renewal, otherwise None should be returned. - */ - def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] -} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala deleted file mode 100644 index fc1f75254c..0000000000 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import java.util.ServiceLoader - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.Utils - -/** - * This class loads delegation token providers registered under the YARN-specific - * [[ServiceCredentialProvider]] interface, as well as the builtin providers defined - * in [[HadoopDelegationTokenManager]]. - */ -private[spark] class YARNHadoopDelegationTokenManager( - _sparkConf: SparkConf, - _hadoopConf: Configuration, - _schedulerRef: RpcEndpointRef) - extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf, _schedulerRef) { - - private val credentialProviders = { - ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) - .asScala - .toList - .filter { p => isServiceEnabled(p.serviceName) } - .map { p => (p.serviceName, p) } - .toMap - } - if (credentialProviders.nonEmpty) { - logDebug("Using the following YARN-specific credential providers: " + - s"${credentialProviders.keys.mkString(", ")}.") - } - - override def obtainDelegationTokens(creds: Credentials): Long = { - val superInterval = super.obtainDelegationTokens(creds) - - credentialProviders.values.flatMap { provider => - if (provider.credentialsRequired(hadoopConf)) { - provider.obtainCredentials(hadoopConf, sparkConf, creds) - } else { - logDebug(s"Service ${provider.serviceName} does not require a token." + - s" Check your configuration to see if security is disabled or not.") - None - } - }.foldLeft(superInterval)(math.min) - } - - // For testing. - override def isProviderLoaded(serviceName: String): Boolean = { - credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName) - } - -} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 821fbcd956..78cd6a200a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -31,14 +31,12 @@ import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.internal.config.UI._ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.ui.JettyUtils import org.apache.spark.util.{RpcUtils, ThreadUtils} /** @@ -223,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend( } override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { - Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) + Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) } /** diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index f31c232693..0000000000 --- a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.deploy.yarn.security.YARNTestCredentialProvider diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala deleted file mode 100644 index f00453cb9c..0000000000 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials - -import org.apache.spark.{SparkConf, SparkFunSuite} - -class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite { - private var credentialManager: YARNHadoopDelegationTokenManager = null - private var sparkConf: SparkConf = null - private var hadoopConf: Configuration = null - - override def beforeAll(): Unit = { - super.beforeAll() - sparkConf = new SparkConf() - hadoopConf = new Configuration() - } - - test("Correctly loads credential providers") { - credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf, null) - assert(credentialManager.isProviderLoaded("yarn-test")) - } -} - -class YARNTestCredentialProvider extends ServiceCredentialProvider { - override def serviceName: String = "yarn-test" - - override def credentialsRequired(conf: Configuration): Boolean = true - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = None -} diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider similarity index 100% rename from sql/hive/src/main/resources/META-INF/services/org.apache.spark.deploy.security.HadoopDelegationTokenProvider rename to sql/hive/src/main/resources/META-INF/services/org.apache.spark.security.HadoopDelegationTokenProvider diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala index c0c46187b1..faee405d70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala @@ -23,7 +23,6 @@ import java.security.PrivilegedExceptionAction import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.metadata.Hive @@ -33,9 +32,9 @@ import org.apache.hadoop.security.token.Token import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenProvider import org.apache.spark.internal.Logging import org.apache.spark.internal.config.KEYTAB +import org.apache.spark.security.HadoopDelegationTokenProvider import org.apache.spark.util.Utils private[spark] class HiveDelegationTokenProvider