From c1bfe1a5c4c41283198f0f066060ffbd5d5bbd3e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 17 Sep 2021 10:51:15 +0800 Subject: [PATCH] [SPARK-36783][SQL] ScanOperation should not push Filter through nondeterministic Project ### What changes were proposed in this pull request? `ScanOperation` collects adjacent Projects and Filters. The caller side always assume that the collected Filters should run before collected Projects, which means `ScanOperation` effectively pushes Filter through Project. Following `PushPredicateThroughNonJoin`, we should not push Filter through nondeterministic Project. This PR fixes `ScanOperation` to follow this rule. ### Why are the changes needed? Fix a bug that violates the semantic of nondeterministic expressions. ### Does this PR introduce _any_ user-facing change? Most likely no change, but in some cases, this is a correctness bug fix which changes the query result. ### How was this patch tested? existing tests Closes #34023 from cloud-fan/scan. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit dfd5237c0cd6e3024032b371f0182d2af691af7d) Signed-off-by: Wenchen Fan --- .../sql/catalyst/planning/patterns.scala | 14 +++++++----- .../planning/ScanOperationSuite.scala | 22 +++++-------------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 37c7229a2c..fc12f48ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -144,13 +144,15 @@ object ScanOperation extends OperationHelper with PredicateHelper { case Filter(condition, child) => collectProjectsAndFilters(child) match { case Some((fields, filters, other, aliases)) => - // Follow CombineFilters and only keep going if 1) the collected Filters - // and this filter are all deterministic or 2) if this filter is the first - // collected filter and doesn't have common non-deterministic expressions - // with lower Project. + // When collecting projects and filters, we effectively push down filters through + // projects. We need to meet the following conditions to do so: + // 1) no Project collected so far or the collected Projects are all deterministic + // 2) the collected filters and this filter are all deterministic, or this is the + // first collected filter. + val canCombineFilters = fields.forall(_.forall(_.deterministic)) && { + filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic) + } val substitutedCondition = substitute(aliases)(condition) - val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) && - substitutedCondition.deterministic) || filters.isEmpty if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) { Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala index 1290f77034..b1baeccbe9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala @@ -71,31 +71,19 @@ class ScanOperationSuite extends SparkFunSuite { } } - test("Filter which has the same non-deterministic expression with its child Project") { - val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR), relation)) + test("Filter with non-deterministic Project") { + val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation)) assert(ScanOperation.unapply(filter1).isEmpty) } - test("Deterministic filter with a child Project with a non-deterministic expression") { - val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation)) - filter2 match { - case ScanOperation(projects, filters, _: LocalRelation) => - assert(projects.size === 2) - assert(projects(0) === colA) - assert(projects(1) === aliasR) - assert(filters.size === 1) - case _ => assert(false) - } - } - - test("Filter which has different non-deterministic expressions with its child Project") { + test("Non-deterministic Filter with deterministic Project") { val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)), - Project(Seq(colA, aliasR), relation)) + Project(Seq(colA, colB), relation)) filter3 match { case ScanOperation(projects, filters, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colA) - assert(projects(1) === aliasR) + assert(projects(1) === colB) assert(filters.size === 1) case _ => assert(false) }