[SPARK-34346][CORE][SQL] io.file.buffer.size set by spark.buffer.size will override by loading hive-site.xml accidentally may cause perf regression
### What changes were proposed in this pull request? In many real-world cases, when interacting with hive catalog through Spark SQL, users may just share the `hive-site.xml` for their hive jobs and make a copy to `SPARK_HOME`/conf w/o modification. In Spark, when we generate Hadoop configurations, we will use `spark.buffer.size(65536)` to reset `io.file.buffer.size(4096)`. But when we load the hive-site.xml, we may ignore this behavior and reset `io.file.buffer.size` again according to `hive-site.xml`. 1. The configuration priority for setting Hadoop and Hive config here is not right, while literally, the order should be `spark > spark.hive > spark.hadoop > hive > hadoop` 2. This breaks `spark.buffer.size` congfig's behavior for tuning the IO performance w/ HDFS if there is an existing `io.file.buffer.size` in hive-site.xml ### Why are the changes needed? bugfix for configuration behavior and fix performance regression by that behavior change ### Does this PR introduce _any_ user-facing change? this pr restores silent user face change ### How was this patch tested? new tests Closes #31460 from yaooqinn/SPARK-34346. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
fbe726f5b1
commit
961c85166a
|
@ -392,7 +392,7 @@ private[spark] class SparkHadoopUtil extends Logging {
|
|||
|
||||
}
|
||||
|
||||
private[spark] object SparkHadoopUtil {
|
||||
private[spark] object SparkHadoopUtil extends Logging {
|
||||
|
||||
private lazy val instance = new SparkHadoopUtil
|
||||
|
||||
|
@ -450,6 +450,7 @@ private[spark] object SparkHadoopUtil {
|
|||
hadoopConf.set("fs.s3a.session.token", sessionToken)
|
||||
}
|
||||
}
|
||||
loadHiveConfFile(conf, hadoopConf)
|
||||
appendSparkHadoopConfigs(conf, hadoopConf)
|
||||
appendSparkHiveConfigs(conf, hadoopConf)
|
||||
val bufferSize = conf.get(BUFFER_SIZE).toString
|
||||
|
@ -457,6 +458,14 @@ private[spark] object SparkHadoopUtil {
|
|||
}
|
||||
}
|
||||
|
||||
private def loadHiveConfFile(conf: SparkConf, hadoopConf: Configuration): Unit = {
|
||||
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
|
||||
if (configFile != null) {
|
||||
logInfo(s"Loading hive config file: $configFile")
|
||||
hadoopConf.addResource(configFile)
|
||||
}
|
||||
}
|
||||
|
||||
private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
|
||||
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
|
||||
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
|
||||
|
|
24
core/src/test/resources/core-site.xml
Normal file
24
core/src/test/resources/core-site.xml
Normal file
|
@ -0,0 +1,24 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hadoop.tmp.dir</name>
|
||||
<value>/tmp/hive_zero</value>
|
||||
<description>default is /tmp/hadoop-${user.name} and will be overridden</description>
|
||||
</property>
|
||||
</configuration>
|
34
core/src/test/resources/hive-site.xml
Normal file
34
core/src/test/resources/hive-site.xml
Normal file
|
@ -0,0 +1,34 @@
|
|||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>hive.in.test</name>
|
||||
<value>true</value>
|
||||
<description>Internal marker for test.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.tmp.dir</name>
|
||||
<value>/tmp/hive_one</value>
|
||||
<description>default is /tmp/hadoop-${user.name} and will be overridden</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>io.file.buffer.size</name>
|
||||
<value>201811</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -46,7 +46,6 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUp
|
|||
import org.apache.spark.shuffle.FetchFailedException
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
|
||||
class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
|
||||
|
||||
test("Only one SparkContext may be active at a time") {
|
||||
|
@ -1151,6 +1150,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
|
|||
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
|
||||
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
|
||||
}
|
||||
|
||||
test("SPARK-34346: hadoop configuration priority for spark/hive/hadoop configs") {
|
||||
val testKey = "hadoop.tmp.dir"
|
||||
val bufferKey = "io.file.buffer.size"
|
||||
val hadoopConf0 = new Configuration()
|
||||
|
||||
val hiveConfFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
|
||||
assert(hiveConfFile != null)
|
||||
hadoopConf0.addResource(hiveConfFile)
|
||||
assert(hadoopConf0.get(testKey) === "/tmp/hive_one")
|
||||
assert(hadoopConf0.get(bufferKey) === "201811")
|
||||
|
||||
val sparkConf = new SparkConf()
|
||||
.setAppName("test")
|
||||
.setMaster("local")
|
||||
.set(BUFFER_SIZE, 65536)
|
||||
sc = new SparkContext(sparkConf)
|
||||
assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_one",
|
||||
"hive configs have higher priority than hadoop ones ")
|
||||
assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536,
|
||||
"spark configs have higher priority than hive ones")
|
||||
|
||||
resetSparkContext()
|
||||
|
||||
sparkConf
|
||||
.set("spark.hadoop.hadoop.tmp.dir", "/tmp/hive_two")
|
||||
.set(s"spark.hadoop.$bufferKey", "20181117")
|
||||
sc = new SparkContext(sparkConf)
|
||||
assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_two",
|
||||
"spark.hadoop configs have higher priority than hive/hadoop ones")
|
||||
assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536,
|
||||
"spark configs have higher priority than spark.hadoop configs")
|
||||
}
|
||||
}
|
||||
|
||||
object SparkContextSuite {
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.util.UUID
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
import javax.annotation.concurrent.GuardedBy
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
|
@ -56,7 +55,7 @@ private[sql] class SharedState(
|
|||
private[sql] val (conf, hadoopConf) = {
|
||||
// Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
|
||||
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
|
||||
val initialConfigsWithoutWarehouse = SharedState.loadHiveConfFile(
|
||||
val initialConfigsWithoutWarehouse = SharedState.resolveWarehousePath(
|
||||
sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs)
|
||||
|
||||
val confClone = sparkContext.conf.clone()
|
||||
|
@ -220,31 +219,27 @@ object SharedState extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
|
||||
* the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
|
||||
* Determine the warehouse path using the key `spark.sql.warehouse.dir` in the [[SparkConf]]
|
||||
* or the initial options from the very first created SparkSession instance, and
|
||||
* `hive.metastore.warehouse.dir` in hadoop [[Configuration]].
|
||||
* The priority order is:
|
||||
* s.s.w.d in initialConfigs
|
||||
* > s.s.w.d in spark conf (user specified)
|
||||
* > h.m.w.d in hadoop conf (user specified)
|
||||
* > s.s.w.d in spark conf (default)
|
||||
*
|
||||
* After resolved, the final value will be application wide reachable in the sparkConf and
|
||||
* hadoopConf from [[SparkContext]].
|
||||
*
|
||||
* @return a map contain the rest of initial options with the warehouses keys cleared
|
||||
*/
|
||||
def loadHiveConfFile(
|
||||
def resolveWarehousePath(
|
||||
sparkConf: SparkConf,
|
||||
hadoopConf: Configuration,
|
||||
initialConfigs: scala.collection.Map[String, String] = Map.empty)
|
||||
: scala.collection.Map[String, String] = {
|
||||
|
||||
def containsInSparkConf(key: String): Boolean = {
|
||||
sparkConf.contains(key) || sparkConf.contains("spark.hadoop." + key) ||
|
||||
(key.startsWith("hive") && sparkConf.contains("spark." + key))
|
||||
}
|
||||
|
||||
val hiveWarehouseKey = "hive.metastore.warehouse.dir"
|
||||
val configFile = Utils.getContextOrSparkClassLoader.getResourceAsStream("hive-site.xml")
|
||||
if (configFile != null) {
|
||||
logInfo(s"loading hive config file: $configFile")
|
||||
val hadoopConfTemp = new Configuration()
|
||||
hadoopConfTemp.clear()
|
||||
hadoopConfTemp.addResource(configFile)
|
||||
for (entry <- hadoopConfTemp.asScala if !containsInSparkConf(entry.getKey)) {
|
||||
hadoopConf.set(entry.getKey, entry.getValue)
|
||||
}
|
||||
}
|
||||
val sparkWarehouseOption =
|
||||
initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key))
|
||||
if (initialConfigs.contains(hiveWarehouseKey)) {
|
||||
|
|
|
@ -52,15 +52,4 @@ class SharedStateSuite extends SharedSparkSession {
|
|||
assert(conf.isInstanceOf[Configuration])
|
||||
assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
|
||||
}
|
||||
|
||||
test("SPARK-33740: hadoop configs in hive-site.xml can overrides pre-existing hadoop ones") {
|
||||
val conf = new SparkConf()
|
||||
val hadoopConf = new Configuration()
|
||||
SharedState.loadHiveConfFile(conf, hadoopConf, Map.empty)
|
||||
assert(hadoopConf.get("hadoop.tmp.dir") === "/tmp/hive_one")
|
||||
hadoopConf.clear()
|
||||
SharedState.loadHiveConfFile(
|
||||
conf.set("spark.hadoop.hadoop.tmp.dir", "noop"), hadoopConf, Map.empty)
|
||||
assert(hadoopConf.get("hadoop.tmp.dir") === null)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,7 +133,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
|
|||
UserGroupInformation.getCurrentUser.addCredentials(credentials)
|
||||
}
|
||||
|
||||
SharedState.loadHiveConfFile(sparkConf, conf)
|
||||
SharedState.resolveWarehousePath(sparkConf, conf)
|
||||
SessionState.start(sessionState)
|
||||
|
||||
// Clean up after we exit
|
||||
|
|
Loading…
Reference in a new issue