diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f85f745249..a930f638ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -480,6 +480,7 @@ object SQLConf { .doc("(Deprecated since Spark 3.0)") .version("1.6.0") .bytesConf(ByteUnit.BYTE) + .checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive") .createWithDefaultString("64MB") val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") @@ -526,28 +527,26 @@ object SQLConf { .booleanConf .createWithDefault(true) - private val MIN_PARTITION_SIZE_KEY = "spark.sql.adaptive.coalescePartitions.minPartitionSize" - val COALESCE_PARTITIONS_PARALLELISM_FIRST = buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst") - .doc("When true, Spark ignores the target size specified by " + + .doc("When true, Spark does not respect the target size specified by " + s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " + - "shuffle partitions, and only respect the minimum partition size specified by " + - s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the parallelism. " + - "This is to avoid performance regression when enabling adaptive query execution. " + - "It's recommended to set this config to false and respect the target size specified by " + - s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.") + "shuffle partitions, but adaptively calculate the target size according to the default " + + "parallelism of the Spark cluster. The calculated size is usually smaller than the " + + "configured target size. This is to maximize the parallelism and avoid performance " + + "regression when enabling adaptive query execution. It's recommended to set this config " + + "to false and respect the configured target size.") .version("3.2.0") .booleanConf .createWithDefault(true) val COALESCE_PARTITIONS_MIN_PARTITION_SIZE = buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize") - .doc("The minimum size of shuffle partitions after coalescing. Its value can be at most " + - s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful when the target size " + - "is ignored during partition coalescing, which is the default case.") + .doc("The minimum size of shuffle partitions after coalescing. This is useful when the " + + "adaptively calculated target size is too small during partition coalescing.") .version("3.2.0") .bytesConf(ByteUnit.BYTE) + .checkValue(_ > 0, "minPartitionSize must be positive") .createWithDefaultString("1MB") val COALESCE_PARTITIONS_MIN_PARTITION_NUM = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 75c53b4f76..5c14caaa46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils /** * A rule to coalesce the shuffle partitions based on the map output statistics, which can @@ -59,33 +60,40 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) { plan } else { - // Ideally, this rule should simply coalesce partition w.r.t. the target size specified by + // Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by // ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this - // rule by default ignores the target size (set it to 0), and only respect the minimum - // partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB). + // rule by default tries to maximize the parallelism and set the target size to + // `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism` + // is too big, this rule also respect the minimum partition size specified by + // COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB). // For history reason, this rule also need to support the config - // COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect both the target - // size and minimum partition number, no matter COALESCE_PARTITIONS_PARALLELISM_FIRST is true - // or false. - // TODO: remove the `minNumPartitions` parameter from - // `ShufflePartitionsUtil.coalescePartitions` after we remove the config - // COALESCE_PARTITIONS_MIN_PARTITION_NUM - val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) - val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) - // `minPartitionSize` can be at most 20% of `advisorySize`. - val minPartitionSize = math.min( - advisorySize / 5, conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)) - val parallelismFirst = conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST) - val advisoryTargetSize = if (minPartitionNum.isEmpty && parallelismFirst) { - 0 + // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future. + val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse { + if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) { + // We fall back to Spark default parallelism if the minimum number of coalesced partitions + // is not set, so to avoid perf regressions compared to no coalescing. + session.sparkContext.defaultParallelism + } else { + // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that + // the specified advisory partition size will be respected. + 1 + } + } + val advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) + val minPartitionSize = if (Utils.isTesting) { + // In the tests, we usually set the target size to a very small value that is even smaller + // than the default value of the min partition size. Here we also adjust the min partition + // size to be not larger than 20% of the target size, so that the tests don't need to set + // both configs all the time to check the coalescing behavior. + conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize / 5) } else { - advisorySize + conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE) } val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions( shuffleStageInfos.map(_.shuffleStage.mapStats), shuffleStageInfos.map(_.partitionSpecs), advisoryTargetSize = advisoryTargetSize, - minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1), + minNumPartitions = minNumPartitions, minPartitionSize = minPartitionSize) if (newPartitionSpecs.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 64f89b920e..3609548f37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -56,13 +56,9 @@ object ShufflePartitionsUtil extends Logging { // If `minNumPartitions` is very large, it is possible that we need to use a value less than // `advisoryTargetSize` as the target size of a coalesced task. val totalPostShuffleInputSize = mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum - // The max at here is to make sure that when we have an empty table, we only have a single - // coalesced partition. - // There is no particular reason that we pick 16. We just need a number to prevent - // `maxTargetSize` from being set to 0. - val maxTargetSize = math.max( - math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16) - val targetSize = math.min(maxTargetSize, advisoryTargetSize) + val maxTargetSize = math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong + // It's meaningless to make target size smaller than minPartitionSize. + val targetSize = maxTargetSize.min(advisoryTargetSize).max(minPartitionSize) val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ") logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d38a641943..4471fda654 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1691,7 +1691,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2258", + // Pick a small value so that no coalesce can happen. + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.SHUFFLE_PARTITIONS.key -> "2") { val df = spark.sparkContext.parallelize( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 6f2452acbe..5e661a00da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -259,8 +259,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g") assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824) - spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1") - assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1) + // test negative value + intercept[IllegalArgumentException] { + spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1") + } // Test overflow exception intercept[IllegalArgumentException] {