[SPARK-35725][SQL] Support optimize skewed partitions in RebalancePartitions

### What changes were proposed in this pull request?

* Add a new rule `ExpandShufflePartitions` in AQE `queryStageOptimizerRules`
* Add a new config `spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled` to decide if should enable the new rule

The new rule `OptimizeSkewInRebalancePartitions` only handle two shuffle origin `REBALANCE_PARTITIONS_BY_NONE` and `REBALANCE_PARTITIONS_BY_COL` for data skew issue. And re-use the exists config `ADVISORY_PARTITION_SIZE_IN_BYTES` to decide what partition size should be.

### Why are the changes needed?

Currently, we don't support expand partition dynamically in AQE which is not friendly for some data skew job.

Let's say if we have a simple query:
```
SELECT /*+ REBALANCE(col) */ * FROM table
```

The column of `col` is skewed, then some shuffle partitions would handle too much data than others.

If we haven't inroduced extra shuffle, we can optimize this case by expanding partitions in AQE.

### Does this PR introduce _any_ user-facing change?

Yes, a new config

### How was this patch tested?

Add test

Closes #32883 from ulysses-you/expand-partition.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
ulysses-you 2021-06-30 18:04:50 +00:00 committed by Wenchen Fan
parent 2c94fbc71e
commit d46c1e38ec
6 changed files with 193 additions and 41 deletions

View file

@ -642,6 +642,16 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0L)
val ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled")
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is true, Spark will optimize the " +
"skewed shuffle partitions in RebalancePartitions and split them to smaller ones " +
s"according to the target size (specified by '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), " +
"to avoid data skew.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()

View file

@ -98,6 +98,7 @@ case class AdaptiveSparkPlanExec(
ReuseAdaptiveSubquery(context.subqueryCache),
// Skew join does not handle `CustomShuffleReader` so needs to be applied first.
OptimizeSkewedJoin,
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(context.session),
// `OptimizeLocalShuffleReader` needs to make use of 'CustomShuffleReaderExec.partitionSpecs'
// added by `CoalesceShufflePartitions`, and must be executed after it.

View file

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf
/**
* A rule to optimize the skewed shuffle partitions in [[RebalancePartitions]] based on the map
* output statistics, which can avoid data skew that hurt performance.
*
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
*/
object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule {
override def supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
/**
* Splits the skewed partition based on the map size and the target partition size
* after split. Create a list of `PartialMapperPartitionSpec` for skewed partition and
* create `CoalescedPartition` for normal partition.
*/
private def optimizeSkewedPartitions(
shuffleId: Int,
bytesByPartitionId: Array[Long],
targetSize: Long): Seq[ShufflePartitionSpec] = {
bytesByPartitionId.indices.flatMap { reduceIndex =>
val bytes = bytesByPartitionId(reduceIndex)
if (bytes > targetSize) {
val newPartitionSpec =
ShufflePartitionsUtil.createSkewPartitionSpecs(shuffleId, reduceIndex, targetSize)
if (newPartitionSpec.isEmpty) {
CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil
} else {
logDebug(s"For shuffle $shuffleId, partition $reduceIndex is skew, " +
s"split it into ${newPartitionSpec.get.size} parts.")
newPartitionSpec.get
}
} else {
CoalescedPartitionSpec(reduceIndex, reduceIndex + 1, bytes) :: Nil
}
}
}
private def tryOptimizeSkewedPartitions(shuffle: ShuffleQueryStageExec): SparkPlan = {
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val mapStats = shuffle.mapStats
if (mapStats.isEmpty ||
mapStats.get.bytesByPartitionId.forall(_ <= advisorySize)) {
return shuffle
}
val newPartitionsSpec = optimizeSkewedPartitions(
mapStats.get.shuffleId, mapStats.get.bytesByPartitionId, advisorySize)
// return origin plan if we can not optimize partitions
if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
shuffle
} else {
CustomShuffleReaderExec(shuffle, newPartitionsSpec)
}
}
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case shuffle: ShuffleQueryStageExec
if supportedShuffleOrigins.contains(shuffle.shuffle.shuffleOrigin) =>
tryOptimizeSkewedPartitions(shuffle)
case _ => plan
}
}
}

View file

@ -21,7 +21,6 @@ import scala.collection.mutable
import org.apache.commons.io.FileUtils
import org.apache.spark.{MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, ShuffleExchangeExec, ShuffleOrigin}
@ -90,41 +89,6 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
}
}
/**
* Get the map size of the specific reduce shuffle Id.
*/
private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
}
/**
* Splits the skewed partition based on the map size and the target partition size
* after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split.
*/
private def createSkewPartitionSpecs(
shuffleId: Int,
reducerId: Int,
targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
val mapStartIndices = ShufflePartitionsUtil.splitSizeListByTargetSize(
mapPartitionSizes, targetSize)
if (mapStartIndices.length > 1) {
Some(mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
val endMapIndex = if (i == mapStartIndices.length - 1) {
mapPartitionSizes.length
} else {
mapStartIndices(i + 1)
}
val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
})
} else {
None
}
}
private def canSplitLeftSide(joinType: JoinType) = {
joinType == Inner || joinType == Cross || joinType == LeftSemi ||
joinType == LeftAnti || joinType == LeftOuter
@ -195,7 +159,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1, rightSize))
val leftParts = if (isLeftSkew) {
val skewSpecs = createSkewPartitionSpecs(
val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs(
left.mapStats.get.shuffleId, partitionIndex, leftTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex " +
@ -209,7 +173,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
}
val rightParts = if (isRightSkew) {
val skewSpecs = createSkewPartitionSpecs(
val skewSpecs = ShufflePartitionsUtil.createSkewPartitionSpecs(
right.mapStats.get.shuffleId, partitionIndex, rightTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex " +

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.MapOutputStatistics
import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, PartialReducerPartitionSpec, ShufflePartitionSpec}
@ -268,7 +268,8 @@ object ShufflePartitionsUtil extends Logging {
* so that the size sum of each partition is close to the target size. Each index indicates the
* start of a partition.
*/
def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = {
// Visible for testing
private[sql] def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = {
val partitionStartIndices = ArrayBuffer[Int]()
partitionStartIndices += 0
var i = 0
@ -308,4 +309,38 @@ object ShufflePartitionsUtil extends Logging {
tryMergePartitions()
partitionStartIndices.toArray
}
/**
* Get the map size of the specific reduce shuffle Id.
*/
private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)}
}
/**
* Splits the skewed partition based on the map size and the target partition size
* after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split.
*/
def createSkewPartitionSpecs(
shuffleId: Int,
reducerId: Int,
targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId)
val mapStartIndices = splitSizeListByTargetSize(mapPartitionSizes, targetSize)
if (mapStartIndices.length > 1) {
Some(mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
val endMapIndex = if (i == mapStartIndices.length - 1) {
mapPartitionSizes.length
} else {
mapStartIndices(i + 1)
}
val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
})
} else {
None
}
}
}

View file

@ -1806,7 +1806,7 @@ class AdaptiveQueryExecSuite
}
test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") {
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val query = "SELECT /*+ REPARTITION */ * FROM testData"
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
collect(adaptivePlan) {
@ -1820,4 +1820,45 @@ class AdaptiveQueryExecSuite
}
}
}
test("SPARK-35725: Support optimize skewed partitions in RebalancePartitions") {
withTempView("v") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
spark.sparkContext.parallelize(
(1 to 10).map(i => TestData(if (i > 4) 5 else i, i.toString)), 3)
.toDF("c1", "c2").createOrReplaceTempView("v")
def checkPartitionNumber(
query: String, skewedPartitionNumber: Int, totalNumber: Int): Unit = {
val (_, adaptive) = runAdaptiveAndVerifyResult(query)
val reader = collect(adaptive) {
case reader: CustomShuffleReaderExec => reader
}
assert(reader.size == 1)
assert(reader.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) ==
skewedPartitionNumber)
assert(reader.head.partitionSpecs.size == totalNumber)
}
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
// partition size [0,258,72,72,72]
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
// partition size [72,216,216,144,72]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 4, 7)
}
// no skewed partition should be optimized
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10000") {
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 0, 1)
}
}
}
}
}