SPARK-2450 Adds executor log links to Web UI

Adds links to stderr/stdout in the executor tab of the webUI for:
1) Standalone
2) Yarn client
3) Yarn cluster

This tries to add the log url support in a general way so as to make it easy to add support for all the
cluster managers. This is done by using environment variables to pass to the executor the log urls. The
SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added.

To propagate this information to the UI we use the onExecutorAdded spark listener event.

Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism.

Author: Kostas Sakellis <kostas@cloudera.com>
Author: Josh Rosen <joshrosen@databricks.com>

Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits:

d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main().
8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available
5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI
This commit is contained in:
Kostas Sakellis 2015-02-06 11:13:00 -08:00 committed by Josh Rosen
parent 4cdb26c174
commit 32e964c410
18 changed files with 178 additions and 30 deletions

View file

@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)

View file

@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
sparkHome,
executorDir,
akkaUrl,

View file

@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")

View file

@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
// Executors to driver
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
case class RegisterExecutor(
executorId: String,
hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}

View file

@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {

View file

@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int
) extends ExecutorInfo(executorHost, totalCores)
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

View file

@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int
) {
val totalCores: Int,
val logUrlMap: Map[String, String]) {
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@ -34,12 +34,13 @@ class ExecutorInfo(
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
case _ => false
}
override def hashCode(): Int = {
val state = Seq(executorHost, totalCores)
val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

View file

@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
new ExecutorInfo(o.host, o.cores)))
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

View file

@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Summary information about an executor to display in the UI. */
private case class ExecutorSummaryInfo(
// Needs to be private[ui] because of a false positive MiMa failure.
private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
rddBlocks: Int,
@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
maxMemory: Long)
maxMemory: Long,
executorLogs: Map[String, String])
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}
/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
@ -138,6 +142,21 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
if (logsExist) {
<td>
{
info.executorLogs.map { case (logName, logUrl) =>
<div>
<a href={logUrl}>
{logName}
</a>
</div>
}
}
</td>
}
}
{
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
new ExecutorSummaryInfo(
execId,
@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
maxMem,
executorLogs
)
}
}

View file

@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()
def storageStatusList = storageStatusListener.storageStatusList
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
}
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1

View file

@ -383,7 +383,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores)
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}
/** ------------------------------ *
@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
new ExecutorInfo(executorHost, totalCores)
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
}
/** -------------------------------- *

View file

@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}
class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
before {
sc = new SparkContext("local-cluster[2,1,512]", "test")
}
test("verify log urls get propagated from workers") {
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}
private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
}

View file

@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)

View file

@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])

View file

@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
testEvent(stageSubmitted, stageSubmittedJsonString)
@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
}
test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
testExecutorInfo(new ExecutorInfo("host", 43))
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
| "Total Cores": 11
| "Total Cores": 11,
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| }
| }
|}
"""

View file

@ -56,7 +56,7 @@ class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
lazy val env = prepareEnvironment
lazy val env = prepareEnvironment(container)
def run = {
logInfo("Starting Executor Container")
@ -254,7 +254,7 @@ class ExecutorRunnable(
localResources
}
private def prepareEnvironment: HashMap[String, String] = {
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
@ -270,6 +270,14 @@ class ExecutorRunnable(
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
}
// Add log urls
sys.env.get("SPARK_USER").foreach { user =>
val baseUrl = "http://%s/node/containerlogs/%s/%s"
.format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user)
env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0"
env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0"
}
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
env
}

View file

@ -21,16 +21,17 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable
import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
// verify log urls are present
YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}
test("run Spark in yarn-cluster mode") {
@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
"--num-executors", "1")
Client.main(args)
checkResult(result)
// verify log urls are present.
YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
var listener: SaveExecutorInfo = null
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
sc.addSparkListener(listener)
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {