[SPARK-39542][YARN] Improve YARN client mode to support IPv6

### What changes were proposed in this pull request?

This PR aims to improve YARN client mode to support IPv6.

### Why are the changes needed?

On `IPv6`-only environment, YARN module passes the UTs if we exclude `ExtendedYarnTest`. Among the failures, this PR focus on `YARN client mode` first. Please note that YARN module passes if IPv4 and IPv6 coexist.

**BEFORE**
```
% build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
...
Using SPARK_LOCAL_HOSTNAME=[::1]
Using SPARK_LOCAL_IP=[::1]
...
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode *** FAILED *** (2 seconds, 144 milliseconds)
[info]   FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:233)
...
```

**AFTER**
```
% build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
...
Using SPARK_LOCAL_HOSTNAME=[::1]
Using SPARK_LOCAL_IP=[::1]
...
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode (10 seconds, 172 milliseconds)
[info] - run Spark in yarn-client mode with unmanaged am (7 seconds, 108 milliseconds)
[info] - run Spark in yarn-client mode with different configurations, ensuring redaction (8 seconds, 112 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' (8 seconds, 118 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' and gateway-replacement path (7 seconds, 115 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file' (9 seconds, 104 milliseconds)
[info] - run Python application in yarn-client mode (11 seconds, 95 milliseconds)
[info] Run completed in 1 minute, 21 seconds.
[info] Total number of tests run: 7
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 7, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #36939 from dongjoon-hyun/SPARK-39542.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
master
Dongjoon Hyun 2022-06-21 09:21:45 -07:00
parent 29cf0c4121
commit 678826e482
No known key found for this signature in database
GPG Key ID: EDA00CE834F0FC5C
3 changed files with 13 additions and 4 deletions

View File

@ -313,7 +313,7 @@ private[spark] class ApplicationMaster(
sparkConf.get(DRIVER_PORT)),
YarnSchedulerBackend.ENDPOINT_NAME)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(Utils.localHostName, -1, sparkConf,
registerAM(Utils.localHostNameForURI(), -1, sparkConf,
sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
val encodedAppId = URLEncoder.encode(appAttemptId.getApplicationId.toString, "UTF-8")
addAmIpFilter(Some(driverRef), s"/proxy/$encodedAppId")
@ -542,7 +542,7 @@ private[spark] class ApplicationMaster(
}
private def runExecutorLauncher(): Unit = {
val hostname = Utils.localHostName
val hostname = Utils.localHostNameForURI()
val amCores = sparkConf.get(AM_CORES)
val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)

View File

@ -987,6 +987,8 @@ private[spark] class Client(
val javaOpts = ListBuffer[String]()
javaOpts += s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"
// SPARK-37106: To start AM with Java 17, `JavaModuleOptions.defaultModuleOptions`
// is added by default. It will not affect Java 8 and Java 11 due to existence of
// `-XX:+IgnoreUnrecognizedVMOptions`.

View File

@ -106,6 +106,9 @@ abstract class BaseYarnClusterSuite
yarnConf.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
yarnConf.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
// Support both IPv4 and IPv6
yarnConf.set("yarn.resourcemanager.hostname", Utils.localHostNameForURI())
try {
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(yarnConf)
@ -133,7 +136,7 @@ abstract class BaseYarnClusterSuite
// done so in a timely manner (defined to be 10 seconds).
val config = yarnCluster.getConfig()
val startTimeNs = System.nanoTime()
while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
while (config.get(YarnConfiguration.RM_ADDRESS).split(":").last == "0") {
if (System.nanoTime() - startTimeNs > TimeUnit.SECONDS.toNanos(10)) {
throw new IllegalStateException("Timed out waiting for RM to come up.")
}
@ -169,7 +172,9 @@ abstract class BaseYarnClusterSuite
outFile: Option[File] = None): SparkAppHandle.State = {
val deployMode = if (clientMode) "client" else "cluster"
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
val env = Map(
"YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath(),
"SPARK_PREFER_IPV6" -> Utils.preferIPv6.toString) ++ extraEnv
val launcher = new SparkLauncher(env.asJava)
if (klass.endsWith(".py")) {
@ -182,6 +187,8 @@ abstract class BaseYarnClusterSuite
.setMaster("yarn")
.setDeployMode(deployMode)
.setConf(EXECUTOR_INSTANCES.key, "1")
.setConf(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS,
s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}")
.setPropertiesFile(propsFile)
.addAppArgs(appArgs.toArray: _*)