[SPARK-36666][SQL] Fix regression in AQEShuffleReadExec

Fix regression in AQEShuffleReadExec when used in conjunction with Spark plugins with custom partitioning.

Signed-off-by: Andy Grove <andygrove73gmail.com>

### What changes were proposed in this pull request?

Return `UnknownPartitioning` rather than throw an exception in `AQEShuffleReadExec`.

### Why are the changes needed?

The [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) replaces `AQEShuffleReadExec` with a custom operator that runs on the GPU. Due to changes in [SPARK-36315](dd80457ffb), Spark now throws an exception if the shuffle exchange does not have recognized partitioning, and this happens before the postStageOptimizer rules so there is no opportunity to replace this operator now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I am still in the process of testing this change. I will update the PR in the next few days with status.

Closes #33910 from andygrove/SPARK-36666.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Andy Grove 2021-09-07 13:49:45 -07:00 committed by Dongjoon Hyun
parent 6745d77818
commit f78d8394dc

View file

@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike}
@ -82,8 +82,14 @@ case class AQEShuffleReadExec private(
// `RoundRobinPartitioning` but we don't need to retain the number of partitions.
case r: RoundRobinPartitioning =>
r.copy(numPartitions = partitionSpecs.length)
case other => throw new IllegalStateException(
"Unexpected partitioning for coalesced shuffle read: " + other)
case other @ SinglePartition =>
throw new IllegalStateException(
"Unexpected partitioning for coalesced shuffle read: " + other)
case _ =>
// Spark plugins may have custom partitioning and may replace this operator
// during the postStageOptimization phase, so return UnknownPartitioning here
// rather than throw an exception
UnknownPartitioning(partitionSpecs.length)
}
} else {
UnknownPartitioning(partitionSpecs.length)