From 4f10e54ba385daa37598efa49dbfb536a7726dbc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 15 Nov 2019 15:49:24 +0800 Subject: [PATCH] [SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.partitions ### What changes were proposed in this pull request? In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`. How to reproduce: ```scala val bucketedTableName = "bucketed_table" spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val df = spark.range(8) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) // Spark 2.4. spark.sql.adaptive.enabled=false // We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case. spark.conf.set("spark.sql.shuffle.partitions", 500) bucketedTable.join(df, "id").explain() // Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases. spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000) bucketedTable.join(df, "id").explain() ``` ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == *(4) Project [id#5L] +- *(4) SortMergeJoin [id#5L], [id#7L], Inner :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0 : +- *(1) Project [id#5L] : +- *(1) Filter isnotnull(id#5L) : +- *(1) ColumnarToRow : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct, SelectedBucketsCount: 500 out of 500 +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 500), true, [id=#49] +- *(2) Range (0, 8, step=1, splits=16) ``` vs ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Project [id#5L] +- SortMergeJoin [id#5L], [id#7L], Inner :- Sort [id#5L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5L, 1000), true, [id=#93] : +- Project [id#5L] : +- Filter isnotnull(id#5L) : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct, SelectedBucketsCount: 500 out of 500 +- Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 1000), true, [id=#92] +- Range (0, 8, step=1, splits=16) ``` This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`. ### Why are the changes needed? Do not degrade performance after enabling adaptive execution. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #26409 from wangyum/SPARK-29655. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- .../exchange/EnsureRequirements.scala | 19 +++++++++++- .../ReduceNumShufflePartitionsSuite.scala | 3 +- .../spark/sql/sources/BucketedReadSuite.scala | 31 +++++++++++++++++-- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index c56a5c015f..866b382a1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -83,7 +83,24 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitionsSet.headOption } - val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) + // If there are non-shuffle children that satisfy the required distribution, we have + // some tradeoffs when picking the expected number of shuffle partitions: + // 1. We should avoid shuffling these children. + // 2. We should have a reasonable parallelism. + val nonShuffleChildrenNumPartitions = + childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) + .map(_.outputPartitioning.numPartitions) + val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { + // Here we pick the max number of partitions among these non-shuffle children as the + // expected number of shuffle partitions. However, if it's smaller than + // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the + // expected number of shuffle partitions. + math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) + } else { + childrenNumPartitions.max + } + + val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions) children = children.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 4d408cd8eb..21ec1ac9bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -274,6 +274,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA .setMaster("local[*]") .setAppName("test") .set(UI_ENABLED, false) + .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") @@ -507,7 +508,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA join, expectedAnswer.collect()) - // Then, let's make sure we do not reduce number of ppst shuffle partitions. + // Then, let's make sure we do not reduce number of post shuffle partitions. val finalPlan = join.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan val shuffleReaders = finalPlan.collect { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7043b6d396..a585f215ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -382,8 +383,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { joined.sort("bucketed_table1.k", "bucketed_table2.k"), df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) - assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) - val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] + val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { + val executedPlan = + joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert(executedPlan.isInstanceOf[SortMergeJoinExec]) + executedPlan.asInstanceOf[SortMergeJoinExec] + } else { + val executedPlan = joined.queryExecution.executedPlan + assert(executedPlan.isInstanceOf[SortMergeJoinExec]) + executedPlan.asInstanceOf[SortMergeJoinExec] + } // check existence of shuffle assert( @@ -795,4 +804,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") { + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key -> "7") { + val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil)) + Seq(false, true).foreach { enableAdaptive => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") { + val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) + val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, + bucketedTableTestSpecRight = bucketedTableTestSpecRight, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + } + } + } }