[SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve

## What changes were proposed in this pull request?

This patch proposes to change the approach on extracting log urls as well as attributes from YARN executor:

 - AS-IS: extract information from `Container` API and include them to container launch context
- TO-BE: let YARN executor self-extracting information

This approach leads us to populate more attributes like nodemanager's IPC port which can let us configure custom log url to JHS log url directly.

## How was this patch tested?

Existing unit tests.

Closes #23706 from HeartSaVioR/SPARK-26790.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-02-15 12:44:14 -08:00 committed by Marcelo Vanzin
parent 71170e74df
commit b6c6875571
6 changed files with 151 additions and 38 deletions

View file

@ -181,33 +181,47 @@ private[spark] class CoarseGrainedExecutorBackend(
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
case class Arguments(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
userClassPath: mutable.ListBuffer[URL])
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
}
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = {
Utils.initDaemon(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)
Utils.checkHost(arguments.hostname)
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()
// Create SparkEnv using properties we fetched from the driver.
@ -225,19 +239,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
arguments.cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}
def main(args: Array[String]) {
def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
@ -276,24 +289,24 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
printUsageAndExit(classNameForEntry)
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
printUsageAndExit(classNameForEntry)
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
userClassPath)
}
private def printUsageAndExit() = {
private def printUsageAndExit(classNameForEntry: String): Unit = {
// scalastyle:off println
System.err.println(
"""
|Usage: CoarseGrainedExecutorBackend [options]
s"""
|Usage: $classNameForEntry [options]
|
| Options are:
| --driver-url <driverUrl>
@ -307,5 +320,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// scalastyle:on println
System.exit(1)
}
}

View file

@ -480,6 +480,18 @@ To use a custom metrics.properties for the application master and executors, upd
<td>{{HTTP_SCHEME}}</td>
<td>`http://` or `https://` according to YARN HTTP policy. (Configured via `yarn.http.policy`)</td>
</tr>
<tr>
<td>{{NM_HOST}}</td>
<td>The "host" of node where container was run.</td>
</tr>
<tr>
<td>{{NM_PORT}}</td>
<td>The "port" of node manager where container was run.</td>
</tr>
<tr>
<td>{{NM_HTTP_PORT}}</td>
<td>The "port" of node manager's http server where container was run.</td>
</tr>
<tr>
<td>{{NM_HTTP_ADDRESS}}</td>
<td>Http URI of the node on which the container is allocated.</td>
@ -502,6 +514,12 @@ To use a custom metrics.properties for the application master and executors, upd
</tr>
</table>
For example, suppose you would like to point log url link to Job History Server directly instead of let NodeManager http server redirects it, you can configure `spark.history.custom.executor.log.url` as below:
`{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096`
NOTE: you need to replace `<JHS_POST>` and `<JHS_PORT>` with actual value.
# Important notes
- Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.

View file

@ -202,7 +202,7 @@ private[yarn] class ExecutorRunnable(
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
@ -235,21 +235,6 @@ private[yarn] class ExecutorRunnable(
}
}
// Add log urls, as well as executor attributes
container.foreach { c =>
YarnContainerInfoHelper.getLogUrls(conf, Some(c)).foreach { m =>
m.foreach { case (fileName, url) =>
env("SPARK_LOG_URL_" + fileName.toUpperCase(Locale.ROOT)) = url
}
}
YarnContainerInfoHelper.getAttributes(conf, Some(c)).foreach { m =>
m.foreach { case (attr, value) =>
env("SPARK_EXECUTOR_ATTRIBUTE_" + attr.toUpperCase(Locale.ROOT)) = value
}
}
}
env
}
}

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import java.net.URL
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.YarnContainerInfoHelper
/**
* Custom implementation of CoarseGrainedExecutorBackend for YARN resource manager.
* This class extracts executor log URLs and executor attributes from system environment which
* properties are available for container being set via YARN.
*/
private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
hostname,
cores,
userClassPath,
env) with Logging {
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
override def extractLogUrls: Map[String, String] = {
YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
.getOrElse(Map())
}
override def extractAttributes: Map[String, String] = {
YarnContainerInfoHelper.getAttributes(hadoopConfiguration, container = None)
.getOrElse(Map())
}
}
private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]): Unit = {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.hostname, arguments.cores, arguments.userClassPath, env)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))
CoarseGrainedExecutorBackend.run(backendArgs, createFn)
System.exit(0)
}
}

View file

@ -59,6 +59,9 @@ private[spark] object YarnContainerInfoHelper extends Logging {
val yarnConf = new YarnConfiguration(conf)
Some(Map(
"HTTP_SCHEME" -> getYarnHttpScheme(yarnConf),
"NM_HOST" -> getNodeManagerHost(container),
"NM_PORT" -> getNodeManagerPort(container),
"NM_HTTP_PORT" -> getNodeManagerHttpPort(container),
"NM_HTTP_ADDRESS" -> getNodeManagerHttpAddress(container),
"CLUSTER_ID" -> getClusterId(yarnConf).getOrElse(""),
"CONTAINER_ID" -> ConverterUtils.toString(getContainerId(container)),
@ -97,7 +100,22 @@ private[spark] object YarnContainerInfoHelper extends Logging {
def getNodeManagerHttpAddress(container: Option[Container]): String = container match {
case Some(c) => c.getNodeHttpAddress
case None => System.getenv(Environment.NM_HOST.name()) + ":" +
System.getenv(Environment.NM_HTTP_PORT.name())
case None => getNodeManagerHost(None) + ":" + getNodeManagerHttpPort(None)
}
def getNodeManagerHost(container: Option[Container]): String = container match {
case Some(c) => c.getNodeHttpAddress.split(":")(0)
case None => System.getenv(Environment.NM_HOST.name())
}
def getNodeManagerHttpPort(container: Option[Container]): String = container match {
case Some(c) => c.getNodeHttpAddress.split(":")(1)
case None => System.getenv(Environment.NM_HTTP_PORT.name())
}
def getNodeManagerPort(container: Option[Container]): String = container match {
case Some(_) => "-1" // Just return invalid port given we cannot retrieve the value
case None => System.getenv(Environment.NM_PORT.name())
}
}

View file

@ -477,6 +477,9 @@ private object YarnClusterDriver extends Logging with Matchers {
val driverAttributes = listener.driverAttributes.get
val expectationAttributes = Map(
"HTTP_SCHEME" -> YarnContainerInfoHelper.getYarnHttpScheme(yarnConf),
"NM_HOST" -> YarnContainerInfoHelper.getNodeManagerHost(container = None),
"NM_PORT" -> YarnContainerInfoHelper.getNodeManagerPort(container = None),
"NM_HTTP_PORT" -> YarnContainerInfoHelper.getNodeManagerHttpPort(container = None),
"NM_HTTP_ADDRESS" -> YarnContainerInfoHelper.getNodeManagerHttpAddress(container = None),
"CLUSTER_ID" -> YarnContainerInfoHelper.getClusterId(yarnConf).getOrElse(""),
"CONTAINER_ID" -> ConverterUtils.toString(containerId),