[SPARK-33971][SQL] Eliminate distinct from more aggregates

### What changes were proposed in this pull request?

Add more aggregate expressions to `EliminateDistinct` rule.

### Why are the changes needed?

Distinct aggregation can add a significant overhead. It's better to remove distinct whenever possible.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #30999 from tanelk/SPARK-33971_eliminate_distinct.

Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
tanel.kiis@gmail.com 2021-02-26 21:59:02 +09:00 committed by Takeshi Yamamuro
parent c1beb16cc8
commit 67ec4f7f67
2 changed files with 32 additions and 25 deletions

View file

@ -352,11 +352,17 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
object EliminateDistinct extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions {
case ae: AggregateExpression if ae.isDistinct =>
ae.aggregateFunction match {
case _: Max | _: Min => ae.copy(isDistinct = false)
case _ => ae
}
case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) =>
ae.copy(isDistinct = false)
}
private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {
case _: Max => true
case _: Min => true
case _: BitAndAgg => true
case _: BitOrAgg => true
case _: CollectSet => true
case _ => false
}
}

View file

@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@ -32,25 +34,24 @@ class EliminateDistinctSuite extends PlanTest {
val testRelation = LocalRelation('a.int)
test("Eliminate Distinct in Max") {
val query = testRelation
.select(maxDistinct('a).as('result))
.analyze
val answer = testRelation
.select(max('a).as('result))
.analyze
assert(query != answer)
comparePlans(Optimize.execute(query), answer)
}
test("Eliminate Distinct in Min") {
val query = testRelation
.select(minDistinct('a).as('result))
.analyze
val answer = testRelation
.select(min('a).as('result))
.analyze
assert(query != answer)
comparePlans(Optimize.execute(query), answer)
Seq(
Max(_),
Min(_),
BitAndAgg(_),
BitOrAgg(_),
CollectSet(_: Expression)
).foreach {
aggBuilder =>
val agg = aggBuilder('a)
test(s"Eliminate Distinct in ${agg.prettyName}") {
val query = testRelation
.select(agg.toAggregateExpression(isDistinct = true).as('result))
.analyze
val answer = testRelation
.select(agg.toAggregateExpression(isDistinct = false).as('result))
.analyze
assert(query != answer)
comparePlans(Optimize.execute(query), answer)
}
}
}