[SPARK-27256][CORE][SQL] If the configuration is used to set the number of bytes, we'd better use bytesConf'.

## What changes were proposed in this pull request?
Currently, if we want to configure `spark.sql.files.maxPartitionBytes` to 256 megabytes, we must set  `spark.sql.files.maxPartitionBytes=268435456`, which is very unfriendly to users.

And if we set it like this:`spark.sql.files.maxPartitionBytes=256M`, we will  encounter this exception:
```
Exception in thread "main" java.lang.IllegalArgumentException:
 spark.sql.files.maxPartitionBytes should be long, but was 256M
        at org.apache.spark.internal.config.ConfigHelpers$.toNumber(ConfigBuilder.scala)
```
This PR use `bytesConf` to replace `longConf` or `intConf`,  if the configuration is used to set the number of bytes.
Configuration change list:
`spark.files.maxPartitionBytes`
`spark.files.openCostInBytes`
`spark.shuffle.sort.initialBufferSize`
`spark.shuffle.spill.initialMemoryThreshold`
`spark.sql.autoBroadcastJoinThreshold`
`spark.sql.files.maxPartitionBytes`
`spark.sql.files.openCostInBytes`
`spark.sql.defaultSizeInBytes`
## How was this patch tested?
1.Existing unit tests
2.Manual testing

Closes #24187 from 10110346/bytesConf.

Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
liuxian 2019-03-25 14:47:40 -07:00 committed by Wenchen Fan
parent 4b2b3da766
commit e4b36df2c0
5 changed files with 13 additions and 12 deletions

View file

@ -144,7 +144,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
this.initialSortBufferSize =
(int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
this.inputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.outputBufferSizeInBytes =

View file

@ -606,7 +606,7 @@ package object config {
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(128 * 1024 * 1024)
private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
@ -614,7 +614,7 @@ package object config {
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimate, then the partitions with small files will be faster than partitions with" +
" bigger files.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(4 * 1024 * 1024)
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS =
@ -868,8 +868,9 @@ package object config {
private[spark] val SHUFFLE_SORT_INIT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.sort.initialBufferSize")
.internal()
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than or equal to ${Int.MaxValue}.")
.createWithDefault(4096)
private[spark] val SHUFFLE_COMPRESS =
@ -891,7 +892,7 @@ package object config {
.internal()
.doc("Initial threshold for the size of a collection before we start tracking its " +
"memory usage.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(5 * 1024 * 1024)
private[spark] val SHUFFLE_SPILL_BATCH_SIZE =

View file

@ -111,7 +111,7 @@ public final class UnsafeExternalRowSorter {
taskContext,
recordComparatorSupplier,
prefixComparator,
(int) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
(int) (long) sparkEnv.conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
(int) SparkEnv.get().conf().get(
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),

View file

@ -254,7 +254,7 @@ object SQLConf {
"command <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(10L * 1024 * 1024)
val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
@ -853,7 +853,7 @@ object SQLConf {
val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(128 * 1024 * 1024) // parquet.block.size
val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes")
@ -1156,7 +1156,7 @@ object SQLConf {
s"which is larger than `${AUTO_BROADCASTJOIN_THRESHOLD.key}` to be more conservative. " +
"That is to say by default the optimizer will not choose to broadcast a table unless it " +
"knows for sure its size is small enough.")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val NDV_MAX_ERROR =

View file

@ -94,7 +94,7 @@ public final class UnsafeKVExternalSorter {
taskContext,
comparatorSupplier,
prefixComparator,
(int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
numElementsForSpillThreshold,
canUseRadixSort);
@ -160,7 +160,7 @@ public final class UnsafeKVExternalSorter {
taskContext,
comparatorSupplier,
prefixComparator,
(int) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
(int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
pageSizeBytes,
numElementsForSpillThreshold,
inMemSorter);