[SPARK-12241][YARN] Improve failure reporting in Yarn client obtainTokenForHBase()

This lines up the HBase token logic with that done for Hive in SPARK-11265: reflection with only CFNE being swallowed.

There is a test, one which doesn't try to put HBase on the yarn/test class and really do the reflection (the way the hive introspection does). If people do want that then it could be added with careful POM work

+also: cut an incorrect comment from the Hive test case before copying it, and a couple of imports that may have been related to the hive test in the past.

Author: Steve Loughran <stevel@hortonworks.com>

Closes #10227 from steveloughran/stevel/patches/SPARK-12241-obtainTokenForHBase.
This commit is contained in:
Steve Loughran 2015-12-09 10:25:38 -08:00 committed by Marcelo Vanzin
parent 6900f01737
commit 442a7715a5
3 changed files with 64 additions and 31 deletions

View file

@ -1369,41 +1369,17 @@ object Client extends Logging {
}
/**
* Obtain security token for HBase.
* Obtain a security token for HBase.
*/
def obtainTokenForHBase(
sparkConf: SparkConf,
conf: Configuration,
credentials: Credentials): Unit = {
if (shouldGetTokens(sparkConf, "hbase") && UserGroupInformation.isSecurityEnabled) {
val mirror = universe.runtimeMirror(getClass.getClassLoader)
try {
val confCreate = mirror.classLoader.
loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
getMethod("create", classOf[Configuration])
val obtainToken = mirror.classLoader.
loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
getMethod("obtainToken", classOf[Configuration])
logDebug("Attempting to fetch HBase security token.")
val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
val token = obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]]
YarnSparkHadoopUtil.get.obtainTokenForHBase(conf).foreach { token =>
credentials.addToken(token.getService, token)
logInfo("Added HBase security token to credentials.")
}
} catch {
case e: java.lang.NoSuchMethodException =>
logInfo("HBase Method not found: " + e)
case e: java.lang.ClassNotFoundException =>
logDebug("HBase Class not found: " + e)
case e: java.lang.NoClassDefFoundError =>
logDebug("HBase Class not found: " + e)
case e: Exception =>
logError("Exception when obtaining HBase security token: " + e)
}
}
}

View file

@ -33,7 +33,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@ -216,6 +216,55 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
None
}
}
/**
* Obtain a security token for HBase.
*
* Requirements
*
* 1. `"hbase.security.authentication" == "kerberos"`
* 2. The HBase classes `HBaseConfiguration` and `TokenUtil` could be loaded
* and invoked.
*
* @param conf Hadoop configuration; an HBase configuration is created
* from this.
* @return a token if the requirements were met, `None` if not.
*/
def obtainTokenForHBase(conf: Configuration): Option[Token[TokenIdentifier]] = {
try {
obtainTokenForHBaseInner(conf)
} catch {
case e: ClassNotFoundException =>
logInfo(s"HBase class not found $e")
logDebug("HBase class not found", e)
None
}
}
/**
* Obtain a security token for HBase if `"hbase.security.authentication" == "kerberos"`
*
* @param conf Hadoop configuration; an HBase configuration is created
* from this.
* @return a token if one was needed
*/
def obtainTokenForHBaseInner(conf: Configuration): Option[Token[TokenIdentifier]] = {
val mirror = universe.runtimeMirror(getClass.getClassLoader)
val confCreate = mirror.classLoader.
loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
getMethod("create", classOf[Configuration])
val obtainToken = mirror.classLoader.
loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
getMethod("obtainToken", classOf[Configuration])
val hbaseConf = confCreate.invoke(null, conf).asInstanceOf[Configuration]
if ("kerberos" == hbaseConf.get("hbase.security.authentication")) {
logDebug("Attempting to fetch HBase security token.")
Some(obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]])
} else {
None
}
}
}
object YarnSparkHadoopUtil {

View file

@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.io.Text
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
@ -259,7 +258,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
})
// expect exception trapping code to unwind this hive-side exception
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastore(hadoopConf)
})
@ -276,6 +274,16 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
inner
}
test("Obtain tokens For HBase") {
val hadoopConf = new Configuration()
hadoopConf.set("hbase.security.authentication", "kerberos")
val util = new YarnSparkHadoopUtil
intercept[ClassNotFoundException] {
util.obtainTokenForHBaseInner(hadoopConf)
}
util.obtainTokenForHBase(hadoopConf) should be (None)
}
// This test needs to live here because it depends on isYarnMode returning true, which can only
// happen in the YARN module.
test("security manager token generation") {