[SPARK-23781][CORE] Merge token renewer functionality into HadoopDelegationTokenManager.

This avoids having two classes to deal with tokens; now the above
class is a one-stop shop for dealing with delegation tokens. The
YARN backend extends that class instead of doing composition like
before, resulting in a bit less code there too.

The renewer functionality is basically the same code that used to
be in YARN's AMCredentialRenewer. That is also the reason why the
public API of HadoopDelegationTokenManager is a little bit odd;
the YARN AM has some odd requirements for how this all should be
initialized, and the weirdness is needed currently to support that.

Tested:
- YARN with stress app for DT renewal
- Mesos and K8S with basic kerberos tests (both tgt and keytab)

Closes #22624 from vanzin/SPARK-23781.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2018-10-31 13:00:10 -05:00 committed by Imran Rashid
parent af3b816070
commit 68dde3481e
19 changed files with 358 additions and 627 deletions

View file

@ -731,7 +731,9 @@ private[spark] object SparkConf extends Logging {
KEYTAB.key -> Seq(
AlternateConfig("spark.yarn.keytab", "3.0")),
PRINCIPAL.key -> Seq(
AlternateConfig("spark.yarn.principal", "3.0"))
AlternateConfig("spark.yarn.principal", "3.0")),
KERBEROS_RELOGIN_PERIOD.key -> Seq(
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
)
/**

View file

@ -413,20 +413,6 @@ object SparkHadoopUtil {
def get: SparkHadoopUtil = instance
/**
* Given an expiration date for the current set of credentials, calculate the time when new
* credentials should be created.
*
* @param expirationDate Drop-dead expiration date
* @param conf Spark configuration
* @return Timestamp when new credentials should be created.
*/
private[spark] def nextCredentialRenewalTime(expirationDate: Long, conf: SparkConf): Long = {
val ct = System.currentTimeMillis
val ratio = conf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
(ct + (ratio * (expirationDate - ct))).toLong
}
/**
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a

View file

@ -17,76 +17,158 @@
package org.apache.spark.deploy.security
import java.io.File
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
/**
* Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
* obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
* [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
* explicitly disabled.
* Manager for delegation tokens in a Spark application.
*
* Also, each HadoopDelegationTokenProvider is controlled by
* spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
* false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
* enabled/disabled by the configuration spark.security.credentials.hive.enabled.
* This manager has two modes of operation:
*
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
* 1. When configured with a principal and a keytab, it will make sure long-running apps can run
* without interruption while accessing secured services. It periodically logs in to the KDC with
* user-provided credentials, and contacts all the configured secure services to obtain delegation
* tokens to be distributed to the rest of the application.
*
* Because the Hadoop UGI API does not expose the TTL of the TGT, a configuration controls how often
* to check that a relogin is necessary. This is done reasonably often since the check is a no-op
* when the relogin is not yet needed. The check period can be overridden in the configuration.
*
* New delegation tokens are created once 75% of the renewal interval of the original tokens has
* elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
* The driver is tasked with distributing the tokens to other processes that might need them.
*
* 2. When operating without an explicit principal and keytab, token renewal will not be available.
* Starting the manager will distribute an initial set of delegation tokens to the provided Spark
* driver, but the app will not get new tokens when those expire.
*
* It can also be used just to create delegation tokens, by calling the `obtainDelegationTokens`
* method. This option does not require calling the `start` method, but leaves it up to the
* caller to distribute the tokens that were generated.
*/
private[spark] class HadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
fileSystems: Configuration => Set[FileSystem])
extends Logging {
protected val sparkConf: SparkConf,
protected val hadoopConf: Configuration) extends Logging {
private val deprecatedProviderEnabledConfigs = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
// Maintain all the registered delegation token providers
private val delegationTokenProviders = getDelegationTokenProviders
private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
private val delegationTokenProviders = loadProviders()
logDebug("Using the following builtin delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")
/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
this(
sparkConf,
hadoopConf,
hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
private var renewalExecutor: ScheduledExecutorService = _
private val driverRef = new AtomicReference[RpcEndpointRef]()
/** Set the endpoint used to send tokens to the driver. */
def setDriverRef(ref: RpcEndpointRef): Unit = {
driverRef.set(ref)
}
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
/** @return Whether delegation token renewal is enabled. */
def renewalEnabled: Boolean = principal != null
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
/**
* Start the token renewer. Requires a principal and keytab. Upon start, the renewer will:
*
* - log in the configured principal, and set up a task to keep that user's ticket renewed
* - obtain delegation tokens from all available providers
* - send the tokens to the driver, if it's already registered
* - schedule a periodic task to update the tokens when needed.
*
* @return The newly logged in user.
*/
def start(): UserGroupInformation = {
require(renewalEnabled, "Token renewal must be enabled to start the renewer.")
renewalExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val ugi = doLogin()
val tgtRenewalTask = new Runnable() {
override def run(): Unit = {
ugi.checkTGTAndReloginFromKeytab()
}
}
val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
TimeUnit.SECONDS)
val creds = obtainTokensAndScheduleRenewal(ugi)
ugi.addCredentials(creds)
val driver = driverRef.get()
if (driver != null) {
val tokens = SparkHadoopUtil.get.serialize(creds)
driver.send(UpdateDelegationTokens(tokens))
}
private def safeCreateProvider(
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
try {
Some(createFn)
} catch {
case t: Throwable =>
logDebug(s"Failed to load built in provider.", t)
// Transfer the original user's tokens to the new user, since it may contain needed tokens
// (such as those user to connect to YARN). Explicitly avoid overwriting tokens that already
// exist in the current user's credentials, since those were freshly obtained above
// (see SPARK-23361).
val existing = ugi.getCredentials()
existing.mergeAll(originalCreds)
ugi.addCredentials(existing)
ugi
}
def stop(): Unit = {
if (renewalExecutor != null) {
renewalExecutor.shutdown()
}
}
/**
* Fetch new delegation tokens for configured services, storing them in the given credentials.
* Tokens are fetched for the current logged in user.
*
* @param creds Credentials object where to store the delegation tokens.
* @return The time by which the tokens must be renewed.
*/
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
}
}.foldLeft(Long.MaxValue)(math.min)
}
def isServiceEnabled(serviceName: String): Boolean = {
// Visible for testing.
def isProviderLoaded(serviceName: String): Boolean = {
delegationTokenProviders.contains(serviceName)
}
protected def isServiceEnabled(serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)
deprecatedProviderEnabledConfigs.foreach { pattern =>
@ -110,32 +192,104 @@ private[spark] class HadoopDelegationTokenManager(
}
/**
* Get delegation token provider for the specified service.
* List of file systems for which to obtain delegation tokens. The base implementation
* returns just the default file system in the given Hadoop configuration.
*/
def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
delegationTokenProviders.get(service)
protected def fileSystemsToAccess(): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}
private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
val renewalTask = new Runnable() {
override def run(): Unit = {
updateTokensTask()
}
}
renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
}
/**
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @param hadoopConf hadoop Configuration
* @param creds Credentials that will be updated in place (overwritten)
* @return Time after which the fetched delegation tokens should be renewed.
* Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
* to fetch the next set of tokens when needed.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
private def updateTokensTask(): Unit = {
try {
val freshUGI = doLogin()
val creds = obtainTokensAndScheduleRenewal(freshUGI)
val tokens = SparkHadoopUtil.get.serialize(creds)
val driver = driverRef.get()
if (driver != null) {
logInfo("Updating delegation tokens.")
driver.send(UpdateDelegationTokens(tokens))
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
// This shouldn't really happen, since the driver should register way before tokens expire.
logWarning("Delegation tokens close to expiration but no driver has registered yet.")
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
}
}.foldLeft(Long.MaxValue)(math.min)
} catch {
case e: Exception =>
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
" If this happens too often tasks will fail.", e)
scheduleRenewal(delay)
}
}
/**
* Obtain new delegation tokens from the available providers. Schedules a new task to fetch
* new tokens before the new set expires.
*
* @return Credentials containing the new tokens.
*/
private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
override def run(): Credentials = {
val creds = new Credentials()
val nextRenewal = obtainDelegationTokens(creds)
// Calculate the time when new credentials should be created, based on the configured
// ratio.
val now = System.currentTimeMillis
val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
val delay = (ratio * (nextRenewal - now)).toLong
scheduleRenewal(delay)
creds
}
})
}
private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
}
private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}
private def safeCreateProvider(
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
try {
Some(createFn)
} catch {
case t: Throwable =>
logDebug(s"Failed to load built in provider.", t)
None
}
}
}

View file

@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem])
private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem])
extends HadoopDelegationTokenProvider with Logging {
// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
@ -44,8 +44,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
val fsToGetTokens = fileSystems(hadoopConf)
val fsToGetTokens = fileSystems()
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds)
// Get the token renewal interval if it is not set. It will only be called once.

View file

@ -179,6 +179,10 @@ package object config {
.doc("Name of the Kerberos principal.")
.stringConf.createOptional
private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.kerberos.relogin.period")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1m")
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional

View file

@ -18,13 +18,17 @@
package org.apache.spark.scheduler.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0
// Current set of delegation tokens to send to executors.
private val delegationTokens = new AtomicReference[Array[Byte]]()
// The token manager used to create security tokens.
private var delegationTokenManager: Option[HadoopDelegationTokenManager] = None
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@ -152,6 +162,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
case UpdateDelegationTokens(newDelegationTokens) =>
SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf)
delegationTokens.set(newDelegationTokens)
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
@ -230,7 +242,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
fetchHadoopDelegationTokens())
Option(delegationTokens.get()))
context.reply(reply)
}
@ -390,6 +402,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager = createTokenManager()
delegationTokenManager.foreach { dtm =>
dtm.setDriverRef(driverEndpoint)
val creds = if (dtm.renewalEnabled) {
dtm.start().getCredentials()
} else {
val creds = UserGroupInformation.getCurrentUser().getCredentials()
dtm.obtainDelegationTokens(creds)
creds
}
delegationTokens.set(SparkHadoopUtil.get.serialize(creds))
}
}
}
protected def createDriverEndpointRef(
@ -416,6 +443,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def stop() {
reviveThread.shutdownNow()
stopExecutors()
delegationTokenManager.foreach(_.stop())
try {
if (driverEndpoint != null) {
driverEndpoint.askSync[Boolean](StopDriver)
@ -684,7 +712,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
true
}
protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
/**
* Create the delegation token manager to be used for the application. This method is called
* once during the start of the scheduler backend (so after the object has already been
* fully constructed), only if security is enabled in the Hadoop configuration.
*/
protected def createTokenManager(): Option[HadoopDelegationTokenManager] = None
}
private[spark] object CoarseGrainedSchedulerBackend {

View file

@ -21,94 +21,36 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
private var delegationTokenManager: HadoopDelegationTokenManager = null
private var sparkConf: SparkConf = null
private var hadoopConf: Configuration = null
class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
private val hadoopConf = new Configuration()
override def beforeAll(): Unit = {
super.beforeAll()
sparkConf = new SparkConf()
hadoopConf = new Configuration()
}
test("Correctly load default credential providers") {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("bogus") should be (None)
test("default configuration") {
val manager = new HadoopDelegationTokenManager(new SparkConf(false), hadoopConf)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(manager.isProviderLoaded("hive"))
}
test("disable hive credential provider") {
sparkConf.set("spark.security.credentials.hive.enabled", "false")
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
val sparkConf = new SparkConf(false).set("spark.security.credentials.hive.enabled", "false")
val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
}
test("using deprecated configurations") {
sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
hadoopFSsToAccess)
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
}
test("verify no credentials are obtained") {
delegationTokenManager = new HadoopDelegationTokenManager(
sparkConf,
hadoopConf,
hadoopFSsToAccess)
val creds = new Credentials()
// Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
val tokens = creds.getAllTokens
tokens.size() should be (0)
}
test("obtain tokens For HiveMetastore") {
val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.kerberos.principal", "bob")
// thrift picks up on port 0 and bails out, without trying to talk to endpoint
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
val hiveCredentialProvider = new HiveDelegationTokenProvider()
val credentials = new Credentials()
hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
credentials.getAllTokens.size() should be (0)
}
test("Obtain tokens For HBase") {
val hadoopConf = new Configuration()
hadoopConf.set("hbase.security.authentication", "kerberos")
val hbaseTokenProvider = new HBaseDelegationTokenProvider()
val creds = new Credentials()
hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
creds.getAllTokens.size should be (0)
val sparkConf = new SparkConf(false)
.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
.set("spark.yarn.security.credentials.hive.enabled", "false")
val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
}
test("SPARK-23209: obtain tokens when Hive classes are not available") {
@ -123,43 +65,41 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
throw new ClassNotFoundException(name)
}
if (name.startsWith("java") || name.startsWith("scala")) {
currentLoader.loadClass(name)
} else {
val prefixBlacklist = Seq("java", "scala", "com.sun.", "sun.")
if (prefixBlacklist.exists(name.startsWith(_))) {
return currentLoader.loadClass(name)
}
val found = findLoadedClass(name)
if (found != null) {
return found
}
val classFileName = name.replaceAll("\\.", "/") + ".class"
val in = currentLoader.getResourceAsStream(classFileName)
if (in != null) {
val bytes = IOUtils.toByteArray(in)
defineClass(name, bytes, 0, bytes.length)
} else {
return defineClass(name, bytes, 0, bytes.length)
}
throw new ClassNotFoundException(name)
}
}
}
}
try {
Thread.currentThread().setContextClassLoader(noHive)
Utils.withContextClassLoader(noHive) {
val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
test.getMethod("runTest").invoke(null)
} finally {
Thread.currentThread().setContextClassLoader(currentLoader)
}
}
private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}
}
/** Test code for SPARK-23209 to avoid using too much reflection above. */
private object NoHiveTest extends Matchers {
private object NoHiveTest {
def runTest(): Unit = {
try {
val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(),
_ => Set())
manager.getServiceDelegationTokenProvider("hive") should be (None)
val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration())
require(!manager.isProviderLoaded("hive"))
} catch {
case e: Throwable =>
// Throw a better exception in case the test fails, since there may be a lot of nesting.

View file

@ -27,7 +27,6 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config.ConfigEntry
@ -79,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(new HadoopDelegationTokenManager(conf, hConf))
new KubernetesHadoopDelegationTokenManager(conf, hConf)
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

View file

@ -38,16 +38,14 @@ private[spark] object HadoopKerberosLogin {
submissionSparkConf: SparkConf,
kubernetesResourceNamePrefix: String,
tokenManager: KubernetesHadoopDelegationTokenManager): KerberosConfigSpec = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
// The JobUserUGI will be taken fom the Local Ticket Cache or via keytab+principal
// The login happens in the SparkSubmit so login logic is not necessary to include
val jobUserUGI = tokenManager.getCurrentUser
val originalCredentials = jobUserUGI.getCredentials
val (tokenData, renewalInterval) = tokenManager.getDelegationTokens(
originalCredentials,
submissionSparkConf,
hadoopConf)
require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
tokenManager.obtainDelegationTokens(originalCredentials)
val tokenData = SparkHadoopUtil.get.serialize(originalCredentials)
val initialTokenDataKeyName = KERBEROS_SECRET_KEY
val newSecretName = s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
val secretDT =

View file

@ -18,45 +18,20 @@
package org.apache.spark.deploy.k8s.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
/**
* The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
* on the behalf of the Kubernetes submission client. The new credentials
* (called Tokens when they are serialized) are stored in Secrets accessible
* to the driver and executors, when new Tokens are received they overwrite the current Secrets.
* Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
*/
private[spark] class KubernetesHadoopDelegationTokenManager(
tokenManager: HadoopDelegationTokenManager) extends Logging {
_sparkConf: SparkConf,
_hadoopConf: Configuration)
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
// HadoopUGI Util methods
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def getShortUserName: String = getCurrentUser.getShortUserName
def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
def serializeCreds(creds: Credentials): Array[Byte] = SparkHadoopUtil.get.serialize(creds)
def nextRT(rt: Long, conf: SparkConf): Long = SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)
def getDelegationTokens(
creds: Credentials,
conf: SparkConf,
hadoopConf: Configuration): (Array[Byte], Long) = {
try {
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
logDebug(s"Initialized tokens")
(serializeCreds(creds), nextRT(rt, conf))
} catch {
case e: Exception =>
logError(s"Failed to fetch Hadoop delegation tokens $e")
throw e
}
}
}

View file

@ -26,12 +26,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
@ -60,9 +60,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
@ -678,7 +675,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
launcherBackend.close()
}
private def stopSchedulerBackend() {
private def stopSchedulerBackend(): Unit = {
// Make sure we're not launching tasks during shutdown
stateLock.synchronized {
if (stopCalled) {
@ -777,6 +774,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
}
override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
}
private def numExecutors(): Int = {
slaves.values.map(_.taskIDs.size).sum
}
@ -789,14 +790,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer.getHostname
}
}
override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopDelegationTokenManager.getTokens())
} else {
None
}
}
}
private class Slave(val hostname: String) {

View file

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
/**
* The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
* of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
* and similarly will renew the Credentials when 75% of the renewal interval has passed.
* The principal difference is that instead of writing the new credentials to HDFS and
* incrementing the timestamp of the file, the new credentials (called Tokens when they are
* serialized) are broadcast to all running executors. On the executor side, when new Tokens are
* received they overwrite the current credentials.
*/
private[spark] class MesosHadoopDelegationTokenManager(
conf: SparkConf,
hadoopConfig: Configuration,
driverEndpoint: RpcEndpointRef)
extends Logging {
require(driverEndpoint != null, "DriverEndpoint is not initialized")
private val credentialRenewerThread: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")
private val tokenManager: HadoopDelegationTokenManager =
new HadoopDelegationTokenManager(conf, hadoopConfig)
private val principal: String = conf.get(config.PRINCIPAL).orNull
private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
try {
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
(SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.nextCredentialRenewalTime(rt, conf))
} catch {
case e: Exception =>
logError(s"Failed to fetch Hadoop delegation tokens $e")
throw e
}
}
private val keytabFile: Option[String] = conf.get(config.KEYTAB)
scheduleTokenRenewal()
private def scheduleTokenRenewal(): Unit = {
if (keytabFile.isDefined) {
require(principal != null, "Principal is required for Keytab-based authentication")
logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
} else {
logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
return
}
def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
if (remainingTime <= 0) {
logInfo("Credentials have expired, creating new ones now.")
runnable.run()
} else {
logInfo(s"Scheduling login from keytab in $remainingTime millis.")
credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
}
}
val credentialRenewerRunnable =
new Runnable {
override def run(): Unit = {
try {
getNewDelegationTokens()
broadcastDelegationTokens(tokens)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
val delay = TimeUnit.SECONDS.toMillis(conf.get(config.CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(
s"Couldn't broadcast tokens, trying again in ${UIUtils.formatDuration(delay)}", e)
credentialRenewerThread.schedule(this, delay, TimeUnit.MILLISECONDS)
return
}
scheduleRenewal(this)
}
}
scheduleRenewal(credentialRenewerRunnable)
}
private def getNewDelegationTokens(): Unit = {
logInfo(s"Attempting to login to KDC with principal ${principal}")
// Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala
// Don't protect against keytabFile being empty because it's guarded above.
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
logInfo("Successfully logged into KDC")
val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] {
override def run(): Long = {
tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
}
})
val currTime = System.currentTimeMillis()
timeOfNextRenewal = if (nextRenewalTime <= currTime) {
logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " +
s"current time ($currTime), which is unexpected, please check your credential renewal " +
"related configurations in the target services.")
currTime
} else {
SparkHadoopUtil.nextCredentialRenewalTime(nextRenewalTime, conf)
}
logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")
// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
// update tokens for late or dynamically added executors
tokens = SparkHadoopUtil.get.serialize(tempCreds)
}
private def broadcastDelegationTokens(tokens: Array[Byte]) = {
logInfo("Sending new tokens to all executors")
driverEndpoint.send(UpdateDelegationTokens(tokens))
}
def getTokens(): Array[Byte] = {
tokens
}
}

View file

@ -41,7 +41,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.AMCredentialRenewer
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
@ -99,20 +99,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}
private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map { _ =>
new AMCredentialRenewer(sparkConf, yarnConf)
private val tokenManager: Option[YARNHadoopDelegationTokenManager] = {
sparkConf.get(KEYTAB).map { _ =>
new YARNHadoopDelegationTokenManager(sparkConf, yarnConf)
}
}
private val ugi = credentialRenewer match {
case Some(cr) =>
private val ugi = tokenManager match {
case Some(tm) =>
// Set the context class loader so that the token renewer has access to jars distributed
// by the user.
val currentLoader = Thread.currentThread().getContextClassLoader()
Thread.currentThread().setContextClassLoader(userClassLoader)
try {
cr.start()
} finally {
Thread.currentThread().setContextClassLoader(currentLoader)
Utils.withContextClassLoader(userClassLoader) {
tm.start()
}
case _ =>
@ -380,7 +378,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
userClassThread.interrupt()
}
if (!inShutdown) {
credentialRenewer.foreach(_.stop())
tokenManager.foreach(_.stop())
}
}
}
@ -440,7 +438,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
securityMgr,
localResources)
credentialRenewer.foreach(_.setDriverRef(driverRef))
tokenManager.foreach(_.setDriverRef(driverRef))
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),

View file

@ -310,7 +310,7 @@ private[spark] class Client(
private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
credentialManager.obtainDelegationTokens(hadoopConf, credentials)
credentialManager.obtainDelegationTokens(credentials)
// When using a proxy user, copy the delegation tokens to the user's credentials. Avoid
// that for regular users, since in those case the user already has access to the TGT,

View file

@ -325,10 +325,6 @@ package object config {
.stringConf
.createOptional
private[spark] val KERBEROS_RELOGIN_PERIOD = ConfigBuilder("spark.yarn.kerberos.relogin.period")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("1m")
// The list of cache-related config entries. This is used by Client and the AM to clean
// up the environment so that these settings do not appear on the web UI.
private[yarn] val CACHE_CONFIGS = Seq(

View file

@ -1,177 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn.security
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
/**
* A manager tasked with periodically updating delegation tokens needed by the application.
*
* This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run
* without interruption while accessing secured services. It periodically logs in to the KDC with
* user-provided credentials, and contacts all the configured secure services to obtain delegation
* tokens to be distributed to the rest of the application.
*
* This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API
* does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is
* necessary. This is done reasonably often since the check is a no-op when the relogin is not yet
* needed. The check period can be overridden in the configuration.
*
* New delegation tokens are created once 75% of the renewal interval of the original tokens has
* elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM.
* The driver is tasked with distributing the tokens to other processes that might need them.
*/
private[yarn] class AMCredentialRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration) extends Logging {
private val principal = sparkConf.get(PRINCIPAL).get
private val keytab = sparkConf.get(KEYTAB).get
private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
private val renewalExecutor: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread")
private val driverRef = new AtomicReference[RpcEndpointRef]()
private val renewalTask = new Runnable() {
override def run(): Unit = {
updateTokensTask()
}
}
def setDriverRef(ref: RpcEndpointRef): Unit = {
driverRef.set(ref)
}
/**
* Start the token renewer. Upon start, the renewer will:
*
* - log in the configured user, and set up a task to keep that user's ticket renewed
* - obtain delegation tokens from all available providers
* - schedule a periodic task to update the tokens when needed.
*
* @return The newly logged in user.
*/
def start(): UserGroupInformation = {
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val ugi = doLogin()
val tgtRenewalTask = new Runnable() {
override def run(): Unit = {
ugi.checkTGTAndReloginFromKeytab()
}
}
val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, tgtRenewalPeriod,
TimeUnit.SECONDS)
val creds = obtainTokensAndScheduleRenewal(ugi)
ugi.addCredentials(creds)
// Transfer the original user's tokens to the new user, since that's needed to connect to
// YARN. Explicitly avoid overwriting tokens that already exist in the current user's
// credentials, since those were freshly obtained above (see SPARK-23361).
val existing = ugi.getCredentials()
existing.mergeAll(originalCreds)
ugi.addCredentials(existing)
ugi
}
def stop(): Unit = {
renewalExecutor.shutdown()
}
private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling login from keytab in ${UIUtils.formatDuration(delay)}.")
renewalExecutor.schedule(renewalTask, _delay, TimeUnit.MILLISECONDS)
}
/**
* Periodic task to login to the KDC and create new delegation tokens. Re-schedules itself
* to fetch the next set of tokens when needed.
*/
private def updateTokensTask(): Unit = {
try {
val freshUGI = doLogin()
val creds = obtainTokensAndScheduleRenewal(freshUGI)
val tokens = SparkHadoopUtil.get.serialize(creds)
val driver = driverRef.get()
if (driver != null) {
logInfo("Updating delegation tokens.")
driver.send(UpdateDelegationTokens(tokens))
} else {
// This shouldn't really happen, since the driver should register way before tokens expire
// (or the AM should time out the application).
logWarning("Delegation tokens close to expiration but no driver has registered yet.")
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
}
} catch {
case e: Exception =>
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
logWarning(s"Failed to update tokens, will try again in ${UIUtils.formatDuration(delay)}!" +
" If this happens too often tasks will fail.", e)
scheduleRenewal(delay)
}
}
/**
* Obtain new delegation tokens from the available providers. Schedules a new task to fetch
* new tokens before the new set expires.
*
* @return Credentials containing the new tokens.
*/
private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
override def run(): Credentials = {
val creds = new Credentials()
val nextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, creds)
val timeToWait = SparkHadoopUtil.nextCredentialRenewalTime(nextRenewal, sparkConf) -
System.currentTimeMillis()
scheduleRenewal(timeToWait)
creds
}
})
}
private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
}
}

View file

@ -22,12 +22,13 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkConf
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
/**
@ -36,27 +37,25 @@ import org.apache.spark.util.Utils
* in [[HadoopDelegationTokenManager]].
*/
private[yarn] class YARNHadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration) extends Logging {
_sparkConf: SparkConf,
_hadoopConf: Configuration)
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf,
conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf))
// public for testing
val credentialProviders = getCredentialProviders
private val credentialProviders = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala
.toList
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}
if (credentialProviders.nonEmpty) {
logDebug("Using the following YARN-specific credential providers: " +
s"${credentialProviders.keys.mkString(", ")}.")
}
/**
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @return Time after which the fetched delegation tokens should be renewed.
*/
def obtainDelegationTokens(hadoopConf: Configuration, creds: Credentials): Long = {
val superInterval = delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
override def obtainDelegationTokens(creds: Credentials): Long = {
val superInterval = super.obtainDelegationTokens(creds)
credentialProviders.values.flatMap { provider =>
if (provider.credentialsRequired(hadoopConf)) {
@ -69,18 +68,13 @@ private[yarn] class YARNHadoopDelegationTokenManager(
}.foldLeft(superInterval)(math.min)
}
private def getCredentialProviders: Map[String, ServiceCredentialProvider] = {
val providers = loadCredentialProviders
providers.
filter { p => delegationTokenManager.isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
// For testing.
override def isProviderLoaded(serviceName: String): Boolean = {
credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName)
}
private def loadCredentialProviders: List[ServiceCredentialProvider] = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala
.toList
override protected def fileSystemsToAccess(): Set[FileSystem] = {
YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, hadoopConf)
}
}

View file

@ -19,16 +19,14 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.atomic.{AtomicBoolean}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
import org.apache.spark.SparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@ -270,7 +268,6 @@ private[spark] abstract class YarnSchedulerBackend(
case u @ UpdateDelegationTokens(tokens) =>
// Add the tokens to the current user and send a message to the scheduler so that it
// notifies all registered executors of the new tokens.
SparkHadoopUtil.get.addDelegationTokens(tokens, sc.conf)
driverEndpoint.send(u)
}

View file

@ -19,11 +19,10 @@ package org.apache.spark.deploy.yarn.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite}
class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite {
private var credentialManager: YARNHadoopDelegationTokenManager = null
private var sparkConf: SparkConf = null
private var hadoopConf: Configuration = null
@ -36,7 +35,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers
test("Correctly loads credential providers") {
credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
credentialManager.credentialProviders.get("yarn-test") should not be (None)
assert(credentialManager.isProviderLoaded("yarn-test"))
}
}