[SPARK-14721][SQL] Remove HiveContext (part 2)
## What changes were proposed in this pull request? This removes the class `HiveContext` itself along with all code usages associated with it. The bulk of the work was already done in #12485. This is mainly just code cleanup and actually removing the class. Note: A couple of things will break after this patch. These will be fixed separately. - the python HiveContext - all the documentation / comments referencing HiveContext - there will be no more HiveContext in the REPL (fixed by #12589) ## How was this patch tested? No change in functionality. Author: Andrew Or <andrew@databricks.com> Closes #12585 from andrewor14/delete-hive-context.
This commit is contained in:
parent
6bfe42a3be
commit
3c5e65c339
|
@ -20,10 +20,8 @@ package main.scala
|
|||
|
||||
import scala.collection.mutable.{ListBuffer, Queue}
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkSession}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
|
||||
case class Person(name: String, age: Int)
|
||||
|
||||
|
@ -35,9 +33,9 @@ object SparkSqlExample {
|
|||
case None => new SparkConf().setAppName("Simple Sql App")
|
||||
}
|
||||
val sc = new SparkContext(conf)
|
||||
val hiveContext = new HiveContext(sc)
|
||||
val sparkSession = SparkSession.withHiveSupport(sc)
|
||||
|
||||
import hiveContext._
|
||||
import sparkSession._
|
||||
sql("DROP TABLE IF EXISTS src")
|
||||
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||||
sql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src")
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.io.{ByteStreams, Files}
|
|||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
|
||||
object HiveFromSpark {
|
||||
case class Record(key: Int, value: String)
|
||||
|
@ -43,9 +42,9 @@ object HiveFromSpark {
|
|||
// using HiveQL. Users who do not have an existing Hive deployment can still create a
|
||||
// HiveContext. When not configured by the hive-site.xml, the context automatically
|
||||
// creates metastore_db and warehouse in the current directory.
|
||||
val hiveContext = new HiveContext(sc)
|
||||
import hiveContext.implicits._
|
||||
import hiveContext.sql
|
||||
val sparkSession = SparkSession.withHiveSupport(sc)
|
||||
import sparkSession.implicits._
|
||||
import sparkSession.sql
|
||||
|
||||
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||||
sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src")
|
||||
|
|
|
@ -603,6 +603,7 @@ class SQLContext(object):
|
|||
return DataFrameReader(self)
|
||||
|
||||
|
||||
# TODO(andrew): remove this too
|
||||
class HiveContext(SQLContext):
|
||||
"""A variant of Spark SQL that integrates with data stored in Hive.
|
||||
|
||||
|
@ -632,7 +633,7 @@ class HiveContext(SQLContext):
|
|||
raise
|
||||
|
||||
def _get_hive_ctx(self):
|
||||
return self._jvm.HiveContext(self._jsc.sc())
|
||||
return self._jvm.SparkSession.withHiveSupport(self._jsc.sc()).wrapped()
|
||||
|
||||
def refreshTable(self, tableName):
|
||||
"""Invalidate and refresh all the cached the metadata of the given
|
||||
|
|
|
@ -905,7 +905,7 @@ class SparkSession private(
|
|||
}
|
||||
|
||||
|
||||
private object SparkSession {
|
||||
object SparkSession {
|
||||
|
||||
private def sharedStateClassName(conf: SparkConf): String = {
|
||||
conf.get(CATALOG_IMPLEMENTATION) match {
|
||||
|
@ -938,4 +938,10 @@ private object SparkSession {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: do we want to expose this?
|
||||
def withHiveSupport(sc: SparkContext): SparkSession = {
|
||||
sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
|
||||
new SparkSession(sc)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,7 +33,8 @@ import org.apache.spark.SparkContext
|
|||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.HiveSessionState
|
||||
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
|
||||
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -53,9 +54,9 @@ object HiveThriftServer2 extends Logging {
|
|||
* Starts a new thrift server with the given context.
|
||||
*/
|
||||
@DeveloperApi
|
||||
def startWithContext(sqlContext: HiveContext): Unit = {
|
||||
def startWithContext(sqlContext: SQLContext): Unit = {
|
||||
val server = new HiveThriftServer2(sqlContext)
|
||||
server.init(sqlContext.sessionState.hiveconf)
|
||||
server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
|
||||
server.start()
|
||||
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
|
||||
sqlContext.sparkContext.addSparkListener(listener)
|
||||
|
@ -82,11 +83,11 @@ object HiveThriftServer2 extends Logging {
|
|||
}
|
||||
|
||||
try {
|
||||
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
|
||||
server.init(SparkSQLEnv.hiveContext.sessionState.hiveconf)
|
||||
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
|
||||
server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
|
||||
server.start()
|
||||
logInfo("HiveThriftServer2 started")
|
||||
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
|
||||
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
|
||||
SparkSQLEnv.sparkContext.addSparkListener(listener)
|
||||
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
|
||||
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
|
||||
|
@ -261,7 +262,7 @@ object HiveThriftServer2 extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
|
||||
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
|
||||
extends HiveServer2
|
||||
with ReflectedCompositeService {
|
||||
// state is tracked internally so that the server only attempts to shut down if it successfully
|
||||
|
@ -269,7 +270,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
|
|||
private val started = new AtomicBoolean(false)
|
||||
|
||||
override def init(hiveConf: HiveConf) {
|
||||
val sparkSqlCliService = new SparkSQLCLIService(this, hiveContext)
|
||||
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
|
||||
setSuperField(this, "cliService", sparkSqlCliService)
|
||||
addService(sparkSqlCliService)
|
||||
|
||||
|
|
|
@ -33,9 +33,9 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
|
|||
import org.apache.hive.service.cli.session.HiveSession
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{DataFrame, Row => SparkRow}
|
||||
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
|
||||
import org.apache.spark.sql.execution.command.SetCommand
|
||||
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
|
||||
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.util.{Utils => SparkUtils}
|
||||
|
@ -45,7 +45,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
statement: String,
|
||||
confOverlay: JMap[String, String],
|
||||
runInBackground: Boolean = true)
|
||||
(hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
|
||||
(sqlContext: SQLContext, sessionToActivePool: SMap[SessionHandle, String])
|
||||
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
|
||||
with Logging {
|
||||
|
||||
|
@ -68,7 +68,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
|
||||
def close(): Unit = {
|
||||
// RDDs will be cleaned automatically upon garbage collection.
|
||||
hiveContext.sparkContext.clearJobGroup()
|
||||
sqlContext.sparkContext.clearJobGroup()
|
||||
logDebug(s"CLOSING $statementId")
|
||||
cleanup(OperationState.CLOSED)
|
||||
}
|
||||
|
@ -193,9 +193,9 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
statementId = UUID.randomUUID().toString
|
||||
logInfo(s"Running query '$statement' with $statementId")
|
||||
setState(OperationState.RUNNING)
|
||||
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
|
||||
// Always use the latest class loader provided by executionHive's state.
|
||||
val executionHiveClassLoader =
|
||||
hiveContext.sessionState.executionHive.state.getConf.getClassLoader
|
||||
val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader
|
||||
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
|
||||
|
||||
HiveThriftServer2.listener.onStatementStart(
|
||||
|
@ -204,12 +204,12 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
statement,
|
||||
statementId,
|
||||
parentSession.getUsername)
|
||||
hiveContext.sparkContext.setJobGroup(statementId, statement)
|
||||
sqlContext.sparkContext.setJobGroup(statementId, statement)
|
||||
sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
|
||||
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
|
||||
sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
|
||||
}
|
||||
try {
|
||||
result = hiveContext.sql(statement)
|
||||
result = sqlContext.sql(statement)
|
||||
logDebug(result.queryExecution.toString())
|
||||
result.queryExecution.logical match {
|
||||
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
|
||||
|
@ -220,7 +220,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
|
||||
iter = {
|
||||
val useIncrementalCollect =
|
||||
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
|
||||
sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
|
||||
if (useIncrementalCollect) {
|
||||
result.toLocalIterator.asScala
|
||||
} else {
|
||||
|
@ -253,7 +253,7 @@ private[hive] class SparkExecuteStatementOperation(
|
|||
override def cancel(): Unit = {
|
||||
logInfo(s"Cancel '$statement' with $statementId")
|
||||
if (statementId != null) {
|
||||
hiveContext.sparkContext.cancelJobGroup(statementId)
|
||||
sqlContext.sparkContext.cancelJobGroup(statementId)
|
||||
}
|
||||
cleanup(OperationState.CANCELED)
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
}
|
||||
|
||||
if (sessionState.database != null) {
|
||||
SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase(
|
||||
SparkSQLEnv.sqlContext.sessionState.catalog.setCurrentDatabase(
|
||||
s"${sessionState.database}")
|
||||
}
|
||||
|
||||
|
|
|
@ -33,17 +33,17 @@ import org.apache.hive.service.auth.HiveAuthFactory
|
|||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.server.HiveServer2
|
||||
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
|
||||
|
||||
private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext)
|
||||
private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLContext)
|
||||
extends CLIService(hiveServer)
|
||||
with ReflectedCompositeService {
|
||||
|
||||
override def init(hiveConf: HiveConf) {
|
||||
setSuperField(this, "hiveConf", hiveConf)
|
||||
|
||||
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, hiveContext)
|
||||
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveServer, sqlContext)
|
||||
setSuperField(this, "sessionManager", sparkSqlSessionManager)
|
||||
addService(sparkSqlSessionManager)
|
||||
var sparkServiceUGI: UserGroupInformation = null
|
||||
|
@ -66,7 +66,7 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: Hiv
|
|||
getInfoType match {
|
||||
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
|
||||
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
|
||||
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version)
|
||||
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(sqlContext.sparkContext.version)
|
||||
case _ => super.getInfo(sessionHandle, getInfoType)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.Driver
|
|||
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.{AnalysisException, SQLContext}
|
||||
import org.apache.spark.sql.execution.QueryExecution
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
|
||||
private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
|
||||
|
||||
private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlContext)
|
||||
extends Driver
|
||||
with Logging {
|
||||
|
||||
|
|
|
@ -23,18 +23,19 @@ import scala.collection.JavaConverters._
|
|||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
|
||||
import org.apache.spark.sql.{SparkSession, SQLContext}
|
||||
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/** A singleton object for the master program. The slaves should not access this. */
|
||||
private[hive] object SparkSQLEnv extends Logging {
|
||||
logDebug("Initializing SparkSQLEnv")
|
||||
|
||||
var hiveContext: HiveContext = _
|
||||
var sqlContext: SQLContext = _
|
||||
var sparkContext: SparkContext = _
|
||||
|
||||
def init() {
|
||||
if (hiveContext == null) {
|
||||
if (sqlContext == null) {
|
||||
val sparkConf = new SparkConf(loadDefaults = true)
|
||||
val maybeSerializer = sparkConf.getOption("spark.serializer")
|
||||
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
|
||||
|
@ -54,16 +55,16 @@ private[hive] object SparkSQLEnv extends Logging {
|
|||
maybeKryoReferenceTracking.getOrElse("false"))
|
||||
|
||||
sparkContext = new SparkContext(sparkConf)
|
||||
hiveContext = new HiveContext(sparkContext)
|
||||
sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped
|
||||
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
|
||||
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
|
||||
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
|
||||
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
|
||||
|
||||
hiveContext.sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
|
||||
hiveContext.sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
|
||||
hiveContext.sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
|
||||
|
||||
hiveContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
|
||||
sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
|
||||
|
||||
if (log.isDebugEnabled) {
|
||||
hiveContext.sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
|
||||
sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
|
||||
.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
|
||||
}
|
||||
}
|
||||
|
@ -76,7 +77,7 @@ private[hive] object SparkSQLEnv extends Logging {
|
|||
if (SparkSQLEnv.sparkContext != null) {
|
||||
sparkContext.stop()
|
||||
sparkContext = null
|
||||
hiveContext = null
|
||||
sqlContext = null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,12 +27,13 @@ import org.apache.hive.service.cli.session.SessionManager
|
|||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.hive.service.server.HiveServer2
|
||||
|
||||
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
|
||||
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
|
||||
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
|
||||
|
||||
|
||||
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: HiveContext)
|
||||
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
|
||||
extends SessionManager(hiveServer)
|
||||
with ReflectedCompositeService {
|
||||
|
||||
|
@ -71,10 +72,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
|
|||
val session = super.getSession(sessionHandle)
|
||||
HiveThriftServer2.listener.onSessionCreated(
|
||||
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
|
||||
val ctx = if (hiveContext.sessionState.hiveThriftServerSingleSession) {
|
||||
hiveContext
|
||||
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
|
||||
val ctx = if (sessionState.hiveThriftServerSingleSession) {
|
||||
sqlContext
|
||||
} else {
|
||||
hiveContext.newSession()
|
||||
sqlContext.newSession()
|
||||
}
|
||||
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
|
||||
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
|
||||
|
|
|
@ -26,7 +26,8 @@ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operati
|
|||
import org.apache.hive.service.cli.session.HiveSession
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.hive.HiveSessionState
|
||||
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
|
||||
|
||||
/**
|
||||
|
@ -39,17 +40,18 @@ private[thriftserver] class SparkSQLOperationManager()
|
|||
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
|
||||
|
||||
val sessionToActivePool = Map[SessionHandle, String]()
|
||||
val sessionToContexts = Map[SessionHandle, HiveContext]()
|
||||
val sessionToContexts = Map[SessionHandle, SQLContext]()
|
||||
|
||||
override def newExecuteStatementOperation(
|
||||
parentSession: HiveSession,
|
||||
statement: String,
|
||||
confOverlay: JMap[String, String],
|
||||
async: Boolean): ExecuteStatementOperation = synchronized {
|
||||
val hiveContext = sessionToContexts(parentSession.getSessionHandle)
|
||||
val runInBackground = async && hiveContext.sessionState.hiveThriftServerAsync
|
||||
val sqlContext = sessionToContexts(parentSession.getSessionHandle)
|
||||
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
|
||||
val runInBackground = async && sessionState.hiveThriftServerAsync
|
||||
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
|
||||
runInBackground)(hiveContext, sessionToActivePool)
|
||||
runInBackground)(sqlContext, sessionToActivePool)
|
||||
handleToOperation.put(operation.getHandle, operation)
|
||||
logDebug(s"Created Operation for $statement with session=$parentSession, " +
|
||||
s"runInBackground=$runInBackground")
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SessionState
|
|||
|
||||
|
||||
/**
|
||||
* A class that holds all session-specific state in a given [[HiveContext]].
|
||||
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
|
||||
*/
|
||||
private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) {
|
||||
|
||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.spark.sql.internal.SharedState
|
|||
|
||||
|
||||
/**
|
||||
* A class that holds all state shared across sessions in a given [[HiveContext]].
|
||||
* A class that holds all state shared across sessions in a given
|
||||
* [[org.apache.spark.sql.SparkSession]] backed by Hive.
|
||||
*/
|
||||
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
|
||||
extends SharedState(sparkContext) {
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
|
|||
import org.apache.hadoop.util.VersionInfo
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
|
||||
import org.apache.spark.sql._
|
||||
|
@ -45,44 +44,6 @@ import org.apache.spark.sql.internal.SQLConf._
|
|||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
|
||||
* Configuration for Hive is read from hive-site.xml on the classpath.
|
||||
*
|
||||
* @since 1.0.0
|
||||
*/
|
||||
class HiveContext private[hive](
|
||||
@transient private val sparkSession: SparkSession,
|
||||
isRootContext: Boolean)
|
||||
extends SQLContext(sparkSession, isRootContext) with Logging {
|
||||
|
||||
self =>
|
||||
|
||||
def this(sc: SparkContext) = {
|
||||
this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
|
||||
}
|
||||
|
||||
def this(sc: JavaSparkContext) = this(sc.sc)
|
||||
|
||||
/**
|
||||
* Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
|
||||
* temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
|
||||
* and Hive client (both of execution and metadata) with existing HiveContext.
|
||||
*/
|
||||
override def newSession(): HiveContext = {
|
||||
new HiveContext(sparkSession.newSession(), isRootContext = false)
|
||||
}
|
||||
|
||||
protected[sql] override def sessionState: HiveSessionState = {
|
||||
sparkSession.sessionState.asInstanceOf[HiveSessionState]
|
||||
}
|
||||
|
||||
protected[sql] override def sharedState: HiveSharedState = {
|
||||
sparkSession.sharedState.asInstanceOf[HiveSharedState]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private[spark] object HiveUtils extends Logging {
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
|
|||
import org.apache.spark.sql.execution.command.RunnableCommand
|
||||
import org.apache.spark.sql.hive.MetastoreRelation
|
||||
|
||||
|
||||
/**
|
||||
* Create table and insert the query result into it.
|
||||
* @param tableDesc the Table Describe, which may contains serde, storage handler etc.
|
||||
|
|
|
@ -72,7 +72,7 @@ object TestHive
|
|||
* test cases that rely on TestHive must be serialized.
|
||||
*/
|
||||
class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean)
|
||||
extends HiveContext(sparkSession, isRootContext) {
|
||||
extends SQLContext(sparkSession, isRootContext) {
|
||||
|
||||
def this(sc: SparkContext) {
|
||||
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
|
|||
|
||||
public class JavaDataFrameSuite {
|
||||
private transient JavaSparkContext sc;
|
||||
private transient HiveContext hc;
|
||||
private transient SQLContext hc;
|
||||
|
||||
Dataset<Row> df;
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.QueryTest$;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.hive.test.TestHive$;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
|
@ -46,7 +47,7 @@ import org.apache.spark.util.Utils;
|
|||
|
||||
public class JavaMetastoreDataSourcesSuite {
|
||||
private transient JavaSparkContext sc;
|
||||
private transient HiveContext sqlContext;
|
||||
private transient SQLContext sqlContext;
|
||||
|
||||
File path;
|
||||
Path hiveManagedPath;
|
||||
|
@ -70,9 +71,9 @@ public class JavaMetastoreDataSourcesSuite {
|
|||
if (path.exists()) {
|
||||
path.delete();
|
||||
}
|
||||
HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
|
||||
hiveManagedPath = new Path(
|
||||
sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
|
||||
new TableIdentifier("javaSavedTable")));
|
||||
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
|
||||
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
|
||||
if (fs.exists(hiveManagedPath)){
|
||||
fs.delete(hiveManagedPath, true);
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.sql.hive.HiveContext
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
/**
|
||||
* Entry point in test application for SPARK-8489.
|
||||
|
@ -28,15 +28,16 @@ import org.apache.spark.sql.hive.HiveContext
|
|||
*
|
||||
* This is used in org.apache.spark.sql.hive.HiveSparkSubmitSuite.
|
||||
*/
|
||||
// TODO: actually rebuild this jar with the new changes.
|
||||
object Main {
|
||||
def main(args: Array[String]) {
|
||||
// scalastyle:off println
|
||||
println("Running regression test for SPARK-8489.")
|
||||
val sc = new SparkContext("local", "testing")
|
||||
val hc = new HiveContext(sc)
|
||||
val sparkSession = SparkSession.withHiveSupport(sc)
|
||||
// This line should not throw scala.reflect.internal.MissingRequirementError.
|
||||
// See SPARK-8470 for more detail.
|
||||
val df = hc.createDataFrame(Seq(MyCoolClass("1", "2", "3")))
|
||||
val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3")))
|
||||
df.collect()
|
||||
println("Regression test for SPARK-8489 success!")
|
||||
// scalastyle:on println
|
||||
|
|
|
@ -142,7 +142,8 @@ class HiveSparkSubmitSuite
|
|||
runSparkSubmit(args)
|
||||
}
|
||||
|
||||
test("SPARK-8489: MissingRequirementError during reflection") {
|
||||
// TODO: re-enable this after rebuilding the jar (HiveContext was removed)
|
||||
ignore("SPARK-8489: MissingRequirementError during reflection") {
|
||||
// This test uses a pre-built jar to test SPARK-8489. In a nutshell, this test creates
|
||||
// a HiveContext and uses it to create a data frame from an RDD using reflection.
|
||||
// Before the fix in SPARK-8470, this results in a MissingRequirementError because
|
||||
|
|
Loading…
Reference in a new issue