[SPARK-7657] [YARN] Add driver logs links in application UI, in cluster mode.

This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #6166 from harishreedharan/am-log-link and squashes the following commits:

943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
b3f9b9d [Hari Shreedharan] Updated comment based on feedback.
0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes.
537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid.
4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls.
6c5c285 [Hari Shreedharan] Import order.
346f4ea [Hari Shreedharan] Review feedback fixes.
629c1dc [Hari Shreedharan] Cleanup.
99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected.
c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself.
50cdae3 [Hari Shreedharan] Added unit test, made the approach generic.
402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well.
1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
This commit is contained in:
Hari Shreedharan 2015-05-21 20:24:28 -05:00 committed by Imran Rashid
parent 85b96372cf
commit 956c4c910c
9 changed files with 136 additions and 12 deletions

View file

@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted // Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one). // the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId)) startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
} }
/** Post the application end event */ /** Post the application end event */

View file

@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend {
*/ */
def applicationAttemptId(): Option[String] = None def applicationAttemptId(): Option[String] = None
/**
* Get the URLs for the driver logs. These URLs are used to display the links in the UI
* Executors tab for the driver.
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None
} }

View file

@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent extends SparkListenerEvent
@DeveloperApi @DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String], case class SparkListenerApplicationStart(
time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent appName: String,
appId: Option[String],
time: Long,
sparkUser: String,
appAttemptId: Option[String],
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
@DeveloperApi @DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

View file

@ -19,7 +19,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import org.apache.spark.ExceptionFailure import org.apache.spark.{ExceptionFailure, SparkContext}
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener} import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
uiData.finishReason = Some(executorRemoved.reason) uiData.finishReason = Some(executorRemoved.reason)
} }
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
}
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1

View file

@ -196,7 +196,8 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~ ("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~ ("User" -> applicationStart.sparkUser) ~
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
} }
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@ -570,7 +571,8 @@ private[spark] object JsonProtocol {
val time = (json \ "Timestamp").extract[Long] val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String] val sparkUser = (json \ "User").extract[String]
val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String]) val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
} }
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {

View file

@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
/** Returns the attempt ID. */ /** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId = { def getAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
val containerId = ConverterUtils.toContainerId(containerIdString)
containerId.getApplicationAttemptId()
} }
/** Returns the configuration for the AmIpFilter to add to the Spark UI. */ /** Returns the configuration for the AmIpFilter to add to the Spark UI. */

View file

@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType} import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
tokenRenewer.foreach(_.stop()) tokenRenewer.foreach(_.stop())
} }
private[spark] def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
} }
object YarnSparkHadoopUtil { object YarnSparkHadoopUtil {

View file

@ -17,10 +17,19 @@
package org.apache.spark.scheduler.cluster package org.apache.spark.scheduler.cluster
import java.net.NetworkInterface
import scala.collection.JavaConverters._
import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam import org.apache.spark.util.{IntParam, Utils}
private[spark] class YarnClusterSchedulerBackend( private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl, scheduler: TaskSchedulerImpl,
@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application attempt ID is not set.") logError("Application attempt ID is not set.")
super.applicationAttemptId super.applicationAttemptId
} }
override def getDriverLogUrls: Option[Map[String, String]] = {
var yarnClientOpt: Option[YarnClient] = None
var driverLogs: Option[Map[String, String]] = None
try {
val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
val containerId = YarnSparkHadoopUtil.get.getContainerId
yarnClientOpt = Some(YarnClient.createYarnClient())
yarnClientOpt.foreach { yarnClient =>
yarnClient.init(yarnConf)
yarnClient.start()
// For newer versions of YARN, we can find the HTTP address for a given node by getting a
// container report for a given container. But container reports came only in Hadoop 2.4,
// so we basically have to get the node reports for all nodes and find the one which runs
// this container. For that we have to compare the node's host against the current host.
// Since the host can have multiple addresses, we need to compare against all of them to
// find out if one matches.
// Get all the addresses of this node.
val addresses =
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala)
.toSeq
// Find a node report that matches one of the addresses
val nodeReport =
yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x =>
val host = x.getNodeId.getHost
addresses.exists { address =>
address.getHostAddress == host ||
address.getHostName == host ||
address.getCanonicalHostName == host
}
}
// Now that we have found the report for the Node Manager that the AM is running on, we
// can get the base HTTP address for the Node manager from the report.
// The format used for the logs for each container is well-known and can be constructed
// using the NM's HTTP address and the container ID.
// The NM may be running several containers, but we can build the URL for the AM using
// the AM's container ID, which we already know.
nodeReport.foreach { report =>
val httpAddress = report.getHttpAddress
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = yarnConf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val user = Utils.getCurrentUserName()
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
logDebug(s"Base URL for logs: $baseUrl")
driverLogs = Some(
Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0"))
}
}
} catch {
case e: Exception =>
logInfo("Node Report API is not available in the version of YARN being used, so AM" +
" logs link will not appear in application UI", e)
} finally {
yarnClientOpt.foreach(_.close())
}
driverLogs
}
} }

View file

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.mutable
import scala.io.Source
import com.google.common.base.Charsets.UTF_8 import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams import com.google.common.io.ByteStreams
@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private[spark] class SaveExecutorInfo extends SparkListener { private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
var driverLogs: Option[collection.Map[String, String]] = None
override def onExecutorAdded(executor: SparkListenerExecutorAdded) { override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo addedExecutorInfos(executor.executorId) = executor.executorInfo
} }
override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
driverLogs = appStart.driverLogs
}
} }
private object YarnClusterDriver extends Logging with Matchers { private object YarnClusterDriver extends Logging with Matchers {
@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers {
val sc = new SparkContext(new SparkConf() val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName) .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
val conf = sc.getConf
val status = new File(args(0)) val status = new File(args(0))
var result = "failure" var result = "failure"
try { try {
@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info => executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty) assert(info.logUrlMap.nonEmpty)
} }
// If we are running in yarn-cluster mode, verify that driver logs are downloadable.
if (conf.get("spark.master") == "yarn-cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
assert(driverLogs.containsKey("stderr"))
assert(driverLogs.containsKey("stdout"))
val stderr = driverLogs("stderr") // YARN puts everything in stderr.
val lines = Source.fromURL(stderr).getLines()
// Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
// cluster mode.
assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
}
} }
} }