[SPARK-32103][CORE] Support IPv6 host/port in core module

### What changes were proposed in this pull request?
In IPv6 scenario, the current logic to split hostname and port is not correct.

### Why are the changes needed?
to support IPV6 deployment scenario

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT and IPV6 spark deployment with yarn

Closes #28931 from PavithraRamachandran/ipv6_issue.

Authored-by: Pavithraramachandran <pavi.rams@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Pavithraramachandran 2020-07-10 13:55:20 -07:00 committed by Dongjoon Hyun
parent 500877e785
commit d7d5bdfd79
6 changed files with 142 additions and 19 deletions

View file

@ -21,6 +21,7 @@ import java.util.Arrays
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.StageStatus
import org.apache.spark.util.Utils
/**
* Low-level status reporting APIs for monitoring job and stage progress.
@ -103,10 +104,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
store.executorList(true).map { exec =>
val (host, port) = exec.hostPort.split(":", 2) match {
case Array(h, p) => (h, p.toInt)
case Array(h) => (h, -1)
}
val (host, port) = Utils.parseHostPort(exec.hostPort)
val cachedMem = exec.memoryMetrics.map { mem =>
mem.usedOnHeapStorageMemory + mem.usedOffHeapStorageMemory
}.getOrElse(0L)

View file

@ -33,7 +33,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.{RDDInfo, StorageLevel}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.{AccumulatorContext, Utils}
import org.apache.spark.util.collection.OpenHashSet
/**
@ -307,7 +307,7 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
// peak values for executor level metrics
val peakExecutorMetrics = new ExecutorMetrics()
def hostname: String = if (host != null) host else hostPort.split(":")(0)
def hostname: String = if (host != null) host else Utils.parseHostPort(hostPort)._1
override protected def doUpdate(): Any = {
val memoryMetrics = if (totalOnHeap >= 0) {

View file

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1._
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.KVIndex
private[spark] case class AppStatusStoreMetadata(version: Long)
@ -57,7 +58,7 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
private def active: Boolean = info.isActive
@JsonIgnore @KVIndex("host")
val host: String = info.hostPort.split(":")(0)
val host: String = Utils.parseHostPort(info.hostPort)._1
}

View file

@ -1026,14 +1026,28 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}
/**
* Checks if the host contains only valid hostname/ip without port
* NOTE: Incase of IPV6 ip it should be enclosed inside []
*/
def checkHost(host: String): Unit = {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
if (host != null && host.split(":").length > 2) {
assert(host.startsWith("[") && host.endsWith("]"),
s"Expected hostname or IPv6 IP enclosed in [] but got $host")
} else {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname or IP but got $host")
}
}
def checkHostPort(hostPort: String): Unit = {
if (hostPort != null && hostPort.split(":").length > 2) {
assert(hostPort != null && hostPort.indexOf("]:") != -1,
s"Expected host and port but got $hostPort")
} else {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
}
}
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
@ -1046,18 +1060,30 @@ private[spark] object Utils extends Logging {
return cached
}
val indx: Int = hostPort.lastIndexOf(':')
// This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
// but then hadoop does not support ipv6 right now.
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0
if (-1 == indx) {
def setDefaultPortValue: (String, Int) = {
val retval = (hostPort, 0)
hostPortParseResults.put(hostPort, retval)
return retval
retval
}
// checks if the hostport contains IPV6 ip and parses the host, port
if (hostPort != null && hostPort.split(":").length > 2) {
val indx: Int = hostPort.lastIndexOf("]:")
if (-1 == indx) {
return setDefaultPortValue
}
val port = hostPort.substring(indx + 2).trim()
val retval = (hostPort.substring(0, indx + 1).trim(), if (port.isEmpty) 0 else port.toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
} else {
val indx: Int = hostPort.lastIndexOf(':')
if (-1 == indx) {
return setDefaultPortValue
}
val port = hostPort.substring(indx + 1).trim()
val retval = (hostPort.substring(0, indx).trim(), if (port.isEmpty) 0 else port.toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
}
val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
hostPortParseResults.putIfAbsent(hostPort, retval)
hostPortParseResults.get(hostPort)
}

View file

@ -1309,6 +1309,103 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.buildLocationMetadata(paths, 15) == "[path0, path1, path2]")
assert(Utils.buildLocationMetadata(paths, 25) == "[path0, path1, path2, path3]")
}
test("checkHost supports both IPV4 and IPV6") {
// IPV4 ips
Utils.checkHost("0.0.0.0")
var e: AssertionError = intercept[AssertionError] {
Utils.checkHost("0.0.0.0:0")
}
assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:0"))
e = intercept[AssertionError] {
Utils.checkHost("0.0.0.0:")
}
assert(e.getMessage.contains("Expected hostname or IP but got 0.0.0.0:"))
// IPV6 ips
Utils.checkHost("[::1]")
e = intercept[AssertionError] {
Utils.checkHost("[::1]:0")
}
assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:0"))
e = intercept[AssertionError] {
Utils.checkHost("[::1]:")
}
assert(e.getMessage.contains("Expected hostname or IPv6 IP enclosed in [] but got [::1]:"))
// hostname
Utils.checkHost("localhost")
e = intercept[AssertionError] {
Utils.checkHost("localhost:0")
}
assert(e.getMessage.contains("Expected hostname or IP but got localhost:0"))
e = intercept[AssertionError] {
Utils.checkHost("localhost:")
}
assert(e.getMessage.contains("Expected hostname or IP but got localhost:"))
}
test("checkHostPort support IPV6 and IPV4") {
// IPV4 ips
Utils.checkHostPort("0.0.0.0:0")
var e: AssertionError = intercept[AssertionError] {
Utils.checkHostPort("0.0.0.0")
}
assert(e.getMessage.contains("Expected host and port but got 0.0.0.0"))
// IPV6 ips
Utils.checkHostPort("[::1]:0")
e = intercept[AssertionError] {
Utils.checkHostPort("[::1]")
}
assert(e.getMessage.contains("Expected host and port but got [::1]"))
// hostname
Utils.checkHostPort("localhost:0")
e = intercept[AssertionError] {
Utils.checkHostPort("localhost")
}
assert(e.getMessage.contains("Expected host and port but got localhost"))
}
test("parseHostPort support IPV6 and IPV4") {
// IPV4 ips
var hostnamePort = Utils.parseHostPort("0.0.0.0:80")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 80)
hostnamePort = Utils.parseHostPort("0.0.0.0")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 0)
hostnamePort = Utils.parseHostPort("0.0.0.0:")
assert(hostnamePort._1.equals("0.0.0.0"))
assert(hostnamePort._2 === 0)
// IPV6 ips
hostnamePort = Utils.parseHostPort("[::1]:80")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 80)
hostnamePort = Utils.parseHostPort("[::1]")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 0)
hostnamePort = Utils.parseHostPort("[::1]:")
assert(hostnamePort._1.equals("[::1]"))
assert(hostnamePort._2 === 0)
// hostname
hostnamePort = Utils.parseHostPort("localhost:80")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 80)
hostnamePort = Utils.parseHostPort("localhost")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 0)
hostnamePort = Utils.parseHostPort("localhost:")
assert(hostnamePort._1.equals("localhost"))
assert(hostnamePort._2 === 0)
}
}
private class SimpleExtension

View file

@ -30,6 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
/**
* Handles registering and unregistering the application with the YARN ResourceManager.
@ -107,7 +108,7 @@ private[spark] class YarnRMClient extends Logging {
// so not all stable releases have it.
val prefix = WebAppUtils.getHttpSchemePrefix(conf)
val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
val hosts = proxies.asScala.map(_.split(":").head)
val hosts = proxies.asScala.map(proxy => Utils.parseHostPort(proxy)._1)
val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
val params =
Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))