diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 5166543933..8537c53688 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -731,7 +731,9 @@ private[spark] object SparkConf extends Logging { KEYTAB.key -> Seq( AlternateConfig("spark.yarn.keytab", "3.0")), PRINCIPAL.key -> Seq( - AlternateConfig("spark.yarn.principal", "3.0")) + AlternateConfig("spark.yarn.principal", "3.0")), + KERBEROS_RELOGIN_PERIOD.key -> Seq( + AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")) ) /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 78a7cf648e..5979151345 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -413,20 +413,6 @@ object SparkHadoopUtil { def get: SparkHadoopUtil = instance - /** - * Given an expiration date for the current set of credentials, calculate the time when new - * credentials should be created. - * - * @param expirationDate Drop-dead expiration date - * @param conf Spark configuration - * @return Timestamp when new credentials should be created. - */ - private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: SparkConf): Long = { - val ct = System.currentTimeMillis - val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO) - (ct + (ratio * (expirationDate - ct))).toLong - } - /** * Returns a Configuration object with Spark configuration applied on top. Unlike * the instance method, this will always return a Configuration instance, and not a diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index ab8d8d96a9..10cd8742f2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -17,76 +17,158 @@ package org.apache.spark.deploy.security +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.ThreadUtils /** - * Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to - * obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], - * [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not - * explicitly disabled. + * Manager for delegation tokens in a Spark application. * - * Also, each HadoopDelegationTokenProvider is controlled by - * spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to - * false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be - * enabled/disabled by the configuration spark.security.credentials.hive.enabled. + * This manager has two modes of operation: * - * @param sparkConf Spark configuration - * @param hadoopConf Hadoop configuration - * @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. + * 1. When configured with a principal and a keytab, it will make sure long-running apps can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often + * to check that a relogin is necessary. This is done reasonably often since the check is a no-op + * when the relogin is not yet needed. The check period can be overridden in the configuration. + * + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. + * + * 2. When operating without an explicit principal and keytab, token renewal will not be available. + * Starting the manager will distribute an initial set of delegation tokens to the provided Spark + * driver, but the app will not get new tokens when those expire. + * + * It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens` + * method. This option does not require calling the `start` method, but leaves it up to the + * caller to distribute the tokens that were generated. */ private[spark] class HadoopDelegationTokenManager( - sparkConf: SparkConf, - hadoopConf: Configuration, - fileSystems: Configuration => Set[FileSystem]) - extends Logging { + protected val sparkConf: SparkConf, + protected val hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfigs = List( "spark.yarn.security.tokens.%s.enabled", "spark.yarn.security.credentials.%s.enabled") private val providerEnabledConfig = "spark.security.credentials.%s.enabled" - // Maintain all the registered delegation token providers - private val delegationTokenProviders = getDelegationTokenProviders + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + + require((principal == null) == (keytab == null), + "Both principal and keytab must be defined, or neither.") + require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.") + + private val delegationTokenProviders = loadProviders() logDebug("Using the following builtin delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") - /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ - def this(sparkConf: SparkConf, hadoopConf: Configuration) = { - this( - sparkConf, - hadoopConf, - hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf))) + private var renewalExecutor: ScheduledExecutorService = _ + private val driverRef = new AtomicReference[RpcEndpointRef]() + + /** Set the endpoint used to send tokens to the driver. */ + def setDriverRef(ref: RpcEndpointRef): Unit = { + driverRef.set(ref) } - private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { - val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ - safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + /** @return Whether delegation token renewal is enabled. */ + def renewalEnabled: Boolean = principal != null - // Filter out providers for which spark.security.credentials.{service}.enabled is false. - providers - .filter { p => isServiceEnabled(p.serviceName) } - .map { p => (p.serviceName, p) } - .toMap + /** + * Start the token renewer. Requires a principal and keytab. Upon start, the renewer will: + * + * - log in the configured principal, and set up a task to keep that user's ticket renewed + * - obtain delegation tokens from all available providers + * - send the tokens to the driver, if it's already registered + * - schedule a periodic task to update the tokens when needed. + * + * @return The newly logged in user. + */ + def start(): UserGroupInformation = { + require(renewalEnabled, "Token renewal must be enabled to start the renewer.") + renewalExecutor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() + val ugi = doLogin() + + val tgtRenewalTask = new Runnable() { + override def run(): Unit = { + ugi.checkTGTAndReloginFromKeytab() + } + } + val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) + renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, + TimeUnit.SECONDS) + + val creds = obtainTokensAndScheduleRenewal(ugi) + ugi.addCredentials(creds) + + val driver = driverRef.get() + if (driver != null) { + val tokens = SparkHadoopUtil.get.serialize(creds) + driver.send(UpdateDelegationTokens(tokens)) + } + + // Transfer the original user's tokens to the new user, since it may contain needed tokens + // (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already + // exist in the current user's credentials, since those were freshly obtained above + // (see SPARK-23361). + val existing = ugi.getCredentials() + existing.mergeAll(originalCreds) + ugi.addCredentials(existing) + ugi } - private def safeCreateProvider( - createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = { - try { - Some(createFn) - } catch { - case t: Throwable => - logDebug(s"Failed to load built in provider.", t) - None + def stop(): Unit = { + if (renewalExecutor != null) { + renewalExecutor.shutdown() } } - def isServiceEnabled(serviceName: String): Boolean = { + /** + * Fetch new delegation tokens for configured services, storing them in the given credentials. + * Tokens are fetched for the current logged in user. + * + * @param creds Credentials object where to store the delegation tokens. + * @return The time by which the tokens must be renewed. + */ + def obtainDelegationTokens(creds: Credentials): Long = { + delegationTokenProviders.values.flatMap { provider => + if (provider.delegationTokensRequired(sparkConf, hadoopConf)) { + provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) + } else { + logDebug(s"Service ${provider.serviceName} does not require a token." + + s" Check your configuration to see if security is disabled or not.") + None + } + }.foldLeft(Long.MaxValue)(math.min) + } + + // Visible for testing. + def isProviderLoaded(serviceName: String): Boolean = { + delegationTokenProviders.contains(serviceName) + } + + protected def isServiceEnabled(serviceName: String): Boolean = { val key = providerEnabledConfig.format(serviceName) deprecatedProviderEnabledConfigs.foreach { pattern => @@ -110,32 +192,104 @@ private[spark] class HadoopDelegationTokenManager( } /** - * Get delegation token provider for the specified service. + * List of file systems for which to obtain delegation tokens. The base implementation + * returns just the default file system in the given Hadoop configuration. */ - def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = { - delegationTokenProviders.get(service) + protected def fileSystemsToAccess(): Set[FileSystem] = { + Set(FileSystem.get(hadoopConf)) + } + + private def scheduleRenewal(delay: Long): Unit = { + val _delay = math.max(0, delay) + logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.") + + val renewalTask = new Runnable() { + override def run(): Unit = { + updateTokensTask() + } + } + renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS) } /** - * Writes delegation tokens to creds. Delegation tokens are fetched from all registered - * providers. - * - * @param hadoopConf hadoop Configuration - * @param creds Credentials that will be updated in place (overwritten) - * @return Time after which the fetched delegation tokens should be renewed. + * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself + * to fetch the next set of tokens when needed. */ - def obtainDelegationTokens( - hadoopConf: Configuration, - creds: Credentials): Long = { - delegationTokenProviders.values.flatMap { provider => - if (provider.delegationTokensRequired(sparkConf, hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) - } else { - logDebug(s"Service ${provider.serviceName} does not require a token." + - s" Check your configuration to see if security is disabled or not.") - None - } - }.foldLeft(Long.MaxValue)(math.min) - } -} + private def updateTokensTask(): Unit = { + try { + val freshUGI = doLogin() + val creds = obtainTokensAndScheduleRenewal(freshUGI) + val tokens = SparkHadoopUtil.get.serialize(creds) + val driver = driverRef.get() + if (driver != null) { + logInfo("Updating delegation tokens.") + driver.send(UpdateDelegationTokens(tokens)) + } else { + // This shouldn't really happen, since the driver should register way before tokens expire. + logWarning("Delegation tokens close to expiration but no driver has registered yet.") + SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) + } + } catch { + case e: Exception => + val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) + logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + + " If this happens too often tasks will fail.", e) + scheduleRenewal(delay) + } + } + + /** + * Obtain new delegation tokens from the available providers. Schedules a new task to fetch + * new tokens before the new set expires. + * + * @return Credentials containing the new tokens. + */ + private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { + ugi.doAs(new PrivilegedExceptionAction[Credentials]() { + override def run(): Credentials = { + val creds = new Credentials() + val nextRenewal = obtainDelegationTokens(creds) + + // Calculate the time when new credentials should be created, based on the configured + // ratio. + val now = System.currentTimeMillis + val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO) + val delay = (ratio * (nextRenewal - now)).toLong + scheduleRenewal(delay) + creds + } + }) + } + + private def doLogin(): UserGroupInformation = { + logInfo(s"Attempting to login to KDC using principal: $principal") + val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + logInfo("Successfully logged into KDC.") + ugi + } + + private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = { + val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++ + safeCreateProvider(new HiveDelegationTokenProvider) ++ + safeCreateProvider(new HBaseDelegationTokenProvider) + + // Filter out providers for which spark.security.credentials.{service}.enabled is false. + providers + .filter { p => isServiceEnabled(p.serviceName) } + .map { p => (p.serviceName, p) } + .toMap + } + + private def safeCreateProvider( + createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = { + try { + Some(createFn) + } catch { + case t: Throwable => + logDebug(s"Failed to load built in provider.", t) + None + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 21ca669ea9..767b5521e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) +private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. @@ -44,8 +44,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long] = { - - val fsToGetTokens = fileSystems(hadoopConf) + val fsToGetTokens = fileSystems() val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) // Get the token renewal interval if it is not set. It will only be called once. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 356cf9e76c..034e5ebbd2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -179,6 +179,10 @@ package object config { .doc("Name of the Kerberos principal.") .stringConf.createOptional + private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.kerberos.relogin.period") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1m") + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index de7c0d813a..329158a44d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -18,13 +18,17 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // Current set of delegation tokens to send to executors. + private val delegationTokens = new AtomicReference[Array[Byte]]() + + // The token manager used to create security tokens. + private var delegationTokenManager: Option[HadoopDelegationTokenManager] = None + private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") @@ -152,6 +162,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } case UpdateDelegationTokens(newDelegationTokens) => + SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf) + delegationTokens.set(newDelegationTokens) executorDataMap.values.foreach { ed => ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens)) } @@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - fetchHadoopDelegationTokens()) + Option(delegationTokens.get())) context.reply(reply) } @@ -390,6 +402,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // TODO (prashant) send conf instead of properties driverEndpoint = createDriverEndpointRef(properties) + + if (UserGroupInformation.isSecurityEnabled()) { + delegationTokenManager = createTokenManager() + delegationTokenManager.foreach { dtm => + dtm.setDriverRef(driverEndpoint) + val creds = if (dtm.renewalEnabled) { + dtm.start().getCredentials() + } else { + val creds = UserGroupInformation.getCurrentUser().getCredentials() + dtm.obtainDelegationTokens(creds) + creds + } + delegationTokens.set(SparkHadoopUtil.get.serialize(creds)) + } + } } protected def createDriverEndpointRef( @@ -416,6 +443,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def stop() { reviveThread.shutdownNow() stopExecutors() + delegationTokenManager.foreach(_.stop()) try { if (driverEndpoint != null) { driverEndpoint.askSync[Boolean](StopDriver) @@ -684,7 +712,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp true } - protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None } + /** + * Create the delegation token manager to be used for the application. This method is called + * once during the start of the scheduler backend (so after the object has already been + * fully constructed), only if security is enabled in the Hadoop configuration. + */ + protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None + } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 2849a10a2c..e0e630e3be 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -21,94 +21,36 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials -import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.Utils -class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { - private var delegationTokenManager: HadoopDelegationTokenManager = null - private var sparkConf: SparkConf = null - private var hadoopConf: Configuration = null +class HadoopDelegationTokenManagerSuite extends SparkFunSuite { + private val hadoopConf = new Configuration() - override def beforeAll(): Unit = { - super.beforeAll() - - sparkConf = new SparkConf() - hadoopConf = new Configuration() - } - - test("Correctly load default credential providers") { - delegationTokenManager = new HadoopDelegationTokenManager( - sparkConf, - hadoopConf, - hadoopFSsToAccess) - - delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hive") should not be (None) - delegationTokenManager.getServiceDelegationTokenProvider("bogus") should be (None) + test("default configuration") { + val manager = new HadoopDelegationTokenManager(new SparkConf(false), hadoopConf) + assert(manager.isProviderLoaded("hadoopfs")) + assert(manager.isProviderLoaded("hbase")) + assert(manager.isProviderLoaded("hive")) } test("disable hive credential provider") { - sparkConf.set("spark.security.credentials.hive.enabled", "false") - delegationTokenManager = new HadoopDelegationTokenManager( - sparkConf, - hadoopConf, - hadoopFSsToAccess) - - delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None) + val sparkConf = new SparkConf(false).set("spark.security.credentials.hive.enabled", "false") + val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) + assert(manager.isProviderLoaded("hadoopfs")) + assert(manager.isProviderLoaded("hbase")) + assert(!manager.isProviderLoaded("hive")) } test("using deprecated configurations") { - sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false") - sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") - delegationTokenManager = new HadoopDelegationTokenManager( - sparkConf, - hadoopConf, - hadoopFSsToAccess) - - delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None) - delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) - } - - test("verify no credentials are obtained") { - delegationTokenManager = new HadoopDelegationTokenManager( - sparkConf, - hadoopConf, - hadoopFSsToAccess) - val creds = new Credentials() - - // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests. - delegationTokenManager.obtainDelegationTokens(hadoopConf, creds) - val tokens = creds.getAllTokens - tokens.size() should be (0) - } - - test("obtain tokens For HiveMetastore") { - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.kerberos.principal", "bob") - // thrift picks up on port 0 and bails out, without trying to talk to endpoint - hadoopConf.set("hive.metastore.uris", "http://localhost:0") - - val hiveCredentialProvider = new HiveDelegationTokenProvider() - val credentials = new Credentials() - hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) - - credentials.getAllTokens.size() should be (0) - } - - test("Obtain tokens For HBase") { - val hadoopConf = new Configuration() - hadoopConf.set("hbase.security.authentication", "kerberos") - - val hbaseTokenProvider = new HBaseDelegationTokenProvider() - val creds = new Credentials() - hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds) - - creds.getAllTokens.size should be (0) + val sparkConf = new SparkConf(false) + .set("spark.yarn.security.tokens.hadoopfs.enabled", "false") + .set("spark.yarn.security.credentials.hive.enabled", "false") + val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf) + assert(!manager.isProviderLoaded("hadoopfs")) + assert(manager.isProviderLoaded("hbase")) + assert(!manager.isProviderLoaded("hive")) } test("SPARK-23209: obtain tokens when Hive classes are not available") { @@ -123,43 +65,41 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { throw new ClassNotFoundException(name) } - if (name.startsWith("java") || name.startsWith("scala")) { - currentLoader.loadClass(name) - } else { - val classFileName = name.replaceAll("\\.", "/") + ".class" - val in = currentLoader.getResourceAsStream(classFileName) - if (in != null) { - val bytes = IOUtils.toByteArray(in) - defineClass(name, bytes, 0, bytes.length) - } else { - throw new ClassNotFoundException(name) - } + val prefixBlacklist = Seq("java", "scala", "com.sun.", "sun.") + if (prefixBlacklist.exists(name.startsWith(_))) { + return currentLoader.loadClass(name) } + + val found = findLoadedClass(name) + if (found != null) { + return found + } + + val classFileName = name.replaceAll("\\.", "/") + ".class" + val in = currentLoader.getResourceAsStream(classFileName) + if (in != null) { + val bytes = IOUtils.toByteArray(in) + return defineClass(name, bytes, 0, bytes.length) + } + + throw new ClassNotFoundException(name) } } - try { - Thread.currentThread().setContextClassLoader(noHive) + Utils.withContextClassLoader(noHive) { val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$")) test.getMethod("runTest").invoke(null) - } finally { - Thread.currentThread().setContextClassLoader(currentLoader) } } - - private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = { - Set(FileSystem.get(hadoopConf)) - } } /** Test code for SPARK-23209 to avoid using too much reflection above. */ -private object NoHiveTest extends Matchers { +private object NoHiveTest { def runTest(): Unit = { try { - val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(), - _ => Set()) - manager.getServiceDelegationTokenProvider("hive") should be (None) + val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration()) + require(!manager.isProviderLoaded("hive")) } catch { case e: Throwable => // Throw a better exception in case the test fails, since there may be a lot of nesting. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 3e30ab2c83..066547dcbb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ -import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.ConfigEntry @@ -79,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file" def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager = - new KubernetesHadoopDelegationTokenManager(new HadoopDelegationTokenManager(conf, hConf)) + new KubernetesHadoopDelegationTokenManager(conf, hConf) def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala index 67a58491e4..0022d8f242 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala @@ -38,16 +38,14 @@ private[spark] object HadoopKerberosLogin { submissionSparkConf: SparkConf, kubernetesResourceNamePrefix: String, tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = { - val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) // The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal // The login happens in the SparkSubmit so login logic is not necessary to include val jobUserUGI = tokenManager.getCurrentUser val originalCredentials = jobUserUGI.getCredentials - val (tokenData, renewalInterval) = tokenManager.getDelegationTokens( - originalCredentials, - submissionSparkConf, - hadoopConf) - require(tokenData.nonEmpty, "Did not obtain any delegation tokens") + tokenManager.obtainDelegationTokens(originalCredentials) + + val tokenData = SparkHadoopUtil.get.serialize(originalCredentials) + val initialTokenDataKeyName = KERBEROS_SECRET_KEY val newSecretName = s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME" val secretDT = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala index 135e2c482b..3e98d5811d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala @@ -18,45 +18,20 @@ package org.apache.spark.deploy.k8s.security import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.Logging /** - * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens - * on the behalf of the Kubernetes submission client. The new credentials - * (called Tokens when they are serialized) are stored in Secrets accessible - * to the driver and executors, when new Tokens are received they overwrite the current Secrets. + * Adds Kubernetes-specific functionality to HadoopDelegationTokenManager. */ private[spark] class KubernetesHadoopDelegationTokenManager( - tokenManager: HadoopDelegationTokenManager) extends Logging { + _sparkConf: SparkConf, + _hadoopConf: Configuration) + extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { - // HadoopUGI Util methods def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser - def getShortUserName: String = getCurrentUser.getShortUserName - def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf) def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled - def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation = - UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - def serializeCreds(creds: Credentials): Array[Byte] = SparkHadoopUtil.get.serialize(creds) - def nextRT(rt: Long, conf: SparkConf): Long = SparkHadoopUtil.nextCredentialRenewalTime(rt, conf) - def getDelegationTokens( - creds: Credentials, - conf: SparkConf, - hadoopConf: Configuration): (Array[Byte], Long) = { - try { - val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) - logDebug(s"Initialized tokens") - (serializeCreds(creds), nextRT(rt, conf)) - } catch { - case e: Exception => - logError(s"Failed to fetch Hadoop delegation tokens $e") - throw e - } - } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index bac0246b7d..f5866651dc 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -26,12 +26,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} @@ -60,9 +60,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { - private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager = - new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint) - // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 @@ -678,7 +675,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( launcherBackend.close() } - private def stopSchedulerBackend() { + private def stopSchedulerBackend(): Unit = { // Make sure we're not launching tasks during shutdown stateLock.synchronized { if (stopCalled) { @@ -777,6 +774,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { + Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration)) + } + private def numExecutors(): Int = { slaves.values.map(_.taskIDs.size).sum } @@ -789,14 +790,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( offer.getHostname } } - - override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { - if (UserGroupInformation.isSecurityEnabled) { - Some(hadoopDelegationTokenManager.getTokens()) - } else { - None - } - } } private class Slave(val hostname: String) { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala deleted file mode 100644 index a1bf4f0c04..0000000000 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.security.PrivilegedExceptionAction -import java.util.concurrent.{ScheduledExecutorService, TimeUnit} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.UserGroupInformation - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens -import org.apache.spark.ui.UIUtils -import org.apache.spark.util.ThreadUtils - - -/** - * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf - * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, - * and similarly will renew the Credentials when 75% of the renewal interval has passed. - * The principal difference is that instead of writing the new credentials to HDFS and - * incrementing the timestamp of the file, the new credentials (called Tokens when they are - * serialized) are broadcast to all running executors. On the executor side, when new Tokens are - * received they overwrite the current credentials. - */ -private[spark] class MesosHadoopDelegationTokenManager( - conf: SparkConf, - hadoopConfig: Configuration, - driverEndpoint: RpcEndpointRef) - extends Logging { - - require(driverEndpoint != null, "DriverEndpoint is not initialized") - - private val credentialRenewerThread: ScheduledExecutorService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") - - private val tokenManager: HadoopDelegationTokenManager = - new HadoopDelegationTokenManager(conf, hadoopConfig) - - private val principal: String = conf.get(config.PRINCIPAL).orNull - - private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { - try { - val creds = UserGroupInformation.getCurrentUser.getCredentials - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) - logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") - (SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)) - } catch { - case e: Exception => - logError(s"Failed to fetch Hadoop delegation tokens $e") - throw e - } - } - - private val keytabFile: Option[String] = conf.get(config.KEYTAB) - - scheduleTokenRenewal() - - private def scheduleTokenRenewal(): Unit = { - if (keytabFile.isDefined) { - require(principal != null, "Principal is required for Keytab-based authentication") - logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") - } else { - logInfo("Using ticket cache for Kerberos authentication, no token renewal.") - return - } - - def scheduleRenewal(runnable: Runnable): Unit = { - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() - if (remainingTime <= 0) { - logInfo("Credentials have expired, creating new ones now.") - runnable.run() - } else { - logInfo(s"Scheduling login from keytab in $remainingTime millis.") - credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) - } - } - - val credentialRenewerRunnable = - new Runnable { - override def run(): Unit = { - try { - getNewDelegationTokens() - broadcastDelegationTokens(tokens) - } catch { - case e: Exception => - // Log the error and try to write new tokens back in an hour - val delay = TimeUnit.SECONDS.toMillis(conf.get(config.CREDENTIALS_RENEWAL_RETRY_WAIT)) - logWarning( - s"Couldn't broadcast tokens, trying again in ${UIUtils.formatDuration(delay)}", e) - credentialRenewerThread.schedule(this, delay, TimeUnit.MILLISECONDS) - return - } - scheduleRenewal(this) - } - } - scheduleRenewal(credentialRenewerRunnable) - } - - private def getNewDelegationTokens(): Unit = { - logInfo(s"Attempting to login to KDC with principal ${principal}") - // Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala - // Don't protect against keytabFile being empty because it's guarded above. - val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get) - logInfo("Successfully logged into KDC") - val tempCreds = ugi.getCredentials - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] { - override def run(): Long = { - tokenManager.obtainDelegationTokens(hadoopConf, tempCreds) - } - }) - - val currTime = System.currentTimeMillis() - timeOfNextRenewal = if (nextRenewalTime <= currTime) { - logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " + - s"current time ($currTime), which is unexpected, please check your credential renewal " + - "related configurations in the target services.") - currTime - } else { - SparkHadoopUtil.nextCredentialRenewalTime(nextRenewalTime, conf) - } - logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms") - - // Add the temp credentials back to the original ones. - UserGroupInformation.getCurrentUser.addCredentials(tempCreds) - // update tokens for late or dynamically added executors - tokens = SparkHadoopUtil.get.serialize(tempCreds) - } - - private def broadcastDelegationTokens(tokens: Array[Byte]) = { - logInfo("Sending new tokens to all executors") - driverEndpoint.send(UpdateDelegationTokens(tokens)) - } - - def getTokens(): Array[Byte] = { - tokens - } -} - diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f94e3f731..c1f3211bca 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -41,7 +41,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.AMCredentialRenewer +import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem @@ -99,20 +99,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map { _ => - new AMCredentialRenewer(sparkConf, yarnConf) + private val tokenManager: Option[YARNHadoopDelegationTokenManager] = { + sparkConf.get(KEYTAB).map { _ => + new YARNHadoopDelegationTokenManager(sparkConf, yarnConf) + } } - private val ugi = credentialRenewer match { - case Some(cr) => + private val ugi = tokenManager match { + case Some(tm) => // Set the context class loader so that the token renewer has access to jars distributed // by the user. - val currentLoader = Thread.currentThread().getContextClassLoader() - Thread.currentThread().setContextClassLoader(userClassLoader) - try { - cr.start() - } finally { - Thread.currentThread().setContextClassLoader(currentLoader) + Utils.withContextClassLoader(userClassLoader) { + tm.start() } case _ => @@ -380,7 +378,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends userClassThread.interrupt() } if (!inShutdown) { - credentialRenewer.foreach(_.stop()) + tokenManager.foreach(_.stop()) } } } @@ -440,7 +438,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends securityMgr, localResources) - credentialRenewer.foreach(_.setDriverRef(driverRef)) + tokenManager.foreach(_.setDriverRef(driverRef)) // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 49b7f6261e..6240f7b68d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -310,7 +310,7 @@ private[spark] class Client( private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { val credentials = UserGroupInformation.getCurrentUser().getCredentials() val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - credentialManager.obtainDelegationTokens(hadoopConf, credentials) + credentialManager.obtainDelegationTokens(credentials) // When using a proxy user, copy the delegation tokens to the user's credentials. Avoid // that for regular users, since in those case the user already has access to the TGT, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index f2ed555edc..b257d8fdd3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -325,10 +325,6 @@ package object config { .stringConf .createOptional - private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.yarn.kerberos.relogin.period") - .timeConf(TimeUnit.SECONDS) - .createWithDefaultString("1m") - // The list of cache-related config entries. This is used by Client and the AM to clean // up the environment so that these settings do not appear on the web UI. private[yarn] val CACHE_CONFIGS = Seq( diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala deleted file mode 100644 index bc8d47dbd5..0000000000 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.yarn.security - -import java.security.PrivilegedExceptionAction -import java.util.concurrent.{ScheduledExecutorService, TimeUnit} -import java.util.concurrent.atomic.AtomicReference - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens -import org.apache.spark.ui.UIUtils -import org.apache.spark.util.ThreadUtils - -/** - * A manager tasked with periodically updating delegation tokens needed by the application. - * - * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run - * without interruption while accessing secured services. It periodically logs in to the KDC with - * user-provided credentials, and contacts all the configured secure services to obtain delegation - * tokens to be distributed to the rest of the application. - * - * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API - * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is - * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet - * needed. The check period can be overridden in the configuration. - * - * New delegation tokens are created once 75% of the renewal interval of the original tokens has - * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. - * The driver is tasked with distributing the tokens to other processes that might need them. - */ -private[yarn] class AMCredentialRenewer( - sparkConf: SparkConf, - hadoopConf: Configuration) extends Logging { - - private val principal = sparkConf.get(PRINCIPAL).get - private val keytab = sparkConf.get(KEYTAB).get - private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - - private val renewalExecutor: ScheduledExecutorService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") - - private val driverRef = new AtomicReference[RpcEndpointRef]() - - private val renewalTask = new Runnable() { - override def run(): Unit = { - updateTokensTask() - } - } - - def setDriverRef(ref: RpcEndpointRef): Unit = { - driverRef.set(ref) - } - - /** - * Start the token renewer. Upon start, the renewer will: - * - * - log in the configured user, and set up a task to keep that user's ticket renewed - * - obtain delegation tokens from all available providers - * - schedule a periodic task to update the tokens when needed. - * - * @return The newly logged in user. - */ - def start(): UserGroupInformation = { - val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() - val ugi = doLogin() - - val tgtRenewalTask = new Runnable() { - override def run(): Unit = { - ugi.checkTGTAndReloginFromKeytab() - } - } - val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD) - renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod, - TimeUnit.SECONDS) - - val creds = obtainTokensAndScheduleRenewal(ugi) - ugi.addCredentials(creds) - - // Transfer the original user's tokens to the new user, since that's needed to connect to - // YARN. Explicitly avoid overwriting tokens that already exist in the current user's - // credentials, since those were freshly obtained above (see SPARK-23361). - val existing = ugi.getCredentials() - existing.mergeAll(originalCreds) - ugi.addCredentials(existing) - - ugi - } - - def stop(): Unit = { - renewalExecutor.shutdown() - } - - private def scheduleRenewal(delay: Long): Unit = { - val _delay = math.max(0, delay) - logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.") - renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS) - } - - /** - * Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself - * to fetch the next set of tokens when needed. - */ - private def updateTokensTask(): Unit = { - try { - val freshUGI = doLogin() - val creds = obtainTokensAndScheduleRenewal(freshUGI) - val tokens = SparkHadoopUtil.get.serialize(creds) - - val driver = driverRef.get() - if (driver != null) { - logInfo("Updating delegation tokens.") - driver.send(UpdateDelegationTokens(tokens)) - } else { - // This shouldn't really happen, since the driver should register way before tokens expire - // (or the AM should time out the application). - logWarning("Delegation tokens close to expiration but no driver has registered yet.") - SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) - } - } catch { - case e: Exception => - val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)) - logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" + - " If this happens too often tasks will fail.", e) - scheduleRenewal(delay) - } - } - - /** - * Obtain new delegation tokens from the available providers. Schedules a new task to fetch - * new tokens before the new set expires. - * - * @return Credentials containing the new tokens. - */ - private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = { - ugi.doAs(new PrivilegedExceptionAction[Credentials]() { - override def run(): Credentials = { - val creds = new Credentials() - val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, creds) - - val timeToWait = SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) - - System.currentTimeMillis() - scheduleRenewal(timeToWait) - creds - } - }) - } - - private def doLogin(): UserGroupInformation = { - logInfo(s"Attempting to login to KDC using principal: $principal") - val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - logInfo("Successfully logged into KDC.") - ugi - } - -} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index 26a2e5d730..2d9a3f0c83 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -22,12 +22,13 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.spark.SparkConf import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils /** @@ -36,27 +37,25 @@ import org.apache.spark.util.Utils * in [[HadoopDelegationTokenManager]]. */ private[yarn] class YARNHadoopDelegationTokenManager( - sparkConf: SparkConf, - hadoopConf: Configuration) extends Logging { + _sparkConf: SparkConf, + _hadoopConf: Configuration) + extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) { - private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, - conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf)) - - // public for testing - val credentialProviders = getCredentialProviders + private val credentialProviders = { + ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) + .asScala + .toList + .filter { p => isServiceEnabled(p.serviceName) } + .map { p => (p.serviceName, p) } + .toMap + } if (credentialProviders.nonEmpty) { logDebug("Using the following YARN-specific credential providers: " + s"${credentialProviders.keys.mkString(", ")}.") } - /** - * Writes delegation tokens to creds. Delegation tokens are fetched from all registered - * providers. - * - * @return Time after which the fetched delegation tokens should be renewed. - */ - def obtainDelegationTokens(hadoopConf: Configuration, creds: Credentials): Long = { - val superInterval = delegationTokenManager.obtainDelegationTokens(hadoopConf, creds) + override def obtainDelegationTokens(creds: Credentials): Long = { + val superInterval = super.obtainDelegationTokens(creds) credentialProviders.values.flatMap { provider => if (provider.credentialsRequired(hadoopConf)) { @@ -69,18 +68,13 @@ private[yarn] class YARNHadoopDelegationTokenManager( }.foldLeft(superInterval)(math.min) } - private def getCredentialProviders: Map[String, ServiceCredentialProvider] = { - val providers = loadCredentialProviders - - providers. - filter { p => delegationTokenManager.isServiceEnabled(p.serviceName) } - .map { p => (p.serviceName, p) } - .toMap + // For testing. + override def isProviderLoaded(serviceName: String): Boolean = { + credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName) } - private def loadCredentialProviders: List[ServiceCredentialProvider] = { - ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) - .asScala - .toList + override protected def fileSystemsToAccess(): Set[FileSystem] = { + YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, hadoopConf) } + } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 63bea3e7a5..67c36aac49 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -19,16 +19,14 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.atomic.{AtomicBoolean} -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future import scala.util.{Failure, Success} import scala.util.control.NonFatal -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -270,7 +268,6 @@ private[spark] abstract class YarnSchedulerBackend( case u @ UpdateDelegationTokens(tokens) => // Add the tokens to the current user and send a message to the scheduler so that it // notifies all registered executors of the new tokens. - SparkHadoopUtil.get.addDelegationTokens(tokens, sc.conf) driverEndpoint.send(u) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 9fa749b14c..98315e4235 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.deploy.yarn.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials -import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite} -class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { +class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite { private var credentialManager: YARNHadoopDelegationTokenManager = null private var sparkConf: SparkConf = null private var hadoopConf: Configuration = null @@ -36,7 +35,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers test("Correctly loads credential providers") { credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - credentialManager.credentialProviders.get("yarn-test") should not be (None) + assert(credentialManager.isProviderLoaded("yarn-test")) } }