[SPARK-24402][SQL] Optimize In expression when only one element in the collection or collection is empty

## What changes were proposed in this pull request?

Two new rules in the logical plan optimizers are added.

1. When there is only one element in the **`Collection`**, the
physical plan will be optimized to **`EqualTo`**, so predicate
pushdown can be used.

```scala
    profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true)
    """
      |== Physical Plan ==
      |*(1) Project [profileID#0]
      |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
      |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet,
      |     PartitionFilters: [],
      |     PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
      |     ReadSchema: struct<profileID:int>
    """.stripMargin
```

2. When the **`Collection`** is empty, and the input is nullable, the
logical plan will be simplified to

```scala
    profileDF.filter( $"profileID".isInCollection(Set())).explain(true)
    """
      |== Optimized Logical Plan ==
      |Filter if (isnull(profileID#0)) null else false
      |+- Relation[profileID#0] parquet
    """.stripMargin
```

TODO:

1. For multiple conditions with numbers less than certain thresholds,
we should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`**
when the numbers of the categories are low, and they are **`Int`**,
**`Long`**.
3. The default immutable hash trees set is slow for query, and we
should do benchmark for using different set implementation for faster
query.
4. **`filter(if (condition) null else false)`** can be optimized to false.

## How was this patch tested?

Couple new tests are added.

Author: DB Tsai <d_tsai@apple.com>

Closes #21442 from dbtsai/optimize-in.
This commit is contained in:
DB Tsai 2018-07-16 15:33:39 -07:00 committed by Xiao Li
parent ba437fc5c7
commit 0f0d1865f5
2 changed files with 41 additions and 4 deletions

View file

@ -218,15 +218,20 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] {
object OptimizeIn extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
case In(v, list) if list.isEmpty =>
// When v is not nullable, the following expression will be optimized
// to FalseLiteral which is tested in OptimizeInSuite.scala
If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) {
if (newList.length == 1 && !newList.isInstanceOf[ListQuery]) {
EqualTo(v, newList.head)
} else if (newList.length > SQLConf.get.optimizerInSetConversionThreshold) {
val hSet = newList.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
} else if (newList.size < list.size) {
} else if (newList.length < list.length) {
expr.copy(list = newList)
} else { // newList.length == list.length
} else { // newList.length == list.length && newList.length > 1
expr
}
}

View file

@ -176,6 +176,21 @@ class OptimizeInSuite extends PlanTest {
}
}
test("OptimizedIn test: one element in list gets transformed to EqualTo.") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1))))
.analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(EqualTo(UnresolvedAttribute("a"), Literal(1)))
.analyze
comparePlans(optimized, correctAnswer)
}
test("OptimizedIn test: In empty list gets transformed to FalseLiteral " +
"when value is not nullable") {
val originalQuery =
@ -191,4 +206,21 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
test("OptimizedIn test: In empty list gets transformed to `If` expression " +
"when value is nullable") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Nil))
.analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(If(IsNotNull(UnresolvedAttribute("a")),
Literal(false), Literal.create(null, BooleanType)))
.analyze
comparePlans(optimized, correctAnswer)
}
}