[SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filter contains the references that are not in the left output

## What changes were proposed in this pull request?
This PR is to fix the `ReplaceExceptWithFilter` rule when the right's Filter contains the references that are not in the left output.

Before this PR, we got the error like
```
java.util.NoSuchElementException: key not found: a
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
```

After this PR, `ReplaceExceptWithFilter ` will not take an effect in this case.

## How was this patch tested?
Added tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20444 from gatorsmile/fixReplaceExceptWithFilter.
This commit is contained in:
gatorsmile 2018-01-30 20:05:57 -08:00
parent 7786616733
commit ca04c3ff23
3 changed files with 36 additions and 4 deletions

View file

@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
}
plan.transform {
case Except(left, right) if isEligible(left, right) =>
Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
case e @ Except(left, right) if isEligible(left, right) =>
val newCondition = transformCondition(left, skipProject(right))
newCondition.map { c =>
Distinct(Filter(Not(c), left))
}.getOrElse {
e
}
}
}
private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap
filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
} else {
None
}
}
// TODO: This can be further extended in the future.

View file

@ -168,6 +168,21 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("replace Except with Filter when only right filter can be applied to the left") {
val table = LocalRelation(Seq('a.int, 'b.int))
val left = table.where('b < 1).select('a).as("left")
val right = table.where('b < 3).select('a).as("right")
val query = Except(left, right)
val optimized = Optimize.execute(query.analyze)
val correctAnswer =
Aggregate(left.output, right.output,
Join(left, right, LeftAnti, Option($"left.a" <=> $"right.a"))).analyze
comparePlans(optimized, correctAnswer)
}
test("replace Distinct with Aggregate") {
val input = LocalRelation('a.int, 'b.int)

View file

@ -589,6 +589,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Nil)
}
test("SPARK-23274: except between two projects without references used in filter") {
val df = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
val df1 = df.filter($"a" === 1)
val df2 = df.filter($"a" === 2)
checkAnswer(df1.select("b").except(df2.select("b")), Row(3) :: Nil)
checkAnswer(df1.select("b").except(df2.select("c")), Row(2) :: Nil)
}
test("except distinct - SQL compliance") {
val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
val df_right = Seq(1, 3).toDF("id")