[SPARK-9853][CORE][FOLLOW-UP] Regularize all the shuffle configurations related to adaptive execution

### What changes were proposed in this pull request?
1. Regularize all the shuffle configurations related to adaptive execution.
2. Add default value for `BlockStoreShuffleReader.shouldBatchFetch`.

### Why are the changes needed?
It's a follow-up PR for #26040.
Regularize the existing `spark.sql.adaptive.shuffle` namespace in SQLConf.

### Does this PR introduce any user-facing change?
Rename one released user config `spark.sql.adaptive.minNumPostShufflePartitions` to `spark.sql.adaptive.shuffle.minNumPostShufflePartitions`, other changed configs is not released yet.

### How was this patch tested?
Existing UT.

Closes #26147 from xuanyuanking/SPARK-9853.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuanjian Li 2019-10-18 15:39:35 +08:00 committed by Wenchen Fan
parent 901ff92969
commit 8616109061
3 changed files with 26 additions and 31 deletions

View file

@ -36,7 +36,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
shouldBatchFetch: Boolean)
shouldBatchFetch: Boolean = false)
extends ShuffleReader[K, C] with Logging {
private val dep = handle.dependency

View file

@ -138,8 +138,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
taskContext,
metrics,
serializerManager,
blockManager,
shouldBatchFetch = false)
blockManager)
assert(shuffleReader.read().length === keyValuePairsPerMap * numMaps)

View file

@ -349,15 +349,19 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
.createWithDefault(false)
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
.doc("The target post-shuffle input size in bytes of a task.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)
val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled")
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled")
.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
"reduce IO and improve performance. Note, this feature also depends on a relocatable " +
@ -365,29 +369,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
.createWithDefault(false)
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
"post-shuffle partitions based on map output statistics.")
.booleanConf
.createWithDefault(true)
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
@ -395,7 +385,7 @@ object SQLConf {
.createWithDefault(1)
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
"spark.sql.shuffle.partitions")
@ -405,13 +395,22 @@ object SQLConf {
.createOptional
val OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED =
buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled")
buildConf("spark.sql.adaptive.shuffle.optimizedLocalShuffleReader.enabled")
.doc("When true and adaptive execution is enabled, this enables the optimization of" +
" converting the shuffle reader to local shuffle reader for the shuffle exchange" +
" of the broadcast hash join in probe side.")
.booleanConf
.createWithDefault(true)
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
"of its size.")
.doubleConf
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
.createWithDefault(0.2)
val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()
@ -2148,21 +2147,18 @@ class SQLConf extends Serializable with Logging {
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
def fetchShuffleBlocksInBatchEnabled: Boolean =
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
def nonEmptyPartitionRatioForBroadcastJoin: Double =
getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN)
def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)