diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
index 4fa191b391..8bab808ad6 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
@@ -18,6 +18,7 @@
package org.apache.spark.network.protocol;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import io.netty.buffer.ByteBuf;
@@ -46,7 +47,11 @@ public class Encoders {
}
}
- /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+ /**
+ * Bitmaps are encoded with their serialization length followed by the serialization bytes.
+ *
+ * @since 3.1.0
+ */
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
@@ -57,13 +62,20 @@ public class Encoders {
return b.serializedSizeInBytes();
}
+ /**
+ * The input ByteBuf for this encoder should have enough write capacity to fit the serialized
+ * bitmap. Other encoders which use {@link io.netty.buffer.AbstractByteBuf#writeBytes(byte[])}
+ * to write can expand the buf as writeBytes calls {@link ByteBuf#ensureWritable} internally.
+ * However, this encoder doesn't rely on netty's writeBytes and will fail if the input buf
+ * doesn't have enough write capacity.
+ */
public static void encode(ByteBuf buf, RoaringBitmap b) {
- int encodedLength = b.serializedSizeInBytes();
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
- b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
- buf.writerIndex(buf.writerIndex() + encodedLength);
+ ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes());
+ b.serialize(byteBuffer);
+ buf.writerIndex(buf.writerIndex() + byteBuffer.position());
}
public static RoaringBitmap decode(ByteBuf buf) {
@@ -172,7 +184,11 @@ public class Encoders {
}
}
- /** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
+ /**
+ * Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding.
+ *
+ * @since 3.1.0
+ */
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 646e427881..fd287b0226 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -363,4 +363,39 @@ public class TransportConf {
return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
}
+ /**
+ * Class name of the implementation of MergedShuffleFileManager that merges the blocks
+ * pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
+ * a cluster level because this configuration is set to
+ * 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
+ * To turn on push-based shuffle at a cluster level, set the configuration to
+ * 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
+ */
+ public String mergedShuffleFileManagerImpl() {
+ return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
+ "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
+ }
+
+ /**
+ * The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
+ * push-based shuffle.
+ * A merged shuffle file consists of multiple small shuffle blocks. Fetching the
+ * complete merged shuffle file in a single response increases the memory requirements for the
+ * clients. Instead of serving the entire merged file, the shuffle service serves the
+ * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
+ * configuration controls how big a chunk can get. A corresponding index file for each merged
+ * shuffle file will be generated indicating chunk boundaries.
+ */
+ public int minChunkSizeInMergedShuffleFile() {
+ return Ints.checkedCast(JavaUtils.byteStringAsBytes(
+ conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+ }
+
+ /**
+ * The size of cache in memory which is used in push-based shuffle for storing merged index files.
+ */
+ public long mergedIndexCacheSize() {
+ return JavaUtils.byteStringAsBytes(
+ conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+ }
}
diff --git a/common/network-common/src/test/java/org/apache/spark/network/protocol/EncodersSuite.java b/common/network-common/src/test/java/org/apache/spark/network/protocol/EncodersSuite.java
new file mode 100644
index 0000000000..6e89702c04
--- /dev/null
+++ b/common/network-common/src/test/java/org/apache/spark/network/protocol/EncodersSuite.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link Encoders}.
+ */
+public class EncodersSuite {
+
+ @Test
+ public void testRoaringBitmapEncodeDecode() {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.add(1, 2, 3);
+ ByteBuf buf = Unpooled.buffer(Encoders.Bitmaps.encodedLength(bitmap));
+ Encoders.Bitmaps.encode(buf, bitmap);
+ RoaringBitmap decodedBitmap = Encoders.Bitmaps.decode(buf);
+ assertEquals(bitmap, decodedBitmap);
+ }
+
+ @Test (expected = java.nio.BufferOverflowException.class)
+ public void testRoaringBitmapEncodeShouldFailWhenBufferIsSmall() {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.add(1, 2, 3);
+ ByteBuf buf = Unpooled.buffer(4);
+ Encoders.Bitmaps.encode(buf, bitmap);
+ }
+
+ @Test
+ public void testBitmapArraysEncodeDecode() {
+ RoaringBitmap[] bitmaps = new RoaringBitmap[] {
+ new RoaringBitmap(),
+ new RoaringBitmap(),
+ new RoaringBitmap(), // empty
+ new RoaringBitmap(),
+ new RoaringBitmap()
+ };
+ bitmaps[0].add(1, 2, 3);
+ bitmaps[1].add(1, 2, 4);
+ bitmaps[3].add(7L, 9L);
+ bitmaps[4].add(1L, 100L);
+ ByteBuf buf = Unpooled.buffer(Encoders.BitmapArrays.encodedLength(bitmaps));
+ Encoders.BitmapArrays.encode(buf, bitmaps);
+ RoaringBitmap[] decodedBitmaps = Encoders.BitmapArrays.decode(buf);
+ assertArrayEquals(bitmaps, decodedBitmaps);
+ }
+}
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index a4a1ff92ef..562a1d495c 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -47,6 +47,11 @@
metrics-core
+
+ org.apache.spark
+ spark-tags_${scala.binary.version}
+
+
org.slf4j
@@ -70,11 +75,6 @@
test-jar
test
-
- org.apache.spark
- spark-tags_${scala.binary.version}
- test
-