[SPARK-9082] [SQL] [FOLLOW-UP] use partition in PushPredicateThroughProject

a follow up of https://github.com/apache/spark/pull/7446

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7607 from cloud-fan/tmp and squashes the following commits:

7106989 [Wenchen Fan] use `partition` in `PushPredicateThroughProject`
This commit is contained in:
Wenchen Fan 2015-07-23 09:37:53 -07:00 committed by Yin Huai
parent 26ed22aec8
commit 52ef76de21

View file

@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
// Split the condition into small conditions by `And`, so that we can push down part of this
// condition without nondeterministic expressions.
val andConditions = splitConjunctivePredicates(condition)
val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))
val (deterministic, nondeterministic) = andConditions.partition(_.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a)
}.forall(_.deterministic))
// If there is no nondeterministic conditions, push down the whole condition.
if (nondeterministicConditions.isEmpty) {
if (nondeterministic.isEmpty) {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
} else {
// If they are all nondeterministic conditions, leave it un-changed.
if (nondeterministicConditions.length == andConditions.length) {
if (deterministic.isEmpty) {
filter
} else {
val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
// Push down the small conditions without nondeterministic expressions.
val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministicConditions.reduce(And),
val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
Filter(nondeterministic.reduce(And),
project.copy(child = Filter(pushedCondition, grandChild)))
}
}
}
private def hasNondeterministic(
condition: Expression,
sourceAliases: AttributeMap[Expression]) = {
condition.collect {
case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
}.exists(!_.deterministic)
}
// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {