[SPARK-20426] Lazy initialization of FileSegmentManagedBuffer for shuffle service.

## What changes were proposed in this pull request?
When application contains large amount of shuffle blocks. NodeManager requires lots of memory to keep metadata(`FileSegmentManagedBuffer`) in `StreamManager`. When the number of shuffle blocks is big enough. NodeManager can run OOM. This pr proposes to do lazy initialization of `FileSegmentManagedBuffer` in shuffle service.

## How was this patch tested?

Manually test.

Author: jinxing <jinxing6042@126.com>

Closes #17744 from jinxing64/SPARK-20426.
This commit is contained in:
jinxing 2017-04-27 14:06:07 -05:00 committed by Tom Graves
parent 561e9cc390
commit 85c6ce6193
4 changed files with 29 additions and 20 deletions

View file

@ -21,7 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import com.codahale.metrics.Gauge;
@ -30,7 +30,6 @@ import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,14 +92,25 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
List<ManagedBuffer> blocks = Lists.newArrayList();
long totalBlockSize = 0;
for (String blockId : msg.blockIds) {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, blockId);
totalBlockSize += block != null ? block.size() : 0;
blocks.add(block);
}
long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator());
Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() {
private int index = 0;
@Override
public boolean hasNext() {
return index < msg.blockIds.length;
}
@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId,
msg.blockIds[index]);
index++;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
}
};
long streamId = streamManager.registerStream(client.getClientId(), iter);
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
@ -109,7 +119,6 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
getRemoteAddress(client.getChannel()));
}
callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
metrics.blockTransferRateBytes.mark(totalBlockSize);
} finally {
responseDelayContext.stop();
}

View file

@ -88,8 +88,6 @@ public class ExternalShuffleBlockHandlerSuite {
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
.toByteBuffer();
handler.receive(client, openBlocks, callback);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
verify(callback, times(1)).onSuccess(response.capture());
@ -107,6 +105,8 @@ public class ExternalShuffleBlockHandlerSuite {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)

View file

@ -216,9 +216,8 @@ public class ExternalShuffleIntegrationSuite {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult execFetch = fetchBlocks("exec-0",
new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
// Both still fail, as we start by checking for all block.
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
assertEquals(Sets.newHashSet("shuffle_0_0_0"), execFetch.successBlocks);
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
}
@Test

View file

@ -56,11 +56,12 @@ class NettyBlockRpcServer(
message match {
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view)
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.