[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer
## What changes were proposed in this pull request? This PR tries to fix the performance regression introduced by SPARK-21517. In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore). The root cause is that we don't do `consolidateIfNeeded` anymore as we are using ``` Unpooled.wrappedBuffer(chunks.length, getChunks(): _*) ``` in ChunkedByteBuffer. If we have many small chunks, it could cause the `buf.notBuffer(...)` have very bad performance in the case that we have to call `copyByteBuf(...)` many times. ## How was this patch tested? Existing unit tests and also test in production Author: Wenbo Zhao <wzhao@twosigma.com> Closes #21593 from WenboZhao/spark-24578.
This commit is contained in:
parent
c5a0d1132a
commit
3f4bda7289
|
@ -137,30 +137,15 @@ class MessageWithHeader extends AbstractFileRegion {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
|
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
|
||||||
ByteBuffer buffer = buf.nioBuffer();
|
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
|
||||||
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
|
// for the case that the passed-in buffer has too many components.
|
||||||
target.write(buffer) : writeNioBuffer(target, buffer);
|
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
|
||||||
|
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
|
||||||
|
int written = target.write(buffer);
|
||||||
buf.skipBytes(written);
|
buf.skipBytes(written);
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int writeNioBuffer(
|
|
||||||
WritableByteChannel writeCh,
|
|
||||||
ByteBuffer buf) throws IOException {
|
|
||||||
int originalLimit = buf.limit();
|
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
try {
|
|
||||||
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
|
|
||||||
buf.limit(buf.position() + ioSize);
|
|
||||||
ret = writeCh.write(buf);
|
|
||||||
} finally {
|
|
||||||
buf.limit(originalLimit);
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageWithHeader touch(Object o) {
|
public MessageWithHeader touch(Object o) {
|
||||||
super.touch(o);
|
super.touch(o);
|
||||||
|
|
Loading…
Reference in a new issue