[SPARK-17190][SQL] Removal of HiveSharedState

### What changes were proposed in this pull request?
Since `HiveClient` is used to interact with the Hive metastore, it should be hidden in `HiveExternalCatalog`. After moving `HiveClient` into `HiveExternalCatalog`, `HiveSharedState` becomes a wrapper of `HiveExternalCatalog`. Thus, removal of `HiveSharedState` becomes straightforward. After removal of `HiveSharedState`, the reflection logic is directly applied on the choice of `ExternalCatalog` types, based on the configuration of `CATALOG_IMPLEMENTATION`.

~~`HiveClient` is also used/invoked by the other entities besides HiveExternalCatalog, we defines the following two APIs: getClient and getNewClient~~

### How was this patch tested?
The existing test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14757 from gatorsmile/removeHiveClient.
This commit is contained in:
gatorsmile 2016-08-25 12:50:03 +08:00 committed by Wenchen Fan
parent ac27557eb6
commit 4d0706d616
15 changed files with 88 additions and 99 deletions

View file

@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@ -39,7 +39,11 @@ import org.apache.spark.sql.catalyst.util.StringUtils
*
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends ExternalCatalog {
class InMemoryCatalog(
conf: SparkConf = new SparkConf,
hadoopConfig: Configuration = new Configuration)
extends ExternalCatalog {
import CatalogTypes.TablePartitionSpec
private class TableDesc(var table: CatalogTable) {

View file

@ -96,10 +96,7 @@ class SparkSession private(
*/
@transient
private[sql] lazy val sharedState: SharedState = {
existingSharedState.getOrElse(
SparkSession.reflect[SharedState, SparkContext](
SparkSession.sharedStateClassName(sparkContext.conf),
sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext))
}
/**
@ -913,16 +910,8 @@ object SparkSession {
/** Reference to the root SparkSession. */
private val defaultSession = new AtomicReference[SparkSession]
private val HIVE_SHARED_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSharedState"
private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState"
private def sharedStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SHARED_STATE_CLASS_NAME
case "in-memory" => classOf[SharedState].getCanonicalName
}
}
private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_CLASS_NAME
@ -953,7 +942,6 @@ object SparkSession {
private[spark] def hiveClassesArePresent: Boolean = {
try {
Utils.classForName(HIVE_SESSION_STATE_CLASS_NAME)
Utils.classForName(HIVE_SHARED_STATE_CLASS_NAME)
Utils.classForName("org.apache.hadoop.hive.conf.HiveConf")
true
} catch {

View file

@ -17,7 +17,13 @@
package org.apache.spark.sql.internal
import org.apache.spark.SparkContext
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
@ -51,7 +57,11 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* A catalog that interacts with external systems.
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog(sparkContext.hadoopConfiguration)
lazy val externalCatalog: ExternalCatalog =
SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
/**
* A classloader used to load all user-added jar.
@ -98,6 +108,39 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
}
object SharedState {
private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog"
private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
}
/**
* Helper method to create an instance of [[T]] using a single-arg constructor that
* accepts an [[Arg1]] and an [[Arg2]].
*/
private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef](
className: String,
ctorArg1: Arg1,
ctorArg2: Arg2)(
implicit ctorArgTag1: ClassTag[Arg1],
ctorArgTag2: ClassTag[Arg2]): T = {
try {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass)
val args = Array[AnyRef](ctorArg1, ctorArg2)
ctor.newInstance(args: _*).asInstanceOf[T]
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
}
/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.

View file

@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf

View file

@ -52,10 +52,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
sparkSession.sessionState.asInstanceOf[HiveSessionState]
}
protected[sql] override def sharedState: HiveSharedState = {
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}
/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a

View file

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
@ -41,13 +42,20 @@ import org.apache.spark.sql.types.{DataType, StructType}
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
*/
private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration)
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends ExternalCatalog with Logging {
import CatalogTypes.TablePartitionSpec
import HiveExternalCatalog._
import CatalogTableType._
/**
* A Hive client used to interact with the metastore.
*/
val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}
// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(
classOf[HiveException].getCanonicalName,

View file

@ -44,7 +44,8 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
private val client =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)

View file

@ -33,21 +33,18 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
self =>
private lazy val sharedState: HiveSharedState = {
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}
/**
* A Hive client used for interacting with the metastore.
*/
lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession()
lazy val metadataHive: HiveClient =
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
/**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
new HiveSessionCatalog(
sharedState.externalCatalog,
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
metadataHive,
sparkSession,
functionResourceLoader,

View file

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SharedState
/**
* A class that holds all state shared across sessions in a given
* [[org.apache.spark.sql.SparkSession]] backed by Hive.
*/
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
extends SharedState(sparkContext) {
// TODO: just share the IsolatedClientLoader instead of the client instance itself
/**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
lazy val metadataHive: HiveClient = {
HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
}
/**
* A catalog that interacts with the Hive metastore.
*/
override lazy val externalCatalog =
new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
}

View file

@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
@ -108,13 +108,13 @@ class TestHiveContext(
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
* @param existingSharedState optional [[HiveSharedState]]
* @param existingSharedState optional [[SharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[HiveSharedState],
@transient private val existingSharedState: Option[SharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@ -139,14 +139,13 @@ private[hive] class TestHiveSparkSession(
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
// TODO: Let's remove HiveSharedState and TestHiveSessionState. Otherwise,
// we are not really testing the reflection logic based on the setting of
// CATALOG_IMPLEMENTATION.
@transient
override lazy val sharedState: HiveSharedState = {
existingSharedState.getOrElse(new HiveSharedState(sc))
override lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sc))
}
// TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection
// logic based on the setting of CATALOG_IMPLEMENTATION.
@transient
override lazy val sessionState: TestHiveSessionState =
new TestHiveSessionState(self)

View file

@ -31,7 +31,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton {
}
test("SPARK-15887: hive-site.xml should be loaded") {
val hiveClient = spark.sharedState.asInstanceOf[HiveSharedState].metadataHive
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
assert(hiveClient.getConf("hive.in.test", "") == "true")
}
}

View file

@ -21,26 +21,26 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.HiveClient
/**
* Test suite for the [[HiveExternalCatalog]].
*/
class HiveExternalCatalogSuite extends ExternalCatalogSuite {
private val client: HiveClient = {
// We create a metastore at a temp location to avoid any potential
// conflict of having multiple connections to a single derby instance.
HiveUtils.newClientForExecution(new SparkConf, new Configuration)
private val externalCatalog: HiveExternalCatalog = {
val catalog = new HiveExternalCatalog(new SparkConf, new Configuration)
catalog.client.reset()
catalog
}
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
override def newEmptyCatalog(): ExternalCatalog =
new HiveExternalCatalog(client, new Configuration())
override def newEmptyCatalog(): ExternalCatalog = externalCatalog
}
protected override def resetState(): Unit = client.reset()
protected override def resetState(): Unit = {
externalCatalog.client.reset()
}
}

View file

@ -378,10 +378,9 @@ object SetMetastoreURLTest extends Logging {
s"spark.sql.test.expectedMetastoreURL should be set.")
}
// HiveSharedState is used when Hive support is enabled.
// HiveExternalCatalog is used when Hive support is enabled.
val actualMetastoreURL =
spark.sharedState.asInstanceOf[HiveSharedState]
.metadataHive
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
.getConf("javax.jdo.option.ConnectionURL", "this_is_a_wrong_URL")
logInfo(s"javax.jdo.option.ConnectionURL is $actualMetastoreURL")

View file

@ -51,7 +51,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
// columns and bucket specification are still in table properties) from hive client.
private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive
private def hiveClient: HiveClient =
sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
test("persistent JSON table") {
withTable("jsonTable") {

View file

@ -266,7 +266,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}
private def createRawHiveTable(ddl: String): Unit = {
hiveContext.sharedState.asInstanceOf[HiveSharedState].metadataHive.runSqlHive(ddl)
hiveContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(ddl)
}
private def checkCreateTable(table: String): Unit = {