[SPARK-20246][SQL] should not push predicate down through aggregate with non-deterministic expressions
## What changes were proposed in this pull request? Similar to `Project`, when `Aggregate` has non-deterministic expressions, we should not push predicate down through it, as it will change the number of input rows and thus change the evaluation result of non-deterministic expressions in `Aggregate`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17562 from cloud-fan/filter.
This commit is contained in:
parent
589f3edb82
commit
7577e9c356
|
@ -755,7 +755,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
|
|||
// implies that, for a given input row, the output are determined by the expression's initial
|
||||
// state and all the input rows processed before. In another word, the order of input rows
|
||||
// matters for non-deterministic expressions, while pushing down predicates changes the order.
|
||||
case filter @ Filter(condition, project @ Project(fields, grandChild))
|
||||
// This also applies to Aggregate.
|
||||
case Filter(condition, project @ Project(fields, grandChild))
|
||||
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
|
||||
|
||||
// Create a map of Aliases to their values from the child projection.
|
||||
|
@ -766,33 +767,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
|
|||
|
||||
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
|
||||
|
||||
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
|
||||
// pushed beneath must satisfy the following conditions:
|
||||
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
|
||||
// 2. Deterministic.
|
||||
// 3. Placed before any non-deterministic predicates.
|
||||
case filter @ Filter(condition, w: Window)
|
||||
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
|
||||
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
|
||||
|
||||
val (candidates, containingNonDeterministic) =
|
||||
splitConjunctivePredicates(condition).span(_.deterministic)
|
||||
|
||||
val (pushDown, rest) = candidates.partition { cond =>
|
||||
cond.references.subsetOf(partitionAttrs)
|
||||
}
|
||||
|
||||
val stayUp = rest ++ containingNonDeterministic
|
||||
|
||||
if (pushDown.nonEmpty) {
|
||||
val pushDownPredicate = pushDown.reduce(And)
|
||||
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
|
||||
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
|
||||
} else {
|
||||
filter
|
||||
}
|
||||
|
||||
case filter @ Filter(condition, aggregate: Aggregate) =>
|
||||
case filter @ Filter(condition, aggregate: Aggregate)
|
||||
if aggregate.aggregateExpressions.forall(_.deterministic) =>
|
||||
// Find all the aliased expressions in the aggregate list that don't include any actual
|
||||
// AggregateExpression, and create a map from the alias to the expression
|
||||
val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
|
||||
|
@ -823,6 +799,32 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
|
|||
filter
|
||||
}
|
||||
|
||||
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
|
||||
// pushed beneath must satisfy the following conditions:
|
||||
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
|
||||
// 2. Deterministic.
|
||||
// 3. Placed before any non-deterministic predicates.
|
||||
case filter @ Filter(condition, w: Window)
|
||||
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
|
||||
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
|
||||
|
||||
val (candidates, containingNonDeterministic) =
|
||||
splitConjunctivePredicates(condition).span(_.deterministic)
|
||||
|
||||
val (pushDown, rest) = candidates.partition { cond =>
|
||||
cond.references.subsetOf(partitionAttrs)
|
||||
}
|
||||
|
||||
val stayUp = rest ++ containingNonDeterministic
|
||||
|
||||
if (pushDown.nonEmpty) {
|
||||
val pushDownPredicate = pushDown.reduce(And)
|
||||
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
|
||||
if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
|
||||
} else {
|
||||
filter
|
||||
}
|
||||
|
||||
case filter @ Filter(condition, union: Union) =>
|
||||
// Union could change the rows, so non-deterministic predicate can't be pushed down
|
||||
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
|
||||
|
@ -848,7 +850,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
|
|||
filter
|
||||
}
|
||||
|
||||
case filter @ Filter(condition, u: UnaryNode)
|
||||
case filter @ Filter(_, u: UnaryNode)
|
||||
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
|
||||
pushDownPredicate(filter, u.child) { predicate =>
|
||||
u.withNewChildren(Seq(Filter(predicate, u.child)))
|
||||
|
|
|
@ -134,15 +134,20 @@ class FilterPushdownSuite extends PlanTest {
|
|||
comparePlans(optimized, correctAnswer)
|
||||
}
|
||||
|
||||
test("nondeterministic: can't push down filter with nondeterministic condition through project") {
|
||||
test("nondeterministic: can always push down filter through project with deterministic field") {
|
||||
val originalQuery = testRelation
|
||||
.select(Rand(10).as('rand), 'a)
|
||||
.where('rand > 5 || 'a > 5)
|
||||
.select('a)
|
||||
.where(Rand(10) > 5 || 'a > 5)
|
||||
.analyze
|
||||
|
||||
val optimized = Optimize.execute(originalQuery)
|
||||
|
||||
comparePlans(optimized, originalQuery)
|
||||
val correctAnswer = testRelation
|
||||
.where(Rand(10) > 5 || 'a > 5)
|
||||
.select('a)
|
||||
.analyze
|
||||
|
||||
comparePlans(optimized, correctAnswer)
|
||||
}
|
||||
|
||||
test("nondeterministic: can't push down filter through project with nondeterministic field") {
|
||||
|
@ -156,6 +161,34 @@ class FilterPushdownSuite extends PlanTest {
|
|||
comparePlans(optimized, originalQuery)
|
||||
}
|
||||
|
||||
test("nondeterministic: can't push down filter through aggregate with nondeterministic field") {
|
||||
val originalQuery = testRelation
|
||||
.groupBy('a)('a, Rand(10).as('rand))
|
||||
.where('a > 5)
|
||||
.analyze
|
||||
|
||||
val optimized = Optimize.execute(originalQuery)
|
||||
|
||||
comparePlans(optimized, originalQuery)
|
||||
}
|
||||
|
||||
test("nondeterministic: push down part of filter through aggregate with deterministic field") {
|
||||
val originalQuery = testRelation
|
||||
.groupBy('a)('a)
|
||||
.where('a > 5 && Rand(10) > 5)
|
||||
.analyze
|
||||
|
||||
val optimized = Optimize.execute(originalQuery.analyze)
|
||||
|
||||
val correctAnswer = testRelation
|
||||
.where('a > 5)
|
||||
.groupBy('a)('a)
|
||||
.where(Rand(10) > 5)
|
||||
.analyze
|
||||
|
||||
comparePlans(optimized, correctAnswer)
|
||||
}
|
||||
|
||||
test("filters: combines filters") {
|
||||
val originalQuery = testRelation
|
||||
.select('a)
|
||||
|
|
Loading…
Reference in a new issue