diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 15d1c081bc..5fe1c663af 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -196,6 +196,7 @@ object SparkEnv extends Logging { private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, + bindAddress: String, hostname: String, numCores: Int, ioEncryptionKey: Option[Array[Byte]], @@ -203,7 +204,7 @@ object SparkEnv extends Logging { val env = create( conf, executorId, - hostname, + bindAddress, hostname, None, isLocal, @@ -214,6 +215,17 @@ object SparkEnv extends Logging { env } + private[spark] def createExecutorEnv( + conf: SparkConf, + executorId: String, + hostname: String, + numCores: Int, + ioEncryptionKey: Option[Array[Byte]], + isLocal: Boolean): SparkEnv = { + createExecutorEnv(conf, executorId, hostname, + hostname, numCores, ioEncryptionKey, isLocal) + } + /** * Helper method to create a SparkEnv for a driver or an executor. */ diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b4bca1e940..beb9d98754 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -46,6 +46,7 @@ private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, + bindAddress: String, hostname: String, cores: Int, userClassPath: Seq[URL], @@ -227,6 +228,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case class Arguments( driverUrl: String, executorId: String, + bindAddress: String, hostname: String, cores: Int, appId: String, @@ -238,7 +240,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env, + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFileOpt) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) @@ -259,10 +261,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val executorConf = new SparkConf val fetcher = RpcEnv.create( "driverPropsFetcher", + arguments.bindAddress, arguments.hostname, -1, executorConf, new SecurityManager(executorConf), + numUsableCores = 0, clientMode = true) var driver: RpcEndpointRef = null @@ -297,8 +301,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } driverConf.set(EXECUTOR_ID, arguments.executorId) - val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname, - arguments.cores, cfg.ioEncryptionKey, isLocal = false) + val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, + arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env)) arguments.workerUrl.foreach { url => @@ -311,6 +315,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def parseArguments(args: Array[String], classNameForEntry: String): Arguments = { var driverUrl: String = null var executorId: String = null + var bindAddress: String = null var hostname: String = null var cores: Int = 0 var resourcesFileOpt: Option[String] = None @@ -327,6 +332,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--executor-id") :: value :: tail => executorId = value argv = tail + case ("--bind-address") :: value :: tail => + bindAddress = value + argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail @@ -364,7 +372,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit(classNameForEntry) } - Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl, + if (bindAddress == null) { + bindAddress = hostname + } + + Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, userClassPath, resourcesFileOpt) } @@ -377,6 +389,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | Options are: | --driver-url | --executor-id + | --bind-address | --hostname | --cores | --resourcesFile diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index e40cf0d66c..7e96039ca9 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -54,7 +54,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") @@ -76,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -101,7 +101,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) withTempDir { tmpDir => @@ -129,7 +129,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) // not enough gpu's on the executor @@ -168,7 +168,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) // executor resources < required @@ -200,7 +200,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) val parsedResources = backend.parseOrFindResources(None) @@ -226,7 +226,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function - val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", + val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", 4, Seq.empty[URL], env, None) val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(gpuArgs)) @@ -254,7 +254,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", 4, Seq.empty[URL], env, None) + "host1", "host1", 4, Seq.empty[URL], env, None) assert(backend.taskResources.isEmpty) val taskId = 1000000 @@ -289,6 +289,31 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite } } + test("SPARK-24203 when bindAddress is not set, it defaults to hostname") { + val args1 = Array( + "--driver-url", "driverurl", + "--executor-id", "1", + "--hostname", "host1", + "--cores", "1", + "--app-id", "app1") + + val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "") + assert(arg.bindAddress == "host1") + } + + test("SPARK-24203 when bindAddress is different, it does not default to hostname") { + val args1 = Array( + "--driver-url", "driverurl", + "--executor-id", "1", + "--hostname", "host1", + "--bind-address", "bindaddress1", + "--cores", "1", + "--app-id", "app1") + + val arg = CoarseGrainedExecutorBackend.parseArguments(args1, "") + assert(arg.bindAddress == "bindaddress1") + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer, rpcEnv: Option[RpcEnv] = None): SparkEnv = { val mockEnv = mock[SparkEnv] diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 2e5748b614..d46424e5e9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -34,6 +34,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend( rpcEnv: RpcEnv, driverUrl: String, executorId: String, + bindAddress: String, hostname: String, cores: Int, userClassPath: Seq[URL], @@ -43,6 +44,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend( rpcEnv, driverUrl, executorId, + bindAddress, hostname, cores, userClassPath, @@ -68,7 +70,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.hostname, arguments.cores, arguments.userClassPath, env, + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.resourcesFileOpt) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,