[SPARK-24203][CORE] Make executor's bindAddress configurable

### What changes were proposed in this pull request?
With this change, executor's bindAddress is passed as an input parameter for RPCEnv.create.
A previous PR https://github.com/apache/spark/pull/21261 which addressed the same, was using a Spark Conf property to get the bindAddress which wouldn't have worked for multiple executors.
This PR is to enable anyone overriding CoarseGrainedExecutorBackend with their custom one to be able to invoke CoarseGrainedExecutorBackend.main() along with the option to configure bindAddress.

### Why are the changes needed?
This is required when Kernel-based Virtual Machine (KVM)'s are used inside Linux container where the hostname is not the same as container hostname.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Tested by running jobs with executors on KVMs inside a linux container.

Closes #26331 from nishchalv/SPARK-29670.

Lead-authored-by: Nishchal Venkataramana <nishchal@apple.com>
Co-authored-by: nishchal <nishchal@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
This commit is contained in:
Nishchal Venkataramana 2019-11-13 22:01:48 +00:00 committed by DB Tsai
parent 363af16c72
commit 833a9f12e2
4 changed files with 66 additions and 14 deletions

View file

@ -196,6 +196,7 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
@ -203,7 +204,7 @@ object SparkEnv extends Logging {
val env = create(
conf,
executorId,
hostname,
bindAddress,
hostname,
None,
isLocal,
@ -214,6 +215,17 @@ object SparkEnv extends Logging {
env
}
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
createExecutorEnv(conf, executorId, hostname,
hostname, numCores, ioEncryptionKey, isLocal)
}
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/

View file

@ -46,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
@ -227,6 +228,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case class Arguments(
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
appId: String,
@ -238,7 +240,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
@ -259,10 +261,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
var driver: RpcEndpointRef = null
@ -297,8 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
arguments.cores, cfg.ioEncryptionKey, isLocal = false)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
arguments.workerUrl.foreach { url =>
@ -311,6 +315,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var bindAddress: String = null
var hostname: String = null
var cores: Int = 0
var resourcesFileOpt: Option[String] = None
@ -327,6 +332,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--bind-address") :: value :: tail =>
bindAddress = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
@ -364,7 +372,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit(classNameForEntry)
}
Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
if (bindAddress == null) {
bindAddress = hostname
}
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
userClassPath, resourcesFileOpt)
}
@ -377,6 +389,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| Options are:
| --driver-url <driverUrl>
| --executor-id <executorId>
| --bind-address <bindAddress>
| --hostname <hostname>
| --cores <cores>
| --resourcesFile <fileWithJSONResourceInformation>

View file

@ -54,7 +54,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@ -101,7 +101,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
withTempDir { tmpDir =>
@ -129,7 +129,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
// not enough gpu's on the executor
@ -168,7 +168,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
// executor resources < required
@ -200,7 +200,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
val parsedResources = backend.parseOrFindResources(None)
@ -226,7 +226,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1",
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, Seq.empty[URL], env, None)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
@ -254,7 +254,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", 4, Seq.empty[URL], env, None)
"host1", "host1", 4, Seq.empty[URL], env, None)
assert(backend.taskResources.isEmpty)
val taskId = 1000000
@ -289,6 +289,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
}
}
test("SPARK-24203 when bindAddress is not set, it defaults to hostname") {
val args1 = Array(
"--driver-url", "driverurl",
"--executor-id", "1",
"--hostname", "host1",
"--cores", "1",
"--app-id", "app1")
val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
assert(arg.bindAddress == "host1")
}
test("SPARK-24203 when bindAddress is different, it does not default to hostname") {
val args1 = Array(
"--driver-url", "driverurl",
"--executor-id", "1",
"--hostname", "host1",
"--bind-address", "bindaddress1",
"--cores", "1",
"--app-id", "app1")
val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "")
assert(arg.bindAddress == "bindaddress1")
}
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
rpcEnv: Option[RpcEnv] = None): SparkEnv = {
val mockEnv = mock[SparkEnv]

View file

@ -34,6 +34,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
@ -43,6 +44,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
userClassPath,
@ -68,7 +70,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,