From d7d5bdfd79bcca405ea10d218b4a2e7b1efb9083 Mon Sep 17 00:00:00 2001 From: Pavithraramachandran Date: Fri, 10 Jul 2020 13:55:20 -0700 Subject: [PATCH] [SPARK-32103][CORE] Support IPv6 host/port in core module ### What changes were proposed in this pull request? In IPv6 scenario, the current logic to split hostname and port is not correct. ### Why are the changes needed? to support IPV6 deployment scenario ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT and IPV6 spark deployment with yarn Closes #28931 from PavithraRamachandran/ipv6_issue. Authored-by: Pavithraramachandran Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/SparkStatusTracker.scala | 6 +- .../org/apache/spark/status/LiveEntity.scala | 4 +- .../org/apache/spark/status/storeTypes.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 48 ++++++--- .../org/apache/spark/util/UtilsSuite.scala | 97 +++++++++++++++++++ .../spark/deploy/yarn/YarnRMClient.scala | 3 +- 6 files changed, 142 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 555c085d85..37e673cd8c 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -21,6 +21,7 @@ import java.util.Arrays import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.StageStatus +import org.apache.spark.util.Utils /** * Low-level status reporting APIs for monitoring job and stage progress. @@ -103,10 +104,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore */ def getExecutorInfos: Array[SparkExecutorInfo] = { store.executorList(true).map { exec => - val (host, port) = exec.hostPort.split(":", 2) match { - case Array(h, p) => (h, p.toInt) - case Array(h) => (h, -1) - } + val (host, port) = Utils.parseHostPort(exec.hostPort) val cachedMem = exec.memoryMetrics.map { mem => mem.usedOnHeapStorageMemory + mem.usedOffHeapStorageMemory }.getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 86cb4fe138..8147821499 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} import org.apache.spark.status.api.v1 import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI -import org.apache.spark.util.AccumulatorContext +import org.apache.spark.util.{AccumulatorContext, Utils} import org.apache.spark.util.collection.OpenHashSet /** @@ -307,7 +307,7 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend // peak values for executor level metrics val peakExecutorMetrics = new ExecutorMetrics() - def hostname: String = if (host != null) host else hostPort.split(":")(0) + def hostname: String = if (host != null) host else Utils.parseHostPort(hostPort)._1 override protected def doUpdate(): Any = { val memoryMetrics = if (totalOnHeap >= 0) { diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index b40f7304b7..5a16482329 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1._ import org.apache.spark.ui.scope._ +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVIndex private[spark] case class AppStatusStoreMetadata(version: Long) @@ -57,7 +58,7 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { private def active: Boolean = info.isActive @JsonIgnore @KVIndex("host") - val host: String = info.hostPort.split(":")(0) + val host: String = Utils.parseHostPort(info.hostPort)._1 } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9636fe88c7..2b4b2508f9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1026,13 +1026,27 @@ private[spark] object Utils extends Logging { customHostname.getOrElse(InetAddresses.toUriString(localIpAddress)) } + /** + * Checks if the host contains only valid hostname/ip without port + * NOTE: Incase of IPV6 ip it should be enclosed inside [] + */ def checkHost(host: String): Unit = { - assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host") + if (host != null && host.split(":").length > 2) { + assert(host.startsWith("[") && host.endsWith("]"), + s"Expected hostname or IPv6 IP enclosed in [] but got $host") + } else { + assert(host != null && host.indexOf(':') == -1, s"Expected hostname or IP but got $host") + } } def checkHostPort(hostPort: String): Unit = { - assert(hostPort != null && hostPort.indexOf(':') != -1, - s"Expected host and port but got $hostPort") + if (hostPort != null && hostPort.split(":").length > 2) { + assert(hostPort != null && hostPort.indexOf("]:") != -1, + s"Expected host and port but got $hostPort") + } else { + assert(hostPort != null && hostPort.indexOf(':') != -1, + s"Expected host and port but got $hostPort") + } } // Typically, this will be of order of number of nodes in cluster @@ -1046,18 +1060,30 @@ private[spark] object Utils extends Logging { return cached } - val indx: Int = hostPort.lastIndexOf(':') - // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... - // but then hadoop does not support ipv6 right now. - // For now, we assume that if port exists, then it is valid - not check if it is an int > 0 - if (-1 == indx) { + def setDefaultPortValue: (String, Int) = { val retval = (hostPort, 0) hostPortParseResults.put(hostPort, retval) - return retval + retval + } + // checks if the hostport contains IPV6 ip and parses the host, port + if (hostPort != null && hostPort.split(":").length > 2) { + val indx: Int = hostPort.lastIndexOf("]:") + if (-1 == indx) { + return setDefaultPortValue + } + val port = hostPort.substring(indx + 2).trim() + val retval = (hostPort.substring(0, indx + 1).trim(), if (port.isEmpty) 0 else port.toInt) + hostPortParseResults.putIfAbsent(hostPort, retval) + } else { + val indx: Int = hostPort.lastIndexOf(':') + if (-1 == indx) { + return setDefaultPortValue + } + val port = hostPort.substring(indx + 1).trim() + val retval = (hostPort.substring(0, indx).trim(), if (port.isEmpty) 0 else port.toInt) + hostPortParseResults.putIfAbsent(hostPort, retval) } - val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt) - hostPortParseResults.putIfAbsent(hostPort, retval) hostPortParseResults.get(hostPort) } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index c9c8ae6023..7ec7c5afca 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1309,6 +1309,103 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]") assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]") } + + test("checkHost supports both IPV4 and IPV6") { + // IPV4 ips + Utils.checkHost("0.0.0.0") + var e: AssertionError = intercept[AssertionError] { + Utils.checkHost("0.0.0.0:0") + } + assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:0")) + e = intercept[AssertionError] { + Utils.checkHost("0.0.0.0:") + } + assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:")) + // IPV6 ips + Utils.checkHost("[::1]") + e = intercept[AssertionError] { + Utils.checkHost("[::1]:0") + } + assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:0")) + e = intercept[AssertionError] { + Utils.checkHost("[::1]:") + } + assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:")) + // hostname + Utils.checkHost("localhost") + e = intercept[AssertionError] { + Utils.checkHost("localhost:0") + } + assert(e.getMessage.contains("Expected hostname or IP but got localhost:0")) + e = intercept[AssertionError] { + Utils.checkHost("localhost:") + } + assert(e.getMessage.contains("Expected hostname or IP but got localhost:")) + } + + test("checkHostPort support IPV6 and IPV4") { + // IPV4 ips + Utils.checkHostPort("0.0.0.0:0") + var e: AssertionError = intercept[AssertionError] { + Utils.checkHostPort("0.0.0.0") + } + assert(e.getMessage.contains("Expected host and port but got 0.0.0.0")) + + // IPV6 ips + Utils.checkHostPort("[::1]:0") + e = intercept[AssertionError] { + Utils.checkHostPort("[::1]") + } + assert(e.getMessage.contains("Expected host and port but got [::1]")) + + // hostname + Utils.checkHostPort("localhost:0") + e = intercept[AssertionError] { + Utils.checkHostPort("localhost") + } + assert(e.getMessage.contains("Expected host and port but got localhost")) + } + + test("parseHostPort support IPV6 and IPV4") { + // IPV4 ips + var hostnamePort = Utils.parseHostPort("0.0.0.0:80") + assert(hostnamePort._1.equals("0.0.0.0")) + assert(hostnamePort._2 === 80) + + hostnamePort = Utils.parseHostPort("0.0.0.0") + assert(hostnamePort._1.equals("0.0.0.0")) + assert(hostnamePort._2 === 0) + + hostnamePort = Utils.parseHostPort("0.0.0.0:") + assert(hostnamePort._1.equals("0.0.0.0")) + assert(hostnamePort._2 === 0) + + // IPV6 ips + hostnamePort = Utils.parseHostPort("[::1]:80") + assert(hostnamePort._1.equals("[::1]")) + assert(hostnamePort._2 === 80) + + hostnamePort = Utils.parseHostPort("[::1]") + assert(hostnamePort._1.equals("[::1]")) + assert(hostnamePort._2 === 0) + + hostnamePort = Utils.parseHostPort("[::1]:") + assert(hostnamePort._1.equals("[::1]")) + assert(hostnamePort._2 === 0) + + // hostname + hostnamePort = Utils.parseHostPort("localhost:80") + assert(hostnamePort._1.equals("localhost")) + assert(hostnamePort._2 === 80) + + hostnamePort = Utils.parseHostPort("localhost") + assert(hostnamePort._1.equals("localhost")) + assert(hostnamePort._2 === 0) + + hostnamePort = Utils.parseHostPort("localhost:") + assert(hostnamePort._1.equals("localhost")) + assert(hostnamePort._2 === 0) + } } private class SimpleExtension diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 7c67493c33..2f272be60b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -30,6 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.util.Utils /** * Handles registering and unregistering the application with the YARN ResourceManager. @@ -107,7 +108,7 @@ private[spark] class YarnRMClient extends Logging { // so not all stable releases have it. val prefix = WebAppUtils.getHttpSchemePrefix(conf) val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) - val hosts = proxies.asScala.map(_.split(":").head) + val hosts = proxies.asScala.map(proxy => Utils.parseHostPort(proxy)._1) val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } val params = Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))