[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions
### What changes were proposed in this pull request? In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`. How to reproduce: ```scala val bucketedTableName = "bucketed_table" spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val df = spark.range(8) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) // Spark 2.4. spark.sql.adaptive.enabled=false // We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case. spark.conf.set("spark.sql.shuffle.partitions", 500) bucketedTable.join(df, "id").explain() // Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases. spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000) bucketedTable.join(df, "id").explain() ``` ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == *(4) Project [id#5L] +- *(4) SortMergeJoin [id#5L], [id#7L], Inner :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0 : +- *(1) Project [id#5L] : +- *(1) Filter isnotnull(id#5L) : +- *(1) ColumnarToRow : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 500), true, [id=#49] +- *(2) Range (0, 8, step=1, splits=16) ``` vs ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Project [id#5L] +- SortMergeJoin [id#5L], [id#7L], Inner :- Sort [id#5L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93] : +- Project [id#5L] : +- Filter isnotnull(id#5L) : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 1000), true, [id=#92] +- Range (0, 8, step=1, splits=16) ``` This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`. ### Why are the changes needed? Do not degrade performance after enabling adaptive execution. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #26409 from wangyum/SPARK-29655. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
0c68578fa9
commit
4f10e54ba3
|
@ -83,7 +83,24 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
|
|||
numPartitionsSet.headOption
|
||||
}
|
||||
|
||||
val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
|
||||
// If there are non-shuffle children that satisfy the required distribution, we have
|
||||
// some tradeoffs when picking the expected number of shuffle partitions:
|
||||
// 1. We should avoid shuffling these children.
|
||||
// 2. We should have a reasonable parallelism.
|
||||
val nonShuffleChildrenNumPartitions =
|
||||
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
|
||||
.map(_.outputPartitioning.numPartitions)
|
||||
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
|
||||
// Here we pick the max number of partitions among these non-shuffle children as the
|
||||
// expected number of shuffle partitions. However, if it's smaller than
|
||||
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
|
||||
// expected number of shuffle partitions.
|
||||
math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
|
||||
} else {
|
||||
childrenNumPartitions.max
|
||||
}
|
||||
|
||||
val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions)
|
||||
|
||||
children = children.zip(requiredChildDistributions).zipWithIndex.map {
|
||||
case ((child, distribution), index) if childrenIndexes.contains(index) =>
|
||||
|
|
|
@ -274,6 +274,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
|
|||
.setMaster("local[*]")
|
||||
.setAppName("test")
|
||||
.set(UI_ENABLED, false)
|
||||
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
|
||||
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
|
||||
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
|
||||
|
@ -507,7 +508,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
|
|||
join,
|
||||
expectedAnswer.collect())
|
||||
|
||||
// Then, let's make sure we do not reduce number of ppst shuffle partitions.
|
||||
// Then, let's make sure we do not reduce number of post shuffle partitions.
|
||||
val finalPlan = join.queryExecution.executedPlan
|
||||
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
val shuffleReaders = finalPlan.collect {
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions
|
|||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
|
||||
import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec}
|
||||
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
|
||||
import org.apache.spark.sql.execution.datasources.BucketingUtils
|
||||
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
|
||||
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
|
||||
|
@ -382,8 +383,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
|
|||
joined.sort("bucketed_table1.k", "bucketed_table2.k"),
|
||||
df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k"))
|
||||
|
||||
assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
|
||||
val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
|
||||
val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) {
|
||||
val executedPlan =
|
||||
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
|
||||
assert(executedPlan.isInstanceOf[SortMergeJoinExec])
|
||||
executedPlan.asInstanceOf[SortMergeJoinExec]
|
||||
} else {
|
||||
val executedPlan = joined.queryExecution.executedPlan
|
||||
assert(executedPlan.isInstanceOf[SortMergeJoinExec])
|
||||
executedPlan.asInstanceOf[SortMergeJoinExec]
|
||||
}
|
||||
|
||||
// check existence of shuffle
|
||||
assert(
|
||||
|
@ -795,4 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") {
|
||||
withSQLConf(
|
||||
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
|
||||
SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") {
|
||||
val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
|
||||
Seq(false, true).foreach { enableAdaptive =>
|
||||
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") {
|
||||
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
|
||||
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
|
||||
testBucketing(
|
||||
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
|
||||
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
|
||||
joinCondition = joinCondition(Seq("i", "j"))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue