[SPARK-19839][CORE] release longArray in BytesToBytesMap

## What changes were proposed in this pull request?
When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore.

## How was this patch tested?
Manual test in production

Author: Zhan Zhang <zhanzhang@fb.com>

Closes #17180 from zhzhan/memory.
This commit is contained in:
Zhan Zhang 2017-07-30 18:50:19 -07:00 committed by Xiao Li
parent f1a798b576
commit 44e501ace3
2 changed files with 8 additions and 2 deletions

View file

@ -258,6 +258,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
this.destructive = destructive;
if (destructive) {
destructiveIterator = this;
// longArray will not be used anymore if destructive is true, release it now.
if (longArray != null) {
freeArray(longArray);
longArray = null;
}
}
}

View file

@ -127,9 +127,10 @@ class UnsafeFixedWidthAggregationMapSuite
PAGE_SIZE_BYTES
)
val groupKey = InternalRow(UTF8String.fromString("cats"))
val row = map.getAggregationBuffer(groupKey)
// Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
assert(map.getAggregationBuffer(groupKey) != null)
assert(row != null)
val iter = map.iterator()
assert(iter.next())
iter.getKey.getString(0) should be ("cats")
@ -138,7 +139,7 @@ class UnsafeFixedWidthAggregationMapSuite
// Modifications to rows retrieved from the map should update the values in the map
iter.getValue.setInt(0, 42)
map.getAggregationBuffer(groupKey).getInt(0) should be (42)
row.getInt(0) should be (42)
map.free()
}