[SPARK-36217][SQL] Rename CustomShuffleReader and OptimizeLocalShuffleReader in AQE

### What changes were proposed in this pull request?

This PR proposes to rename:

- Rename `*Reader`/`*reader` to `*Read`/`*read` for rules and execution plan (user-facing doc/config name remain untouched)
  - `*ShuffleReaderExec` ->`*ShuffleReadExec`
  - `isLocalReader` -> `isLocalRead`
  - ...
- Rename `CustomShuffle*` prefix to `AQEShuffle*`
- Rename `OptimizeLocalShuffleReader` rule to `OptimizeShuffleWithLocalRead`

### Why are the changes needed?

There are multiple problems in the current naming:

- `CustomShuffle*` -> `AQEShuffle*`
    it sounds like it is a pluggable API. However, this is actually only used by AQE.
- `OptimizeLocalShuffleReader` -> `OptimizeShuffleWithLocalRead`
    it is the name of a rule but it can be misread as a reader, which is counterintuative
- `*ReaderExec` -> `*ReadExec`
    Reader execution reads a bit odd. It should better be read execution (like `ScanExec`, `ProjectExec` and `FilterExec`). I can't find the reason to name it with something that performs an action. See also the generated plans:

    Before:

    ```
    ...
    * HashAggregate (12)
       +- CustomShuffleReader (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ...
    ```

    After:

    ```
    ...
    * HashAggregate (12)
       +- AQEShuffleRead (11)
          +- ShuffleQueryStage (10)
             +- Exchange (9)
    ..
    ```

### Does this PR introduce _any_ user-facing change?

No, internal refactoring.

### How was this patch tested?

Existing unittests should cover the changes.

Closes #33429 from HyukjinKwon/SPARK-36217.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6e3d404cec)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Hyukjin Kwon 2021-07-26 22:41:54 +08:00 committed by Wenchen Fan
parent 39d6e87bd9
commit a77c9d6d17
14 changed files with 254 additions and 254 deletions

View file

@ -65,7 +65,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
}
}
private[spark] class CustomShuffledRDDPartition(
private[spark] class AQEShuffledRDDPartition(
val index: Int, val startIndexInParent: Int, val endIndexInParent: Int)
extends Partition {
@ -78,7 +78,7 @@ private[spark] class CustomShuffledRDDPartition(
* A special ShuffledRDD that supports a ShuffleDependency object from outside and launching reduce
* tasks that read multiple map output partitions.
*/
class CustomShuffledRDD[K, V, C](
class AQEShuffledRDD[K, V, C](
var dependency: ShuffleDependency[K, V, C],
partitionStartIndices: Array[Int])
extends RDD[(K, C)](dependency.rdd.context, Seq(dependency)) {
@ -98,12 +98,12 @@ class CustomShuffledRDD[K, V, C](
Array.tabulate[Partition](partitionStartIndices.length) { i =>
val startIndex = partitionStartIndices(i)
val endIndex = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n
new CustomShuffledRDDPartition(i, startIndex, endIndex)
new AQEShuffledRDDPartition(i, startIndex, endIndex)
}
}
override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = {
val part = p.asInstanceOf[CustomShuffledRDDPartition]
val part = p.asInstanceOf[AQEShuffledRDDPartition]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle, part.startIndexInParent, part.endIndexInParent, context, metrics)

View file

@ -36,7 +36,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
(x, x)
}
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep)
val shuffled = new AQEShuffledRDD[Int, Int, Int](dep)
sc.submitMapStage(dep).get()
assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3)))
@ -50,7 +50,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 2))
val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 2))
assert(shuffled.partitions.length === 2)
assert(shuffled.glom().map(_.toSet).collect().toSet == Set(Set((0, 0), (1, 1)), Set((2, 2))))
}
@ -60,7 +60,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
// Also create lots of hash partitions so that some of them are empty
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(5))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0))
val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0))
assert(shuffled.partitions.length === 1)
assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
}
@ -69,7 +69,7 @@ class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(0 to 2, 3).map(x => (x, x))
val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(3))
val shuffled = new CustomShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2))
val shuffled = new AQEShuffledRDD[Int, Int, Int](dep, Array(0, 0, 0, 1, 1, 1, 2))
assert(shuffled.partitions.length === 7)
assert(shuffled.collect().toSet == Set((0, 0), (1, 1), (2, 2)))
}

View file

