diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6f799a542b..d872c3b5a9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -392,7 +392,7 @@ private[spark] class SparkHadoopUtil extends Logging { } -private[spark] object SparkHadoopUtil { +private[spark] object SparkHadoopUtil extends Logging { private lazy val instance = new SparkHadoopUtil @@ -450,6 +450,7 @@ private[spark] object SparkHadoopUtil { hadoopConf.set("fs.s3a.session.token", sessionToken) } } + loadHiveConfFile(conf, hadoopConf) appendSparkHadoopConfigs(conf, hadoopConf) appendSparkHiveConfigs(conf, hadoopConf) val bufferSize = conf.get(BUFFER_SIZE).toString @@ -457,6 +458,14 @@ private[spark] object SparkHadoopUtil { } } + private def loadHiveConfFile(conf: SparkConf, hadoopConf: Configuration): Unit = { + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + logInfo(s"Loading hive config file: $configFile") + hadoopConf.addResource(configFile) + } + } + private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { diff --git a/core/src/test/resources/core-site.xml b/core/src/test/resources/core-site.xml new file mode 100644 index 0000000000..84eddf8050 --- /dev/null +++ b/core/src/test/resources/core-site.xml @@ -0,0 +1,24 @@ + + + + + hadoop.tmp.dir + /tmp/hive_zero + default is /tmp/hadoop-${user.name} and will be overridden + + diff --git a/core/src/test/resources/hive-site.xml b/core/src/test/resources/hive-site.xml new file mode 100644 index 0000000000..d7117c3f20 --- /dev/null +++ b/core/src/test/resources/hive-site.xml @@ -0,0 +1,34 @@ + + + + + hive.in.test + true + Internal marker for test. + + + hadoop.tmp.dir + /tmp/hive_one + default is /tmp/hadoop-${user.name} and will be overridden + + + + io.file.buffer.size + 201811 + + diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index be51001660..4319761cdd 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -46,7 +46,6 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUp import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{ThreadUtils, Utils} - class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { test("Only one SparkContext may be active at a time") { @@ -1151,6 +1150,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar"))) assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar"))) } + + test("SPARK-34346: hadoop configuration priority for spark/hive/hadoop configs") { + val testKey = "hadoop.tmp.dir" + val bufferKey = "io.file.buffer.size" + val hadoopConf0 = new Configuration() + + val hiveConfFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + assert(hiveConfFile != null) + hadoopConf0.addResource(hiveConfFile) + assert(hadoopConf0.get(testKey) === "/tmp/hive_one") + assert(hadoopConf0.get(bufferKey) === "201811") + + val sparkConf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set(BUFFER_SIZE, 65536) + sc = new SparkContext(sparkConf) + assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_one", + "hive configs have higher priority than hadoop ones ") + assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536, + "spark configs have higher priority than hive ones") + + resetSparkContext() + + sparkConf + .set("spark.hadoop.hadoop.tmp.dir", "/tmp/hive_two") + .set(s"spark.hadoop.$bufferKey", "20181117") + sc = new SparkContext(sparkConf) + assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_two", + "spark.hadoop configs have higher priority than hive/hadoop ones") + assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536, + "spark configs have higher priority than spark.hadoop configs") + } } object SparkContextSuite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index cc21def3fb..ac1dd4ecb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,7 +22,6 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -56,7 +55,7 @@ private[sql] class SharedState( private[sql] val (conf, hadoopConf) = { // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into // both spark conf and hadoop conf avoiding be affected by any SparkSession level options - val initialConfigsWithoutWarehouse = SharedState.loadHiveConfFile( + val initialConfigsWithoutWarehouse = SharedState.resolveWarehousePath( sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs) val confClone = sparkContext.conf.clone() @@ -220,31 +219,27 @@ object SharedState extends Logging { } /** - * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on - * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. + * Determine the warehouse path using the key `spark.sql.warehouse.dir` in the [[SparkConf]] + * or the initial options from the very first created SparkSession instance, and + * `hive.metastore.warehouse.dir` in hadoop [[Configuration]]. + * The priority order is: + * s.s.w.d in initialConfigs + * > s.s.w.d in spark conf (user specified) + * > h.m.w.d in hadoop conf (user specified) + * > s.s.w.d in spark conf (default) + * + * After resolved, the final value will be application wide reachable in the sparkConf and + * hadoopConf from [[SparkContext]]. + * + * @return a map contain the rest of initial options with the warehouses keys cleared */ - def loadHiveConfFile( + def resolveWarehousePath( sparkConf: SparkConf, hadoopConf: Configuration, initialConfigs: scala.collection.Map[String, String] = Map.empty) : scala.collection.Map[String, String] = { - def containsInSparkConf(key: String): Boolean = { - sparkConf.contains(key) || sparkConf.contains("spark.hadoop." + key) || - (key.startsWith("hive") && sparkConf.contains("spark." + key)) - } - val hiveWarehouseKey = "hive.metastore.warehouse.dir" - val configFile = Utils.getContextOrSparkClassLoader.getResourceAsStream("hive-site.xml") - if (configFile != null) { - logInfo(s"loading hive config file: $configFile") - val hadoopConfTemp = new Configuration() - hadoopConfTemp.clear() - hadoopConfTemp.addResource(configFile) - for (entry <- hadoopConfTemp.asScala if !containsInSparkConf(entry.getKey)) { - hadoopConf.set(entry.getKey, entry.getValue) - } - } val sparkWarehouseOption = initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key)) if (initialConfigs.contains(hiveWarehouseKey)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala index 60a899b89e..81bf153424 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -52,15 +52,4 @@ class SharedStateSuite extends SharedSparkSession { assert(conf.isInstanceOf[Configuration]) assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///") } - - test("SPARK-33740: hadoop configs in hive-site.xml can overrides pre-existing hadoop ones") { - val conf = new SparkConf() - val hadoopConf = new Configuration() - SharedState.loadHiveConfFile(conf, hadoopConf, Map.empty) - assert(hadoopConf.get("hadoop.tmp.dir") === "/tmp/hive_one") - hadoopConf.clear() - SharedState.loadHiveConfFile( - conf.set("spark.hadoop.hadoop.tmp.dir", "noop"), hadoopConf, Map.empty) - assert(hadoopConf.get("hadoop.tmp.dir") === null) - } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 633d7fb9aa..53f085c33d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -133,7 +133,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { UserGroupInformation.getCurrentUser.addCredentials(credentials) } - SharedState.loadHiveConfFile(sparkConf, conf) + SharedState.resolveWarehousePath(sparkConf, conf) SessionState.start(sessionState) // Clean up after we exit