From 581200af717bcefd11c9930ac063fe53c6fd2fde Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 19 Sep 2017 19:35:36 +0800 Subject: [PATCH] [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one ## What changes were proposed in this pull request? While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too. ## How was this patch tested? existing ut cc cloud-fan jiangxb1987 Author: Kent Yao Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP. --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 14 +++++++++++--- .../org/apache/spark/sql/hive/HiveUtils.scala | 6 +++--- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +++++++++++++-- .../spark/sql/hive/client/HiveVersionSuite.scala | 2 +- .../spark/sql/hive/client/VersionsSuite.scala | 2 +- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 761e832ed1..832a15d095 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils @@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging { System.exit(1) } + val sparkConf = new SparkConf(loadDefaults = true) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf) + val cliConf = new HiveConf(classOf[SessionState]) - // Override the location of the metastore since this is only used for local execution. - HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { - case (key, value) => cliConf.set(key, value) + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfigs).foreach { + case (k, v) => + cliConf.set(k, v) } + val sessionState = new CliSessionState(cliConf) sessionState.in = System.in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 561c127a40..80b9a3dc96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging { } /** - * Configurations needed to create a [[HiveClient]]. + * Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format. */ - private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = { + private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- // compatibility when users are trying to connecting to a Hive metastore of lower version, @@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging { protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient = { - val configurations = hiveClientConfigurations(hadoopConf) + val configurations = formatTimeVarsForHiveClient(hadoopConf) newClientForMetadata(conf, hadoopConf, configurations) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 426db6a4e1..c4e48c9360 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} @@ -132,14 +133,24 @@ private[hive] class HiveClientImpl( // in hive jars, which will turn off isolation, if SessionSate.detachSession is // called to remove the current state after that, hive client created later will initialize // its own state by newState() - Option(SessionState.get).getOrElse(newState()) + val ret = SessionState.get + if (ret != null) { + // hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState + // instance constructed, we need to follow that change here. + Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir => + ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) + } + ret + } else { + newState() + } } } // Log the default warehouse location. logInfo( s"Warehouse location for Hive client " + - s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}") private def newState(): SessionState = { val hiveConf = new HiveConf(classOf[SessionState]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index ed475a0261..951ebfad45 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu hadoopConf.set("hive.metastore.schema.verification", "false") } HiveClientBuilder - .buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) + .buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) } override def suiteName: String = s"${super.suiteName}($version)" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1d9c8da996..edb9a9ffba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } - client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) + client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() versionSpark = TestHiveVersion(client) assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client