diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index 66227f96a1..4f8781b42a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -18,6 +18,7 @@ package org.apache.spark.network.protocol; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import javax.annotation.Nullable; @@ -43,6 +44,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { private final long bodyLength; private long totalBytesTransferred; + /** + * When the write buffer size is larger than this limit, I/O will be done in chunks of this size. + * The size should not be too large as it will waste underlying memory copy. e.g. If network + * avaliable buffer is smaller than this limit, the data cannot be sent within one single write + * operation while it still will make memory copy with this size. + */ + private static final int NIO_BUFFER_LIMIT = 256 * 1024; + /** * Construct a new MessageWithHeader. * @@ -128,8 +137,27 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { } private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException { - int written = target.write(buf.nioBuffer()); + ByteBuffer buffer = buf.nioBuffer(); + int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + target.write(buffer) : writeNioBuffer(target, buffer); buf.skipBytes(written); return written; } + + private int writeNioBuffer( + WritableByteChannel writeCh, + ByteBuffer buf) throws IOException { + int originalLimit = buf.limit(); + int ret = 0; + + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + ret = writeCh.write(buf); + } finally { + buf.limit(originalLimit); + } + + return ret; + } }