[SPARK-23630][YARN] Allow user's hadoop conf customizations to take effect.

This change restores functionality that was inadvertently removed as part
of the fix for SPARK-22372.

Also modified an existing unit test to make sure the feature works as intended.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20776 from vanzin/SPARK-23630.
This commit is contained in:
Marcelo Vanzin 2018-03-09 10:36:38 -08:00
parent d90e77bd0e
commit 2c3673680e
3 changed files with 44 additions and 15 deletions

View file

@ -111,7 +111,9 @@ class SparkHadoopUtil extends Logging {
* subsystems.
*/
def newConfiguration(conf: SparkConf): Configuration = {
SparkHadoopUtil.newConfiguration(conf)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
hadoopConf.addResource(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE)
hadoopConf
}
/**
@ -435,6 +437,13 @@ object SparkHadoopUtil {
*/
private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
/**
* Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
* cluster's Hadoop config. It is up to the Spark code launching the application to create
* this file if it's desired. If the file doesn't exist, it will just be ignored.
*/
private[spark] val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
def get: SparkHadoopUtil = instance
/**

View file

@ -696,7 +696,13 @@ private[spark] class Client(
}
}
Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
// SPARK-23630: during testing, Spark scripts filter out hadoop conf dirs so that user's
// environments do not interfere with tests. This allows a special env variable during
// tests so that custom conf dirs can be used by unit tests.
val confDirs = Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR") ++
(if (Utils.isTesting) Seq("SPARK_TEST_HADOOP_CONF_DIR") else Nil)
confDirs.foreach { envKey =>
sys.env.get(envKey).foreach { path =>
val dir = new File(path)
if (dir.isDirectory()) {
@ -753,7 +759,7 @@ private[spark] class Client(
// Save the YARN configuration into a separate file that will be overlayed on top of the
// cluster's Hadoop conf.
confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
confStream.putNextEntry(new ZipEntry(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE))
hadoopConf.writeXml(confStream)
confStream.closeEntry()
@ -1220,10 +1226,6 @@ private object Client extends Logging {
// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"
// Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
// cluster's Hadoop config.
val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"

View file

@ -114,12 +114,25 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
))
}
test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414)") {
test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630)") {
// Create a custom hadoop config file, to make sure it's contents are propagated to the driver.
val customConf = Utils.createTempDir()
val coreSite = """<?xml version="1.0" encoding="UTF-8"?>
|<configuration>
| <property>
| <name>spark.test.key</name>
| <value>testvalue</value>
| </property>
|</configuration>
|""".stripMargin
Files.write(coreSite, new File(customConf, "core-site.xml"), StandardCharsets.UTF_8)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(false,
mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
appArgs = Seq("key=value", result.getAbsolutePath()),
extraConf = Map("spark.hadoop.key" -> "value"))
appArgs = Seq("key=value", "spark.test.key=testvalue", result.getAbsolutePath()),
extraConf = Map("spark.hadoop.key" -> "value"),
extraEnv = Map("SPARK_TEST_HADOOP_CONF_DIR" -> customConf.getAbsolutePath()))
checkResult(finalState, result)
}
@ -319,13 +332,13 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers {
private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
if (args.length < 2) {
// scalastyle:off println
System.err.println(
s"""
|Invalid command line: ${args.mkString(" ")}
|
|Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
|Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value]+ [result file]
""".stripMargin)
// scalastyle:on println
System.exit(1)
@ -335,11 +348,16 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn test using SparkHadoopUtil's conf"))
val kv = args(0).split("=")
val status = new File(args(1))
val kvs = args.take(args.length - 1).map { kv =>
val parsed = kv.split("=")
(parsed(0), parsed(1))
}
val status = new File(args.last)
var result = "failure"
try {
SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
kvs.foreach { case (k, v) =>
SparkHadoopUtil.get.conf.get(k) should be (v)
}
result = "success"
} finally {
Files.write(result, status, StandardCharsets.UTF_8)