[SPARK-23635][YARN] AM env variable should not overwrite same name env variable set through spark.executorEnv.

## What changes were proposed in this pull request?

In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values for executors.

To reproduce issue, user could start spark-shell like:

```
./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf  spark.yarn.appMasterEnv.SPARK_ABC=am_val
```

Then check executor env variables by

```
sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println)
```

We will always get `am_val` instead of `executor_val`. So we should not let AM to overwrite specifically set executor env variables.

## How was this patch tested?

Added UT and tested in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #20799 from jerryshao/SPARK-23635.
This commit is contained in:
jerryshao 2018-03-16 16:22:03 +08:00
parent ca83526de5
commit c952000487
2 changed files with 50 additions and 8 deletions

View file

@ -220,12 +220,6 @@ private[yarn] class ExecutorRunnable(
val env = new HashMap[String, String]()
Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
// This is kept for backward compatibility and consistency with hadoop
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
}
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
@ -233,6 +227,20 @@ private[yarn] class ExecutorRunnable(
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }
sparkConf.getExecutorEnv.foreach { case (key, value) =>
if (key == Environment.CLASSPATH.name()) {
// If the key of env variable is CLASSPATH, we assume it is a path and append it.
// This is kept for backward compatibility and consistency with hadoop
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
} else {
// For other env variables, simply overwrite the value.
env(key) = value
}
}
// Add log urls
container.foreach { c =>
sys.env.get("SPARK_USER").foreach { user =>
@ -245,8 +253,6 @@ private[yarn] class ExecutorRunnable(
}
}
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }
env
}
}

View file

@ -225,6 +225,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
finalState should be (SparkAppHandle.State.FAILED)
}
test("executor env overwrite AM env in client mode") {
testExecutorEnv(true)
}
test("executor env overwrite AM env in cluster mode") {
testExecutorEnv(false)
}
private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
@ -305,6 +313,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, executorResult, "OVERRIDDEN")
}
private def testExecutorEnv(clientMode: Boolean): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(ExecutorEnvTestApp.getClass),
appArgs = Seq(result.getAbsolutePath),
extraConf = Map(
"spark.yarn.appMasterEnv.TEST_ENV" -> "am_val",
"spark.executorEnv.TEST_ENV" -> "executor_val"
)
)
checkResult(finalState, result, "true")
}
}
private[spark] class SaveExecutorInfo extends SparkListener {
@ -526,3 +545,20 @@ private object SparkContextTimeoutApp {
}
}
private object ExecutorEnvTestApp {
def main(args: Array[String]): Unit = {
val status = args(0)
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val executorEnvs = sc.parallelize(Seq(1)).flatMap { _ => sys.env }.collect().toMap
val result = sparkConf.getExecutorEnv.forall { case (k, v) =>
executorEnvs.get(k).contains(v)
}
Files.write(result.toString, new File(status), StandardCharsets.UTF_8)
sc.stop()
}
}