[SPARK-17483] Refactoring in BlockManager status reporting and block removal

This patch makes three minor refactorings to the BlockManager:

- Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine).
- Split  `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch.
- Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block.

These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.
This commit is contained in:
Josh Rosen 2016-09-12 13:09:33 -07:00
parent 1742c3ab86
commit 3d40896f41

View file

@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
@ -298,7 +298,7 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
* NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}
/**
@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
@ -807,12 +801,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@ -961,15 +953,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
// Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfoManager.removeBlock(blockId)
val removeBlockStatus = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, removeBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
}
removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
}
}
/**
* Internal version of [[removeBlock()]] which assumes that the caller already holds a write
* lock on the block.
*/
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}
blockInfoManager.removeBlock(blockId)
if (tellMaster) {
reportBlockStatus(blockId, BlockStatus.empty)
}
}
private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}