[SPARK-35639][SQL] Make hasCoalescedPartition return true if something was actually coalesced

### What changes were proposed in this pull request?
Fix `CustomShuffleReaderExec.hasCoalescedPartition` so that it returns true only if some original partitions got combined

### Why are the changes needed?
W/o this change `CustomShuffleReaderExec` description can report `coalesced` even though partitions are unchanged

### Does this PR introduce _any_ user-facing change?
Yes, the `Arguments` in the node description is now accurate:
```
(16) CustomShuffleReader
Input [3]: [registration#4, sum#85, count#86L]
Arguments: coalesced
```

### How was this patch tested?
Existing tests

Closes #32872 from ekoifman/PRISM-77023-fix-hasCoalescedPartition.

Authored-by: Eugene Koifman <eugene.koifman@workday.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4033b2a3f4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Eugene Koifman 2021-07-14 15:48:02 +08:00 committed by Wenchen Fan
parent 1686cff9a1
commit 78796349d9
3 changed files with 30 additions and 10 deletions

View file

@ -87,8 +87,15 @@ case class CustomShuffleReaderExec private(
Iterator(desc)
}
def hasCoalescedPartition: Boolean =
partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
/**
* Returns true iff some non-empty partitions were combined
*/
def hasCoalescedPartition: Boolean = {
partitionSpecs.exists {
case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
case _ => false
}
}
def hasSkewedPartition: Boolean =
partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])

View file

@ -315,7 +315,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
val test: SparkSession => Unit = { spark: SparkSession =>
spark.sql("SET spark.sql.exchange.reuse=true")
val df = spark.range(1).selectExpr("id AS key", "id AS value")
val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value")
// test case 1: a query stage has 3 child stages but they are the same stage.
// Final Stage 1
@ -323,7 +323,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// ReusedQueryStage 0
// ReusedQueryStage 0
val resultDf = df.join(df, "key").join(df, "key")
QueryTest.checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i)))
val finalPlan = resultDf.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(finalPlan.collect {
@ -344,7 +344,9 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val grouped = df.groupBy("key").agg(max("value").as("value"))
val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
.union(grouped.groupBy(col("key") + 2).max("value"))
QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: Row(3, 1) ::
Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: Row(6, 4) :: Row(6, 5) ::
Row(7, 5) :: Nil)
val finalPlan2 = resultDf2.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@ -353,6 +355,17 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
assert(level1Stages.length == 2)
assert(
finalPlan2.collect {
case r @ CoalescedShuffleReader() => r
}.length == 2, "finalPlan2")
level1Stages.foreach(qs =>
assert(qs.plan.collect {
case r @ CoalescedShuffleReader() => r
}.length == 1,
"Wrong CoalescedShuffleReader below " + qs.simpleString(3)))
val leafStages = level1Stages.flatMap { stage =>
// All of the child stages of result stage have only one child stage.
val children = stage.plan.collect { case q: QueryStageExec => q }
@ -368,7 +381,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
}
assert(reusedStages.length == 1)
}
withSparkSession(test, 4, None)
withSparkSession(test, 400, None)
}
test("Do not reduce the number of shuffle partition for repartition") {

View file

@ -953,7 +953,7 @@ class AdaptiveQueryExecSuite
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "100",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "1000") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
@ -982,9 +982,9 @@ class AdaptiveQueryExecSuite
assert(reader.metrics.contains("numSkewedPartitions"))
}
assert(readers(0).metrics("numSkewedPartitions").value == 2)
assert(readers(0).metrics("numSkewedSplits").value == 15)
assert(readers(0).metrics("numSkewedSplits").value == 11)
assert(readers(1).metrics("numSkewedPartitions").value == 1)
assert(readers(1).metrics("numSkewedSplits").value == 12)
assert(readers(1).metrics("numSkewedSplits").value == 9)
}
}
}
@ -1582,7 +1582,7 @@ class AdaptiveQueryExecSuite
// Skew join can apply as the repartition is not optimized out.
assert(smjWithNum.head.isSkewJoin)
val customReadersWithNum = collect(smjWithNum.head) {
case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c
case c: CustomShuffleReaderExec => c
}
assert(customReadersWithNum.nonEmpty)