[SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit

## What changes were proposed in this pull request?

When acquiring unroll memory from `StaticMemoryManager`, let it fail fast if required space exceeds memory limit, just like acquiring storage memory.
I think this may reduce some computation and memory evicting costs especially when required space(`numBytes`) is very big.

## How was this patch tested?

Existing unit tests.

Closes #23426 from SongYadong/acquireUnrollMemory_fail_fast.

Authored-by: SongYadong <song.yadong1@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
SongYadong 2019-01-06 08:46:20 -06:00 committed by Sean Owen
parent a17851cb95
commit 737f08949a
2 changed files with 19 additions and 12 deletions

View file

@ -80,16 +80,23 @@ private[spark] class StaticMemoryManager(
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = onHeapStorageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
if (numBytes > maxOnHeapStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = onHeapStorageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
}
private[memory]

View file

@ -291,11 +291,11 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("b3")
putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
// Unroll huge block with not enough space. This should fail.
val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator