[SPARK-3170][CORE][BUG]:RDD info loss in "StorageTab" and "ExecutorTab"
compeleted stage only need to remove its own partitions that are no longer cached. However, "StorageTab" may lost some rdds which are cached actually. Not only in "StorageTab", "ExectutorTab" may also lose some rdd info which have been overwritten by last rdd in a same task. 1. "StorageTab": when multiple stages run simultaneously, completed stage will remove rdd info which belong to other stages that are still running. 2. "ExectutorTab": taskcontext may lose some "updatedBlocks" info of rdds in a dependency chain. Like the following example: val r1 = sc.paralize(..).cache() val r2 = r1.map(...).cache() val n = r2.count() When count the r2, r1 and r2 will be cached finally. So in CacheManager.getOrCompute, the taskcontext should contain "updatedBlocks" of r1 and r2. Currently, the "updatedBlocks" only contain the info of r2. Author: uncleGen <hustyugm@gmail.com> Closes #2131 from uncleGen/master_ui_fix and squashes the following commits: a6a8a0b [uncleGen] fix some coding style 3a1bc15 [uncleGen] fix some error in unit test 56ea488 [uncleGen] there's some line too long c82ba82 [uncleGen] Bug Fix: RDD info loss in "StorageTab" and "ExecutorTab"
This commit is contained in:
parent
b92d823ad1
commit
d8298c46b7
|
@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
|
||||||
// Otherwise, cache the values and keep track of any updates in block statuses
|
// Otherwise, cache the values and keep track of any updates in block statuses
|
||||||
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
|
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
|
||||||
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
|
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
|
||||||
context.taskMetrics.updatedBlocks = Some(updatedBlocks)
|
val metrics = context.taskMetrics
|
||||||
|
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
|
||||||
|
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
|
||||||
new InterruptibleIterator(context, cachedValues)
|
new InterruptibleIterator(context, cachedValues)
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -70,8 +70,11 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
|
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
|
||||||
// Remove all partitions that are no longer cached
|
// Remove all partitions that are no longer cached in current completed stage
|
||||||
_rddInfoMap.retain { case (_, info) => info.numCachedPartitions > 0 }
|
val completedRddIds = stageCompleted.stageInfo.rddInfos.map(r => r.id).toSet
|
||||||
|
_rddInfoMap.retain { case (id, info) =>
|
||||||
|
!completedRddIds.contains(id) || info.numCachedPartitions > 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
|
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
|
||||||
|
|
|
@ -32,6 +32,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
|
||||||
var split: Partition = _
|
var split: Partition = _
|
||||||
/** An RDD which returns the values [1, 2, 3, 4]. */
|
/** An RDD which returns the values [1, 2, 3, 4]. */
|
||||||
var rdd: RDD[Int] = _
|
var rdd: RDD[Int] = _
|
||||||
|
var rdd2: RDD[Int] = _
|
||||||
|
var rdd3: RDD[Int] = _
|
||||||
|
|
||||||
before {
|
before {
|
||||||
sc = new SparkContext("local", "test")
|
sc = new SparkContext("local", "test")
|
||||||
|
@ -43,6 +45,16 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
|
||||||
override val getDependencies = List[Dependency[_]]()
|
override val getDependencies = List[Dependency[_]]()
|
||||||
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
|
override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
|
||||||
}
|
}
|
||||||
|
rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
|
||||||
|
override def getPartitions: Array[Partition] = firstParent[Int].partitions
|
||||||
|
override def compute(split: Partition, context: TaskContext) =
|
||||||
|
firstParent[Int].iterator(split, context)
|
||||||
|
}.cache()
|
||||||
|
rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
|
||||||
|
override def getPartitions: Array[Partition] = firstParent[Int].partitions
|
||||||
|
override def compute(split: Partition, context: TaskContext) =
|
||||||
|
firstParent[Int].iterator(split, context)
|
||||||
|
}.cache()
|
||||||
}
|
}
|
||||||
|
|
||||||
after {
|
after {
|
||||||
|
@ -87,4 +99,11 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
|
||||||
assert(value.toList === List(1, 2, 3, 4))
|
assert(value.toList === List(1, 2, 3, 4))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("verify task metrics updated correctly") {
|
||||||
|
cacheManager = sc.env.cacheManager
|
||||||
|
val context = new TaskContext(0, 0, 0)
|
||||||
|
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
|
||||||
|
assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
|
||||||
private val memOnly = StorageLevel.MEMORY_ONLY
|
private val memOnly = StorageLevel.MEMORY_ONLY
|
||||||
private val none = StorageLevel.NONE
|
private val none = StorageLevel.NONE
|
||||||
private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
|
private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
|
||||||
|
private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", TaskLocality.ANY, false)
|
||||||
private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
|
private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
|
||||||
private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
|
private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
|
||||||
private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
|
private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
|
||||||
|
@ -162,4 +163,30 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
|
||||||
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
|
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("verify StorageTab contains all cached rdds") {
|
||||||
|
|
||||||
|
val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly)
|
||||||
|
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly)
|
||||||
|
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
|
||||||
|
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
|
||||||
|
val taskMetrics0 = new TaskMetrics
|
||||||
|
val taskMetrics1 = new TaskMetrics
|
||||||
|
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
|
||||||
|
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
|
||||||
|
taskMetrics0.updatedBlocks = Some(Seq(block0))
|
||||||
|
taskMetrics1.updatedBlocks = Some(Seq(block1))
|
||||||
|
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
|
||||||
|
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
|
||||||
|
assert(storageListener.rddInfoList.size === 0)
|
||||||
|
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
|
||||||
|
assert(storageListener.rddInfoList.size === 1)
|
||||||
|
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
|
||||||
|
assert(storageListener.rddInfoList.size === 1)
|
||||||
|
bus.postToAll(SparkListenerStageCompleted(stageInfo0))
|
||||||
|
assert(storageListener.rddInfoList.size === 1)
|
||||||
|
bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, taskMetrics1))
|
||||||
|
assert(storageListener.rddInfoList.size === 2)
|
||||||
|
bus.postToAll(SparkListenerStageCompleted(stageInfo1))
|
||||||
|
assert(storageListener.rddInfoList.size === 2)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue