From ba0a479bdafc2b01a8ed966739b0307dee4871e5 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 1 Jul 2021 05:43:11 +0000 Subject: [PATCH] [SPARK-35961][SQL] Only use local shuffle reader when REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec ### What changes were proposed in this pull request? Remove dead code in `OptimizeLocalShuffleReader`. ### Why are the changes needed? After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we might expand partition if that partition is skewed. So the partition number check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if some partitions are coalesced and some partitions are splitted into smaller. Note that, it's unlikely happened in real world since it used RoundRobin. Otherhand, after [SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use origin plan if can not coalesce partitions. So the assuming of that shuffle stage has `CustomShuffleReaderExec` with no effect is always false in `REBALANCE_PARTITIONS_BY_NONE` shuffle origin. That said, if no rule was efficient, there would be no `CustomShuffleReaderExec`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass CI Closes #33165 from ulysses-you/SPARK-35961. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../adaptive/OptimizeLocalShuffleReader.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 44fd9fe21f..b17af00053 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -141,15 +141,8 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule { case s: ShuffleQueryStageExec => s.mapStats.isDefined && supportLocalReader(s.shuffle) case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => - s.shuffle.shuffleOrigin match { - case ENSURE_REQUIREMENTS => - s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) - case REBALANCE_PARTITIONS_BY_NONE => - // Use LocalShuffleReader only when we can't CoalesceShufflePartitions - s.mapStats.exists(_.bytesByPartitionId.length == partitionSpecs.size) && - partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) - case _ => false - } + s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) && + s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS case _ => false }