[SPARK-18966][SQL] NOT IN subquery with correlated expressions may return incorrect result

## What changes were proposed in this pull request?

This PR fixes the following problem:
````
Seq((1, 2)).toDF("a1", "a2").createOrReplaceTempView("a")
Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("b1", "b2").createOrReplaceTempView("b")

// The expected result is 1 row of (1,2) as shown in the next statement.
sql("select * from a where a1 not in (select b1 from b where b2 = a2)").show
+---+---+
| a1| a2|
+---+---+
+---+---+

sql("select * from a where a1 not in (select b1 from b where b2 = 2)").show
+---+---+
| a1| a2|
+---+---+
|  1|  2|
+---+---+
````
There are a number of scenarios to consider:

1. When the correlated predicate yields a match (i.e., B.B2 = A.A2)
1.1. When the NOT IN expression yields a match (i.e., A.A1 = B.B1)
1.2. When the NOT IN expression yields no match (i.e., A.A1 = B.B1 returns false)
1.3. When A.A1 is null
1.4. When B.B1 is null
1.4.1. When A.A1 is not null
1.4.2. When A.A1 is null

2. When the correlated predicate yields no match (i.e.,B.B2 = A.A2 is false or unknown)
2.1. When B.B2 is null and A.A2 is null
2.2. When B.B2 is null and A.A2 is not null
2.3. When the value of A.A2 does not match any of B.B2

````
 A.A1   A.A2      B.B1   B.B2
-----  -----     -----  -----
    1      1         1      1    (1.1)
    2      1                     (1.2)
 null      1                     (1.3)

    1      3      null      3    (1.4.1)
 null      3                     (1.4.2)

    1   null         1   null    (2.1)
 null      2                     (2.2 & 2.3)
````

We can divide the evaluation of the above correlated NOT IN subquery into 2 groups:-

Group 1: The rows in A when there is a match from the correlated predicate (A.A1 = B.B1)

In this case, the result of the subquery is not empty and the semantics of the NOT IN depends solely on the evaluation of the equality comparison of the columns of NOT IN, i.e., A1 = B1, which says

- If A.A1 is null, the row is filtered (1.3 and 1.4.2)
- If A.A1 = B.B1, the row is filtered (1.1)
- If B.B1 is null, any rows of A in the same group (A.A2 = B.B2) is filtered (1.4.1 & 1.4.2)
- Otherwise, the row is qualified.

Hence, in this group, the result is the row from (1.2).

Group 2: The rows in A when there is no match from the correlated predicate (A.A2 = B.B2)

In this case, all the rows in A, including the rows where A.A1, are qualified because the subquery returns an empty set and by the semantics of the NOT IN, all rows from the parent side qualifies as the result set, that is, the rows from (2.1, 2.2 and 2.3).

In conclusion, the correct result set of the above query is
````
 A.A1   A.A2
-----  -----
    2      1    (1.2)
    1   null    (2.1)
 null      2    (2.2 & 2.3)
````
## How was this patch tested?
unit tests, regression tests, and new test cases focusing on the problem being fixed.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #17294 from nsyca/18966.
This commit is contained in:
Nattavut Sutyanyong 2017-03-14 20:34:59 +01:00 committed by Herman van Hovell
parent e04c05cf41
commit 6eac96823c
3 changed files with 82 additions and 5 deletions

View file

@ -80,14 +80,19 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// Note that will almost certainly be planned as a Broadcast Nested Loop join.
// Use EXISTS if performance matters to you.
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p)
// Expand the NOT IN expression with the NULL-aware semantic
// to its full form. That is from:
// (a1,b1,...) = (a2,b2,...)
// (a1,a2,...) = (b1,b2,...)
// to
// (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ...
// (a1=b1 OR isnull(a1=b1)) AND (a2=b2 OR isnull(a2=b2)) AND ...
val joinConds = splitConjunctivePredicates(joinCond.get)
val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And)
// After that, add back the correlated join predicate(s) in the subquery
// Example:
// SELECT ... FROM A WHERE A.A1 NOT IN (SELECT B.B1 FROM B WHERE B.B2 = A.A2 AND B.B3 > 1)
// will have the final conditions in the LEFT ANTI as
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2)
val pairs = (joinConds.map(c => Or(c, IsNull(c))) ++ conditions).reduceLeft(And)
Join(outerPlan, sub, LeftAnti, Option(pairs))
case (p, predicate) =>
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)

View file

@ -109,4 +109,28 @@ FROM t1
WHERE t1a NOT IN (SELECT t2a
FROM t2);
-- DDLs
create temporary view a as select * from values
(1, 1), (2, 1), (null, 1), (1, 3), (null, 3), (1, null), (null, 2)
as a(a1, a2);
create temporary view b as select * from values
(1, 1, 2), (null, 3, 2), (1, null, 2), (1, 2, null)
as b(b1, b2, b3);
-- TC 02.01
SELECT a1, a2
FROM a
WHERE a1 NOT IN (SELECT b.b1
FROM b
WHERE a.a2 = b.b2)
;
-- TC 02.02
SELECT a1, a2
FROM a
WHERE a1 NOT IN (SELECT b.b1
FROM b
WHERE a.a2 = b.b2
AND b.b3 > 1)
;

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 10
-- Number of queries: 14
-- !query 0
@ -174,3 +174,51 @@ t1a 6 2014-04-04 01:02:00.001
t1d 10 2015-05-04 01:01:00
t1d NULL 2014-06-04 01:01:00
t1d NULL 2014-07-04 01:02:00.001
-- !query 10
create temporary view a as select * from values
(1, 1), (2, 1), (null, 1), (1, 3), (null, 3), (1, null), (null, 2)
as a(a1, a2)
-- !query 10 schema
struct<>
-- !query 10 output
-- !query 11
create temporary view b as select * from values
(1, 1, 2), (null, 3, 2), (1, null, 2), (1, 2, null)
as b(b1, b2, b3)
-- !query 11 schema
struct<>
-- !query 11 output
-- !query 12
SELECT a1, a2
FROM a
WHERE a1 NOT IN (SELECT b.b1
FROM b
WHERE a.a2 = b.b2)
-- !query 12 schema
struct<a1:int,a2:int>
-- !query 12 output
1 NULL
2 1
-- !query 13
SELECT a1, a2
FROM a
WHERE a1 NOT IN (SELECT b.b1
FROM b
WHERE a.a2 = b.b2
AND b.b3 > 1)
-- !query 13 schema
struct<a1:int,a2:int>
-- !query 13 output
1 NULL
2 1
NULL 2