[SPARK-14243][CORE] update task metrics when removing blocks
## What changes were proposed in this pull request? This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses` ## How was this patch tested? test("updated block statuses") in BlockManagerSuite.scala Author: jeanlyn <jeanlyn92@gmail.com> Closes #12091 from jeanlyn/updateBlock.
This commit is contained in:
parent
446c45bd87
commit
8a333d2da8
|
@ -1264,9 +1264,12 @@ private[spark] class BlockManager(
|
|||
"the disk, memory, or external block store")
|
||||
}
|
||||
blockInfoManager.removeBlock(blockId)
|
||||
val removeBlockStatus = getCurrentBlockStatus(blockId, info)
|
||||
if (tellMaster && info.tellMaster) {
|
||||
val status = getCurrentBlockStatus(blockId, info)
|
||||
reportBlockStatus(blockId, info, status)
|
||||
reportBlockStatus(blockId, info, removeBlockStatus)
|
||||
}
|
||||
Option(TaskContext.get()).foreach { c =>
|
||||
c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, removeBlockStatus)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -928,6 +928,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
|
||||
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
|
||||
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
|
||||
|
||||
// remove block - list2 should be removed from disk
|
||||
val updatedBlocks6 = getUpdatedBlocks {
|
||||
store.removeBlock(
|
||||
"list2", tellMaster = true)
|
||||
}
|
||||
assert(updatedBlocks6.size === 1)
|
||||
assert(updatedBlocks6.head._1 === TestBlockId("list2"))
|
||||
assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
|
||||
assert(!store.diskStore.contains("list2"), "list2 was in disk store")
|
||||
}
|
||||
|
||||
test("query block statuses") {
|
||||
|
|
Loading…
Reference in a new issue