diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 36081069b0..9d05f03613 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -144,7 +144,7 @@ public class UnsafeShuffleWriter extends ShuffleWriter { this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); this.initialSortBufferSize = - (int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); + (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()); this.inputBufferSizeInBytes = (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024; this.outputBufferSizeInBytes = diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 758f605d1d..0bd46bef35 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -606,7 +606,7 @@ package object config { private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(128 * 1024 * 1024) private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes") @@ -614,7 +614,7 @@ package object config { " the same time. This is used when putting multiple files into a partition. It's better to" + " over estimate, then the partitions with small files will be faster than partitions with" + " bigger files.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(4 * 1024 * 1024) private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS = @@ -868,8 +868,9 @@ package object config { private[spark] val SHUFFLE_SORT_INIT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.sort.initialBufferSize") .internal() - .intConf - .checkValue(v => v > 0, "The value should be a positive integer.") + .bytesConf(ByteUnit.BYTE) + .checkValue(v => v > 0 && v <= Int.MaxValue, + s"The buffer size must be greater than 0 and less than or equal to ${Int.MaxValue}.") .createWithDefault(4096) private[spark] val SHUFFLE_COMPRESS = @@ -891,7 +892,7 @@ package object config { .internal() .doc("Initial threshold for the size of a collection before we start tracking its " + "memory usage.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(5 * 1024 * 1024) private[spark] val SHUFFLE_SPILL_BATCH_SIZE = diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index d6edddfc1a..863d80b5cb 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -111,7 +111,7 @@ public final class UnsafeExternalRowSorter { taskContext, recordComparatorSupplier, prefixComparator, - (int) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (int) (long) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, (int) SparkEnv.get().conf().get( package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 39bbf0b1bd..3ecc34005c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -254,7 +254,7 @@ object SQLConf { "command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been " + "run, and file-based data source tables where the statistics are computed directly on " + "the files of data.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(10L * 1024 * 1024) val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor") @@ -853,7 +853,7 @@ object SQLConf { val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(128 * 1024 * 1024) // parquet.block.size val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes") @@ -1156,7 +1156,7 @@ object SQLConf { s"which is larger than `${AUTO_BROADCASTJOIN_THRESHOLD.key}` to be more conservative. " + "That is to say by default the optimizer will not choose to broadcast a table unless it " + "knows for sure its size is small enough.") - .longConf + .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) val NDV_MAX_ERROR = diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 5b126338f3..09426117a2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -94,7 +94,7 @@ public final class UnsafeKVExternalSorter { taskContext, comparatorSupplier, prefixComparator, - (int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, numElementsForSpillThreshold, canUseRadixSort); @@ -160,7 +160,7 @@ public final class UnsafeKVExternalSorter { taskContext, comparatorSupplier, prefixComparator, - (int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), + (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, numElementsForSpillThreshold, inMemSorter);