[SPARK-24415][CORE] Fixed the aggregated stage metrics by retaining stage objects in liveStages until all tasks are complete
The problem occurs because stage object is removed from liveStages in AppStatusListener onStageCompletion. Because of this any onTaskEnd event received after onStageCompletion event do not update stage metrics. The fix is to retain stage objects in liveStages until all tasks are complete. 1. Fixed the reproducible example posted in the JIRA 2. Added unit test Closes #22209 from ankuriitg/ankurgupta/SPARK-24415. Authored-by: ankurgupta <ankur.gupta@cloudera.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
parent
8440e30728
commit
39a02d8f75
|
@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
|
||||||
val e = it.next()
|
val e = it.next()
|
||||||
if (job.stageIds.contains(e.getKey()._1)) {
|
if (job.stageIds.contains(e.getKey()._1)) {
|
||||||
val stage = e.getValue()
|
val stage = e.getValue()
|
||||||
stage.status = v1.StageStatus.SKIPPED
|
if (v1.StageStatus.PENDING.equals(stage.status)) {
|
||||||
job.skippedStages += stage.info.stageId
|
stage.status = v1.StageStatus.SKIPPED
|
||||||
job.skippedTasks += stage.info.numTasks
|
job.skippedStages += stage.info.stageId
|
||||||
it.remove()
|
job.skippedTasks += stage.info.numTasks
|
||||||
update(stage, now)
|
job.activeStages -= 1
|
||||||
|
|
||||||
|
pools.get(stage.schedulingPool).foreach { pool =>
|
||||||
|
pool.stageIds = pool.stageIds - stage.info.stageId
|
||||||
|
update(pool, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
it.remove()
|
||||||
|
update(stage, now, last = true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -506,7 +515,16 @@ private[spark] class AppStatusListener(
|
||||||
if (killedDelta > 0) {
|
if (killedDelta > 0) {
|
||||||
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
|
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
|
||||||
}
|
}
|
||||||
maybeUpdate(stage, now)
|
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
|
||||||
|
val removeStage =
|
||||||
|
stage.activeTasks == 0 &&
|
||||||
|
(v1.StageStatus.COMPLETE.equals(stage.status) ||
|
||||||
|
v1.StageStatus.FAILED.equals(stage.status))
|
||||||
|
if (removeStage) {
|
||||||
|
update(stage, now, last = true)
|
||||||
|
} else {
|
||||||
|
maybeUpdate(stage, now)
|
||||||
|
}
|
||||||
|
|
||||||
// Store both stage ID and task index in a single long variable for tracking at job level.
|
// Store both stage ID and task index in a single long variable for tracking at job level.
|
||||||
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
|
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
|
||||||
|
@ -521,7 +539,7 @@ private[spark] class AppStatusListener(
|
||||||
if (killedDelta > 0) {
|
if (killedDelta > 0) {
|
||||||
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
|
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
|
||||||
}
|
}
|
||||||
maybeUpdate(job, now)
|
conditionalLiveUpdate(job, now, removeStage)
|
||||||
}
|
}
|
||||||
|
|
||||||
val esummary = stage.executorSummary(event.taskInfo.executorId)
|
val esummary = stage.executorSummary(event.taskInfo.executorId)
|
||||||
|
@ -532,7 +550,7 @@ private[spark] class AppStatusListener(
|
||||||
if (metricsDelta != null) {
|
if (metricsDelta != null) {
|
||||||
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
|
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
|
||||||
}
|
}
|
||||||
maybeUpdate(esummary, now)
|
conditionalLiveUpdate(esummary, now, removeStage)
|
||||||
|
|
||||||
if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
|
if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
|
||||||
stage.cleaning = true
|
stage.cleaning = true
|
||||||
|
@ -540,6 +558,9 @@ private[spark] class AppStatusListener(
|
||||||
cleanupTasks(stage)
|
cleanupTasks(stage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (removeStage) {
|
||||||
|
liveStages.remove((event.stageId, event.stageAttemptId))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
|
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
|
||||||
|
@ -564,17 +585,13 @@ private[spark] class AppStatusListener(
|
||||||
|
|
||||||
// Force an update on live applications when the number of active tasks reaches 0. This is
|
// Force an update on live applications when the number of active tasks reaches 0. This is
|
||||||
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
|
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
|
||||||
if (exec.activeTasks == 0) {
|
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
|
||||||
liveUpdate(exec, now)
|
|
||||||
} else {
|
|
||||||
maybeUpdate(exec, now)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
|
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
|
||||||
val maybeStage =
|
val maybeStage =
|
||||||
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
|
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
|
||||||
maybeStage.foreach { stage =>
|
maybeStage.foreach { stage =>
|
||||||
val now = System.nanoTime()
|
val now = System.nanoTime()
|
||||||
stage.info = event.stageInfo
|
stage.info = event.stageInfo
|
||||||
|
@ -608,7 +625,6 @@ private[spark] class AppStatusListener(
|
||||||
}
|
}
|
||||||
|
|
||||||
stage.executorSummaries.values.foreach(update(_, now))
|
stage.executorSummaries.values.foreach(update(_, now))
|
||||||
update(stage, now, last = true)
|
|
||||||
|
|
||||||
val executorIdsForStage = stage.blackListedExecutors
|
val executorIdsForStage = stage.blackListedExecutors
|
||||||
executorIdsForStage.foreach { executorId =>
|
executorIdsForStage.foreach { executorId =>
|
||||||
|
@ -616,6 +632,13 @@ private[spark] class AppStatusListener(
|
||||||
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
|
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove stage only if there are no active tasks remaining
|
||||||
|
val removeStage = stage.activeTasks == 0
|
||||||
|
update(stage, now, last = removeStage)
|
||||||
|
if (removeStage) {
|
||||||
|
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
|
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
|
||||||
|
@ -882,6 +905,14 @@ private[spark] class AppStatusListener(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
|
||||||
|
if (condition) {
|
||||||
|
liveUpdate(entity, now)
|
||||||
|
} else {
|
||||||
|
maybeUpdate(entity, now)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def cleanupExecutors(count: Long): Unit = {
|
private def cleanupExecutors(count: Long): Unit = {
|
||||||
// Because the limit is on the number of *dead* executors, we need to calculate whether
|
// Because the limit is on the number of *dead* executors, we need to calculate whether
|
||||||
// there are actually enough dead executors to be deleted.
|
// there are actually enough dead executors to be deleted.
|
||||||
|
|
|
@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
|
||||||
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
|
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-24415: update metrics for tasks that finish late") {
|
||||||
|
val listener = new AppStatusListener(store, conf, true)
|
||||||
|
|
||||||
|
val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
|
||||||
|
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
|
||||||
|
|
||||||
|
// Start job
|
||||||
|
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
|
||||||
|
|
||||||
|
// Start 2 stages
|
||||||
|
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
|
||||||
|
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
|
||||||
|
|
||||||
|
// Start 2 Tasks
|
||||||
|
val tasks = createTasks(2, Array("1"))
|
||||||
|
tasks.foreach { task =>
|
||||||
|
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Task 1 Finished
|
||||||
|
time += 1
|
||||||
|
tasks(0).markFinished(TaskState.FINISHED, time)
|
||||||
|
listener.onTaskEnd(
|
||||||
|
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
|
||||||
|
|
||||||
|
// Stage 1 Completed
|
||||||
|
stage1.failureReason = Some("Failed")
|
||||||
|
listener.onStageCompleted(SparkListenerStageCompleted(stage1))
|
||||||
|
|
||||||
|
// Stop job 1
|
||||||
|
time += 1
|
||||||
|
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
|
||||||
|
|
||||||
|
// Task 2 Killed
|
||||||
|
time += 1
|
||||||
|
tasks(1).markFinished(TaskState.FINISHED, time)
|
||||||
|
listener.onTaskEnd(
|
||||||
|
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
|
||||||
|
TaskKilled(reason = "Killed"), tasks(1), null))
|
||||||
|
|
||||||
|
// Ensure killed task metrics are updated
|
||||||
|
val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
|
||||||
|
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
|
||||||
|
assert(failedStages.size == 1)
|
||||||
|
assert(failedStages.head.numKilledTasks == 1)
|
||||||
|
assert(failedStages.head.numCompleteTasks == 1)
|
||||||
|
|
||||||
|
val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
|
||||||
|
assert(allJobs.size == 1)
|
||||||
|
assert(allJobs.head.numKilledTasks == 1)
|
||||||
|
assert(allJobs.head.numCompletedTasks == 1)
|
||||||
|
assert(allJobs.head.numActiveStages == 1)
|
||||||
|
assert(allJobs.head.numFailedStages == 1)
|
||||||
|
}
|
||||||
|
|
||||||
test("driver logs") {
|
test("driver logs") {
|
||||||
val listener = new AppStatusListener(store, conf, true)
|
val listener = new AppStatusListener(store, conf, true)
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,12 @@ class UISeleniumSuite
|
||||||
inputStream.foreachRDD { rdd =>
|
inputStream.foreachRDD { rdd =>
|
||||||
rdd.foreach(_ => {})
|
rdd.foreach(_ => {})
|
||||||
try {
|
try {
|
||||||
rdd.foreach(_ => throw new RuntimeException("Oops"))
|
rdd.foreach { _ =>
|
||||||
|
// Failing the task with id 15 to ensure only one task fails
|
||||||
|
if (TaskContext.get.taskAttemptId() % 15 == 0) {
|
||||||
|
throw new RuntimeException("Oops")
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: SparkException if e.getMessage.contains("Oops") =>
|
case e: SparkException if e.getMessage.contains("Oops") =>
|
||||||
}
|
}
|
||||||
|
@ -166,7 +171,7 @@ class UISeleniumSuite
|
||||||
|
|
||||||
// Check job progress
|
// Check job progress
|
||||||
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
|
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
|
||||||
List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
|
List("4/4", "4/4", "4/4", "3/4 (1 failed)"))
|
||||||
|
|
||||||
// Check stacktrace
|
// Check stacktrace
|
||||||
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq
|
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq
|
||||||
|
|
Loading…
Reference in a new issue