From 737f08949adecbae37bb92dfad71ae5f3a82cbee Mon Sep 17 00:00:00 2001 From: SongYadong Date: Sun, 6 Jan 2019 08:46:20 -0600 Subject: [PATCH] [SPARK-26527][CORE] Let acquireUnrollMemory fail fast if required space exceeds memory limit ## What changes were proposed in this pull request? When acquiring unroll memory from `StaticMemoryManager`, let it fail fast if required space exceeds memory limit, just like acquiring storage memory. I think this may reduce some computation and memory evicting costs especially when required space(`numBytes`) is very big. ## How was this patch tested? Existing unit tests. Closes #23426 from SongYadong/acquireUnrollMemory_fail_fast. Authored-by: SongYadong Signed-off-by: Sean Owen --- .../spark/memory/StaticMemoryManager.scala | 27 ++++++++++++------- .../spark/storage/MemoryStoreSuite.scala | 4 +-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 8286087042..0fd349dc51 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -80,16 +80,23 @@ private[spark] class StaticMemoryManager( memoryMode: MemoryMode): Boolean = synchronized { require(memoryMode != MemoryMode.OFF_HEAP, "StaticMemoryManager does not support off-heap unroll memory") - val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory - val freeMemory = onHeapStorageMemoryPool.memoryFree - // When unrolling, we will use all of the existing free memory, and, if necessary, - // some extra space freed from evicting cached blocks. We must place a cap on the - // amount of memory to be evicted by unrolling, however, otherwise unrolling one - // big block can blow away the entire cache. - val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) - // Keep it within the range 0 <= X <= maxNumBytesToFree - val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) + if (numBytes > maxOnHeapStorageMemory) { + // Fail fast if the block simply won't fit + logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + + s"memory limit ($maxOnHeapStorageMemory bytes)") + false + } else { + val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory + val freeMemory = onHeapStorageMemoryPool.memoryFree + // When unrolling, we will use all of the existing free memory, and, if necessary, + // some extra space freed from evicting cached blocks. We must place a cap on the + // amount of memory to be evicted by unrolling, however, otherwise unrolling one + // big block can blow away the entire cache. + val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) + // Keep it within the range 0 <= X <= maxNumBytesToFree + val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) + onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) + } } private[memory] diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 7274072e50..baff672f5f 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -291,11 +291,11 @@ class MemoryStoreSuite blockInfoManager.removeBlock("b3") putIteratorAsBytes("b3", smallIterator, ClassTag.Any) - // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + // Unroll huge block with not enough space. This should fail. val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) - assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator