+ }
+
+ // TODO: We should consider increasing the number of this parameter over time
+ // if we find that it's okay.
+ private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
+
+
def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
val parameterId = request.getParameter("id")
@@ -196,7 +226,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
def accumulableRow(acc: AccumulableInfo): Elem =
{acc.name}
{acc.value}
- val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow,
+ val accumulableTable = UIUtils.listingTable(
+ accumulableHeaders,
+ accumulableRow,
accumulables.values.toSeq)
val taskHeadersAndCssClasses: Seq[(String, String)] =
@@ -232,10 +264,17 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val unzipped = taskHeadersAndCssClasses.unzip
+ val currentTime = System.currentTimeMillis()
val taskTable = UIUtils.listingTable(
unzipped._1,
- taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput,
- stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled),
+ taskRow(
+ hasAccumulators,
+ stageData.hasInput,
+ stageData.hasOutput,
+ stageData.hasShuffleRead,
+ stageData.hasShuffleWrite,
+ stageData.hasBytesSpilled,
+ currentTime),
tasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
@@ -460,25 +499,192 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
dagViz ++
maybeExpandDagViz ++
showAdditionalMetrics ++
+ makeTimeline(stageData.taskData.values.toSeq, currentTime) ++
Summary Metrics for {numCompleted} Completed Tasks
++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
+ {
+ if (MAX_TIMELINE_TASKS < tasks.size) {
+
+ This stage has more than the maximum number of tasks that can be shown in the
+ visualization! Only the most recent {MAX_TIMELINE_TASKS} tasks
+ (of {tasks.size} total) are shown.
+
+ } else {
+ Seq.empty
+ }
+ }
+
+
+
+ Enable zooming
+
+
+ {TIMELINE_LEGEND}
+
++
+
+ }
+
def taskRow(
hasAccumulators: Boolean,
hasInput: Boolean,
hasOutput: Boolean,
hasShuffleRead: Boolean,
hasShuffleWrite: Boolean,
- hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
+ hasBytesSpilled: Boolean,
+ currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
taskData match { case TaskUIData(info, metrics, errorMessage) =>
- val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+ val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
@@ -542,7 +748,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
+
{info.index}
{info.taskId}
{
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 55169956d8..5989f0035b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -25,6 +25,7 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
/** Web UI showing progress status of all stages in the given SparkContext. */
private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
val sc = parent.sc
+ val conf = parent.conf
val killEnabled = parent.killEnabled
val progressListener = parent.jobProgressListener
val operationGraphListener = parent.operationGraphListener