[SPARK-25368][SQL] Incorrect predicate pushdown returns wrong result
## What changes were proposed in this pull request? How to reproduce: ```scala val df1 = spark.createDataFrame(Seq( (1, 1) )).toDF("a", "b").withColumn("c", lit(null).cast("int")) val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull) df2.show +---+---+----+---+ | a| b| c| d| +---+---+----+---+ | 1| 1|null| 0| | 1| 1|null| 1| +---+---+----+---+ ``` `filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before https://github.com/apache/spark/pull/19201, but it is transformed to `(c#10 = null)` since https://github.com/apache/spark/pull/20155. This pr revert it to `(null <=> c#10)` to fix this issue. ## How was this patch tested? unit tests Closes #22368 from wangyum/SPARK-25368. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
parent
88a930dfab
commit
77c996403d
|
@ -159,7 +159,7 @@ abstract class UnaryNode extends LogicalPlan {
|
|||
var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
|
||||
projectList.foreach {
|
||||
case a @ Alias(l: Literal, _) =>
|
||||
allConstraints += EqualTo(a.toAttribute, l)
|
||||
allConstraints += EqualNullSafe(a.toAttribute, l)
|
||||
case a @ Alias(e, _) =>
|
||||
// For every alias in `projectList`, replace the reference in constraints by its attribute.
|
||||
allConstraints ++= allConstraints.map(_ transform {
|
||||
|
|
|
@ -196,7 +196,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
|
|||
|
||||
test("constraints should be inferred from aliased literals") {
|
||||
val originalLeft = testRelation.subquery('left).as("left")
|
||||
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a === 2).as("left")
|
||||
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a <=> 2).as("left")
|
||||
|
||||
val right = Project(Seq(Literal(2).as("two")), testRelation.subquery('right)).as("right")
|
||||
val condition = Some("left.a".attr === "right.two".attr)
|
||||
|
|
|
@ -2552,4 +2552,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-25368 Incorrect predicate pushdown returns wrong result") {
|
||||
def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = {
|
||||
val df1 = spark.createDataFrame(Seq(
|
||||
(1, 1)
|
||||
)).toDF("a", "b").withColumn("c", newCol)
|
||||
|
||||
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter(filter)
|
||||
checkAnswer(df2, result)
|
||||
}
|
||||
|
||||
check(lit(null).cast("int"), $"c".isNull, Seq(Row(1, 1, null, 0), Row(1, 1, null, 1)))
|
||||
check(lit(null).cast("int"), $"c".isNotNull, Seq())
|
||||
check(lit(2).cast("int"), $"c".isNull, Seq())
|
||||
check(lit(2).cast("int"), $"c".isNotNull, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
|
||||
check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
|
||||
check(lit(2).cast("int"), $"c" =!= 2, Seq())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue