From d46c1e38eccae67bc7ccaa920ceb3abdd8866d10 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 30 Jun 2021 18:04:50 +0000 Subject: [PATCH] [SPARK-35725][SQL] Support optimize skewed partitions in RebalancePartitions ### What changes were proposed in this pull request? * Add a new rule `ExpandShufflePartitions` in AQE `queryStageOptimizerRules` * Add a new config `spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled` to decide if should enable the new rule The new rule `OptimizeSkewInRebalancePartitions` only handle two shuffle origin `REBALANCE_PARTITIONS_BY_NONE` and `REBALANCE_PARTITIONS_BY_COL` for data skew issue. And re-use the exists config `ADVISORY_PARTITION_SIZE_IN_BYTES` to decide what partition size should be. ### Why are the changes needed? Currently, we don't support expand partition dynamically in AQE which is not friendly for some data skew job. Let's say if we have a simple query: ``` SELECT /*+ REBALANCE(col) */ * FROM table ``` The column of `col` is skewed, then some shuffle partitions would handle too much data than others. If we haven't inroduced extra shuffle, we can optimize this case by expanding partitions in AQE. ### Does this PR introduce _any_ user-facing change? Yes, a new config ### How was this patch tested? Add test Closes #32883 from ulysses-you/expand-partition. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../apache/spark/sql/internal/SQLConf.scala | 10 ++ .../adaptive/AdaptiveSparkPlanExec.scala | 1 + .../OptimizeSkewInRebalancePartitions.scala | 101 ++++++++++++++++++ .../adaptive/OptimizeSkewedJoin.scala | 40 +------ .../adaptive/ShufflePartitionsUtil.scala | 39 ++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 43 +++++++- 6 files changed, 193 insertions(+), 41 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7926120abc..286fd16dce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -642,6 +642,16 @@ object SQLConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(0L) + val ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED = + buildConf("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will optimize the " + + "skewed shuffle partitions in RebalancePartitions and split them to smaller ones " + + s"according to the target size (specified by '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), " + + "to avoid data skew.") + .version("3.2.0") + .booleanConf + .createWithDefault(true) + val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 5840141f4f..8b1295c333 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -98,6 +98,7 @@ case class AdaptiveSparkPlanExec( ReuseAdaptiveSubquery(context.subqueryCache), // Skew join does not handle `CustomShuffleReader` so needs to be applied first. OptimizeSkewedJoin, + OptimizeSkewInRebalancePartitions, CoalesceShufflePartitions(context.session), // `OptimizeLocalShuffleReader` needs to make use of 'CustomShuffleReaderExec.partitionSpecs' // added by `CoalesceShufflePartitions`, and must be executed after it. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala new file mode 100644 index 0000000000..d8a198f2c3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec, SparkPlan} +import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, ShuffleOrigin} +import org.apache.spark.sql.internal.SQLConf + +/** + * A rule to optimize the skewed shuffle partitions in [[RebalancePartitions]] based on the map + * output statistics, which can avoid data skew that hurt performance. + * + * We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized. + * Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue. + * the map side looks like: + * m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2] + * and the reduce side looks like: + * (without this rule) r1[m0-b1, m1-b1, m2-b1] + * / \ + * r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2] + * + * Note that, this rule is only applied with the SparkPlan whose top-level node is + * ShuffleQueryStageExec. + */ +object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule { + override def supportedShuffleOrigins: Seq[ShuffleOrigin] = + Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL) + + /** + * Splits the skewed partition based on the map size and the target partition size + * after split. Create a list of `PartialMapperPartitionSpec` for skewed partition and + * create `CoalescedPartition` for normal partition. + */ + private def optimizeSkewedPartitions( + shuffleId: Int, + bytesByPartitionId: Array[Long], + targetSize: Long): Seq[ShufflePartitionSpec] = { + bytesByPartitionId.indices.flatMap { reduceIndex => + val bytes = bytesByPartitionId(reduceIndex) + if (bytes > targetSize) { + val newPartitionSpec = + ShufflePartitionsUtil.createSkewPartitionSpecs(shuffleId, reduceIndex, targetSize) + if (newPartitionSpec.isEmpty) { + CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil + } else { + logDebug(s"For shuffle $shuffleId, partition $reduceIndex is skew, " + + s"split it into ${newPartitionSpec.get.size} parts.") + newPartitionSpec.get + } + } else { + CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil + } + } + } + + private def tryOptimizeSkewedPartitions(shuffle: ShuffleQueryStageExec): SparkPlan = { + val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) + val mapStats = shuffle.mapStats + if (mapStats.isEmpty || + mapStats.get.bytesByPartitionId.forall(_ <= advisorySize)) { + return shuffle + } + + val newPartitionsSpec = optimizeSkewedPartitions( + mapStats.get.shuffleId, mapStats.get.bytesByPartitionId, advisorySize) + // return origin plan if we can not optimize partitions + if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) { + shuffle + } else { + CustomShuffleReaderExec(shuffle, newPartitionsSpec) + } + } + + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) { + return plan + } + + plan match { + case shuffle: ShuffleQueryStageExec + if supportedShuffleOrigins.contains(shuffle.shuffle.shuffleOrigin) => + tryOptimizeSkewedPartitions(shuffle) + case _ => plan + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 0a1a677843..a284016bfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils -import org.apache.spark.{MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, ShuffleExchangeExec, ShuffleOrigin} @@ -90,41 +89,6 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule { } } - /** - * Get the map size of the specific reduce shuffle Id. - */ - private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = { - val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)} - } - - /** - * Splits the skewed partition based on the map size and the target partition size - * after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split. - */ - private def createSkewPartitionSpecs( - shuffleId: Int, - reducerId: Int, - targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = { - val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) - val mapStartIndices = ShufflePartitionsUtil.splitSizeListByTargetSize( - mapPartitionSizes, targetSize) - if (mapStartIndices.length > 1) { - Some(mapStartIndices.indices.map { i => - val startMapIndex = mapStartIndices(i) - val endMapIndex = if (i == mapStartIndices.length - 1) { - mapPartitionSizes.length - } else { - mapStartIndices(i + 1) - } - val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum - PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) - }) - } else { - None - } - } - private def canSplitLeftSide(joinType: JoinType) = { joinType == Inner || joinType == Cross || joinType == LeftSemi || joinType == LeftAnti || joinType == LeftOuter @@ -195,7 +159,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule { Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1, rightSize)) val leftParts = if (isLeftSkew) { - val skewSpecs = createSkewPartitionSpecs( + val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs( left.mapStats.get.shuffleId, partitionIndex, leftTargetSize) if (skewSpecs.isDefined) { logDebug(s"Left side partition $partitionIndex " + @@ -209,7 +173,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule { } val rightParts = if (isRightSkew) { - val skewSpecs = createSkewPartitionSpecs( + val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs( right.mapStats.get.shuffleId, partitionIndex, rightTargetSize) if (skewSpecs.isDefined) { logDebug(s"Right side partition $partitionIndex " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 6106a562e0..a1f2d91801 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable.ArrayBuffer -import org.apache.spark.MapOutputStatistics +import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec} @@ -268,7 +268,8 @@ object ShufflePartitionsUtil extends Logging { * so that the size sum of each partition is close to the target size. Each index indicates the * start of a partition. */ - def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = { + // Visible for testing + private[sql] def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = { val partitionStartIndices = ArrayBuffer[Int]() partitionStartIndices += 0 var i = 0 @@ -308,4 +309,38 @@ object ShufflePartitionsUtil extends Logging { tryMergePartitions() partitionStartIndices.toArray } + + /** + * Get the map size of the specific reduce shuffle Id. + */ + private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = { + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)} + } + + /** + * Splits the skewed partition based on the map size and the target partition size + * after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split. + */ + def createSkewPartitionSpecs( + shuffleId: Int, + reducerId: Int, + targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = { + val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) + val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, targetSize) + if (mapStartIndices.length > 1) { + Some(mapStartIndices.indices.map { i => + val startMapIndex = mapStartIndices(i) + val endMapIndex = if (i == mapStartIndices.length - 1) { + mapPartitionSizes.length + } else { + mapStartIndices(i + 1) + } + val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum + PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize) + }) + } else { + None + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 16cc4a79e6..e1fa855a7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1806,7 +1806,7 @@ class AdaptiveQueryExecSuite } test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") { - withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2") { + withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") { val query = "SELECT /*+ REPARTITION */ * FROM testData" val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query) collect(adaptivePlan) { @@ -1820,4 +1820,45 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-35725: Support optimize skewed partitions in RebalancePartitions") { + withTempView("v") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { + + spark.sparkContext.parallelize( + (1 to 10).map(i => TestData(if (i > 4) 5 else i, i.toString)), 3) + .toDF("c1", "c2").createOrReplaceTempView("v") + + def checkPartitionNumber( + query: String, skewedPartitionNumber: Int, totalNumber: Int): Unit = { + val (_, adaptive) = runAdaptiveAndVerifyResult(query) + val reader = collect(adaptive) { + case reader: CustomShuffleReaderExec => reader + } + assert(reader.size == 1) + assert(reader.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) == + skewedPartitionNumber) + assert(reader.head.partitionSpecs.size == totalNumber) + } + + withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { + // partition size [0,258,72,72,72] + checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) + // partition size [72,216,216,144,72] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 4, 7) + } + + // no skewed partition should be optimized + withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10000") { + checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 0, 1) + } + } + } + } }