diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 852a9b18ec..3f676bec9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -53,17 +53,21 @@ private[sql] class SharedState( SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, sparkContext.hadoopConfiguration) private[sql] val (conf, hadoopConf) = { - // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into - // both spark conf and hadoop conf avoiding be affected by any SparkSession level options - val initialConfigsWithoutWarehouse = SharedState.resolveWarehousePath( + val warehousePath = SharedState.resolveWarehousePath( sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs) val confClone = sparkContext.conf.clone() val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) + // Extract entries from `SparkConf` and put them in the Hadoop conf. + confClone.getAll.foreach { case (k, v) => + if (v ne null) hadoopConfClone.set(k, v) + } // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing // `SharedState`, all `SparkSession` level configurations have higher priority to generate a // `SharedState` instance. This will be done only once then shared across `SparkSession`s - initialConfigsWithoutWarehouse.foreach { + initialConfigs.foreach { + // We have resolved the warehouse path and should not set warehouse conf here. + case (k, _) if k == WAREHOUSE_PATH.key || k == SharedState.HIVE_WAREHOUSE_CONF_NAME => case (k, v) if SQLConf.staticConfKeys.contains(k) => logDebug(s"Applying static initial session options to SparkConf: $k -> $v") confClone.set(k, v) @@ -71,6 +75,11 @@ private[sql] class SharedState( logDebug(s"Applying other initial session options to HadoopConf: $k -> $v") hadoopConfClone.set(k, v) } + val qualified = SharedState.qualifyWarehousePath(hadoopConfClone, warehousePath) + // Set warehouse path in the SparkConf and Hadoop conf, so that it's application wide reachable + // from `SparkContext`. + SharedState.setWarehousePathConf(sparkContext.conf, sparkContext.hadoopConfiguration, qualified) + SharedState.setWarehousePathConf(confClone, hadoopConfClone, qualified) (confClone, hadoopConfClone) } @@ -218,6 +227,8 @@ object SharedState extends Logging { } } + private val HIVE_WAREHOUSE_CONF_NAME = "hive.metastore.warehouse.dir" + /** * Determine the warehouse path using the key `spark.sql.warehouse.dir` in the [[SparkConf]] * or the initial options from the very first created SparkSession instance, and @@ -228,33 +239,27 @@ object SharedState extends Logging { * > h.m.w.d in hadoop conf (user specified) * > s.s.w.d in spark conf (default) * - * After resolved, the final value will be application wide reachable in the sparkConf and - * hadoopConf from [[SparkContext]]. - * - * @return a map contain the rest of initial options with the warehouses keys cleared + * @return the resolved warehouse path. */ def resolveWarehousePath( sparkConf: SparkConf, hadoopConf: Configuration, - initialConfigs: scala.collection.Map[String, String] = Map.empty) - : scala.collection.Map[String, String] = { - - val hiveWarehouseKey = "hive.metastore.warehouse.dir" + initialConfigs: scala.collection.Map[String, String] = Map.empty): String = { val sparkWarehouseOption = initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key)) - if (initialConfigs.contains(hiveWarehouseKey)) { - logWarning(s"Not allowing to set $hiveWarehouseKey in SparkSession's options, please use " + - s"${WAREHOUSE_PATH.key} to set statically for cross-session usages") + if (initialConfigs.contains(HIVE_WAREHOUSE_CONF_NAME)) { + logWarning(s"Not allowing to set $HIVE_WAREHOUSE_CONF_NAME in SparkSession's " + + s"options, please use ${WAREHOUSE_PATH.key} to set statically for cross-session usages") } // hive.metastore.warehouse.dir only stay in hadoopConf - sparkConf.remove(hiveWarehouseKey) + sparkConf.remove(HIVE_WAREHOUSE_CONF_NAME) // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey) - val warehousePath = if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) { + val hiveWarehouseDir = hadoopConf.get(HIVE_WAREHOUSE_CONF_NAME) + if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. - logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" + - s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey.") + logInfo(s"${WAREHOUSE_PATH.key} is not set, but $HIVE_WAREHOUSE_CONF_NAME is set. " + + s"Setting ${WAREHOUSE_PATH.key} to the value of $HIVE_WAREHOUSE_CONF_NAME.") hiveWarehouseDir } else { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using @@ -262,16 +267,24 @@ object SharedState extends Logging { // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. val sparkWarehouseDir = sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString) - logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + + logInfo(s"Setting $HIVE_WAREHOUSE_CONF_NAME ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key}.") sparkWarehouseDir } + } + def qualifyWarehousePath(hadoopConf: Configuration, warehousePath: String): String = { val tempPath = new Path(warehousePath) - val qualifiedWarehousePath = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString - sparkConf.set(WAREHOUSE_PATH.key, qualifiedWarehousePath) - hadoopConf.set(hiveWarehouseKey, qualifiedWarehousePath) - logInfo(s"Warehouse path is '$qualifiedWarehousePath'.") - initialConfigs -- Seq(WAREHOUSE_PATH.key, hiveWarehouseKey) + val qualified = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString + logInfo(s"Warehouse path is '$qualified'.") + qualified + } + + def setWarehousePathConf( + sparkConf: SparkConf, + hadoopConf: Configuration, + warehousePath: String): Unit = { + sparkConf.set(WAREHOUSE_PATH.key, warehousePath) + hadoopConf.set(HIVE_WAREHOUSE_CONF_NAME, warehousePath) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 53f085c33d..4b1e30af52 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -133,7 +133,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { UserGroupInformation.getCurrentUser.addCredentials(credentials) } - SharedState.resolveWarehousePath(sparkConf, conf) + val warehousePath = SharedState.resolveWarehousePath(sparkConf, conf) + val qualified = SharedState.qualifyWarehousePath(conf, warehousePath) + SharedState.setWarehousePathConf(sparkConf, conf, qualified) SessionState.start(sessionState) // Clean up after we exit