[SPARK-23973][SQL] Remove consecutive Sorts

## What changes were proposed in this pull request?

In SPARK-23375 we introduced the ability of removing `Sort` operation during query optimization if the data is already sorted. In this follow-up we remove also a `Sort` which is followed by another `Sort`: in this case the first sort is not needed and can be safely removed.

The PR starts from henryr's comment: https://github.com/apache/spark/pull/20560#discussion_r180601594. So credit should be given to him.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21072 from mgaido91/SPARK-23973.
This commit is contained in:
Marco Gaido 2018-04-24 10:11:09 +08:00 committed by Wenchen Fan
parent 428b903859
commit 281c1ca0dc
2 changed files with 63 additions and 9 deletions

View file

@ -767,12 +767,29 @@ object EliminateSorts extends Rule[LogicalPlan] {
}
/**
* Removes Sort operation if the child is already sorted
* Removes redundant Sort operation. This can happen:
* 1) if the child is already sorted
* 2) if there is another Sort operator separated by 0...n Project/Filter operators
*/
object RemoveRedundantSorts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
child
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
}
def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
case Sort(_, _, child) => recursiveRemoveSort(child)
case other if canEliminateSort(other) =>
other.withNewChildren(other.children.map(recursiveRemoveSort))
case _ => plan
}
def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
case p: Project => p.projectList.forall(_.deterministic)
case f: Filter => f.condition.deterministic
case _: ResolvedHint => true
case _ => false
}
}

View file

@ -17,16 +17,12 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL}
class RemoveRedundantSortsSuite extends PlanTest {
@ -42,15 +38,15 @@ class RemoveRedundantSortsSuite extends PlanTest {
test("remove redundant order by") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val unnecessaryReordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst)
val optimized = Optimize.execute(unnecessaryReordered.analyze)
val correctAnswer = orderedPlan.select('a).analyze
val correctAnswer = orderedPlan.limit(2).select('a).analyze
comparePlans(Optimize.execute(optimized), correctAnswer)
}
test("do not remove sort if the order is different") {
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc)
val reorderedDifferently = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc)
val optimized = Optimize.execute(reorderedDifferently.analyze)
val correctAnswer = reorderedDifferently.analyze
comparePlans(optimized, correctAnswer)
@ -72,6 +68,14 @@ class RemoveRedundantSortsSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("different sorts are not simplified if limit is in between") {
val orderedPlan = testRelation.select('a, 'b).orderBy('b.desc).limit(Literal(10))
.orderBy('a.asc)
val optimized = Optimize.execute(orderedPlan.analyze)
val correctAnswer = orderedPlan.analyze
comparePlans(optimized, correctAnswer)
}
test("range is already sorted") {
val inputPlan = Range(1L, 1000L, 1, 10)
val orderedPlan = inputPlan.orderBy('id.asc)
@ -98,4 +102,37 @@ class RemoveRedundantSortsSuite extends PlanTest {
val correctAnswer = groupedAndResorted.analyze
comparePlans(optimized, correctAnswer)
}
test("remove two consecutive sorts") {
val orderedTwice = testRelation.orderBy('a.asc).orderBy('b.desc)
val optimized = Optimize.execute(orderedTwice.analyze)
val correctAnswer = testRelation.orderBy('b.desc).analyze
comparePlans(optimized, correctAnswer)
}
test("remove sorts separated by Filter/Project operators") {
val orderedTwiceWithProject = testRelation.orderBy('a.asc).select('b).orderBy('b.desc)
val optimizedWithProject = Optimize.execute(orderedTwiceWithProject.analyze)
val correctAnswerWithProject = testRelation.select('b).orderBy('b.desc).analyze
comparePlans(optimizedWithProject, correctAnswerWithProject)
val orderedTwiceWithFilter =
testRelation.orderBy('a.asc).where('b > Literal(0)).orderBy('b.desc)
val optimizedWithFilter = Optimize.execute(orderedTwiceWithFilter.analyze)
val correctAnswerWithFilter = testRelation.where('b > Literal(0)).orderBy('b.desc).analyze
comparePlans(optimizedWithFilter, correctAnswerWithFilter)
val orderedTwiceWithBoth =
testRelation.orderBy('a.asc).select('b).where('b > Literal(0)).orderBy('b.desc)
val optimizedWithBoth = Optimize.execute(orderedTwiceWithBoth.analyze)
val correctAnswerWithBoth =
testRelation.select('b).where('b > Literal(0)).orderBy('b.desc).analyze
comparePlans(optimizedWithBoth, correctAnswerWithBoth)
val orderedThrice = orderedTwiceWithBoth.select(('b + 1).as('c)).orderBy('c.asc)
val optimizedThrice = Optimize.execute(orderedThrice.analyze)
val correctAnswerThrice = testRelation.select('b).where('b > Literal(0))
.select(('b + 1).as('c)).orderBy('c.asc).analyze
comparePlans(optimizedThrice, correctAnswerThrice)
}
}