[SPARK-26219][CORE] Executor summary should get updated for failure jobs in the history server UI

The root cause of the problem is, whenever the taskEnd event comes after stageCompleted event, execSummary is updating only for live UI. we need to update for history UI too.

To see the previous discussion, refer: PR for https://github.com/apache/spark/pull/23038, https://issues.apache.org/jira/browse/SPARK-26100.

Added UT. Manually verified

Test step to reproduce:

```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```

Open Executors page from the History UI

Before patch:
![screenshot from 2018-11-29 22-13-34](https://user-images.githubusercontent.com/23054875/49246338-a21ead00-f43a-11e8-8214-f1020420be52.png)

After patch:
![screenshot from 2018-11-30 00-54-49](https://user-images.githubusercontent.com/23054875/49246353-aa76e800-f43a-11e8-98ef-7faecaa7a50e.png)

Closes #23181 from shahidki31/executorUpdate.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Shahid 2018-11-30 15:20:05 -08:00 committed by Marcelo Vanzin
parent 36edbac1c8
commit 8856e9f6a3
2 changed files with 63 additions and 46 deletions

View file

@ -641,9 +641,14 @@ private[spark] class AppStatusListener(
} }
} }
// Force an update on live applications when the number of active tasks reaches 0. This is // Force an update on both live and history applications when the number of active tasks
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date. // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
conditionalLiveUpdate(exec, now, exec.activeTasks == 0) // reliably up to date.
if (exec.activeTasks == 0) {
update(exec, now)
} else {
maybeUpdate(exec, now)
}
} }
} }
@ -1024,14 +1029,6 @@ private[spark] class AppStatusListener(
} }
} }
private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}
private def cleanupExecutors(count: Long): Unit = { private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether // Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted. // there are actually enough dead executors to be deleted.

View file

@ -1273,48 +1273,68 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1) assert(allJobs.head.numFailedStages == 1)
} }
test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { Seq(true, false).foreach { live =>
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {
val listener = new AppStatusListener(store, testConf, true) val testConf = if (live) {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
} else {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
}
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") val listener = new AppStatusListener(store, testConf, live)
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
val tasks = createTasks(4, Array("1", "2")) listener.onExecutorAdded(createExecutorAddedEvent(1))
tasks.foreach { task => listener.onExecutorAdded(createExecutorAddedEvent(2))
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
} listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
time += 1 val tasks = createTasks(4, Array("1", "2"))
tasks(0).markFinished(TaskState.FINISHED, time) tasks.foreach { task =>
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
Success, tasks(0), null)) }
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(1), null))
stage.failureReason = Some("Failed") time += 1
listener.onStageCompleted(SparkListenerStageCompleted(stage)) tasks(0).markFinished(TaskState.FINISHED, time)
time += 1 listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(1), null))
time += 1 stage.failureReason = Some("Failed")
tasks(2).markFinished(TaskState.FAILED, time) listener.onStageCompleted(SparkListenerStageCompleted(stage))
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", time += 1
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
time += 1 new RuntimeException("Bad Executor"))))
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) time += 1
esummary.foreach { execSummary => tasks(2).markFinished(TaskState.FAILED, time)
assert(execSummary.failedTasks === 1) listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
assert(execSummary.succeededTasks === 1) ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
assert(execSummary.killedTasks === 0) time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
}
val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
assert(allExecutorSummary.size === 2)
allExecutorSummary.foreach { allExecSummary =>
assert(allExecSummary.failedTasks === 1)
assert(allExecSummary.activeTasks === 0)
assert(allExecSummary.completedTasks === 1)
}
store.delete(classOf[ExecutorSummaryWrapper], "1")
store.delete(classOf[ExecutorSummaryWrapper], "2")
} }
} }