diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2865c3bc86..645f58716d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -181,33 +181,47 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { - private def run( + case class Arguments( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], - userClassPath: Seq[URL]) { + userClassPath: mutable.ListBuffer[URL]) + + def main(args: Array[String]): Unit = { + val createFn: (RpcEnv, Arguments, SparkEnv) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => + new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.hostname, arguments.cores, arguments.userClassPath, env) + } + run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) + System.exit(0) + } + + def run( + arguments: Arguments, + backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code - Utils.checkHost(hostname) + Utils.checkHost(arguments.hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val fetcher = RpcEnv.create( "driverPropsFetcher", - hostname, + arguments.hostname, -1, executorConf, new SecurityManager(executorConf), clientMode = true) - val driver = fetcher.setupEndpointRefByURI(driverUrl) + val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl) val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) - val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. @@ -225,19 +239,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } - val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) + val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname, + arguments.cores, cfg.ioEncryptionKey, isLocal = false) - env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( - env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) - workerUrl.foreach { url => + env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env)) + arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() } } - def main(args: Array[String]) { + def parseArguments(args: Array[String], classNameForEntry: String): Arguments = { var driverUrl: String = null var executorId: String = null var hostname: String = null @@ -276,24 +289,24 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // scalastyle:off println System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") // scalastyle:on println - printUsageAndExit() + printUsageAndExit(classNameForEntry) } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { - printUsageAndExit() + printUsageAndExit(classNameForEntry) } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) - System.exit(0) + Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl, + userClassPath) } - private def printUsageAndExit() = { + private def printUsageAndExit(classNameForEntry: String): Unit = { // scalastyle:off println System.err.println( - """ - |Usage: CoarseGrainedExecutorBackend [options] + s""" + |Usage: $classNameForEntry [options] | | Options are: | --driver-url @@ -307,5 +320,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // scalastyle:on println System.exit(1) } - } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 51a13d342a..8f1a12726b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -480,6 +480,18 @@ To use a custom metrics.properties for the application master and executors, upd {{HTTP_SCHEME}} `http://` or `https://` according to YARN HTTP policy. (Configured via `yarn.http.policy`) + + {{NM_HOST}} + The "host" of node where container was run. + + + {{NM_PORT}} + The "port" of node manager where container was run. + + + {{NM_HTTP_PORT}} + The "port" of node manager's http server where container was run. + {{NM_HTTP_ADDRESS}} Http URI of the node on which the container is allocated. @@ -502,6 +514,12 @@ To use a custom metrics.properties for the application master and executors, upd +For example, suppose you would like to point log url link to Job History Server directly instead of let NodeManager http server redirects it, you can configure `spark.history.custom.executor.log.url` as below: + + `{{HTTP_SCHEME}}:/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096` + + NOTE: you need to replace `` and `` with actual value. + # Important notes - Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 0b909d15c2..2f8f2a0a11 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -202,7 +202,7 @@ private[yarn] class ExecutorRunnable( val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ - Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", + Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, @@ -235,21 +235,6 @@ private[yarn] class ExecutorRunnable( } } - // Add log urls, as well as executor attributes - container.foreach { c => - YarnContainerInfoHelper.getLogUrls(conf, Some(c)).foreach { m => - m.foreach { case (fileName, url) => - env("SPARK_LOG_URL_" + fileName.toUpperCase(Locale.ROOT)) = url - } - } - - YarnContainerInfoHelper.getAttributes(conf, Some(c)).foreach { m => - m.foreach { case (attr, value) => - env("SPARK_EXECUTOR_ATTRIBUTE_" + attr.toUpperCase(Locale.ROOT)) = value - } - } - } - env } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala new file mode 100644 index 0000000000..53e99d992d --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.net.URL + +import org.apache.spark.SparkEnv +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEnv +import org.apache.spark.util.YarnContainerInfoHelper + +/** + * Custom implementation of CoarseGrainedExecutorBackend for YARN resource manager. + * This class extracts executor log URLs and executor attributes from system environment which + * properties are available for container being set via YARN. + */ +private[spark] class YarnCoarseGrainedExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + hostname, + cores, + userClassPath, + env) with Logging { + + private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) + + override def extractLogUrls: Map[String, String] = { + YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) + .getOrElse(Map()) + } + + override def extractAttributes: Map[String, String] = { + YarnContainerInfoHelper.getAttributes(hadoopConfiguration, container = None) + .getOrElse(Map()) + } +} + +private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { + + def main(args: Array[String]): Unit = { + val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) => + new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, + arguments.hostname, arguments.cores, arguments.userClassPath, env) + } + val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, + this.getClass.getCanonicalName.stripSuffix("$")) + CoarseGrainedExecutorBackend.run(backendArgs, createFn) + System.exit(0) + } + +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala index 96350cdece..5e39422e86 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala @@ -59,6 +59,9 @@ private[spark] object YarnContainerInfoHelper extends Logging { val yarnConf = new YarnConfiguration(conf) Some(Map( "HTTP_SCHEME" -> getYarnHttpScheme(yarnConf), + "NM_HOST" -> getNodeManagerHost(container), + "NM_PORT" -> getNodeManagerPort(container), + "NM_HTTP_PORT" -> getNodeManagerHttpPort(container), "NM_HTTP_ADDRESS" -> getNodeManagerHttpAddress(container), "CLUSTER_ID" -> getClusterId(yarnConf).getOrElse(""), "CONTAINER_ID" -> ConverterUtils.toString(getContainerId(container)), @@ -97,7 +100,22 @@ private[spark] object YarnContainerInfoHelper extends Logging { def getNodeManagerHttpAddress(container: Option[Container]): String = container match { case Some(c) => c.getNodeHttpAddress - case None => System.getenv(Environment.NM_HOST.name()) + ":" + - System.getenv(Environment.NM_HTTP_PORT.name()) + case None => getNodeManagerHost(None) + ":" + getNodeManagerHttpPort(None) } + + def getNodeManagerHost(container: Option[Container]): String = container match { + case Some(c) => c.getNodeHttpAddress.split(":")(0) + case None => System.getenv(Environment.NM_HOST.name()) + } + + def getNodeManagerHttpPort(container: Option[Container]): String = container match { + case Some(c) => c.getNodeHttpAddress.split(":")(1) + case None => System.getenv(Environment.NM_HTTP_PORT.name()) + } + + def getNodeManagerPort(container: Option[Container]): String = container match { + case Some(_) => "-1" // Just return invalid port given we cannot retrieve the value + case None => System.getenv(Environment.NM_PORT.name()) + } + } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index b3c5bbd263..56b7dfc136 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -477,6 +477,9 @@ private object YarnClusterDriver extends Logging with Matchers { val driverAttributes = listener.driverAttributes.get val expectationAttributes = Map( "HTTP_SCHEME" -> YarnContainerInfoHelper.getYarnHttpScheme(yarnConf), + "NM_HOST" -> YarnContainerInfoHelper.getNodeManagerHost(container = None), + "NM_PORT" -> YarnContainerInfoHelper.getNodeManagerPort(container = None), + "NM_HTTP_PORT" -> YarnContainerInfoHelper.getNodeManagerHttpPort(container = None), "NM_HTTP_ADDRESS" -> YarnContainerInfoHelper.getNodeManagerHttpAddress(container = None), "CLUSTER_ID" -> YarnContainerInfoHelper.getClusterId(yarnConf).getOrElse(""), "CONTAINER_ID" -> ConverterUtils.toString(containerId),