[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

## What changes were proposed in this pull request?

The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist.

This PR adds `last(stageId)` to ensure it returns a correct `StageData`

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20654 from zsxwing/SPARK-23481.
This commit is contained in:
Shixiong Zhu 2018-02-21 15:37:28 -08:00
parent 3fd0ccb13f
commit 744d5af652
2 changed files with 38 additions and 1 deletions

View file

@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
}
def lastStageAttempt(stageId: Int): v1.StageData = {
val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
val it = store.view(classOf[StageDataWrapper])
.index("stageId")
.reverse()
.first(stageId)
.last(stageId)
.closeableIterator()
try {
if (it.hasNext()) {

View file

@ -1121,6 +1121,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
test("lastStageAttempt should fail when the stage doesn't exist") {
val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
val listener = new AppStatusListener(store, testConf, true)
val appStore = new AppStatusStore(store)
val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
time += 1
stage1.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
stage1.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage1))
// Make stage 3 complete before stage 2 so that stage 3 will be evicted
time += 1
stage3.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
stage3.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage3))
time += 1
stage2.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
stage2.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(stage2))
assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2))
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}
test("driver logs") {
val listener = new AppStatusListener(store, conf, true)