[SPARK-36783][SQL] ScanOperation should not push Filter through nondeterministic Project

### What changes were proposed in this pull request?

`ScanOperation` collects adjacent Projects and Filters. The caller side always assume that the collected Filters should run before collected Projects, which means `ScanOperation` effectively pushes Filter through Project.

Following `PushPredicateThroughNonJoin`, we should not push Filter through nondeterministic Project. This PR fixes `ScanOperation` to follow this rule.

### Why are the changes needed?

Fix a bug that violates the semantic of nondeterministic expressions.

### Does this PR introduce _any_ user-facing change?

Most likely no change, but in some cases, this is a correctness bug fix which changes the query result.

### How was this patch tested?

existing tests

Closes #34023 from cloud-fan/scan.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dfd5237c0c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2021-09-17 10:51:15 +08:00
parent 3502fda783
commit c1bfe1a5c4
2 changed files with 13 additions and 23 deletions

View file

@ -144,13 +144,15 @@ object ScanOperation extends OperationHelper with PredicateHelper {
case Filter(condition, child) =>
collectProjectsAndFilters(child) match {
case Some((fields, filters, other, aliases)) =>
// Follow CombineFilters and only keep going if 1) the collected Filters
// and this filter are all deterministic or 2) if this filter is the first
// collected filter and doesn't have common non-deterministic expressions
// with lower Project.
// When collecting projects and filters, we effectively push down filters through
// projects. We need to meet the following conditions to do so:
// 1) no Project collected so far or the collected Projects are all deterministic
// 2) the collected filters and this filter are all deterministic, or this is the
// first collected filter.
val canCombineFilters = fields.forall(_.forall(_.deterministic)) && {
filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic)
}
val substitutedCondition = substitute(aliases)(condition)
val canCombineFilters = (filters.nonEmpty && filters.forall(_.deterministic) &&
substitutedCondition.deterministic) || filters.isEmpty
if (canCombineFilters && !hasCommonNonDeterministic(Seq(condition), aliases)) {
Some((fields, filters ++ splitConjunctivePredicates(substitutedCondition),
other, aliases))

View file

@ -71,31 +71,19 @@ class ScanOperationSuite extends SparkFunSuite {
}
}
test("Filter which has the same non-deterministic expression with its child Project") {
val filter1 = Filter(EqualTo(colR, Literal(1)), Project(Seq(colA, aliasR), relation))
test("Filter with non-deterministic Project") {
val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation))
assert(ScanOperation.unapply(filter1).isEmpty)
}
test("Deterministic filter with a child Project with a non-deterministic expression") {
val filter2 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation))
filter2 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === aliasR)
assert(filters.size === 1)
case _ => assert(false)
}
}
test("Filter which has different non-deterministic expressions with its child Project") {
test("Non-deterministic Filter with deterministic Project") {
val filter3 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)),
Project(Seq(colA, aliasR), relation))
Project(Seq(colA, colB), relation))
filter3 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === aliasR)
assert(projects(1) === colB)
assert(filters.size === 1)
case _ => assert(false)
}