Merge pull request #244 from leftnoteasy/master

Added SPARK-968 implementation for review

Added SPARK-968 implementation for review
This commit is contained in:
Reynold Xin 2013-12-23 10:38:20 -08:00
commit 11107c9de5
7 changed files with 257 additions and 5 deletions

View file

@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
"Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
<td>{Utils.msDurationToString(kv(10).toLong)}</td>
<td>{Utils.bytesToString(kv(11).toLong)}</td>
<td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
totalTasks.toString
totalTasks.toString,
totalDuration.toString,
totalShuffleRead.toString,
totalShuffleWrite.toString
)
}
@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
executorToDuration.put(eid, newDuration)
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
// update shuffle read/write
if (null != taskEnd.taskMetrics) {
taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
shuffleRead.remoteBytesRead))
taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
shuffleWrite.shuffleBytesWritten))
}
}
}
}

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
/** class for reporting aggregated metrics for each executors in stageUI */
private[spark] class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
var shuffleRead : Long = 0
var shuffleWrite : Long = 0
}

View file

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import scala.xml.Node
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.util.Utils
import scala.collection.mutable
/** Page showing executor summary */
private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
val listener = parent.listener
val dateFmt = parent.dateFmt
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
listener.synchronized {
executorTable()
}
}
/** Special table which merges two header cells. */
private def executorTable[T](): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>Task Time</th>
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
</thead>
<tbody>
{createExecutorTable()}
</tbody>
</table>
}
private def createExecutorTable() : Seq[Node] = {
// make a executor-id -> address map
val executorIdToAddress = mutable.HashMap[String, String]()
val storageStatusList = parent.sc.getExecutorStorageStatus
for (statusId <- 0 until storageStatusList.size) {
val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
val address = blockManagerId.hostPort
val executorId = blockManagerId.executorId
executorIdToAddress.put(executorId, address)
}
val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
case Some(x) => {
x.toSeq.sortBy(_._1).map{
case (k,v) => {
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td>{parent.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td>{Utils.bytesToString(v.shuffleRead)}</td>
<td>{Utils.bytesToString(v.shuffleWrite)}</td>
</tr>
}
}
}
case _ => { Seq[Node]() }
}
}
}

View file

@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
op = new ExecutorSummary())
val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}
// update duration
y.taskTime += taskEnd.taskInfo.duration
taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
y.shuffleRead += shuffleRead.remoteBytesRead
}
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
y.shuffleWrite += shuffleWrite.shuffleBytesWritten
}
}
case _ => {}
}
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>

View file

@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
<strong>Total duration across all tasks: </strong>
<strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@ -166,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)

View file

@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>

View file

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.apache.spark.scheduler._
import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.scheduler.SparkListenerTaskStart
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
// nothing in it
assert(listener.stageIdToExecutorSummaries.size == 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.size == 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
.shuffleRead == 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
taskInfo.finishTime = 1
listener.onTaskEnd(new SparkListenerTaskEnd(
new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
.shuffleRead == 1000)
}
}