[SPARK-29082][CORE] Skip delegation token generation if no credentials are available
This situation can happen when an external system (e.g. Oozie) generates delegation tokens for a Spark application. The Spark driver will then run against secured services, have proper credentials (the tokens), but no kerberos credentials. So trying to do things that requires a kerberos credential fails. Instead, if no kerberos credentials are detected, just skip the whole delegation token code. Tested with an application that simulates Oozie; fails before the fix, passes with the fix. Also with other DT-related tests to make sure other functionality keeps working. Closes #25805 from vanzin/SPARK-29082. Authored-by: Marcelo Vanzin <vanzin@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
db9e0fda6b
commit
f32f16fd68
|
@ -140,6 +140,13 @@ private[spark] class HadoopDelegationTokenManager(
|
|||
* @param creds Credentials object where to store the delegation tokens.
|
||||
*/
|
||||
def obtainDelegationTokens(creds: Credentials): Unit = {
|
||||
val currentUser = UserGroupInformation.getCurrentUser()
|
||||
val hasKerberosCreds = principal != null ||
|
||||
Option(currentUser.getRealUser()).getOrElse(currentUser).hasKerberosCredentials()
|
||||
|
||||
// Delegation tokens can only be obtained if the real user has Kerberos credentials, so
|
||||
// skip creation when those are not available.
|
||||
if (hasKerberosCreds) {
|
||||
val freshUGI = doLogin()
|
||||
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
|
||||
override def run(): Unit = {
|
||||
|
@ -148,6 +155,7 @@ private[spark] class HadoopDelegationTokenManager(
|
|||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch new delegation tokens for configured services.
|
||||
|
|
|
@ -427,13 +427,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
|
|||
val ugi = UserGroupInformation.getCurrentUser()
|
||||
val tokens = if (dtm.renewalEnabled) {
|
||||
dtm.start()
|
||||
} else if (ugi.hasKerberosCredentials() || SparkHadoopUtil.get.isProxyUser(ugi)) {
|
||||
} else {
|
||||
val creds = ugi.getCredentials()
|
||||
dtm.obtainDelegationTokens(creds)
|
||||
if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
|
||||
SparkHadoopUtil.get.serialize(creds)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
if (tokens != null) {
|
||||
updateDelegationTokens(tokens)
|
||||
}
|
||||
|
|
|
@ -17,10 +17,14 @@
|
|||
|
||||
package org.apache.spark.deploy.security
|
||||
|
||||
import java.security.PrivilegedExceptionAction
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.security.Credentials
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
|
||||
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.security.HadoopDelegationTokenProvider
|
||||
|
||||
private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
|
||||
|
@ -69,4 +73,37 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
|
|||
assert(!manager.isProviderLoaded("hadoopfs"))
|
||||
assert(manager.isProviderLoaded("hbase"))
|
||||
}
|
||||
|
||||
test("SPARK-29082: do not fail if current user does not have credentials") {
|
||||
// SparkHadoopUtil overrides the UGI configuration during initialization. That normally
|
||||
// happens early in the Spark application, but here it may affect the test depending on
|
||||
// how it's run, so force its initialization.
|
||||
SparkHadoopUtil.get
|
||||
|
||||
val krbConf = new Configuration()
|
||||
krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
|
||||
|
||||
UserGroupInformation.setConfiguration(krbConf)
|
||||
try {
|
||||
val manager = new HadoopDelegationTokenManager(new SparkConf(false), krbConf, null)
|
||||
val testImpl = new PrivilegedExceptionAction[Unit] {
|
||||
override def run(): Unit = {
|
||||
assert(UserGroupInformation.isSecurityEnabled())
|
||||
val creds = new Credentials()
|
||||
manager.obtainDelegationTokens(creds)
|
||||
assert(creds.numberOfTokens() === 0)
|
||||
assert(creds.numberOfSecretKeys() === 0)
|
||||
}
|
||||
}
|
||||
|
||||
val realUser = UserGroupInformation.createUserForTesting("realUser", Array.empty)
|
||||
realUser.doAs(testImpl)
|
||||
|
||||
val proxyUser = UserGroupInformation.createProxyUserForTesting("proxyUser", realUser,
|
||||
Array.empty)
|
||||
proxyUser.doAs(testImpl)
|
||||
} finally {
|
||||
UserGroupInformation.reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue