[SPARK-27012][CORE] Storage tab shows rdd details even after executor ended

## What changes were proposed in this pull request?

After we cache a table, we can see its details in Storage Tab of spark UI. If the executor has shutdown ( graceful shutdown/ Dynamic executor scenario) UI still shows the rdd as cached and when we click the link it throws error. This is because on executor remove event, we fail to adjust rdd partition details  org.apache.spark.status.AppStatusListener#onExecutorRemoved

## How was this patch tested?

Have tested this fix in UI manually
Edit: Added UT

Closes #23920 from ajithme/cachestorage.

Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Ajith 2019-03-05 10:40:38 -08:00 committed by Marcelo Vanzin
parent b99610e9ed
commit 6207360b00
2 changed files with 124 additions and 0 deletions

View file

@ -211,6 +211,30 @@ private[spark] class AppStatusListener(
update(rdd, now)
}
}
// Remove all RDD partitions that reference the removed executor
liveRDDs.values.foreach { rdd =>
rdd.getPartitions.values
.filter(_.executors.contains(event.executorId))
.foreach { partition =>
if (partition.executors.length == 1) {
rdd.removePartition(partition.blockName)
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, partition.memoryUsed * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, partition.diskUsed * -1)
} else {
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1)
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
(partition.diskUsed / partition.executors.length) * -1)
partition.update(partition.executors
.filter(!_.equals(event.executorId)), rdd.storageLevel,
addDeltaToValue(partition.memoryUsed,
(partition.memoryUsed / partition.executors.length) * -1),
addDeltaToValue(partition.diskUsed,
(partition.diskUsed / partition.executors.length) * -1))
}
}
update(rdd, now)
}
if (isExecutorActiveForLiveStages(exec)) {
// the executor was running for a currently active stage, so save it for now in
// deadExecutors, and remove when there are no active stages overlapping with the

View file

@ -1520,6 +1520,106 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
test("storage information on executor lost/down") {
val listener = new AppStatusListener(store, conf, true)
val maxMemory = 42L
// Register a couple of block managers.
val bm1 = BlockManagerId("1", "1.example.com", 42)
val bm2 = BlockManagerId("2", "2.example.com", 84)
Seq(bm1, bm2).foreach { bm =>
listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory))
}
val rdd1b1 = RddBlock(1, 1, 1L, 2L)
val rdd1b2 = RddBlock(1, 2, 3L, 4L)
val level = StorageLevel.MEMORY_AND_DISK
// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
// Add partition 1 replicated on two block managers.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm2, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))
// Add a second partition only to bm 1.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b2.blockId, level, rdd1b2.memSize, rdd1b2.diskSize)))
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 2L)
assert(wrapper.info.memoryUsed === 2 * rdd1b1.memSize + rdd1b2.memSize)
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
assert(wrapper.info.dataDistribution.get.size === 2L)
assert(wrapper.info.partitions.get.size === 2L)
val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get
assert(dist.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(dist.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part1.storageLevel === level.description)
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))
val part2 = wrapper.info.partitions.get.find(_.blockName === rdd1b2.blockId.name).get
assert(part2.storageLevel === level.description)
assert(part2.memoryUsed === rdd1b2.memSize)
assert(part2.diskUsed === rdd1b2.diskSize)
assert(part2.executors === Seq(bm1.executorId))
}
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
assert(exec.info.rddBlocks === 2L)
assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
}
// Remove Executor 1.
listener.onExecutorRemoved(createExecutorRemovedEvent(1))
// check that partition info now contains only details about what is remaining in bm2
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 1L)
assert(wrapper.info.memoryUsed === rdd1b1.memSize)
assert(wrapper.info.diskUsed === rdd1b1.diskSize)
assert(wrapper.info.dataDistribution.get.size === 1L)
assert(wrapper.info.partitions.get.size === 1L)
val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get
assert(dist.memoryUsed === rdd1b1.memSize)
assert(dist.diskUsed === rdd1b1.diskSize)
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
assert(part.storageLevel === level.description)
assert(part.memoryUsed === rdd1b1.memSize)
assert(part.diskUsed === rdd1b1.diskSize)
assert(part.executors === Seq(bm2.executorId))
}
// Remove Executor 2.
listener.onExecutorRemoved(createExecutorRemovedEvent(2))
// Check that storage cost is zero as both exec are down
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
assert(wrapper.info.numCachedPartitions === 0)
assert(wrapper.info.memoryUsed === 0)
assert(wrapper.info.diskUsed === 0)
assert(wrapper.info.dataDistribution.isEmpty)
assert(wrapper.info.partitions.get.isEmpty)
}
}
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {