[SPARK-35904][SQL] Collapse above RebalancePartitions

### What changes were proposed in this pull request?

1. Make `RebalancePartitions` extend `RepartitionOperation`.
2. Make `CollapseRepartition` support `RebalancePartitions`.

### Why are the changes needed?

`CollapseRepartition` can optimize `RebalancePartitions` if possible.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33099 from wangyum/SPARK-35904.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Yuming Wang 2021-06-26 21:19:58 -07:00 committed by Dongjoon Hyun
parent 74637a6ca7
commit def29e5075
3 changed files with 20 additions and 4 deletions

View file

@ -911,6 +911,11 @@ object CollapseRepartition extends Rule[LogicalPlan] {
// we can remove the child.
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>
r.copy(child = child.child)
// Case 3: When a RebalancePartitions has a child of Repartition or RepartitionByExpression
// we can remove the child.
case r @ RebalancePartitions(_, child: RepartitionOperation) =>
r.copy(child = child.child)
}
}

View file

@ -1373,9 +1373,9 @@ object RepartitionByExpression {
*/
case class RebalancePartitions(
partitionExpressions: Seq[Expression],
child: LogicalPlan) extends UnaryNode {
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
child: LogicalPlan) extends RepartitionOperation {
override def numPartitions: Int = conf.numShufflePartitions
def partitioning: Partitioning = if (partitionExpressions.isEmpty) {
RoundRobinPartitioning(conf.numShufflePartitions)
@ -1383,6 +1383,8 @@ case class RebalancePartitions(
HashPartitioning(partitionExpressions, conf.numShufflePartitions)
}
override def shuffle: Boolean = true
override protected def withNewChildInternal(newChild: LogicalPlan): RebalancePartitions =
copy(child = newChild)
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, RebalancePartitions}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
class CollapseRepartitionSuite extends PlanTest {
@ -195,4 +195,13 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized1, correctAnswer)
comparePlans(optimized2, correctAnswer)
}
test("SPARK-35904: Collapse above RebalancePartitions") {
comparePlans(
Optimize.execute(RebalancePartitions(Seq('b), testRelation.distribute('b)(10)).analyze),
RebalancePartitions(Seq('b), testRelation).analyze)
comparePlans(
Optimize.execute(RebalancePartitions(Seq('b), testRelation).distribute('a)(10).analyze),
testRelation.distribute('a)(10).analyze)
}
}