[SPARK-13076][SQL] Rename ClientInterface -> HiveClient
And ClientWrapper -> HiveClientImpl. I have some followup pull requests to introduce a new internal catalog, and I think this new naming reflects better the functionality of the two classes. Author: Reynold Xin <rxin@databricks.com> Closes #10981 from rxin/SPARK-13076.
This commit is contained in:
parent
e38b0baa38
commit
2cbc412821
|
@ -404,7 +404,7 @@ case class DescribeFunction(
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
case None => Seq(Row(s"Function: $functionName is not found."))
|
case None => Seq(Row(s"Function: $functionName not found."))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
||||||
"Extended Usage")
|
"Extended Usage")
|
||||||
|
|
||||||
checkExistence(sql("describe functioN abcadf"), true,
|
checkExistence(sql("describe functioN abcadf"), true,
|
||||||
"Function: abcadf is not found.")
|
"Function: abcadf not found.")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-6743: no columns from cache") {
|
test("SPARK-6743: no columns from cache") {
|
||||||
|
|
|
@ -79,8 +79,8 @@ class HiveContext private[hive](
|
||||||
sc: SparkContext,
|
sc: SparkContext,
|
||||||
cacheManager: CacheManager,
|
cacheManager: CacheManager,
|
||||||
listener: SQLListener,
|
listener: SQLListener,
|
||||||
@transient private val execHive: ClientWrapper,
|
@transient private val execHive: HiveClientImpl,
|
||||||
@transient private val metaHive: ClientInterface,
|
@transient private val metaHive: HiveClient,
|
||||||
isRootContext: Boolean)
|
isRootContext: Boolean)
|
||||||
extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
|
extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
|
||||||
self =>
|
self =>
|
||||||
|
@ -193,7 +193,7 @@ class HiveContext private[hive](
|
||||||
* for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
|
* for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
|
||||||
*/
|
*/
|
||||||
@transient
|
@transient
|
||||||
protected[hive] lazy val executionHive: ClientWrapper = if (execHive != null) {
|
protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) {
|
||||||
execHive
|
execHive
|
||||||
} else {
|
} else {
|
||||||
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
|
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
|
||||||
|
@ -203,7 +203,7 @@ class HiveContext private[hive](
|
||||||
config = newTemporaryConfiguration(useInMemoryDerby = true),
|
config = newTemporaryConfiguration(useInMemoryDerby = true),
|
||||||
isolationOn = false,
|
isolationOn = false,
|
||||||
baseClassLoader = Utils.getContextOrSparkClassLoader)
|
baseClassLoader = Utils.getContextOrSparkClassLoader)
|
||||||
loader.createClient().asInstanceOf[ClientWrapper]
|
loader.createClient().asInstanceOf[HiveClientImpl]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -222,7 +222,7 @@ class HiveContext private[hive](
|
||||||
* in the hive-site.xml file.
|
* in the hive-site.xml file.
|
||||||
*/
|
*/
|
||||||
@transient
|
@transient
|
||||||
protected[hive] lazy val metadataHive: ClientInterface = if (metaHive != null) {
|
protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) {
|
||||||
metaHive
|
metaHive
|
||||||
} else {
|
} else {
|
||||||
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
|
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
|
||||||
|
|
|
@ -96,7 +96,7 @@ private[hive] object HiveSerDe {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
|
private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext)
|
||||||
extends Catalog with Logging {
|
extends Catalog with Logging {
|
||||||
|
|
||||||
val conf = hive.conf
|
val conf = hive.conf
|
||||||
|
|
|
@ -60,9 +60,9 @@ private[hive] case class HiveTable(
|
||||||
viewText: Option[String] = None) {
|
viewText: Option[String] = None) {
|
||||||
|
|
||||||
@transient
|
@transient
|
||||||
private[client] var client: ClientInterface = _
|
private[client] var client: HiveClient = _
|
||||||
|
|
||||||
private[client] def withClient(ci: ClientInterface): this.type = {
|
private[client] def withClient(ci: HiveClient): this.type = {
|
||||||
client = ci
|
client = ci
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ private[hive] case class HiveTable(
|
||||||
* internal and external classloaders for a given version of Hive and thus must expose only
|
* internal and external classloaders for a given version of Hive and thus must expose only
|
||||||
* shared classes.
|
* shared classes.
|
||||||
*/
|
*/
|
||||||
private[hive] trait ClientInterface {
|
private[hive] trait HiveClient {
|
||||||
|
|
||||||
/** Returns the Hive Version of this client. */
|
/** Returns the Hive Version of this client. */
|
||||||
def version: HiveVersion
|
def version: HiveVersion
|
||||||
|
@ -184,8 +184,8 @@ private[hive] trait ClientInterface {
|
||||||
/** Add a jar into class loader */
|
/** Add a jar into class loader */
|
||||||
def addJar(path: String): Unit
|
def addJar(path: String): Unit
|
||||||
|
|
||||||
/** Return a ClientInterface as new session, that will share the class loader and Hive client */
|
/** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */
|
||||||
def newSession(): ClientInterface
|
def newSession(): HiveClient
|
||||||
|
|
||||||
/** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
|
/** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
|
||||||
def withHiveState[A](f: => A): A
|
def withHiveState[A](f: => A): A
|
|
@ -44,8 +44,8 @@ import org.apache.spark.util.{CircularBuffer, Utils}
|
||||||
* A class that wraps the HiveClient and converts its responses to externally visible classes.
|
* A class that wraps the HiveClient and converts its responses to externally visible classes.
|
||||||
* Note that this class is typically loaded with an internal classloader for each instantiation,
|
* Note that this class is typically loaded with an internal classloader for each instantiation,
|
||||||
* allowing it to interact directly with a specific isolated version of Hive. Loading this class
|
* allowing it to interact directly with a specific isolated version of Hive. Loading this class
|
||||||
* with the isolated classloader however will result in it only being visible as a ClientInterface,
|
* with the isolated classloader however will result in it only being visible as a [[HiveClient]],
|
||||||
* not a ClientWrapper.
|
* not a [[HiveClientImpl]].
|
||||||
*
|
*
|
||||||
* This class needs to interact with multiple versions of Hive, but will always be compiled with
|
* This class needs to interact with multiple versions of Hive, but will always be compiled with
|
||||||
* the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
|
* the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
|
||||||
|
@ -55,14 +55,14 @@ import org.apache.spark.util.{CircularBuffer, Utils}
|
||||||
* @param config a collection of configuration options that will be added to the hive conf before
|
* @param config a collection of configuration options that will be added to the hive conf before
|
||||||
* opening the hive client.
|
* opening the hive client.
|
||||||
* @param initClassLoader the classloader used when creating the `state` field of
|
* @param initClassLoader the classloader used when creating the `state` field of
|
||||||
* this ClientWrapper.
|
* this [[HiveClientImpl]].
|
||||||
*/
|
*/
|
||||||
private[hive] class ClientWrapper(
|
private[hive] class HiveClientImpl(
|
||||||
override val version: HiveVersion,
|
override val version: HiveVersion,
|
||||||
config: Map[String, String],
|
config: Map[String, String],
|
||||||
initClassLoader: ClassLoader,
|
initClassLoader: ClassLoader,
|
||||||
val clientLoader: IsolatedClientLoader)
|
val clientLoader: IsolatedClientLoader)
|
||||||
extends ClientInterface
|
extends HiveClient
|
||||||
with Logging {
|
with Logging {
|
||||||
|
|
||||||
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
|
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
|
||||||
|
@ -77,7 +77,7 @@ private[hive] class ClientWrapper(
|
||||||
case hive.v1_2 => new Shim_v1_2()
|
case hive.v1_2 => new Shim_v1_2()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an internal session state for this ClientWrapper.
|
// Create an internal session state for this HiveClientImpl.
|
||||||
val state = {
|
val state = {
|
||||||
val original = Thread.currentThread().getContextClassLoader
|
val original = Thread.currentThread().getContextClassLoader
|
||||||
// Switch to the initClassLoader.
|
// Switch to the initClassLoader.
|
||||||
|
@ -160,7 +160,7 @@ private[hive] class ClientWrapper(
|
||||||
case e: Exception if causedByThrift(e) =>
|
case e: Exception if causedByThrift(e) =>
|
||||||
caughtException = e
|
caughtException = e
|
||||||
logWarning(
|
logWarning(
|
||||||
"HiveClientWrapper got thrift exception, destroying client and retrying " +
|
"HiveClient got thrift exception, destroying client and retrying " +
|
||||||
s"(${retryLimit - numTries} tries remaining)", e)
|
s"(${retryLimit - numTries} tries remaining)", e)
|
||||||
clientLoader.cachedHive = null
|
clientLoader.cachedHive = null
|
||||||
Thread.sleep(retryDelayMillis)
|
Thread.sleep(retryDelayMillis)
|
||||||
|
@ -199,7 +199,7 @@ private[hive] class ClientWrapper(
|
||||||
*/
|
*/
|
||||||
def withHiveState[A](f: => A): A = retryLocked {
|
def withHiveState[A](f: => A): A = retryLocked {
|
||||||
val original = Thread.currentThread().getContextClassLoader
|
val original = Thread.currentThread().getContextClassLoader
|
||||||
// Set the thread local metastore client to the client associated with this ClientWrapper.
|
// Set the thread local metastore client to the client associated with this HiveClientImpl.
|
||||||
Hive.set(client)
|
Hive.set(client)
|
||||||
// The classloader in clientLoader could be changed after addJar, always use the latest
|
// The classloader in clientLoader could be changed after addJar, always use the latest
|
||||||
// classloader
|
// classloader
|
||||||
|
@ -521,8 +521,8 @@ private[hive] class ClientWrapper(
|
||||||
runSqlHive(s"ADD JAR $path")
|
runSqlHive(s"ADD JAR $path")
|
||||||
}
|
}
|
||||||
|
|
||||||
def newSession(): ClientWrapper = {
|
def newSession(): HiveClientImpl = {
|
||||||
clientLoader.createClient().asInstanceOf[ClientWrapper]
|
clientLoader.createClient().asInstanceOf[HiveClientImpl]
|
||||||
}
|
}
|
||||||
|
|
||||||
def reset(): Unit = withHiveState {
|
def reset(): Unit = withHiveState {
|
|
@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.expressions._
|
||||||
import org.apache.spark.sql.types.{IntegralType, StringType}
|
import org.apache.spark.sql.types.{IntegralType, StringType}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A shim that defines the interface between ClientWrapper and the underlying Hive library used to
|
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
|
||||||
* talk to the metastore. Each Hive version has its own implementation of this class, defining
|
* to talk to the metastore. Each Hive version has its own implementation of this class, defining
|
||||||
* version-specific version of needed functions.
|
* version-specific version of needed functions.
|
||||||
*
|
*
|
||||||
* The guideline for writing shims is:
|
* The guideline for writing shims is:
|
||||||
|
@ -52,7 +52,6 @@ private[client] sealed abstract class Shim {
|
||||||
/**
|
/**
|
||||||
* Set the current SessionState to the given SessionState. Also, set the context classloader of
|
* Set the current SessionState to the given SessionState. Also, set the context classloader of
|
||||||
* the current thread to the one set in the HiveConf of this given `state`.
|
* the current thread to the one set in the HiveConf of this given `state`.
|
||||||
* @param state
|
|
||||||
*/
|
*/
|
||||||
def setCurrentSessionState(state: SessionState): Unit
|
def setCurrentSessionState(state: SessionState): Unit
|
||||||
|
|
||||||
|
|
|
@ -124,15 +124,15 @@ private[hive] object IsolatedClientLoader extends Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a Hive `ClientInterface` using a classloader that works according to the following rules:
|
* Creates a [[HiveClient]] using a classloader that works according to the following rules:
|
||||||
* - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
|
* - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
|
||||||
* allowing the results of calls to the `ClientInterface` to be visible externally.
|
* allowing the results of calls to the [[HiveClient]] to be visible externally.
|
||||||
* - Hive classes: new instances are loaded from `execJars`. These classes are not
|
* - Hive classes: new instances are loaded from `execJars`. These classes are not
|
||||||
* accessible externally due to their custom loading.
|
* accessible externally due to their custom loading.
|
||||||
* - ClientWrapper: a new copy is created for each instance of `IsolatedClassLoader`.
|
* - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`.
|
||||||
* This new instance is able to see a specific version of hive without using reflection. Since
|
* This new instance is able to see a specific version of hive without using reflection. Since
|
||||||
* this is a unique instance, it is not visible externally other than as a generic
|
* this is a unique instance, it is not visible externally other than as a generic
|
||||||
* `ClientInterface`, unless `isolationOn` is set to `false`.
|
* [[HiveClient]], unless `isolationOn` is set to `false`.
|
||||||
*
|
*
|
||||||
* @param version The version of hive on the classpath. used to pick specific function signatures
|
* @param version The version of hive on the classpath. used to pick specific function signatures
|
||||||
* that are not compatible across versions.
|
* that are not compatible across versions.
|
||||||
|
@ -179,7 +179,7 @@ private[hive] class IsolatedClientLoader(
|
||||||
|
|
||||||
/** True if `name` refers to a spark class that must see specific version of Hive. */
|
/** True if `name` refers to a spark class that must see specific version of Hive. */
|
||||||
protected def isBarrierClass(name: String): Boolean =
|
protected def isBarrierClass(name: String): Boolean =
|
||||||
name.startsWith(classOf[ClientWrapper].getName) ||
|
name.startsWith(classOf[HiveClientImpl].getName) ||
|
||||||
name.startsWith(classOf[Shim].getName) ||
|
name.startsWith(classOf[Shim].getName) ||
|
||||||
barrierPrefixes.exists(name.startsWith)
|
barrierPrefixes.exists(name.startsWith)
|
||||||
|
|
||||||
|
@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader(
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The isolated client interface to Hive. */
|
/** The isolated client interface to Hive. */
|
||||||
private[hive] def createClient(): ClientInterface = {
|
private[hive] def createClient(): HiveClient = {
|
||||||
if (!isolationOn) {
|
if (!isolationOn) {
|
||||||
return new ClientWrapper(version, config, baseClassLoader, this)
|
return new HiveClientImpl(version, config, baseClassLoader, this)
|
||||||
}
|
}
|
||||||
// Pre-reflective instantiation setup.
|
// Pre-reflective instantiation setup.
|
||||||
logDebug("Initializing the logger to avoid disaster...")
|
logDebug("Initializing the logger to avoid disaster...")
|
||||||
|
@ -244,10 +244,10 @@ private[hive] class IsolatedClientLoader(
|
||||||
|
|
||||||
try {
|
try {
|
||||||
classLoader
|
classLoader
|
||||||
.loadClass(classOf[ClientWrapper].getName)
|
.loadClass(classOf[HiveClientImpl].getName)
|
||||||
.getConstructors.head
|
.getConstructors.head
|
||||||
.newInstance(version, config, classLoader, this)
|
.newInstance(version, config, classLoader, this)
|
||||||
.asInstanceOf[ClientInterface]
|
.asInstanceOf[HiveClient]
|
||||||
} catch {
|
} catch {
|
||||||
case e: InvocationTargetException =>
|
case e: InvocationTargetException =>
|
||||||
if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
|
if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
|
||||||
|
|
|
@ -38,13 +38,13 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
|
||||||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
|
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
|
||||||
import org.apache.spark.sql.catalyst.util.sequenceOption
|
import org.apache.spark.sql.catalyst.util.sequenceOption
|
||||||
import org.apache.spark.sql.hive.HiveShim._
|
import org.apache.spark.sql.hive.HiveShim._
|
||||||
import org.apache.spark.sql.hive.client.ClientWrapper
|
import org.apache.spark.sql.hive.client.HiveClientImpl
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
|
||||||
private[hive] class HiveFunctionRegistry(
|
private[hive] class HiveFunctionRegistry(
|
||||||
underlying: analysis.FunctionRegistry,
|
underlying: analysis.FunctionRegistry,
|
||||||
executionHive: ClientWrapper)
|
executionHive: HiveClientImpl)
|
||||||
extends analysis.FunctionRegistry with HiveInspectors {
|
extends analysis.FunctionRegistry with HiveInspectors {
|
||||||
|
|
||||||
def getFunctionInfo(name: String): FunctionInfo = {
|
def getFunctionInfo(name: String): FunctionInfo = {
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||||
import org.apache.spark.sql.execution.CacheTableCommand
|
import org.apache.spark.sql.execution.CacheTableCommand
|
||||||
import org.apache.spark.sql.hive._
|
import org.apache.spark.sql.hive._
|
||||||
import org.apache.spark.sql.hive.client.ClientWrapper
|
import org.apache.spark.sql.hive.client.HiveClientImpl
|
||||||
import org.apache.spark.sql.hive.execution.HiveNativeCommand
|
import org.apache.spark.sql.hive.execution.HiveNativeCommand
|
||||||
import org.apache.spark.util.{ShutdownHookManager, Utils}
|
import org.apache.spark.util.{ShutdownHookManager, Utils}
|
||||||
|
|
||||||
|
@ -458,7 +458,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
|
||||||
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
|
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper)
|
private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
|
||||||
extends HiveFunctionRegistry(fr, client) {
|
extends HiveFunctionRegistry(fr, client) {
|
||||||
|
|
||||||
private val removedFunctions =
|
private val removedFunctions =
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.spark.tags.ExtendedHiveTest
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple set of tests that call the methods of a hive ClientInterface, loading different version
|
* A simple set of tests that call the methods of a [[HiveClient]], loading different version
|
||||||
* of hive from maven central. These tests are simple in that they are mostly just testing to make
|
* of hive from maven central. These tests are simple in that they are mostly just testing to make
|
||||||
* sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
|
* sure that reflective calls are not throwing NoSuchMethod error, but the actually functionality
|
||||||
* is not fully tested.
|
* is not fully tested.
|
||||||
|
@ -101,7 +101,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
||||||
|
|
||||||
private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
|
private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
|
||||||
|
|
||||||
private var client: ClientInterface = null
|
private var client: HiveClient = null
|
||||||
|
|
||||||
versions.foreach { version =>
|
versions.foreach { version =>
|
||||||
test(s"$version: create client") {
|
test(s"$version: create client") {
|
||||||
|
|
|
@ -199,7 +199,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
||||||
"Extended Usage")
|
"Extended Usage")
|
||||||
|
|
||||||
checkExistence(sql("describe functioN abcadf"), true,
|
checkExistence(sql("describe functioN abcadf"), true,
|
||||||
"Function: abcadf is not found.")
|
"Function: abcadf not found.")
|
||||||
|
|
||||||
checkExistence(sql("describe functioN `~`"), true,
|
checkExistence(sql("describe functioN `~`"), true,
|
||||||
"Function: ~",
|
"Function: ~",
|
||||||
|
|
Loading…
Reference in a new issue