[SPARK-35968][SQL] Make sure partitions are not too small in AQE partition coalescing

### What changes were proposed in this pull request?

By default, AQE will set `COALESCE_PARTITIONS_MIN_PARTITION_NUM` to the spark default parallelism, which is usually quite big. This is to keep the parallelism on par with non-AQE, to avoid perf regressions.

However, this usually leads to many small/empty partitions, and hurts performance (although not worse than non-AQE). Users usually blindly set `COALESCE_PARTITIONS_MIN_PARTITION_NUM` to 1, which makes this config quite useless.

This PR adds a new config to set the min partition size, to avoid too small partitions after coalescing. By default, Spark will not respect the target size, and only respect this min partition size, to maximize the parallelism and avoid perf regression in AQE. This PR also adds a bool config to respect the target size when coalescing partitions, and it's recommended to set it to get better overall performance. This PR also deprecates the `COALESCE_PARTITIONS_MIN_PARTITION_NUM` config.

### Why are the changes needed?

AQE is default on now, we should make the perf better in the default case.

### Does this PR introduce _any_ user-facing change?

yes, a new config.

### How was this patch tested?

new tests

Closes #33172 from cloud-fan/aqe2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 0c9c8ff569)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-07-02 16:07:31 +08:00
parent 79a6e00b76
commit c1d8178817
13 changed files with 249 additions and 76 deletions

View file

@ -3542,6 +3542,8 @@ test_that("repartition by columns on DataFrame", {
conf <- callJMethod(sparkSession, "conf")
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
coalesceEnabled <- callJMethod(conf, "get", "spark.sql.adaptive.coalescePartitions.enabled")
callJMethod(conf, "set", "spark.sql.adaptive.coalescePartitions.enabled", "false")
tryCatch({
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
@ -3583,6 +3585,7 @@ test_that("repartition by columns on DataFrame", {
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
callJMethod(conf, "set", "spark.sql.adaptive.coalescePartitions.enabled", coalesceEnabled)
})
})

View file

@ -918,10 +918,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 2|Alice|
| 5| Bob|
| 2|Alice|
| 2|Alice|
| 5| Bob|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
@ -935,7 +935,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
+---+-----+
>>> data.rdd.getNumPartitions()
7
>>> data = data.repartition("name", "age")
>>> data = data.repartition(3, "name", "age")
>>> data.show()
+---+-----+
|age| name|

View file

@ -209,7 +209,7 @@ class DataFrameTests(ReusedSQLTestCase):
self.assertEqual(df4.rdd.take(3), df2.rdd.take(3))
# test repartitionByRange(*cols)
df5 = df1.repartitionByRange("name", "age")
df5 = df1.repartitionByRange(5, "name", "age")
self.assertEqual(df5.rdd.first(), df2.rdd.first())
self.assertEqual(df5.rdd.take(3), df2.rdd.take(3))

View file

@ -526,10 +526,35 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
private val MIN_PARTITION_SIZE_KEY = "spark.sql.adaptive.coalescePartitions.minPartitionSize"
val COALESCE_PARTITIONS_PARALLELISM_FIRST =
buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
.doc("When true, Spark ignores the target size specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
"shuffle partitions, and only respect the minimum partition size specified by " +
s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the parallelism. " +
"This is to avoid performance regression when enabling adaptive query execution. " +
"It's recommended to set this config to false and respect the target size specified by " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
.doc("The minimum size of shuffle partitions after coalescing. Its value can be at most " +
s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful when the target size " +
"is ignored during partition coalescing, which is the default case.")
.version("3.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1MB")
val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.doc("The suggested (not guaranteed) minimum number of shuffle partitions after " +
"coalescing. If not set, the default value is the default parallelism of the " +
.internal()
.doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
"after coalescing. If not set, the default value is the default parallelism of the " +
"Spark cluster. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
@ -3352,7 +3377,9 @@ object SQLConf {
DeprecatedConfig(AVRO_REBASE_MODE_IN_READ.alternatives.head, "3.2",
s"Use '${AVRO_REBASE_MODE_IN_READ.key}' instead."),
DeprecatedConfig(LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key, "3.2",
"""Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader` instead.""")
"""Use `.format("avro")` in `DataFrameWriter` or `DataFrameReader` instead."""),
DeprecatedConfig(COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "3.2",
s"Use '${COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key}' instead.")
)
Map(configs.map { cfg => cfg.key -> cfg } : _*)

View file

@ -55,15 +55,34 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl
if (!shuffleStageInfos.forall(s => supportCoalesce(s.shuffleStage.shuffle))) {
plan
} else {
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
// Ideally, this rule should simply coalesce partition w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default ignores the target size (set it to 0), and only respect the minimum
// partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect both the target
// size and minimum partition number, no matter COALESCE_PARTITIONS_PARALLELISM_FIRST is true
// or false.
// TODO: remove the `minNumPartitions` parameter from
// `ShufflePartitionsUtil.coalescePartitions` after we remove the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM
val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
.getOrElse(session.sparkContext.defaultParallelism)
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
// `minPartitionSize` can be at most 20% of `advisorySize`.
val minPartitionSize = math.min(
advisorySize / 5, conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE))
val parallelismFirst = conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)
val advisoryTargetSize = if (minPartitionNum.isEmpty && parallelismFirst) {
0
} else {
advisorySize
}
val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
shuffleStageInfos.map(_.shuffleStage.mapStats),
shuffleStageInfos.map(_.partitionSpecs),
advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
minNumPartitions = minPartitionNum)
advisoryTargetSize = advisoryTargetSize,
minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1),
minPartitionSize = minPartitionSize)
if (newPartitionSpecs.nonEmpty) {
val specsMap = shuffleStageInfos.zip(newPartitionSpecs).map { case (stageInfo, partSpecs) =>

View file

@ -45,7 +45,8 @@ object ShufflePartitionsUtil extends Logging {
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
advisoryTargetSize: Long,
minNumPartitions: Int): Seq[Seq[ShufflePartitionSpec]] = {
minNumPartitions: Int,
minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
assert(mapOutputStatistics.length == inputPartitionSpecs.length)
if (mapOutputStatistics.isEmpty) {
@ -65,19 +66,22 @@ object ShufflePartitionsUtil extends Logging {
val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
s"actual target size $targetSize.")
s"actual target size $targetSize, minimum partition size: $minPartitionSize")
// If `inputPartitionSpecs` are all empty, it means skew join optimization is not applied.
if (inputPartitionSpecs.forall(_.isEmpty)) {
coalescePartitionsWithoutSkew(mapOutputStatistics, targetSize)
coalescePartitionsWithoutSkew(
mapOutputStatistics, targetSize, minPartitionSize)
} else {
coalescePartitionsWithSkew(mapOutputStatistics, inputPartitionSpecs, targetSize)
coalescePartitionsWithSkew(
mapOutputStatistics, inputPartitionSpecs, targetSize, minPartitionSize)
}
}
private def coalescePartitionsWithoutSkew(
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
targetSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
targetSize: Long,
minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
// `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = mapOutputStatistics.flatten
@ -95,7 +99,8 @@ object ShufflePartitionsUtil extends Logging {
}
val numPartitions = validMetrics.head.bytesByPartitionId.length
val newPartitionSpecs = coalescePartitions(0, numPartitions, validMetrics, targetSize)
val newPartitionSpecs = coalescePartitions(
0, numPartitions, validMetrics, targetSize, minPartitionSize)
if (newPartitionSpecs.length < numPartitions) {
attachDataSize(mapOutputStatistics, newPartitionSpecs)
} else {
@ -106,7 +111,8 @@ object ShufflePartitionsUtil extends Logging {
private def coalescePartitionsWithSkew(
mapOutputStatistics: Seq[Option[MapOutputStatistics]],
inputPartitionSpecs: Seq[Option[Seq[ShufflePartitionSpec]]],
targetSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
targetSize: Long,
minPartitionSize: Long): Seq[Seq[ShufflePartitionSpec]] = {
// Do not coalesce if any of the map output stats are missing or if not all shuffles have
// partition specs, which should not happen in practice.
if (!mapOutputStatistics.forall(_.isDefined) || !inputPartitionSpecs.forall(_.isDefined)) {
@ -146,7 +152,12 @@ object ShufflePartitionsUtil extends Logging {
// coalesce any partitions before partition(i - 1) and after the end of latest skew section.
if (i - 1 > start) {
val partitionSpecs = coalescePartitions(
partitionIndices(start), repeatValue, validMetrics, targetSize, true)
partitionIndices(start),
repeatValue,
validMetrics,
targetSize,
minPartitionSize,
allowReturnEmpty = true)
newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
.foreach(spec => spec._1 ++= spec._2)
}
@ -172,7 +183,12 @@ object ShufflePartitionsUtil extends Logging {
// coalesce any partitions after the end of last skew section.
if (numPartitions > start) {
val partitionSpecs = coalescePartitions(
partitionIndices(start), partitionIndices.last + 1, validMetrics, targetSize, true)
partitionIndices(start),
partitionIndices.last + 1,
validMetrics,
targetSize,
minPartitionSize,
allowReturnEmpty = true)
newPartitionSpecsSeq.zip(attachDataSize(mapOutputStatistics, partitionSpecs))
.foreach(spec => spec._1 ++= spec._2)
}
@ -215,11 +231,16 @@ object ShufflePartitionsUtil extends Logging {
end: Int,
mapOutputStatistics: Seq[MapOutputStatistics],
targetSize: Long,
minPartitionSize: Long,
allowReturnEmpty: Boolean = false): Seq[CoalescedPartitionSpec] = {
// `minPartitionSize` is useful for cases like [64MB, 0.5MB, 64MB]: we can't do coalesce,
// because merging 0.5MB to either the left or right partition will exceed the target size.
// If 0.5MB is smaller than `minPartitionSize`, we will force-merge it to the left/right side.
val partitionSpecs = ArrayBuffer.empty[CoalescedPartitionSpec]
var coalescedSize = 0L
var i = start
var latestSplitPoint = i
var latestPartitionSize = 0L
def createPartitionSpec(forceCreate: Boolean = false): Unit = {
// Skip empty inputs, as it is a waste to launch an empty task.
@ -237,20 +258,45 @@ object ShufflePartitionsUtil extends Logging {
j += 1
}
// If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
// new coalesced partition.
// If including the `totalSizeOfCurrentPartition` would exceed the target size and the
// current size has reached the `minPartitionSize`, then start a new coalesced partition.
if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
createPartitionSpec()
latestSplitPoint = i
// reset postShuffleInputSize.
coalescedSize = totalSizeOfCurrentPartition
if (coalescedSize < minPartitionSize) {
// the current partition size is below `minPartitionSize`.
// pack it with the smaller one between the two adjacent partitions (before and after).
if (latestPartitionSize > 0 && latestPartitionSize < totalSizeOfCurrentPartition) {
// pack with the before partition.
partitionSpecs(partitionSpecs.length - 1) =
CoalescedPartitionSpec(partitionSpecs.last.startReducerIndex, i)
latestSplitPoint = i
latestPartitionSize += coalescedSize
// reset postShuffleInputSize.
coalescedSize = totalSizeOfCurrentPartition
} else {
// pack with the after partition.
coalescedSize += totalSizeOfCurrentPartition
}
} else {
createPartitionSpec()
latestSplitPoint = i
latestPartitionSize = coalescedSize
// reset postShuffleInputSize.
coalescedSize = totalSizeOfCurrentPartition
}
} else {
coalescedSize += totalSizeOfCurrentPartition
}
i += 1
}
// If do not allowReturnEmpty, create at least one partition if all partitions are empty.
createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty)
if (coalescedSize < minPartitionSize && latestPartitionSize > 0) {
// pack with the last partition.
partitionSpecs(partitionSpecs.length - 1) =
CoalescedPartitionSpec(partitionSpecs.last.startReducerIndex, end)
} else {
// If do not allowReturnEmpty, create at least one partition if all partitions are empty.
createPartitionSpec(!allowReturnEmpty && partitionSpecs.isEmpty)
}
partitionSpecs.toSeq
}

View file

@ -102,7 +102,7 @@ select a, b, grouping(a), grouping(b), sum(v), count(*), max(v)
from gstest1 group by rollup (a,b) order by b desc, a;
-- select a, b, grouping(a,b), sum(v), count(*), max(v)
select a, b, grouping(a), grouping(b), sum(v), count(*), max(v)
from gstest1 group by rollup (a,b) order by coalesce(a,0)+coalesce(b,0);
from gstest1 group by rollup (a,b) order by coalesce(a,0)+coalesce(b,0), a;
-- [SPARK-28664] ORDER BY in aggregate function
-- various types of ordered aggs

View file

@ -105,7 +105,7 @@ WHERE t1b IN (SELECT Min(t2b)
FROM t2
WHERE t1b = t2b
ORDER BY Min(t2b))
ORDER BY t1c DESC nulls first;
ORDER BY t1c DESC nulls first, t1a DESC, t1d DESC, t1h;
-- TC 01.07
SELECT t1a,

View file

@ -149,7 +149,7 @@ NULL NULL 1 1 145 10 19
-- !query
select a, b, grouping(a), grouping(b), sum(v), count(*), max(v)
from gstest1 group by rollup (a,b) order by coalesce(a,0)+coalesce(b,0)
from gstest1 group by rollup (a,b) order by coalesce(a,0)+coalesce(b,0), a
-- !query schema
struct<a:int,b:int,grouping(a):tinyint,grouping(b):tinyint,sum(v):bigint,count(1):bigint,max(v):int>
-- !query output
@ -157,12 +157,12 @@ NULL NULL 1 1 145 10 19
1 NULL 0 1 60 5 14
1 1 0 0 21 2 11
2 NULL 0 1 15 1 15
3 NULL 0 1 33 2 17
1 2 0 0 25 2 13
3 NULL 0 1 33 2 17
1 3 0 0 14 1 14
4 NULL 0 1 37 2 19
4 1 0 0 37 2 19
2 3 0 0 15 1 15
4 1 0 0 37 2 19
3 3 0 0 16 1 16
3 4 0 0 17 1 17

View file

@ -143,16 +143,16 @@ WHERE t1b IN (SELECT Min(t2b)
FROM t2
WHERE t1b = t2b
ORDER BY Min(t2b))
ORDER BY t1c DESC nulls first
ORDER BY t1c DESC nulls first, t1a DESC, t1d DESC, t1h
-- !query schema
struct<t1a:string,t1b:smallint,t1c:int,t1d:bigint,t1e:float,t1f:double,t1g:decimal(4,0),t1h:timestamp,t1i:date>
-- !query output
val1e 10 NULL 25 17.0 25.0 2600 2014-08-04 01:01:00 2014-08-04
val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04
val1e 10 NULL 19 17.0 25.0 2600 2014-09-04 01:02:00.001 2014-09-04
val1d 10 NULL 12 17.0 25.0 2600 2015-05-04 01:01:00 2015-05-04
val1e 10 NULL 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04
val1b 8 16 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04
val1c 8 16 19 17.0 25.0 2600 2014-05-04 01:02:00.001 2014-05-05
val1b 8 16 19 17.0 25.0 2600 2014-05-04 01:01:00 2014-05-04
val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:00:00 2014-04-04
val1a 6 8 10 15.0 20.0 2000 2014-04-04 01:02:00.001 2014-04-04

View file

@ -1821,7 +1821,7 @@ class DataFrameSuite extends QueryTest
(1 to 100).map(i => TestData2(i % 10, i))).toDF()
// Distribute and order by.
val df4 = data.repartition($"a").sortWithinPartitions($"b".desc)
val df4 = data.repartition(5, $"a").sortWithinPartitions($"b".desc)
// Walk each partition and verify that it is sorted descending and does not contain all
// the values.
df4.rdd.foreachPartition { p =>

View file

@ -26,7 +26,8 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
bytesByPartitionIdArray: Array[Array[Long]],
expectedPartitionStartIndices: Seq[Seq[CoalescedPartitionSpec]],
targetSize: Long,
minNumPartitions: Int = 1): Unit = {
minNumPartitions: Int = 1,
minPartitionSize: Long = 0): Unit = {
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
case (bytesByPartitionId, index) =>
Some(new MapOutputStatistics(index, bytesByPartitionId))
@ -35,7 +36,8 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
mapOutputStatistics,
Seq.fill(mapOutputStatistics.length)(None),
targetSize,
minNumPartitions)
minNumPartitions,
minPartitionSize)
assert(estimatedPartitionStartIndices.length === expectedPartitionStartIndices.length)
estimatedPartitionStartIndices.zip(expectedPartitionStartIndices).foreach {
case (actual, expect) => assert(actual === expect)
@ -100,7 +102,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Array(
Some(new MapOutputStatistics(0, bytesByPartitionId1)),
Some(new MapOutputStatistics(1, bytesByPartitionId2))),
Seq.fill(2)(None), targetSize, 1)
Seq.fill(2)(None), targetSize, 1, 0)
assert(coalesced.isEmpty)
}
@ -300,22 +302,27 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
test("coalesce after skew splitting") {
val targetSize = 100
def createPartitionSpecsBeforeCoalesce(
range: Range, bytes: Array[Long]): Seq[CoalescedPartitionSpec] = {
range.map(i => CoalescedPartitionSpec(i, i + 1, bytes(i)))
}
{
// Skew sections in the middle.
val bytesByPartitionId1 = Array[Long](10, 5, 300, 10, 8, 10, 10, 20)
val bytesByPartitionId2 = Array[Long](10, 10, 10, 8, 10, 200, 7, 20)
val specs1 =
Seq(CoalescedPartitionSpec(0, 1, 10), CoalescedPartitionSpec(1, 2, 15)) ++ // 0, 1
createPartitionSpecsBeforeCoalesce(0 to 1, bytesByPartitionId1) ++ // 0, 1
Seq.tabulate(3)(i => PartialReducerPartitionSpec(2, i, i + 1, 100L)) ++ // 2 - skew
Seq(CoalescedPartitionSpec(3, 4, 10), CoalescedPartitionSpec(4, 5, 8)) ++ // 3, 4
createPartitionSpecsBeforeCoalesce(3 to 4, bytesByPartitionId1) ++ // 3, 4
Seq.fill(2)(CoalescedPartitionSpec(5, 6, 10)) ++ // 5 - other side skew
Seq(CoalescedPartitionSpec(6, 7, 10), CoalescedPartitionSpec(7, 8, 20)) // 6, 7
createPartitionSpecsBeforeCoalesce(6 to 7, bytesByPartitionId1) // 6, 7
val specs2 =
Seq(CoalescedPartitionSpec(0, 1, 10), CoalescedPartitionSpec(1, 2, 10)) ++ // 0, 1
createPartitionSpecsBeforeCoalesce(0 to 1, bytesByPartitionId2) ++ // 0, 1
Seq.fill(3)(CoalescedPartitionSpec(2, 3, 10)) ++ // 2 - other side skew
Seq(CoalescedPartitionSpec(3, 4, 8), CoalescedPartitionSpec(4, 5, 10)) ++ // 3, 4
createPartitionSpecsBeforeCoalesce(3 to 4, bytesByPartitionId2) ++ // 3, 4
Seq.tabulate(2)(i => PartialReducerPartitionSpec(5, i, i + 1, 100L)) ++ // 5 - skew
Seq(CoalescedPartitionSpec(6, 7, 7), CoalescedPartitionSpec(7, 8, 20)) // 6, 7
createPartitionSpecsBeforeCoalesce(6 to 7, bytesByPartitionId2) // 6, 7
val expected1 =
Seq(CoalescedPartitionSpec(0, 2, 15)) ++ // 0, 1 - coalesced
Seq.tabulate(3)(i => PartialReducerPartitionSpec(2, i, i + 1, 100L)) ++ // 2 - skew
@ -335,7 +342,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
@ -345,16 +352,11 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
val bytesByPartitionId2 = Array[Long](10, 10, 10, 8, 10, 20, 7, 200)
val specs1 =
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
Seq(CoalescedPartitionSpec(1, 2, 5), CoalescedPartitionSpec(2, 3, 10),
CoalescedPartitionSpec(3, 4, 10), CoalescedPartitionSpec(4, 5, 8),
CoalescedPartitionSpec(5, 6, 10), CoalescedPartitionSpec(6, 7, 10)
) ++ // 1, 2, 3, 4, 5, 6
createPartitionSpecsBeforeCoalesce(1 to 6, bytesByPartitionId1) ++ // 1, 2, 3, 4, 5, 6
Seq.fill(2)(CoalescedPartitionSpec(7, 8, 20)) // 7 - other side skew
val specs2 =
Seq.fill(3)(CoalescedPartitionSpec(0, 1, 10)) ++ // 0 - other side skew
Seq(CoalescedPartitionSpec(1, 2, 10), CoalescedPartitionSpec(2, 3, 10),
CoalescedPartitionSpec(3, 4, 8), CoalescedPartitionSpec(4, 5, 10),
CoalescedPartitionSpec(5, 6, 20), CoalescedPartitionSpec(6, 7, 7)) ++ // 1, 2, 3, 4, 5, 6
createPartitionSpecsBeforeCoalesce(1 to 6, bytesByPartitionId2) ++ // 1, 2, 3, 4, 5, 6
Seq.tabulate(2)(i => PartialReducerPartitionSpec(7, i, i + 1, 100L)) // 7 - skew
val expected1 =
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
@ -373,7 +375,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
@ -384,15 +386,13 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
val specs1 =
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
Seq.fill(5)(CoalescedPartitionSpec(1, 2, 50)) ++ // 1 - other side skew
Seq(CoalescedPartitionSpec(2, 3, 10), CoalescedPartitionSpec(3, 4, 10),
CoalescedPartitionSpec(4, 5, 8), CoalescedPartitionSpec(5, 6, 10)) ++ // 2, 3, 4, 5
createPartitionSpecsBeforeCoalesce(2 to 5, bytesByPartitionId1) ++ // 2, 3, 4, 5
Seq.tabulate(4)(i => PartialReducerPartitionSpec(6, i, i + 1, 100L)) ++ // 6 - skew
Seq.tabulate(2)(i => PartialReducerPartitionSpec(7, i, i + 1, 100L)) // 7 - skew
val specs2 =
Seq.fill(3)(CoalescedPartitionSpec(0, 1, 10)) ++ // 0 - other side skew
Seq.tabulate(5)(i => PartialReducerPartitionSpec(1, i, i + 1, 100L)) ++ // 1 - skew
Seq(CoalescedPartitionSpec(2, 3, 10), CoalescedPartitionSpec(3, 4, 8),
CoalescedPartitionSpec(4, 5, 10), CoalescedPartitionSpec(5, 6, 20)) ++ // 2, 3, 4, 5
createPartitionSpecsBeforeCoalesce(2 to 5, bytesByPartitionId2) ++ // 2, 3, 4, 5
Seq.fill(4)(CoalescedPartitionSpec(6, 7, 7)) ++ // 6 - other side skew
Seq.fill(2)(CoalescedPartitionSpec(7, 8, 20)) // 7 - other side skew
val expected1 =
@ -414,7 +414,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
@ -426,16 +426,14 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
Seq(CoalescedPartitionSpec(1, 2, 16)) ++ // 1
Seq.fill(5)(CoalescedPartitionSpec(2, 3, 30)) ++ // 2 - other side skew
Seq(CoalescedPartitionSpec(3, 4, 10), CoalescedPartitionSpec(4, 5, 8),
CoalescedPartitionSpec(5, 6, 10)) ++ // 3, 4, 5
createPartitionSpecsBeforeCoalesce(3 to 5, bytesByPartitionId1) ++ // 3, 4, 5
Seq.fill(2)(CoalescedPartitionSpec(6, 7, 10)) ++ // 6 - other side skew
Seq(CoalescedPartitionSpec(7, 8, 20)) // 7
val specs2 =
Seq.tabulate(3)(i => CoalescedPartitionSpec(0, 1, 10)) ++ // 0 - other side skew
Seq(CoalescedPartitionSpec(1, 2, 10)) ++ // 1
Seq.tabulate(5)(i => PartialReducerPartitionSpec(2, i, i + 1, 100L)) ++ // 2- skew
Seq(CoalescedPartitionSpec(3, 4, 8), CoalescedPartitionSpec(4, 5, 10),
CoalescedPartitionSpec(5, 6, 7)) ++ // 3, 4, 5
createPartitionSpecsBeforeCoalesce(3 to 5, bytesByPartitionId2) ++ // 3, 4, 5
Seq.tabulate(2)(i => PartialReducerPartitionSpec(6, i, i + 1, 100L)) ++ // 6 - skew
Seq(CoalescedPartitionSpec(7, 8, 20)) // 7
val expected1 =
@ -459,9 +457,76 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
{
// Partitions below `minPartitionSize` packed with smaller adjacent partition.
val minPartitionSize = 10L
val bytesByPartitionId1 = Array[Long](300, 90, 4, 16, 3, 24, 10, 60, 5)
val bytesByPartitionId2 = Array[Long](10, 10, 5, 102, 4, 80, 200, 36, 3)
val specs1 =
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
createPartitionSpecsBeforeCoalesce(1 to 5, bytesByPartitionId1) ++ // 1, 2, 3, 4, 5
Seq.fill(2)(CoalescedPartitionSpec(6, 7, 10)) ++ // 6 - other side skew
createPartitionSpecsBeforeCoalesce(7 to 8, bytesByPartitionId1) // 7, 8
val specs2 =
Seq.tabulate(3)(i => CoalescedPartitionSpec(0, 1, 10)) ++ // 0 - other side skew
createPartitionSpecsBeforeCoalesce(1 to 5, bytesByPartitionId2) ++ // 1, 2, 3, 4, 5
Seq.tabulate(2)(i => PartialReducerPartitionSpec(6, i, i + 1, 100L)) ++ // 6 - skew
createPartitionSpecsBeforeCoalesce(7 to 8, bytesByPartitionId2) // 7, 8
val expected1 =
Seq.tabulate(3)(i => PartialReducerPartitionSpec(0, i, i + 1, 100L)) ++ // 0 - skew
Seq(CoalescedPartitionSpec(1, 3, 94)) ++ // 1, 2 - coalesced
Seq(CoalescedPartitionSpec(3, 4, 16)) ++ // 3
Seq(CoalescedPartitionSpec(4, 6, 27)) ++ // 4, 5 - coalesced
Seq.fill(2)(CoalescedPartitionSpec(6, 7, 10)) ++ // 6 - other side skew
Seq(CoalescedPartitionSpec(7, 9, 65)) // 7, 8 - coalesced
val expected2 =
Seq.tabulate(3)(i => CoalescedPartitionSpec(0, 1, 10)) ++ // 0 - other side skew
Seq(CoalescedPartitionSpec(1, 3, 15)) ++ // 1, 2 - coalesced
Seq(CoalescedPartitionSpec(3, 4, 102)) ++ // 3
Seq(CoalescedPartitionSpec(4, 6, 84)) ++ // 4, 5 - coalesced
Seq.tabulate(2)(i => PartialReducerPartitionSpec(6, i, i + 1, 100L)) ++ // 6 - skew
Seq(CoalescedPartitionSpec(7, 9, 39)) // 7, 8 - coalesced
val coalesced = ShufflePartitionsUtil.coalescePartitions(
Array(
Some(new MapOutputStatistics(0, bytesByPartitionId1)),
Some(new MapOutputStatistics(1, bytesByPartitionId2))),
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1, minPartitionSize)
assert(coalesced == Seq(expected1, expected2))
}
{
// No actual coalescing happened, return empty result.
val bytesByPartitionId1 = Array[Long](5, 300, 40, 8, 20, 10, 3)
val bytesByPartitionId2 = Array[Long](4, 10, 58, 10, 67, 200, 5)
val specs1 =
Seq(CoalescedPartitionSpec(0, 1, 5)) ++ // 0
Seq.tabulate(3)(i => PartialReducerPartitionSpec(1, i, i + 1, 100L)) ++ // 1 - skew
createPartitionSpecsBeforeCoalesce(2 to 4, bytesByPartitionId1) ++ // 2, 3, 4
Seq.fill(2)(CoalescedPartitionSpec(5, 6)) ++ // 5 - other side skew
Seq(CoalescedPartitionSpec(6, 7)) // 6
val specs2 =
Seq(CoalescedPartitionSpec(0, 1)) ++ // 0
Seq.fill(3)(CoalescedPartitionSpec(1, 2)) ++ // 1 - other side skew
createPartitionSpecsBeforeCoalesce(2 to 4, bytesByPartitionId2) ++ // 2, 3, 4
Seq.tabulate(2)(i => PartialReducerPartitionSpec(5, i, i + 1, 100L)) ++ // 5 - skew
Seq(CoalescedPartitionSpec(6, 7)) // 6
val coalesced = ShufflePartitionsUtil.coalescePartitions(
Array(
Some(new MapOutputStatistics(0, bytesByPartitionId1)),
Some(new MapOutputStatistics(1, bytesByPartitionId2))),
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1, 0)
assert(coalesced.isEmpty)
}
}
test("coalesce after skew splitting - counter cases") {
@ -479,7 +544,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
None),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced.isEmpty)
}
@ -495,7 +560,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced.isEmpty)
}
@ -514,7 +579,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -532,7 +597,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -551,7 +616,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -572,7 +637,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -590,7 +655,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -609,7 +674,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
@ -628,7 +693,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
}
}
}
@ -697,7 +762,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
Seq(
Some(specs1),
Some(specs2)),
targetSize, 1)
targetSize, 1, 0)
assert(coalesced == Seq(expected1, expected2))
}
}

View file

@ -1885,4 +1885,17 @@ class AdaptiveQueryExecSuite
}
}
}
test("SPARK-35968: AQE coalescing should not produce too small partitions by default") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptive) =
runAdaptiveAndVerifyResult("SELECT sum(id) FROM RANGE(10) GROUP BY id % 3")
val coalesceReader = collect(adaptive) {
case r: CustomShuffleReaderExec if r.hasCoalescedPartition => r
}
assert(coalesceReader.length == 1)
// RANGE(10) is a very small dataset and AQE coalescing should produce one partition.
assert(coalesceReader.head.partitionSpecs.length == 1)
}
}
}