diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 14080f8822..d5a66db233 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -36,7 +36,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, - shouldBatchFetch: Boolean) + shouldBatchFetch: Boolean = false) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 67adf5fa5e..3f9536e224 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -138,8 +138,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext taskContext, metrics, serializerManager, - blockManager, - shouldBatchFetch = false) + blockManager) assert(shuffleReader.read().length === keyValuePairsPerMap * numMaps) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e2c1308cdc..eb2f13baf6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -349,15 +349,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive") .createWithDefault(200) + val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") + .doc("When true, enable adaptive query execution.") + .booleanConf + .createWithDefault(false) + val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") .doc("The target post-shuffle input size in bytes of a task.") .bytesConf(ByteUnit.BYTE) .createWithDefault(64 * 1024 * 1024) - val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED = - buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled") + buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled") .doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " + "one by one, fetching continuous shuffle blocks for the same map task in batch can " + "reduce IO and improve performance. Note, this feature also depends on a relocatable " + @@ -365,29 +369,15 @@ object SQLConf { .booleanConf .createWithDefault(true) - val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") - .doc("When true, enable adaptive query execution.") - .booleanConf - .createWithDefault(false) - - val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = - buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") - .doc("The relation with a non-empty partition ratio lower than this config will not be " + - "considered as the build side of a broadcast-hash join in adaptive execution regardless " + - "of its size.") - .doubleConf - .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.") - .createWithDefault(0.2) - val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = - buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled") + buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") .doc("When true and adaptive execution is enabled, this enables reducing the number of " + "post-shuffle partitions based on map output statistics.") .booleanConf .createWithDefault(true) val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = - buildConf("spark.sql.adaptive.minNumPostShufflePartitions") + buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions") .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") .intConf .checkValue(_ > 0, "The minimum shuffle partition number " + @@ -395,7 +385,7 @@ object SQLConf { .createWithDefault(1) val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = - buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") + buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions") .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " + "This is used as the initial number of pre-shuffle partitions. By default it equals to " + "spark.sql.shuffle.partitions") @@ -405,13 +395,22 @@ object SQLConf { .createOptional val OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED = - buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled") + buildConf("spark.sql.adaptive.shuffle.optimizedLocalShuffleReader.enabled") .doc("When true and adaptive execution is enabled, this enables the optimization of" + " converting the shuffle reader to local shuffle reader for the shuffle exchange" + " of the broadcast hash join in probe side.") .booleanConf .createWithDefault(true) + val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = + buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") + .doc("The relation with a non-empty partition ratio lower than this config will not be " + + "considered as the build side of a broadcast-hash join in adaptive execution regardless " + + "of its size.") + .doubleConf + .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.") + .createWithDefault(0.2) + val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") .internal() @@ -2148,21 +2147,18 @@ class SQLConf extends Serializable with Logging { def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) - def targetPostShuffleInputSize: Long = - getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) - - def fetchShuffleBlocksInBatchEnabled: Boolean = - getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED) - def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) + def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) + + def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED) + def nonEmptyPartitionRatioForBroadcastJoin: Double = getConf(NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN) def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) - def minNumPostShufflePartitions: Int = - getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) + def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)