From 4e6a7a0b3e55098374a22f3ae9500404f7e4e91a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 2 Nov 2014 10:44:52 -0800 Subject: [PATCH] [SPARK-4166][Core][WebUI] Display the executor ID in the Web UI when ExecutorLostFailure happens Now when ExecutorLostFailure happens, it only displays `ExecutorLostFailure (executor lost)`. Adding the executor id will help locate the faulted executor. Author: zsxwing Closes #3033 from zsxwing/SPARK-4166 and squashes the following commits: ff4664c [zsxwing] Backward-compatible support c5c4cf2 [zsxwing] Display the executor ID in the Web UI when ExecutorLostFailure happens --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 4 ++-- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 8 ++++++-- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 5 +++-- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 8f0c5e7841..202fba699a 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -117,8 +117,8 @@ case object TaskKilled extends TaskFailedReason { * the task crashed the JVM. */ @DeveloperApi -case object ExecutorLostFailure extends TaskFailedReason { - override def toErrorString: String = "ExecutorLostFailure (executor lost)" +case class ExecutorLostFailure(execId: String) extends TaskFailedReason { + override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)" } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a976734007..d8fb640350 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -732,7 +732,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure) + handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId)) } // recalculate valid locality levels and waits when executor is lost recomputeLocality() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5b2e7d3a7e..43c7fba066 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -272,7 +272,7 @@ private[spark] object JsonProtocol { def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) - val json = taskEndReason match { + val json: JObject = taskEndReason match { case fetchFailed: FetchFailed => val blockManagerAddress = Option(fetchFailed.bmAddress). map(blockManagerIdToJson).getOrElse(JNothing) @@ -287,6 +287,8 @@ private[spark] object JsonProtocol { ("Description" -> exceptionFailure.description) ~ ("Stack Trace" -> stackTrace) ~ ("Metrics" -> metrics) + case ExecutorLostFailure(executorId) => + ("Executor ID" -> executorId) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -636,7 +638,9 @@ private[spark] object JsonProtocol { new ExceptionFailure(className, description, stackTrace, metrics) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled - case `executorLostFailure` => ExecutorLostFailure + case `executorLostFailure` => + val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String]) + ExecutorLostFailure(executorId.getOrElse("Unknown")) case `unknownReason` => UnknownReason } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4156..6567c5ab83 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -119,7 +119,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc new ExceptionFailure("Exception", "description", null, None), TaskResultLost, TaskKilled, - ExecutorLostFailure, + ExecutorLostFailure("0"), UnknownReason) var failCount = 0 for (reason <- taskFailedReasons) { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f1f88c5fd3..d235d7a0ed 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -115,7 +115,7 @@ class JsonProtocolSuite extends FunSuite { testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled) - testTaskEndReason(ExecutorLostFailure) + testTaskEndReason(ExecutorLostFailure("100")) testTaskEndReason(UnknownReason) // BlockId @@ -403,7 +403,8 @@ class JsonProtocolSuite extends FunSuite { assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals) case (TaskResultLost, TaskResultLost) => case (TaskKilled, TaskKilled) => - case (ExecutorLostFailure, ExecutorLostFailure) => + case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) => + assert(execId1 === execId2) case (UnknownReason, UnknownReason) => case _ => fail("Task end reasons don't match in types!") }