Revert "[SPARK-35904][SQL] Collapse above RebalancePartitions"
This reverts commit def29e50
This commit is contained in:
parent
3255511d52
commit
108635af17
|
@ -911,11 +911,6 @@ object CollapseRepartition extends Rule[LogicalPlan] {
|
||||||
// we can remove the child.
|
// we can remove the child.
|
||||||
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>
|
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>
|
||||||
r.copy(child = child.child)
|
r.copy(child = child.child)
|
||||||
|
|
||||||
// Case 3: When a RebalancePartitions has a child of Repartition or RepartitionByExpression
|
|
||||||
// we can remove the child.
|
|
||||||
case r @ RebalancePartitions(_, child: RepartitionOperation) =>
|
|
||||||
r.copy(child = child.child)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1373,9 +1373,9 @@ object RepartitionByExpression {
|
||||||
*/
|
*/
|
||||||
case class RebalancePartitions(
|
case class RebalancePartitions(
|
||||||
partitionExpressions: Seq[Expression],
|
partitionExpressions: Seq[Expression],
|
||||||
child: LogicalPlan) extends RepartitionOperation {
|
child: LogicalPlan) extends UnaryNode {
|
||||||
|
override def maxRows: Option[Long] = child.maxRows
|
||||||
override def numPartitions: Int = conf.numShufflePartitions
|
override def output: Seq[Attribute] = child.output
|
||||||
|
|
||||||
def partitioning: Partitioning = if (partitionExpressions.isEmpty) {
|
def partitioning: Partitioning = if (partitionExpressions.isEmpty) {
|
||||||
RoundRobinPartitioning(conf.numShufflePartitions)
|
RoundRobinPartitioning(conf.numShufflePartitions)
|
||||||
|
@ -1383,8 +1383,6 @@ case class RebalancePartitions(
|
||||||
HashPartitioning(partitionExpressions, conf.numShufflePartitions)
|
HashPartitioning(partitionExpressions, conf.numShufflePartitions)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def shuffle: Boolean = true
|
|
||||||
|
|
||||||
override protected def withNewChildInternal(newChild: LogicalPlan): RebalancePartitions =
|
override protected def withNewChildInternal(newChild: LogicalPlan): RebalancePartitions =
|
||||||
copy(child = newChild)
|
copy(child = newChild)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
|
||||||
import org.apache.spark.sql.catalyst.dsl.expressions._
|
import org.apache.spark.sql.catalyst.dsl.expressions._
|
||||||
import org.apache.spark.sql.catalyst.dsl.plans._
|
import org.apache.spark.sql.catalyst.dsl.plans._
|
||||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, RebalancePartitions}
|
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
|
||||||
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||||
|
|
||||||
class CollapseRepartitionSuite extends PlanTest {
|
class CollapseRepartitionSuite extends PlanTest {
|
||||||
|
@ -195,13 +195,4 @@ class CollapseRepartitionSuite extends PlanTest {
|
||||||
comparePlans(optimized1, correctAnswer)
|
comparePlans(optimized1, correctAnswer)
|
||||||
comparePlans(optimized2, correctAnswer)
|
comparePlans(optimized2, correctAnswer)
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-35904: Collapse above RebalancePartitions") {
|
|
||||||
comparePlans(
|
|
||||||
Optimize.execute(RebalancePartitions(Seq('b), testRelation.distribute('b)(10)).analyze),
|
|
||||||
RebalancePartitions(Seq('b), testRelation).analyze)
|
|
||||||
comparePlans(
|
|
||||||
Optimize.execute(RebalancePartitions(Seq('b), testRelation).distribute('a)(10).analyze),
|
|
||||||
testRelation.distribute('a)(10).analyze)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue