[SPARK-36430][SQL] Adaptively calculate the target size when coalescing shuffle partitions in AQE

### What changes were proposed in this pull request?

This PR fixes a performance regression introduced in https://github.com/apache/spark/pull/33172

Before #33172 , the target size is adaptively calculated based on the default parallelism of the spark cluster. Sometimes it's very small and #33172 sets a min partition size to fix perf issues. Sometimes the calculated size is reasonable, such as dozens of MBs.

After #33172 , we no longer calculate the target size adaptively, and by default always coalesce the partitions into 1 MB. This can cause perf regression if the adaptively calculated size is reasonable.

This PR brings back the code that adaptively calculate the target size based on the default parallelism of the spark cluster.

### Why are the changes needed?

fix perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33655 from cloud-fan/minor.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-08-09 17:25:55 +08:00
parent f3e079b09b
commit 9a539d5846
5 changed files with 46 additions and 40 deletions

View file

@ -480,6 +480,7 @@ object SQLConf {
.doc("(Deprecated since Spark 3.0)") .doc("(Deprecated since Spark 3.0)")
.version("1.6.0") .version("1.6.0")
.bytesConf(ByteUnit.BYTE) .bytesConf(ByteUnit.BYTE)
.checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
.createWithDefaultString("64MB") .createWithDefaultString("64MB")
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
@ -526,28 +527,26 @@ object SQLConf {
.booleanConf .booleanConf
.createWithDefault(true) .createWithDefault(true)
private val MIN_PARTITION_SIZE_KEY = "spark.sql.adaptive.coalescePartitions.minPartitionSize"
val COALESCE_PARTITIONS_PARALLELISM_FIRST = val COALESCE_PARTITIONS_PARALLELISM_FIRST =
buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst") buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
.doc("When true, Spark ignores the target size specified by " + .doc("When true, Spark does not respect the target size specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " + s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
"shuffle partitions, and only respect the minimum partition size specified by " + "shuffle partitions, but adaptively calculate the target size according to the default " +
s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the parallelism. " + "parallelism of the Spark cluster. The calculated size is usually smaller than the " +
"This is to avoid performance regression when enabling adaptive query execution. " + "configured target size. This is to maximize the parallelism and avoid performance " +
"It's recommended to set this config to false and respect the target size specified by " + "regression when enabling adaptive query execution. It's recommended to set this config " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.") "to false and respect the configured target size.")
.version("3.2.0") .version("3.2.0")
.booleanConf .booleanConf
.createWithDefault(true) .createWithDefault(true)
val COALESCE_PARTITIONS_MIN_PARTITION_SIZE = val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize") buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
.doc("The minimum size of shuffle partitions after coalescing. Its value can be at most " + .doc("The minimum size of shuffle partitions after coalescing. This is useful when the " +
s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful when the target size " + "adaptively calculated target size is too small during partition coalescing.")
"is ignored during partition coalescing, which is the default case.")
.version("3.2.0") .version("3.2.0")
.bytesConf(ByteUnit.BYTE) .bytesConf(ByteUnit.BYTE)
.checkValue(_ > 0, "minPartitionSize must be positive")
.createWithDefaultString("1MB") .createWithDefaultString("1MB")
val COALESCE_PARTITIONS_MIN_PARTITION_NUM = val COALESCE_PARTITIONS_MIN_PARTITION_NUM =

View file

@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
/** /**
* A rule to coalesce the shuffle partitions based on the map output statistics, which can * A rule to coalesce the shuffle partitions based on the map output statistics, which can
@ -59,33 +60,40 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) { if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) {
plan plan
} else { } else {
// Ideally, this rule should simply coalesce partition w.r.t. the target size specified by // Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this // ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default ignores the target size (set it to 0), and only respect the minimum // rule by default tries to maximize the parallelism and set the target size to
// partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB). // `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config // For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect both the target // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
// size and minimum partition number, no matter COALESCE_PARTITIONS_PARALLELISM_FIRST is true val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
// or false. if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
// TODO: remove the `minNumPartitions` parameter from // We fall back to Spark default parallelism if the minimum number of coalesced partitions
// `ShufflePartitionsUtil.coalescePartitions` after we remove the config // is not set, so to avoid perf regressions compared to no coalescing.
// COALESCE_PARTITIONS_MIN_PARTITION_NUM session.sparkContext.defaultParallelism
val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) } else {
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
// `minPartitionSize` can be at most 20% of `advisorySize`. // the specified advisory partition size will be respected.
val minPartitionSize = math.min( 1
advisorySize / 5, conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)) }
val parallelismFirst = conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST) }
val advisoryTargetSize = if (minPartitionNum.isEmpty && parallelismFirst) { val advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
0 val minPartitionSize = if (Utils.isTesting) {
// In the tests, we usually set the target size to a very small value that is even smaller
// than the default value of the min partition size. Here we also adjust the min partition
// size to be not larger than 20% of the target size, so that the tests don't need to set
// both configs all the time to check the coalescing behavior.
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize / 5)
} else { } else {
advisorySize conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
} }
val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions( val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
shuffleStageInfos.map(_.shuffleStage.mapStats), shuffleStageInfos.map(_.shuffleStage.mapStats),
shuffleStageInfos.map(_.partitionSpecs), shuffleStageInfos.map(_.partitionSpecs),
advisoryTargetSize = advisoryTargetSize, advisoryTargetSize = advisoryTargetSize,
minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1), minNumPartitions = minNumPartitions,
minPartitionSize = minPartitionSize) minPartitionSize = minPartitionSize)
if (newPartitionSpecs.nonEmpty) { if (newPartitionSpecs.nonEmpty) {

View file

@ -56,13 +56,9 @@ object ShufflePartitionsUtil extends Logging {
// If `minNumPartitions` is very large, it is possible that we need to use a value less than // If `minNumPartitions` is very large, it is possible that we need to use a value less than
// `advisoryTargetSize` as the target size of a coalesced task. // `advisoryTargetSize` as the target size of a coalesced task.
val totalPostShuffleInputSize = mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum val totalPostShuffleInputSize = mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
// The max at here is to make sure that when we have an empty table, we only have a single val maxTargetSize = math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong
// coalesced partition. // It's meaningless to make target size smaller than minPartitionSize.
// There is no particular reason that we pick 16. We just need a number to prevent val targetSize = maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
// `maxTargetSize` from being set to 0.
val maxTargetSize = math.max(
math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
val targetSize = math.min(maxTargetSize, advisoryTargetSize)
val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ") val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +

View file

@ -1691,7 +1691,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2258", // Pick a small value so that no coalesce can happen.
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") { SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val df = spark.sparkContext.parallelize( val df = spark.sparkContext.parallelize(

View file

@ -259,8 +259,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g") spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824) assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824)
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1") // test negative value
assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1) intercept[IllegalArgumentException] {
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
}
// Test overflow exception // Test overflow exception
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {