diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 37c7229a2c..fc12f48ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -144,13 +144,15 @@ object ScanOperation extends OperationHelper with PredicateHelper { case Filter(condition, child) => collectProjectsAndFilters(child) match { case Some((fields, filters, other, aliases)) => - // Follow CombineFilters and only keep going if 1) the collected Filters - // and this filter are all deterministic or 2) if this filter is the first - // collected filter and doesn't have common non-deterministic expressions - // with lower Project. + // When collecting projects and filters, we effectively push down filters through + // projects. We need to meet the following conditions to do so: + // 1) no Project collected so far or the collected Projects are all deterministic + // 2) the collected filters and this filter are all deterministic, or this is the + // first collected filter. + val canCombineFilters = fields.forall(_.forall(_.deterministic)) && { + filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic) + } val substitutedCondition = substitute(aliases)(condition) - val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) && - substitutedCondition.deterministic) || filters.isEmpty if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) { Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala index 1290f77034..b1baeccbe9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala @@ -71,31 +71,19 @@ class ScanOperationSuite extends SparkFunSuite { } } - test("Filter which has the same non-deterministic expression with its child Project") { - val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR), relation)) + test("Filter with non-deterministic Project") { + val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation)) assert(ScanOperation.unapply(filter1).isEmpty) } - test("Deterministic filter with a child Project with a non-deterministic expression") { - val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation)) - filter2 match { - case ScanOperation(projects, filters, _: LocalRelation) => - assert(projects.size === 2) - assert(projects(0) === colA) - assert(projects(1) === aliasR) - assert(filters.size === 1) - case _ => assert(false) - } - } - - test("Filter which has different non-deterministic expressions with its child Project") { + test("Non-deterministic Filter with deterministic Project") { val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)), - Project(Seq(colA, aliasR), relation)) + Project(Seq(colA, colB), relation)) filter3 match { case ScanOperation(projects, filters, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colA) - assert(projects(1) === aliasR) + assert(projects(1) === colB) assert(filters.size === 1) case _ => assert(false) }