[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one
## What changes were proposed in this pull request? While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too. ## How was this patch tested? existing ut cc cloud-fan jiangxb1987 Author: Kent Yao <yaooqinn@hotmail.com> Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.
This commit is contained in:
parent
1bc17a6b8a
commit
581200af71
|
@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
|
|||
import org.apache.log4j.{Level, Logger}
|
||||
import org.apache.thrift.transport.TSocket
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.hive.HiveUtils
|
||||
|
@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
System.exit(1)
|
||||
}
|
||||
|
||||
val sparkConf = new SparkConf(loadDefaults = true)
|
||||
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
|
||||
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
|
||||
|
||||
val cliConf = new HiveConf(classOf[SessionState])
|
||||
// Override the location of the metastore since this is only used for local execution.
|
||||
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
|
||||
case (key, value) => cliConf.set(key, value)
|
||||
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
|
||||
++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
|
||||
case (k, v) =>
|
||||
cliConf.set(k, v)
|
||||
}
|
||||
|
||||
val sessionState = new CliSessionState(cliConf)
|
||||
|
||||
sessionState.in = System.in
|
||||
|
|
|
@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Configurations needed to create a [[HiveClient]].
|
||||
* Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
|
||||
*/
|
||||
private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
|
||||
private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
|
||||
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
|
||||
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
|
||||
// compatibility when users are trying to connecting to a Hive metastore of lower version,
|
||||
|
@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
|
|||
protected[hive] def newClientForMetadata(
|
||||
conf: SparkConf,
|
||||
hadoopConf: Configuration): HiveClient = {
|
||||
val configurations = hiveClientConfigurations(hadoopConf)
|
||||
val configurations = formatTimeVarsForHiveClient(hadoopConf)
|
||||
newClientForMetadata(conf, hadoopConf, configurations)
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hive.common.StatsSetupConst
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
|
||||
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
|
||||
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
|
||||
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
|
||||
|
@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
|
|||
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
|
||||
// called to remove the current state after that, hive client created later will initialize
|
||||
// its own state by newState()
|
||||
Option(SessionState.get).getOrElse(newState())
|
||||
val ret = SessionState.get
|
||||
if (ret != null) {
|
||||
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
|
||||
// instance constructed, we need to follow that change here.
|
||||
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
|
||||
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
|
||||
}
|
||||
ret
|
||||
} else {
|
||||
newState()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log the default warehouse location.
|
||||
logInfo(
|
||||
s"Warehouse location for Hive client " +
|
||||
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
|
||||
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
|
||||
|
||||
private def newState(): SessionState = {
|
||||
val hiveConf = new HiveConf(classOf[SessionState])
|
||||
|
|
|
@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
|
|||
hadoopConf.set("hive.metastore.schema.verification", "false")
|
||||
}
|
||||
HiveClientBuilder
|
||||
.buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
|
||||
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
|
||||
}
|
||||
|
||||
override def suiteName: String = s"${super.suiteName}($version)"
|
||||
|
|
|
@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
|||
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
|
||||
hadoopConf.set("hive.metastore.schema.verification", "false")
|
||||
}
|
||||
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
|
||||
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
|
||||
if (versionSpark != null) versionSpark.reset()
|
||||
versionSpark = TestHiveVersion(client)
|
||||
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
|
||||
|
|
Loading…
Reference in a new issue