[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array (round 2)

This patch reverts most of the changes in a previous fix #8827.

The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](a18208047f/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (L88)))

Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.

Author: Andrew Or <andrew@databricks.com>

Closes #8888 from andrewor14/dont-track-pointer-array.
This commit is contained in:
Andrew Or 2015-09-23 19:34:31 -07:00
parent 084e4e1262
commit 83f6f54d12
3 changed files with 16 additions and 52 deletions

View file

@ -159,16 +159,15 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
public void initializeForWriting() throws IOException {
private void initializeForWriting() throws IOException {
// Note: Do not track memory for the pointer array for now because of SPARK-10474.
// In more detail, in TungstenAggregate we only reserve a page, but when we fall back to
// sort-based aggregation we try to acquire a page AND a pointer array, which inevitably
// fails if all other memory is already occupied. It should be safe to not track the array
// because its memory footprint is frequently much smaller than that of a page. This is a
// temporary hack that we should address in 1.6.0.
// TODO: track the pointer array memory!
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory);
if (memoryAcquired != pointerArrayMemory) {
shuffleMemoryManager.release(memoryAcquired);
throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory");
}
this.inMemSorter =
new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
this.isInMemSorterExternal = false;
@ -187,14 +186,6 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
spill(true);
}
/**
* Sort and spill the current records in response to memory pressure.
* @param shouldInitializeForWriting whether to allocate memory for writing after the spill
*/
public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@ -225,10 +216,8 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
if (shouldInitializeForWriting) {
initializeForWriting();
}
}
/**
* Return the total memory usage of this sorter, including the data pages and the sorter's pointer
@ -275,14 +264,7 @@ public final class UnsafeExternalSorter {
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
if (inMemSorter != null) {
if (!isInMemSorterExternal) {
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
memoryFreed += sorterMemoryUsage;
shuffleMemoryManager.release(sorterMemoryUsage);
}
inMemSorter = null;
}
// TODO: track in-memory sorter memory usage (SPARK-10474)
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
@ -320,17 +302,8 @@ public final class UnsafeExternalSorter {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
spill();
} else {
// TODO: track the pointer array memory! (SPARK-10474)
inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
}

View file

@ -85,7 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
// TODO: track pointer array memory used by this in-memory sorter!
// TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
@ -124,13 +124,8 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
// Note: This spill doesn't actually release any memory, so if we try to allocate a new
// pointer array immediately after the spill then we may fail to acquire sufficient space
// for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
// we have actually freed memory from our map.
sorter.spill(false /* initialize for writing */);
sorter.spill();
map.free();
sorter.initializeForWriting();
}
}

View file

@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
// 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
initialMemoryConsumption + 4096 * 16)
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.
@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()
withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
// 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
initialMemoryConsumption + 4096 * 16)
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}
// Add more keys to the sorter and make sure the results come out sorted.