[SPARK-11805] free the array in UnsafeExternalSorter during spilling

After calling spill() on SortedIterator, the array inside InMemorySorter is not needed, it should be freed during spilling, this could help to join multiple tables with limited memory.

Author: Davies Liu <davies@databricks.com>

Closes #9793 from davies/free_array.
This commit is contained in:
Davies Liu 2015-11-24 14:33:28 -08:00 committed by Josh Rosen
parent e6dd237463
commit 58d9b26055
2 changed files with 19 additions and 22 deletions

View file

@ -468,6 +468,12 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
} }
allocatedPages.clear(); allocatedPages.clear();
} }
// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
return released; return released;
} }
} }
@ -489,10 +495,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
} }
upstream = nextUpstream; upstream = nextUpstream;
nextUpstream = null; nextUpstream = null;
assert(inMemSorter != null);
inMemSorter.free();
inMemSorter = null;
} }
numRecords--; numRecords--;
upstream.loadNext(); upstream.loadNext();

View file

@ -108,6 +108,7 @@ public final class UnsafeInMemorySorter {
*/ */
public void free() { public void free() {
consumer.freeArray(array); consumer.freeArray(array);
array = null;
} }
public void reset() { public void reset() {
@ -160,28 +161,22 @@ public final class UnsafeInMemorySorter {
pos++; pos++;
} }
public static final class SortedIterator extends UnsafeSorterIterator { public final class SortedIterator extends UnsafeSorterIterator {
private final TaskMemoryManager memoryManager; private final int numRecords;
private final int sortBufferInsertPosition; private int position;
private final LongArray sortBuffer;
private int position = 0;
private Object baseObject; private Object baseObject;
private long baseOffset; private long baseOffset;
private long keyPrefix; private long keyPrefix;
private int recordLength; private int recordLength;
private SortedIterator( private SortedIterator(int numRecords) {
TaskMemoryManager memoryManager, this.numRecords = numRecords;
int sortBufferInsertPosition, this.position = 0;
LongArray sortBuffer) {
this.memoryManager = memoryManager;
this.sortBufferInsertPosition = sortBufferInsertPosition;
this.sortBuffer = sortBuffer;
} }
public SortedIterator clone () { public SortedIterator clone () {
SortedIterator iter = new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer); SortedIterator iter = new SortedIterator(numRecords);
iter.position = position; iter.position = position;
iter.baseObject = baseObject; iter.baseObject = baseObject;
iter.baseOffset = baseOffset; iter.baseOffset = baseOffset;
@ -192,21 +187,21 @@ public final class UnsafeInMemorySorter {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return position < sortBufferInsertPosition; return position / 2 < numRecords;
} }
public int numRecordsLeft() { public int numRecordsLeft() {
return (sortBufferInsertPosition - position) / 2; return numRecords - position / 2;
} }
@Override @Override
public void loadNext() { public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes // This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = sortBuffer.get(position); final long recordPointer = array.get(position);
baseObject = memoryManager.getPage(recordPointer); baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4); recordLength = Platform.getInt(baseObject, baseOffset - 4);
keyPrefix = sortBuffer.get(position + 1); keyPrefix = array.get(position + 1);
position += 2; position += 2;
} }
@ -229,6 +224,6 @@ public final class UnsafeInMemorySorter {
*/ */
public SortedIterator getSortedIterator() { public SortedIterator getSortedIterator() {
sorter.sort(array, 0, pos / 2, sortComparator); sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(memoryManager, pos, array); return new SortedIterator(pos / 2);
} }
} }