@ -37,12 +37,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @param partitionSpecs The partition specs that defines the arrangement, requires at least one
* partition.
*/
case class CustomShuffleReaderExec private(
case class AQEShuffleReadExec private(
child: SparkPlan,
partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least one partition")
assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at least one partition")
// If this reader is to read shuffle files locally, then all partition specs should be
// If this is to read shuffle files locally, then all partition specs should be
// `PartialMapperPartitionSpec`.
if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
@ -52,7 +52,7 @@ case class CustomShuffleReaderExec private(
override def output: Seq[Attribute] = child.output
override lazy val outputPartitioning: Partitioning = {
// If it is a local shuffle reader with one mapper per task, then the output partitioning is
// If it is a local shuffle read with one mapper per task, then the output partitioning is
// the same as the plan before shuffle.
// TODO this check is based on assumptions of callers' behavior but is sufficient for now.
if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
@ -75,7 +75,7 @@ case class CustomShuffleReaderExec private(
}
override def stringArgs: Iterator[Any] = {
val desc = if (isLocalReader) {
val desc = if (isLocalRead) {
"local"
} else if (hasCoalescedPartition && hasSkewedPartition) {
"coalesced and skewed"
@ -104,7 +104,7 @@ case class CustomShuffleReaderExec private(
def hasSkewedPartition: Boolean =
partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
def isLocalReader: Boolean =
def isLocalRead: Boolean =
partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) ||
partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec])
@ -114,7 +114,7 @@ case class CustomShuffleReaderExec private(
}
@transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
if (!isLocalRead && shuffleStage.get.mapStats.isDefined) {
Some(partitionSpecs.map {
case p: CoalescedPartitionSpec =>
assert(p.dataSize.isDefined)
@ -166,8 +166,8 @@ case class CustomShuffleReaderExec private(
@transient override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ {
if (isLocalReader) {
// We split the mapper partition evenly when creating local shuffle reader, so no
if (isLocalRead) {
// We split the mapper partition evenly when creating local shuffle read, so no
// data size info is available.
Map.empty
} else {
@ -208,6 +208,6 @@ case class CustomShuffleReaderExec private(
shuffleRDD.asInstanceOf[RDD[ColumnarBatch]]
}
override protected def withNewChildInternal(newChild: SparkPlan): CustomShuffleReaderExec =
override protected def withNewChildInternal(newChild: SparkPlan): AQEShuffleReadExec =
copy(child = newChild)
}

View file

@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleOrigin
/**
* Adaptive Query Execution rule that may create [[CustomShuffleReaderExec]] on top of query stages.
* Adaptive Query Execution rule that may create [[AQEShuffleReadExec]] on top of query stages.
*/
trait CustomShuffleReaderRule extends Rule[SparkPlan] {
trait AQEShuffleReadRule extends Rule[SparkPlan] {
/**
* Returns the list of [[ShuffleOrigin]]s supported by this rule.

View file

@ -97,13 +97,13 @@ case class AdaptiveSparkPlanExec(
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(this),
ReuseAdaptiveSubquery(context.subqueryCache),
// Skew join does not handle `CustomShuffleReader` so needs to be applied first.
// Skew join does not handle `AQEShuffleRead` so needs to be applied first.
OptimizeSkewedJoin,
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(context.session),
// `OptimizeLocalShuffleReader` needs to make use of 'CustomShuffleReaderExec.partitionSpecs'
// `OptimizeShuffleWithLocalRead` needs to make use of 'AQEShuffleReadExec.partitionSpecs'
// added by `CoalesceShufflePartitions`, and must be executed after it.
OptimizeLocalShuffleReader
OptimizeShuffleWithLocalRead
)
// A list of physical optimizer rules to be applied right after a new stage is created. The input
@ -116,7 +116,7 @@ case class AdaptiveSparkPlanExec(
// The partitioning of the query output depends on the shuffle(s) in the final stage. If the
// original plan contains a repartition operator, we need to preserve the specified partitioning,
// whether or not the repartition-introduced shuffle is optimized out because of an underlying
// shuffle of the same partitioning. Thus, we need to exclude some `CustomShuffleReaderRule`s
// shuffle of the same partitioning. Thus, we need to exclude some `AQEShuffleReadRule`s
// from the final stage, depending on the presence and properties of repartition operators.
private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] = {
val origins = inputPlan.collect {
@ -124,7 +124,7 @@ case class AdaptiveSparkPlanExec(
}
val allRules = queryStageOptimizerRules ++ postStageCreationRules
allRules.filter {
case c: CustomShuffleReaderRule =>
case c: AQEShuffleReadRule =>
origins.forall(c.supportedShuffleOrigins.contains)
case _ => true
}
@ -134,7 +134,7 @@ case class AdaptiveSparkPlanExec(
val optimized = rules.foldLeft(plan) { case (latestPlan, rule) =>
val applied = rule.apply(latestPlan)
val result = rule match {
case c: CustomShuffleReaderRule if c.mayAddExtraShuffles =>
case c: AQEShuffleReadRule if c.mayAddExtraShuffles =>
if (ValidateRequirements.validate(applied)) {
applied
} else {

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
* A rule to coalesce the shuffle partitions based on the map output statistics, which can
* avoid many small reduce tasks that hurt performance.
*/
case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffleReaderRule {
case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleReadRule {
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REBALANCE_PARTITIONS_BY_NONE,
@ -88,23 +88,23 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl
val specsMap = shuffleStageInfos.zip(newPartitionSpecs).map { case (stageInfo, partSpecs) =>
(stageInfo.shuffleStage.id, partSpecs)
}.toMap
updateShuffleReaders(plan, specsMap)
updateShuffleReads(plan, specsMap)
} else {
plan
}
}
}
private def updateShuffleReaders(
private def updateShuffleReads(
plan: SparkPlan, specsMap: Map[Int, Seq[ShufflePartitionSpec]]): SparkPlan = plan match {
// Even for shuffle exchange whose input RDD has 0 partition, we should still update its
// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
// number of output partitions.
case ShuffleStageInfo(stage, _) =>
specsMap.get(stage.id).map { specs =>
CustomShuffleReaderExec(stage, specs)
AQEShuffleReadExec(stage, specs)
}.getOrElse(plan)
case other => other.mapChildren(updateShuffleReaders(_, specsMap))
case other => other.mapChildren(updateShuffleReads(_, specsMap))
}
private def supportCoalesce(s: ShuffleExchangeLike): Boolean = {
@ -121,7 +121,7 @@ private object ShuffleStageInfo {
: Option[(ShuffleQueryStageExec, Option[Seq[ShufflePartitionSpec]])] = plan match {
case stage: ShuffleQueryStageExec =>
Some((stage, None))
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
case AQEShuffleReadExec(s: ShuffleQueryStageExec, partitionSpecs) =>
Some((s, Some(partitionSpecs)))
case _ => None
}

View file

@ -25,40 +25,40 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.internal.SQLConf
/**
* A rule to optimize the shuffle reader to local reader iff no additional shuffles
* A rule to optimize the shuffle read to local read iff no additional shuffles
* will be introduced:
* 1. if the input plan is a shuffle, add local reader directly as we can never introduce
* 1. if the input plan is a shuffle, add local read directly as we can never introduce
* extra shuffles in this case.
* 2. otherwise, add local reader to the probe side of broadcast hash join and
* 2. otherwise, add local read to the probe side of broadcast hash join and
* then run `EnsureRequirements` to check whether additional shuffle introduced.
* If introduced, we will revert all the local readers.
* If introduced, we will revert all the local reads.
*/
object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
object OptimizeShuffleWithLocalRead extends AQEShuffleReadRule {
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_NONE)
override def mayAddExtraShuffles: Boolean = true
// The build side is a broadcast query stage which should have been optimized using local reader
// The build side is a broadcast query stage which should have been optimized using local read
// already. So we only need to deal with probe side here.
private def createProbeSideLocalReader(plan: SparkPlan): SparkPlan = {
private def createProbeSideLocalRead(plan: SparkPlan): SparkPlan = {
plan.transformDown {
case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildRight) =>
val localReader = createLocalReader(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(left = localReader)
val localRead = createLocalRead(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(left = localRead)
case join @ BroadcastJoinWithShuffleRight(shuffleStage, BuildLeft) =>
val localReader = createLocalReader(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(right = localReader)
val localRead = createLocalRead(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(right = localRead)
}
}
private def createLocalReader(plan: SparkPlan): CustomShuffleReaderExec = {
private def createLocalRead(plan: SparkPlan): AQEShuffleReadExec = {
plan match {
case c @ CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
CustomShuffleReaderExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length)))
case c @ AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
AQEShuffleReadExec(s, getPartitionSpecs(s, Some(c.partitionSpecs.length)))
case s: ShuffleQueryStageExec =>
CustomShuffleReaderExec(s, getPartitionSpecs(s, None))
AQEShuffleReadExec(s, getPartitionSpecs(s, None))
}
}
@ -111,16 +111,16 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
}
plan match {
case s: SparkPlan if canUseLocalShuffleReader(s) =>
createLocalReader(s)
case s: SparkPlan if canUseLocalShuffleRead(s) =>
createLocalRead(s)
case s: SparkPlan =>
createProbeSideLocalReader(s)
createProbeSideLocalRead(s)
}
}
object BroadcastJoinWithShuffleLeft {
def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.left) =>
case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.left) =>
Some((join.left, join.buildSide))
case _ => None
}
@ -128,22 +128,22 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
object BroadcastJoinWithShuffleRight {
def unapply(plan: SparkPlan): Option[(SparkPlan, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if canUseLocalShuffleReader(join.right) =>
case join: BroadcastHashJoinExec if canUseLocalShuffleRead(join.right) =>
Some((join.right, join.buildSide))
case _ => None
}
}
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
def canUseLocalShuffleRead(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.mapStats.isDefined && supportLocalReader(s.shuffle)
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
s.mapStats.isDefined && supportLocalReader(s.shuffle) &&
s.mapStats.isDefined && supportLocalRead(s.shuffle)
case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
s.mapStats.isDefined && supportLocalRead(s.shuffle) &&
s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS
case _ => false
}
private def supportLocalReader(s: ShuffleExchangeLike): Boolean = {
private def supportLocalRead(s: ShuffleExchangeLike): Boolean = {
s.outputPartitioning != SinglePartition && supportedShuffleOrigins.contains(s.shuffleOrigin)
}
}

View file

@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
*/
object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule {
object OptimizeSkewInRebalancePartitions extends AQEShuffleReadRule {
override def supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
@ -82,7 +82,7 @@ object OptimizeSkewInRebalancePartitions extends CustomShuffleReaderRule {
if (newPartitionsSpec.length == mapStats.get.bytesByPartitionId.length) {
shuffle
} else {
CustomShuffleReaderExec(shuffle, newPartitionsSpec)
AQEShuffleReadExec(shuffle, newPartitionsSpec)
}
}

View file

@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
* (L3, R3-1), (L3, R3-2),
* (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2)
*/
object OptimizeSkewedJoin extends CustomShuffleReaderRule {
object OptimizeSkewedJoin extends AQEShuffleReadRule {
override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS)
@ -110,9 +110,9 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
* 2. Assuming partition0 is skewed in left side, and it has 5 mappers (Map0, Map1...Map4).
* And we may split the 5 Mappers into 3 mapper ranges [(Map0, Map1), (Map2, Map3), (Map4)]
* based on the map size and the max split number.
* 3. Wrap the join left child with a special shuffle reader that reads each mapper range with one
* 3. Wrap the join left child with a special shuffle read that loads each mapper range with one
* task, so total 3 tasks.
* 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
* 4. Wrap the join right child with a special shuffle read that loads partition0 3 times by
* 3 tasks separately.
*/
private def tryOptimizeJoinChildren(
@ -196,8 +196,8 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
}
logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
if (numSkewedLeft > 0 || numSkewedRight > 0) {
Some((CustomShuffleReaderExec(left, leftSidePartitions.toSeq),
CustomShuffleReaderExec(right, rightSidePartitions.toSeq)))
Some((AQEShuffleReadExec(left, leftSidePartitions.toSeq),
AQEShuffleReadExec(right, rightSidePartitions.toSeq)))
} else {
None
}

View file

@ -86,7 +86,7 @@ object ShufflePartitionsUtil extends Logging {
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = mapOutputStatistics.flatten
val numShuffles = mapOutputStatistics.length
// If all input RDDs have 0 partition, we create an empty partition for every shuffle reader.
// If all input RDDs have 0 partition, we create an empty partition for every shuffle read.
if (validMetrics.isEmpty) {
return Seq.fill(numShuffles)(Seq(CoalescedPartitionSpec(0, 0, 0)))
}

View file

@ -99,13 +99,13 @@ case object REPARTITION_BY_NUM extends ShuffleOrigin
// Indicates that the shuffle operator was added by the user-specified rebalance operator.
// Spark will try to rebalance partitions that make per-partition size not too small and not
// too big. Local shuffle reader will be used if possible to reduce network traffic.
// too big. Local shuffle read will be used if possible to reduce network traffic.
case object REBALANCE_PARTITIONS_BY_NONE extends ShuffleOrigin
// Indicates that the shuffle operator was added by the user-specified rebalance operator with
// columns. Spark will try to rebalance partitions that make per-partition size not too small and
// not too big.
// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle reader cannot be used for it as
// Different from `REBALANCE_PARTITIONS_BY_NONE`, local shuffle read cannot be used for it as
// the output needs to be partitioned by the given columns.
case object REBALANCE_PARTITIONS_BY_COL extends ShuffleOrigin

View file

@ -536,7 +536,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
// AdaptiveSparkPlan (21)
// +- == Final Plan ==
// * HashAggregate (12)
// +- CustomShuffleReader (11)
// +- AQEShuffleRead (11)
// +- ShuffleQueryStage (10)
// +- Exchange (9)
// +- * HashAggregate (8)
@ -570,7 +570,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
|Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: 1""".stripMargin,
"""
|(11) CustomShuffleReader
|(11) AQEShuffleRead
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|""".stripMargin,
"""

View file

@ -24,7 +24,7 @@ import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@ -110,18 +110,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// by the ExchangeCoordinator.
val finalPlan = agg.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case r @ CoalescedShuffleReader() => r
val shuffleReads = finalPlan.collect {
case r @ CoalescedShuffleRead() => r
}
minNumPostShufflePartitions match {
case Some(numPartitions) =>
assert(shuffleReaders.isEmpty)
assert(shuffleReads.isEmpty)
case None =>
assert(shuffleReaders.length === 1)
shuffleReaders.foreach { reader =>
assert(reader.outputPartitioning.numPartitions === 3)
assert(shuffleReads.length === 1)
shuffleReads.foreach { read =>
assert(read.outputPartitioning.numPartitions === 3)
}
}
}
@ -156,18 +156,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// by the ExchangeCoordinator.
val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case r @ CoalescedShuffleReader() => r
val shuffleReads = finalPlan.collect {
case r @ CoalescedShuffleRead() => r
}
minNumPostShufflePartitions match {
case Some(numPartitions) =>
assert(shuffleReaders.isEmpty)
assert(shuffleReads.isEmpty)
case None =>
assert(shuffleReaders.length === 2)
shuffleReaders.foreach { reader =>
assert(reader.outputPartitioning.numPartitions === 2)
assert(shuffleReads.length === 2)
shuffleReads.foreach { read =>
assert(read.outputPartitioning.numPartitions === 2)
}
}
}
@ -207,18 +207,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// by the ExchangeCoordinator.
val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case r @ CoalescedShuffleReader() => r
val shuffleReads = finalPlan.collect {
case r @ CoalescedShuffleRead() => r
}
minNumPostShufflePartitions match {
case Some(numPartitions) =>
assert(shuffleReaders.isEmpty)
assert(shuffleReads.isEmpty)
case None =>
assert(shuffleReaders.length === 2)
shuffleReaders.foreach { reader =>
assert(reader.outputPartitioning.numPartitions === 2)
assert(shuffleReads.length === 2)
shuffleReads.foreach { read =>
assert(read.outputPartitioning.numPartitions === 2)
}
}
}
@ -258,18 +258,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// by the ExchangeCoordinator.
val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case r @ CoalescedShuffleReader() => r
val shuffleReads = finalPlan.collect {
case r @ CoalescedShuffleRead() => r
}
minNumPostShufflePartitions match {
case Some(numPartitions) =>
assert(shuffleReaders.isEmpty)
assert(shuffleReads.isEmpty)
case None =>
assert(shuffleReaders.length === 2)
shuffleReaders.foreach { reader =>
assert(reader.outputPartitioning.numPartitions === 3)
assert(shuffleReads.length === 2)
shuffleReads.foreach { read =>
assert(read.outputPartitioning.numPartitions === 3)
}
}
}
@ -300,10 +300,10 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// Then, let's make sure we do not reduce number of post shuffle partitions.
val finalPlan = join.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case r @ CoalescedShuffleReader() => r
val shuffleReads = finalPlan.collect {
case r @ CoalescedShuffleRead() => r
}
assert(shuffleReaders.length === 0)
assert(shuffleReads.length === 0)
} finally {
spark.sql("drop table t")
}
@ -331,7 +331,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
}.length == 2)
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.length == 3)
@ -357,14 +357,14 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
assert(
finalPlan2.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.length == 2, "finalPlan2")
level1Stages.foreach(qs =>
assert(qs.plan.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.length == 1,
"Wrong CoalescedShuffleReader below " + qs.simpleString(3)))
"Wrong CoalescedShuffleRead below " + qs.simpleString(3)))
val leafStages = level1Stages.flatMap { stage =>
// All of the child stages of result stage have only one child stage.
@ -395,7 +395,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.isEmpty)
}
withSparkSession(test, 200, None)
@ -416,7 +416,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
// the shuffle partition numbers.
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.isEmpty)
}
withSparkSession(test, 100, None)
@ -432,7 +432,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
case r @ CoalescedShuffleRead() => r
}.isDefinedAt(0))
}
Seq(true, false).foreach { enableIOEncryption =>
@ -442,8 +442,8 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
}
}
object CoalescedShuffleReader {
def unapply(reader: CustomShuffleReaderExec): Boolean = {
!reader.isLocalReader && !reader.hasSkewedPartition && reader.hasCoalescedPartition
object CoalescedShuffleRead {
def unapply(read: AQEShuffleReadExec): Boolean = {
!read.isLocalRead && !read.hasSkewedPartition && read.hasCoalescedPartition
}
}

View file

@ -139,21 +139,21 @@ class AdaptiveQueryExecSuite
}
}
private def checkNumLocalShuffleReaders(
plan: SparkPlan, numShufflesWithoutLocalReader: Int = 0): Unit = {
private def checkNumLocalShuffleReads(
plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
val numShuffles = collect(plan) {
case s: ShuffleQueryStageExec => s
}.length
val numLocalReaders = collect(plan) {
case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
val numLocalReads = collect(plan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
numLocalReaders.foreach { r =>
numLocalReads.foreach { r =>
val rdd = r.execute()
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader))
assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
}
private def checkInitialPartitionNum(df: Dataset[_], numPartition: Int): Unit = {
@ -177,11 +177,11 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
}
test("Reuse the parallelism of CoalescedShuffleReaderExec in LocalShuffleReaderExec") {
test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
@ -192,12 +192,12 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
val localReaders = collect(adaptivePlan) {
case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
val localReads = collect(adaptivePlan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
assert(localReaders.length == 2)
val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
assert(localReads.length == 2)
val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD]
// The pre-shuffle partition size is [0, 0, 0, 72, 0]
// We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1
// the final parallelism is
@ -213,7 +213,7 @@ class AdaptiveQueryExecSuite
}
}
test("Reuse the default parallelism in LocalShuffleReaderExec") {
test("Reuse the default parallelism in local shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
@ -224,12 +224,12 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
val localReaders = collect(adaptivePlan) {
case reader: CustomShuffleReaderExec if reader.isLocalReader => reader
val localReads = collect(adaptivePlan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
assert(localReaders.length == 2)
val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
assert(localReads.length == 2)
val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD]
// the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2
// and the partitions length is 2 * numMappers = 4
assert(localShuffleRDD0.getPartitions.length == 4)
@ -252,11 +252,11 @@ class AdaptiveQueryExecSuite
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
val coalescedReads = collect(plan) {
case r: AQEShuffleReadExec => r
}
assert(coalescedReaders.length == 3)
coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1))
assert(coalescedReads.length == 3)
coalescedReads.foreach(r => assert(r.partitionSpecs.length == 1))
}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
@ -265,11 +265,11 @@ class AdaptiveQueryExecSuite
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
val coalescedReads = collect(plan) {
case r: AQEShuffleReadExec => r
}
assert(coalescedReaders.length == 3, s"$plan")
coalescedReaders.foreach(r => assert(r.isLocalReader || r.partitionSpecs.length == 1))
assert(coalescedReads.length == 3, s"$plan")
coalescedReads.foreach(r => assert(r.isLocalRead || r.partitionSpecs.length == 1))
}
}
}
@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
}
@ -301,7 +301,7 @@ class AdaptiveQueryExecSuite
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
}
@ -342,11 +342,11 @@ class AdaptiveQueryExecSuite
// +-LocalShuffleReader*
// +- ShuffleExchange
// After applied the 'OptimizeLocalShuffleReader' rule, we can convert all the four
// shuffle reader to local shuffle reader in the bottom two 'BroadcastHashJoin'.
// After applied the 'OptimizeShuffleWithLocalRead' rule, we can convert all the four
// shuffle read to local shuffle read in the bottom two 'BroadcastHashJoin'.
// For the top level 'BroadcastHashJoin', the probe side is not shuffle query stage
// and the build side shuffle query stage is also converted to local shuffle reader.
checkNumLocalShuffleReaders(adaptivePlan)
// and the build side shuffle query stage is also converted to local shuffle read.
checkNumLocalShuffleReads(adaptivePlan)
}
}
@ -390,8 +390,8 @@ class AdaptiveQueryExecSuite
// +- CoalescedShuffleReader
// +- ShuffleExchange
// The shuffle added by Aggregate can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
// The shuffle added by Aggregate can't apply local read.
checkNumLocalShuffleReads(adaptivePlan, 1)
}
}
@ -436,8 +436,8 @@ class AdaptiveQueryExecSuite
// +-LocalShuffleReader*
// +- ShuffleExchange
// The shuffle added by Aggregate can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
// The shuffle added by Aggregate can't apply local read.
checkNumLocalShuffleReads(adaptivePlan, 1)
}
}
@ -452,9 +452,9 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 2)
// There is still a SMJ, and its two shuffles can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 2)
// Even with local shuffle reader, the query stage reuse can also work.
// There is still a SMJ, and its two shuffles can't apply local read.
checkNumLocalShuffleReads(adaptivePlan, 2)
// Even with local shuffle read, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.size == 1)
}
@ -471,8 +471,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
// Even with local shuffle reader, the query stage reuse can also work.
checkNumLocalShuffleReads(adaptivePlan)
// Even with local shuffle read, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.size == 1)
}
@ -491,8 +491,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
// Even with local shuffle reader, the query stage reuse can also work.
checkNumLocalShuffleReads(adaptivePlan)
// Even with local shuffle read, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.nonEmpty)
val sub = findReusedSubquery(adaptivePlan)
@ -512,8 +512,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
// Even with local shuffle reader, the query stage reuse can also work.
checkNumLocalShuffleReads(adaptivePlan)
// Even with local shuffle read, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.isEmpty)
val sub = findReusedSubquery(adaptivePlan)
@ -536,8 +536,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
// Even with local shuffle reader, the query stage reuse can also work.
checkNumLocalShuffleReads(adaptivePlan)
// Even with local shuffle read, the query stage reuse can also work.
val ex = findReusedExchange(adaptivePlan)
assert(ex.nonEmpty)
assert(ex.head.child.isInstanceOf[BroadcastExchangeExec])
@ -599,7 +599,7 @@ class AdaptiveQueryExecSuite
}
}
test("Change merge join to broadcast join without local shuffle reader") {
test("Change merge join to broadcast join without local shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key -> "true",
@ -615,8 +615,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 2)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
// There is still a SMJ, and its two shuffles can't apply local reader.
checkNumLocalShuffleReaders(adaptivePlan, 2)
// There is still a SMJ, and its two shuffles can't apply local read.
checkNumLocalShuffleReads(adaptivePlan, 2)
}
}
@ -734,12 +734,12 @@ class AdaptiveQueryExecSuite
rightSkewNum: Int): Unit = {
assert(joins.size == 1 && joins.head.isSkewJoin)
assert(joins.head.left.collect {
case r: CustomShuffleReaderExec => r
case r: AQEShuffleReadExec => r
}.head.partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length == leftSkewNum)
assert(joins.head.right.collect {
case r: CustomShuffleReaderExec => r
case r: AQEShuffleReadExec => r
}.head.partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length == rightSkewNum)
@ -895,16 +895,16 @@ class AdaptiveQueryExecSuite
}
}
test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan") {
test("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData GROUP BY key")
val readers = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
val reads = collect(adaptivePlan) {
case r: AQEShuffleReadExec => r
}
assert(readers.length == 1)
val reader = readers.head
val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec]
assert(reads.length == 1)
val read = reads.head
val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
// we can't just call execute() because that has separate checks for canonicalized plans
val ex = intercept[IllegalStateException] {
val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
@ -914,22 +914,22 @@ class AdaptiveQueryExecSuite
}
}
test("metrics of the shuffle reader") {
test("metrics of the shuffle read") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData GROUP BY key")
val readers = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
val reads = collect(adaptivePlan) {
case r: AQEShuffleReadExec => r
}
assert(readers.length == 1)
val reader = readers.head
assert(!reader.isLocalReader)
assert(!reader.hasSkewedPartition)
assert(reader.hasCoalescedPartition)
assert(reader.metrics.keys.toSeq.sorted == Seq(
assert(reads.length == 1)
val read = reads.head
assert(!read.isLocalRead)
assert(!read.hasSkewedPartition)
assert(read.hasCoalescedPartition)
assert(read.metrics.keys.toSeq.sorted == Seq(
"numPartitions", "partitionDataSize"))
assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length)
assert(reader.metrics("partitionDataSize").value > 0)
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
@ -939,14 +939,14 @@ class AdaptiveQueryExecSuite
}.head
assert(join.buildSide == BuildLeft)
val readers = collect(join.right) {
case r: CustomShuffleReaderExec => r
val reads = collect(join.right) {
case r: AQEShuffleReadExec => r
}
assert(readers.length == 1)
val reader = readers.head
assert(reader.isLocalReader)
assert(reader.metrics.keys.toSeq == Seq("numPartitions"))
assert(reader.metrics("numPartitions").value == reader.partitionSpecs.length)
assert(reads.length == 1)
val read = reads.head
assert(read.isLocalRead)
assert(read.metrics.keys.toSeq == Seq("numPartitions"))
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
}
withSQLConf(
@ -972,19 +972,19 @@ class AdaptiveQueryExecSuite
.createOrReplaceTempView("skewData2")
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
val readers = collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
val reads = collect(adaptivePlan) {
case r: AQEShuffleReadExec => r
}
readers.foreach { reader =>
assert(!reader.isLocalReader)
assert(reader.hasCoalescedPartition)
assert(reader.hasSkewedPartition)
assert(reader.metrics.contains("numSkewedPartitions"))
reads.foreach { read =>
assert(!read.isLocalRead)
assert(read.hasCoalescedPartition)
assert(read.hasSkewedPartition)
assert(read.metrics.contains("numSkewedPartitions"))
}
assert(readers(0).metrics("numSkewedPartitions").value == 2)
assert(readers(0).metrics("numSkewedSplits").value == 11)
assert(readers(1).metrics("numSkewedPartitions").value == 1)
assert(readers(1).metrics("numSkewedSplits").value == 9)
assert(reads(0).metrics("numSkewedPartitions").value == 2)
assert(reads(0).metrics("numSkewedSplits").value == 11)
assert(reads(1).metrics("numSkewedPartitions").value == 1)
assert(reads(1).metrics("numSkewedSplits").value == 9)
}
}
}
@ -1233,7 +1233,7 @@ class AdaptiveQueryExecSuite
assert(bhj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
assert(join.isEmpty)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
}
@ -1252,7 +1252,7 @@ class AdaptiveQueryExecSuite
// this is different compares to test(SPARK-32573) due to the rule
// `EliminateUnnecessaryJoin` has been excluded.
assert(join.nonEmpty)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
}
@ -1273,7 +1273,7 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
assert(join.isEmpty)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
})
}
}
@ -1431,7 +1431,7 @@ class AdaptiveQueryExecSuite
}
}
test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
test("SPARK-32932: Do not use local shuffle read at final stage on write command") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
@ -1441,14 +1441,14 @@ class AdaptiveQueryExecSuite
) yield (i, j)
val df = data.toDF("i", "j").repartition($"j")
var noLocalReader: Boolean = false
var noLocalread: Boolean = false
val listener = new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
qe.executedPlan match {
case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
noLocalReader = collect(plan) {
case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
noLocalread = collect(plan) {
case exec: AQEShuffleReadExec if exec.isLocalRead => exec
}.isEmpty
case _ => // ignore other events
}
@ -1461,32 +1461,32 @@ class AdaptiveQueryExecSuite
withTable("t") {
df.write.partitionBy("j").saveAsTable("t")
sparkContext.listenerBus.waitUntilEmpty()
assert(noLocalReader)
noLocalReader = false
assert(noLocalread)
noLocalread = false
}
// Test DataSource v2
val format = classOf[NoopDataSource].getName
df.write.format(format).mode("overwrite").save()
sparkContext.listenerBus.waitUntilEmpty()
assert(noLocalReader)
noLocalReader = false
assert(noLocalread)
noLocalread = false
spark.listenerManager.unregister(listener)
}
}
test("SPARK-33494: Do not use local shuffle reader for repartition") {
test("SPARK-33494: Do not use local shuffle read for repartition") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val df = spark.table("testData").repartition('key)
df.collect()
// local shuffle reader breaks partitioning and shouldn't be used for repartition operation
// local shuffle read breaks partitioning and shouldn't be used for repartition operation
// which is specified by users.
checkNumLocalShuffleReaders(df.queryExecution.executedPlan, numShufflesWithoutLocalReader = 1)
checkNumLocalShuffleReads(df.queryExecution.executedPlan, numShufflesWithoutLocalRead = 1)
}
}
test("SPARK-33551: Do not use custom shuffle reader for repartition") {
test("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@ -1515,11 +1515,11 @@ class AdaptiveQueryExecSuite
assert(!hasRepartitionShuffle(plan))
val bhj = findTopLevelBroadcastHashJoin(plan)
assert(bhj.length == 1)
checkNumLocalShuffleReaders(plan, 1)
checkNumLocalShuffleReads(plan, 1)
// Probe side is coalesced.
val customReader = bhj.head.right.find(_.isInstanceOf[CustomShuffleReaderExec])
assert(customReader.isDefined)
assert(customReader.get.asInstanceOf[CustomShuffleReaderExec].hasCoalescedPartition)
val aqeRead = bhj.head.right.find(_.isInstanceOf[AQEShuffleReadExec])
assert(aqeRead.isDefined)
assert(aqeRead.get.asInstanceOf[AQEShuffleReadExec].hasCoalescedPartition)
// Repartition with partition default num specified.
val dfRepartitionWithNum = df.repartition(5, 'b)
@ -1529,23 +1529,23 @@ class AdaptiveQueryExecSuite
assert(hasRepartitionShuffle(planWithNum))
val bhjWithNum = findTopLevelBroadcastHashJoin(planWithNum)
assert(bhjWithNum.length == 1)
checkNumLocalShuffleReaders(planWithNum, 1)
checkNumLocalShuffleReads(planWithNum, 1)
// Probe side is coalesced.
assert(bhjWithNum.head.right.find(_.isInstanceOf[CustomShuffleReaderExec]).nonEmpty)
assert(bhjWithNum.head.right.find(_.isInstanceOf[AQEShuffleReadExec]).nonEmpty)
// Repartition with partition non-default num specified.
val dfRepartitionWithNum2 = df.repartition(3, 'b)
dfRepartitionWithNum2.collect()
val planWithNum2 = dfRepartitionWithNum2.queryExecution.executedPlan
// The top shuffle from repartition is not optimized out, and this is the only shuffle that
// does not have local shuffle reader.
// does not have local shuffle read.
assert(hasRepartitionShuffle(planWithNum2))
val bhjWithNum2 = findTopLevelBroadcastHashJoin(planWithNum2)
assert(bhjWithNum2.length == 1)
checkNumLocalShuffleReaders(planWithNum2, 1)
val customReader2 = bhjWithNum2.head.right.find(_.isInstanceOf[CustomShuffleReaderExec])
assert(customReader2.isDefined)
assert(customReader2.get.asInstanceOf[CustomShuffleReaderExec].isLocalReader)
checkNumLocalShuffleReads(planWithNum2, 1)
val aqeRead2 = bhjWithNum2.head.right.find(_.isInstanceOf[AQEShuffleReadExec])
assert(aqeRead2.isDefined)
assert(aqeRead2.get.asInstanceOf[AQEShuffleReadExec].isLocalRead)
}
// Force skew join
@ -1565,10 +1565,10 @@ class AdaptiveQueryExecSuite
// No skew join due to the repartition.
assert(!smj.head.isSkewJoin)
// Both sides are coalesced.
val customReaders = collect(smj.head) {
case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c
val aqeReads = collect(smj.head) {
case c: AQEShuffleReadExec if c.hasCoalescedPartition => c
}
assert(customReaders.length == 2)
assert(aqeReads.length == 2)
// Repartition with default partition num specified.
val dfRepartitionWithNum = df.repartition(5, 'b)
@ -1580,10 +1580,10 @@ class AdaptiveQueryExecSuite
assert(smjWithNum.length == 1)
// Skew join can apply as the repartition is not optimized out.
assert(smjWithNum.head.isSkewJoin)
val customReadersWithNum = collect(smjWithNum.head) {
case c: CustomShuffleReaderExec => c
val aqeReadsWithNum = collect(smjWithNum.head) {
case c: AQEShuffleReadExec => c
}
assert(customReadersWithNum.nonEmpty)
assert(aqeReadsWithNum.nonEmpty)
// Repartition with default non-partition num specified.
val dfRepartitionWithNum2 = df.repartition(3, 'b)
@ -1660,7 +1660,7 @@ class AdaptiveQueryExecSuite
ds.collect()
val plan = ds.queryExecution.executedPlan
assert(collect(plan) {
case c: CustomShuffleReaderExec => c
case c: AQEShuffleReadExec => c
}.isEmpty)
assert(collect(plan) {
case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s
@ -1691,7 +1691,7 @@ class AdaptiveQueryExecSuite
val (_, adaptive) = runAdaptiveAndVerifyResult("SELECT c1, count(*) FROM t GROUP BY c1")
assert(
collect(adaptive) {
case c @ CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.length == 1 =>
case c @ AQEShuffleReadExec(_, partitionSpecs) if partitionSpecs.length == 1 =>
assert(c.hasCoalescedPartition)
c
}.length == 1
@ -1793,30 +1793,30 @@ class AdaptiveQueryExecSuite
val query = s"SELECT /*+ $repartition */ * FROM testData"
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
case r: AQEShuffleReadExec => r
} match {
case Seq(customShuffleReader) =>
assert(customShuffleReader.partitionSpecs.size === 1)
assert(!customShuffleReader.isLocalReader)
case Seq(aqeShuffleRead) =>
assert(aqeShuffleRead.partitionSpecs.size === 1)
assert(!aqeShuffleRead.isLocalRead)
case _ =>
fail("There should be a CustomShuffleReaderExec")
fail("There should be a AQEShuffleReadExec")
}
}
}
}
test("SPARK-35650: Use local shuffle reader if can not coalesce number of partitions") {
test("SPARK-35650: Use local shuffle read if can not coalesce number of partitions") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val query = "SELECT /*+ REPARTITION */ * FROM testData"
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query)
collect(adaptivePlan) {
case r: CustomShuffleReaderExec => r
case r: AQEShuffleReadExec => r
} match {
case Seq(customShuffleReader) =>
assert(customShuffleReader.partitionSpecs.size === 4)
assert(customShuffleReader.isLocalReader)
case Seq(aqeShuffleRead) =>
assert(aqeShuffleRead.partitionSpecs.size === 4)
assert(aqeShuffleRead.isLocalRead)
case _ =>
fail("There should be a CustomShuffleReaderExec")
fail("There should be a AQEShuffleReadExec")
}
}
}
@ -1838,13 +1838,13 @@ class AdaptiveQueryExecSuite
def checkPartitionNumber(
query: String, skewedPartitionNumber: Int, totalNumber: Int): Unit = {
val (_, adaptive) = runAdaptiveAndVerifyResult(query)
val reader = collect(adaptive) {
case reader: CustomShuffleReaderExec => reader
val read = collect(adaptive) {
case read: AQEShuffleReadExec => read
}
assert(reader.size == 1)
assert(reader.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) ==
assert(read.size == 1)
assert(read.head.partitionSpecs.count(_.isInstanceOf[PartialReducerPartitionSpec]) ==
skewedPartitionNumber)
assert(reader.head.partitionSpecs.size == totalNumber)
assert(read.head.partitionSpecs.size == totalNumber)
}
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
@ -1873,11 +1873,11 @@ class AdaptiveQueryExecSuite
.createOrReplaceTempView("t2")
val (_, adaptive) =
runAdaptiveAndVerifyResult("SELECT * FROM testData2 t1 left semi join t2 ON t1.a=t2.b")
val customReaders = collect(adaptive) {
case c: CustomShuffleReaderExec => c
val aqeReads = collect(adaptive) {
case c: AQEShuffleReadExec => c
}
assert(customReaders.length == 2)
customReaders.foreach { c =>
assert(aqeReads.length == 2)
aqeReads.foreach { c =>
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
assert(stats.sizeInBytes >= 0)
assert(stats.rowCount.get >= 0)
@ -1890,12 +1890,12 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptive) =
runAdaptiveAndVerifyResult("SELECT sum(id) FROM RANGE(10) GROUP BY id % 3")
val coalesceReader = collect(adaptive) {
case r: CustomShuffleReaderExec if r.hasCoalescedPartition => r
val coalesceRead = collect(adaptive) {
case r: AQEShuffleReadExec if r.hasCoalescedPartition => r
}
assert(coalesceReader.length == 1)
assert(coalesceRead.length == 1)
// RANGE(10) is a very small dataset and AQE coalescing should produce one partition.
assert(coalesceReader.head.partitionSpecs.length == 1)
assert(coalesceRead.head.partitionSpecs.length == 1)
}
}
@ -1919,7 +1919,7 @@ class AdaptiveQueryExecSuite
assert(smj.size == 1)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 1)
checkNumLocalShuffleReaders(adaptivePlan)
checkNumLocalShuffleReads(adaptivePlan)
}
withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key ->