[SPARK-21860][CORE] Improve memory reuse for heap memory in HeapMemoryAllocator

## What changes were proposed in this pull request?
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the same,because we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.

## How was this patch tested?
Existing tests and added unit tests

Author: liuxian <liu.xian3@zte.com.cn>

Closes #19077 from 10110346/headmemoptimize.
This commit is contained in:
liuxian 2018-02-08 23:41:30 +08:00 committed by Wenchen Fan
parent a75f927173
commit 76e019d9bd
2 changed files with 33 additions and 7 deletions

View file

@ -46,9 +46,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (shouldPool(size)) {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
@ -62,11 +65,11 @@ public class HeapMemoryAllocator implements MemoryAllocator {
return memory;
}
}
bufferPoolsBySize.remove(size);
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[(int) ((size + 7) / 8)];
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@ -98,12 +101,13 @@ public class HeapMemoryAllocator implements MemoryAllocator {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);
if (shouldPool(size)) {
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.unsafe;
import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
@ -134,4 +135,25 @@ public class PlatformUtilSuite {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}
@Test
public void heapMemoryReuse() {
MemoryAllocator heapMem = new HeapMemoryAllocator();
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
MemoryBlock onheap1 = heapMem.allocate(513);
Object obj1 = onheap1.getBaseObject();
heapMem.free(onheap1);
MemoryBlock onheap2 = heapMem.allocate(514);
Assert.assertNotEquals(obj1, onheap2.getBaseObject());
// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// reuse the previous memory which has released.
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
Object obj3 = onheap3.getBaseObject();
heapMem.free(onheap3);
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}
}