[SPARK-25776][CORE]The disk write buffer size must be greater than 12

## What changes were proposed in this pull request?

 In `UnsafeSorterSpillWriter.java`, when we write a record to a spill file wtih ` void write(Object baseObject, long baseOffset,  int recordLength, long keyPrefix)`, `recordLength` and `keyPrefix`  will be  written  the disk write buffer  first, and these will take 12 bytes, so the disk write buffer size must be greater than 12.

 If `diskWriteBufferSize` is  10, it will print this exception info:

_java.lang.ArrayIndexOutOfBoundsException: 10
   at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.writeLongToBuffer (UnsafeSorterSpillWriter.java:91)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.write(UnsafeSorterSpillWriter.java:123)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spillIterator(UnsafeExternalSorter.java:498)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:222)
	at org.apache.spark.memory.MemoryConsumer.spill(MemoryConsumer.java:65)_

## How was this patch tested?
Existing UT in `UnsafeExternalSorterSuite`

Closes #22754 from 10110346/diskWriteBufferSize.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
This commit is contained in:
liuxian 2018-11-05 01:55:13 +09:00 committed by Kazuaki Ishizaki
parent 463a676687
commit 6c9e5ac9de
2 changed files with 8 additions and 3 deletions

View file

@ -42,7 +42,10 @@ public final class UnsafeSorterSpillWriter {
private final SparkConf conf = new SparkConf();
/** The buffer size to use when writing the sorted records to an on-disk file */
/**
* The buffer size to use when writing the sorted records to an on-disk file, and
* this space used by prefix + len + recordLength must be greater than 4 + 8 bytes.
*/
private final int diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());

View file

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
package object config {
@ -504,8 +505,9 @@ package object config {
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
.checkValue(v => v > 12 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
s"The buffer size must be greater than 12 and less than or equal to " +
s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
.createWithDefault(1024 * 1024)
private[spark] val UNROLL_MEMORY_CHECK_PERIOD =