[SPARK-34558][SQL][FOLLOWUP] Use final Hadoop conf to instantiate FileSystem in SharedState

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/31671

https://github.com/apache/spark/pull/31671 has an unexpected behavior change that it uses a different Hadoop conf (`sparkContext.hadoopConfiguration`) to instantiate `FileSystem`, which is used to qualify the warehouse path. Before https://github.com/apache/spark/pull/31671 , the Hadoop conf to instantiate `FileSystem` is `session.sessionState.newHadoopConf()`.

More specifically, `session.sessionState.newHadoopConf()` has more conf entries:
1. it includes configs from `SharedState.initialConfigs`
2. in includes configs from `sparkContext.conf`

This PR updates `SharedState` to use the final Hadoop conf to instantiate `FileSystem`.

### Why are the changes needed?

fix behavior change

### Does this PR introduce _any_ user-facing change?

yes, the behavior will be the same before https://github.com/apache/spark/pull/31671

### How was this patch tested?

manually check the log of `FileSystem` and verify the passed in configs.

Closes #31868 from cloud-fan/followup.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-03-19 22:02:15 +08:00
parent 58509565f8
commit 4b4f8e2a25
2 changed files with 42 additions and 27 deletions

View file

@ -53,17 +53,21 @@ private[sql] class SharedState(
SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, sparkContext.hadoopConfiguration)
private[sql] val (conf, hadoopConf) = {
// Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
val initialConfigsWithoutWarehouse = SharedState.resolveWarehousePath(
val warehousePath = SharedState.resolveWarehousePath(
sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs)
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// Extract entries from `SparkConf` and put them in the Hadoop conf.
confClone.getAll.foreach { case (k, v) =>
if (v ne null) hadoopConfClone.set(k, v)
}
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initialConfigsWithoutWarehouse.foreach {
initialConfigs.foreach {
// We have resolved the warehouse path and should not set warehouse conf here.
case (k, _) if k == WAREHOUSE_PATH.key || k == SharedState.HIVE_WAREHOUSE_CONF_NAME =>
case (k, v) if SQLConf.staticConfKeys.contains(k) =>
logDebug(s"Applying static initial session options to SparkConf: $k -> $v")
confClone.set(k, v)
@ -71,6 +75,11 @@ private[sql] class SharedState(
logDebug(s"Applying other initial session options to HadoopConf: $k -> $v")
hadoopConfClone.set(k, v)
}
val qualified = SharedState.qualifyWarehousePath(hadoopConfClone, warehousePath)
// Set warehouse path in the SparkConf and Hadoop conf, so that it's application wide reachable
// from `SparkContext`.
SharedState.setWarehousePathConf(sparkContext.conf, sparkContext.hadoopConfiguration, qualified)
SharedState.setWarehousePathConf(confClone, hadoopConfClone, qualified)
(confClone, hadoopConfClone)
}
@ -218,6 +227,8 @@ object SharedState extends Logging {
}
}
private val HIVE_WAREHOUSE_CONF_NAME = "hive.metastore.warehouse.dir"
/**
* Determine the warehouse path using the key `spark.sql.warehouse.dir` in the [[SparkConf]]
* or the initial options from the very first created SparkSession instance, and
@ -228,33 +239,27 @@ object SharedState extends Logging {
* > h.m.w.d in hadoop conf (user specified)
* > s.s.w.d in spark conf (default)
*
* After resolved, the final value will be application wide reachable in the sparkConf and
* hadoopConf from [[SparkContext]].
*
* @return a map contain the rest of initial options with the warehouses keys cleared
* @return the resolved warehouse path.
*/
def resolveWarehousePath(
sparkConf: SparkConf,
hadoopConf: Configuration,
initialConfigs: scala.collection.Map[String, String] = Map.empty)
: scala.collection.Map[String, String] = {
val hiveWarehouseKey = "hive.metastore.warehouse.dir"
initialConfigs: scala.collection.Map[String, String] = Map.empty): String = {
val sparkWarehouseOption =
initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key))
if (initialConfigs.contains(hiveWarehouseKey)) {
logWarning(s"Not allowing to set $hiveWarehouseKey in SparkSession's options, please use " +
s"${WAREHOUSE_PATH.key} to set statically for cross-session usages")
if (initialConfigs.contains(HIVE_WAREHOUSE_CONF_NAME)) {
logWarning(s"Not allowing to set $HIVE_WAREHOUSE_CONF_NAME in SparkSession's " +
s"options, please use ${WAREHOUSE_PATH.key} to set statically for cross-session usages")
}
// hive.metastore.warehouse.dir only stay in hadoopConf
sparkConf.remove(hiveWarehouseKey)
sparkConf.remove(HIVE_WAREHOUSE_CONF_NAME)
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
val warehousePath = if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) {
val hiveWarehouseDir = hadoopConf.get(HIVE_WAREHOUSE_CONF_NAME)
if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" +
s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey.")
logInfo(s"${WAREHOUSE_PATH.key} is not set, but $HIVE_WAREHOUSE_CONF_NAME is set. " +
s"Setting ${WAREHOUSE_PATH.key} to the value of $HIVE_WAREHOUSE_CONF_NAME.")
hiveWarehouseDir
} else {
// If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
@ -262,16 +267,24 @@ object SharedState extends Logging {
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString)
logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " +
logInfo(s"Setting $HIVE_WAREHOUSE_CONF_NAME ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key}.")
sparkWarehouseDir
}
}
def qualifyWarehousePath(hadoopConf: Configuration, warehousePath: String): String = {
val tempPath = new Path(warehousePath)
val qualifiedWarehousePath = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString
sparkConf.set(WAREHOUSE_PATH.key, qualifiedWarehousePath)
hadoopConf.set(hiveWarehouseKey, qualifiedWarehousePath)
logInfo(s"Warehouse path is '$qualifiedWarehousePath'.")
initialConfigs -- Seq(WAREHOUSE_PATH.key, hiveWarehouseKey)
val qualified = tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString
logInfo(s"Warehouse path is '$qualified'.")
qualified
}
def setWarehousePathConf(
sparkConf: SparkConf,
hadoopConf: Configuration,
warehousePath: String): Unit = {
sparkConf.set(WAREHOUSE_PATH.key, warehousePath)
hadoopConf.set(HIVE_WAREHOUSE_CONF_NAME, warehousePath)
}
}

View file

@ -133,7 +133,9 @@ private[hive] object SparkSQLCLIDriver extends Logging {
UserGroupInformation.getCurrentUser.addCredentials(credentials)
}
SharedState.resolveWarehousePath(sparkConf, conf)
val warehousePath = SharedState.resolveWarehousePath(sparkConf, conf)
val qualified = SharedState.qualifyWarehousePath(conf, warehousePath)
SharedState.setWarehousePathConf(sparkConf, conf, qualified)
SessionState.start(sessionState)
// Clean up after we exit