From 0da71548a5dc24d705c025d72cbcd80fe2c18632 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 14 Jul 2021 22:04:50 +0800 Subject: [PATCH] [SPARK-35639][SQL][FOLLOWUP] Make hasCoalescedPartition return true if something was actually coalesced ### What changes were proposed in this pull request? Add `CoalescedPartitionSpec(0, 0, _)` check if a `CoalescedPartitionSpec` is coalesced. ### Why are the changes needed? Fix corner case. ### Does this PR introduce _any_ user-facing change? yes, UI may be changed ### How was this patch tested? Add test Closes #33342 from ulysses-you/SPARK-35639-FOLLOW. Authored-by: ulysses-you Signed-off-by: Wenchen Fan (cherry picked from commit 3819641201fedcbf5d6dedd93a066784aca960e6) Signed-off-by: Wenchen Fan --- .../sql/execution/adaptive/CustomShuffleReaderExec.scala | 2 ++ .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 61318a71a0..cea3016695 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -92,6 +92,8 @@ case class CustomShuffleReaderExec private( */ def hasCoalescedPartition: Boolean = { partitionSpecs.exists { + // shuffle from empty RDD + case CoalescedPartitionSpec(0, 0, _) => true case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 5000abc998..13bba68ff6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1692,7 +1692,9 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) FROM t GROUP BY c1") assert( collect(adaptive) { - case c @ CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.length == 1 => c + case c @ CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.length == 1 => + assert(c.hasCoalescedPartition) + c }.length == 1 ) }