[SPARK-35961][SQL] Only use local shuffle reader when REBALANCE_PARTITIONS_BY_NONE without CustomShuffleReaderExec

### What changes were proposed in this pull request?

Remove dead code in `OptimizeLocalShuffleReader`.

### Why are the changes needed?

After [SPARK-35725](https://issues.apache.org/jira/browse/SPARK-35725), we might expand partition if that partition is skewed. So the partition number check `bytesByPartitionId.length == partitionSpecs.size` would be wrong if some partitions are coalesced and some partitions are splitted into smaller.
Note that, it's unlikely happened in real world since it used RoundRobin.

Otherhand, after [SPARK-34899](https://issues.apache.org/jira/browse/SPARK-34899), we use origin plan if can not coalesce partitions. So the assuming of that shuffle stage has `CustomShuffleReaderExec` with no effect is always false in `REBALANCE_PARTITIONS_BY_NONE` shuffle origin. That said, if no rule was efficient, there would be no `CustomShuffleReaderExec`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass CI

Closes #33165 from ulysses-you/SPARK-35961.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
ulysses-you 2021-07-01 05:43:11 +00:00 committed by Wenchen Fan
parent 34286ae5bf
commit ba0a479bda

View file

@ -141,15 +141,8 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
case s: ShuffleQueryStageExec =>
s.mapStats.isDefined && supportLocalReader(s.shuffle)
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
s.shuffle.shuffleOrigin match {
case ENSURE_REQUIREMENTS =>
s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle)
case REBALANCE_PARTITIONS_BY_NONE =>
// Use LocalShuffleReader only when we can't CoalesceShufflePartitions
s.mapStats.exists(_.bytesByPartitionId.length == partitionSpecs.size) &&
partitionSpecs.nonEmpty && supportLocalReader(s.shuffle)
case _ => false
}
s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) &&
s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS
case _ => false
}