[SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive 1.2.1 in a secure cluster
This is a fix for SPARK-11265; the introspection code to get Hive delegation tokens failing on Spark 1.5.1+, due to changes in the Hive codebase Author: Steve Loughran <stevel@hortonworks.com> Closes #9232 from steveloughran/stevel/patches/SPARK-11265-hive-tokens.
This commit is contained in:
parent
fc27dfbf0f
commit
40d3c6797a
25
yarn/pom.xml
25
yarn/pom.xml
|
@ -162,6 +162,31 @@
|
|||
<artifactId>jersey-server</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!--
|
||||
Testing Hive reflection needs hive on the test classpath only.
|
||||
It doesn't need the spark hive modules, so the -Phive flag is not checked.
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>${hive.group}</groupId>
|
||||
<artifactId>hive-exec</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>${hive.group}</groupId>
|
||||
<artifactId>hive-metastore</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libthrift</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libfb303</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1337,55 +1337,8 @@ object Client extends Logging {
|
|||
conf: Configuration,
|
||||
credentials: Credentials) {
|
||||
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
|
||||
val mirror = universe.runtimeMirror(getClass.getClassLoader)
|
||||
|
||||
try {
|
||||
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
|
||||
val hiveConf = hiveConfClass.newInstance()
|
||||
|
||||
val hiveConfGet = (param: String) => Option(hiveConfClass
|
||||
.getMethod("get", classOf[java.lang.String])
|
||||
.invoke(hiveConf, param))
|
||||
|
||||
val metastore_uri = hiveConfGet("hive.metastore.uris")
|
||||
|
||||
// Check for local metastore
|
||||
if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
|
||||
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
|
||||
val hive = hiveClass.getMethod("get").invoke(null, hiveConf.asInstanceOf[Object])
|
||||
|
||||
val metastore_kerberos_principal_conf_var = mirror.classLoader
|
||||
.loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
|
||||
.getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
|
||||
|
||||
val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
|
||||
|
||||
val username = Option(UserGroupInformation.getCurrentUser().getUserName)
|
||||
if (principal != None && username != None) {
|
||||
val tokenStr = hiveClass.getMethod("getDelegationToken",
|
||||
classOf[java.lang.String], classOf[java.lang.String])
|
||||
.invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
|
||||
|
||||
val hive2Token = new Token[DelegationTokenIdentifier]()
|
||||
hive2Token.decodeFromUrlString(tokenStr)
|
||||
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
|
||||
logDebug("Added hive.Server2.delegation.token to conf.")
|
||||
hiveClass.getMethod("closeCurrent").invoke(null)
|
||||
} else {
|
||||
logError("Username or principal == NULL")
|
||||
logError(s"""username=${username.getOrElse("(NULL)")}""")
|
||||
logError(s"""principal=${principal.getOrElse("(NULL)")}""")
|
||||
throw new IllegalArgumentException("username and/or principal is equal to null!")
|
||||
}
|
||||
} else {
|
||||
logDebug("HiveMetaStore configured in localmode")
|
||||
}
|
||||
} catch {
|
||||
case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
|
||||
case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
|
||||
case e: Exception => { logError("Unexpected Exception " + e)
|
||||
throw new RuntimeException("Unexpected exception", e)
|
||||
}
|
||||
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
|
||||
credentials.addToken(new Text("hive.server2.delegation.token"), _)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,14 +22,17 @@ import java.util.regex.Matcher
|
|||
import java.util.regex.Pattern
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
import scala.reflect.runtime._
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.mapred.{Master, JobConf}
|
||||
import org.apache.hadoop.security.Credentials
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.token.Token
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
|
||||
|
@ -142,6 +145,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
|
|||
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
|
||||
ConverterUtils.toContainerId(containerIdString)
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains token for the Hive metastore, using the current user as the principal.
|
||||
* Some exceptions are caught and downgraded to a log message.
|
||||
* @param conf hadoop configuration; the Hive configuration will be based on this
|
||||
* @return a token, or `None` if there's no need for a token (no metastore URI or principal
|
||||
* in the config), or if a binding exception was caught and downgraded.
|
||||
*/
|
||||
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
|
||||
try {
|
||||
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
|
||||
} catch {
|
||||
case e: ClassNotFoundException =>
|
||||
logInfo(s"Hive class not found $e")
|
||||
logDebug("Hive class not found", e)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
|
||||
* @param conf hadoop configuration; the Hive configuration will be based on this.
|
||||
* @param username the username of the principal requesting the delegating token.
|
||||
* @return a delegation token
|
||||
*/
|
||||
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
|
||||
username: String): Option[Token[DelegationTokenIdentifier]] = {
|
||||
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
|
||||
|
||||
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
|
||||
// to a Configuration and used without reflection
|
||||
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
|
||||
// using the (Configuration, Class) constructor allows the current configuratin to be included
|
||||
// in the hive config.
|
||||
val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
|
||||
classOf[Object].getClass)
|
||||
val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
|
||||
val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
|
||||
|
||||
// Check for local metastore
|
||||
if (metastoreUri.nonEmpty) {
|
||||
require(username.nonEmpty, "Username undefined")
|
||||
val principalKey = "hive.metastore.kerberos.principal"
|
||||
val principal = hiveConf.getTrimmed(principalKey, "")
|
||||
require(principal.nonEmpty, "Hive principal $principalKey undefined")
|
||||
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
|
||||
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
|
||||
val closeCurrent = hiveClass.getMethod("closeCurrent")
|
||||
try {
|
||||
// get all the instance methods before invoking any
|
||||
val getDelegationToken = hiveClass.getMethod("getDelegationToken",
|
||||
classOf[String], classOf[String])
|
||||
val getHive = hiveClass.getMethod("get", hiveConfClass)
|
||||
|
||||
// invoke
|
||||
val hive = getHive.invoke(null, hiveConf)
|
||||
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
|
||||
val hive2Token = new Token[DelegationTokenIdentifier]()
|
||||
hive2Token.decodeFromUrlString(tokenStr)
|
||||
Some(hive2Token)
|
||||
} finally {
|
||||
Utils.tryLogNonFatalError {
|
||||
closeCurrent.invoke(null)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logDebug("HiveMetaStore configured in localmode")
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object YarnSparkHadoopUtil {
|
||||
|
|
|
@ -18,10 +18,12 @@
|
|||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.io.{File, IOException}
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
|
||||
import com.google.common.io.{ByteStreams, Files}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hive.ql.metadata.HiveException
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||
|
@ -245,4 +247,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
|
|||
System.clearProperty("SPARK_YARN_MODE")
|
||||
}
|
||||
}
|
||||
|
||||
test("Obtain tokens For HiveMetastore") {
|
||||
val hadoopConf = new Configuration()
|
||||
hadoopConf.set("hive.metastore.kerberos.principal", "bob")
|
||||
// thrift picks up on port 0 and bails out, without trying to talk to endpoint
|
||||
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
|
||||
val util = new YarnSparkHadoopUtil
|
||||
assertNestedHiveException(intercept[InvocationTargetException] {
|
||||
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
|
||||
})
|
||||
// expect exception trapping code to unwind this hive-side exception
|
||||
assertNestedHiveException(intercept[InvocationTargetException] {
|
||||
util.obtainTokenForHiveMetastore(hadoopConf)
|
||||
})
|
||||
}
|
||||
|
||||
def assertNestedHiveException(e: InvocationTargetException): Throwable = {
|
||||
val inner = e.getCause
|
||||
if (inner == null) {
|
||||
fail("No inner cause", e)
|
||||
}
|
||||
if (!inner.isInstanceOf[HiveException]) {
|
||||
fail("Not a hive exception", inner)
|
||||
}
|
||||
inner
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue