spark-instrumented-optimizer/sql/core
Takeshi Yamamuro 5197562afe [SPARK-21351][SQL] Update nullability based on children's output
## What changes were proposed in this pull request?
This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example:

```
scala> val df = Seq((Some(1), Some(2))).toDF("a", "b")
scala> val bIsNotNull = df.where($"b" =!= 2).select($"b")
scala> val targetQuery = bIsNotNull.distinct
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = true

scala> targetQuery.debugCodegen
Found 2 WholeStageCodegen subtrees.
== Subtree 1 / 2 ==
*HashAggregate(keys=[b#19], functions=[], output=[b#19])
+- Exchange hashpartitioning(b#19, 200)
   +- *HashAggregate(keys=[b#19], functions=[], output=[b#19])
      +- *Project [_2#16 AS b#19]
         +- *Filter isnotnull(_2#16)
            +- LocalTableScan [_1#15, _2#16]

Generated code:
...
/* 124 */   protected void processNext() throws java.io.IOException {
...
/* 132 */     // output the result
/* 133 */
/* 134 */     while (agg_mapIter.next()) {
/* 135 */       wholestagecodegen_numOutputRows.add(1);
/* 136 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 137 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 138 */
/* 139 */       boolean agg_isNull4 = agg_aggKey.isNullAt(0);
/* 140 */       int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0));
/* 141 */       agg_rowWriter1.zeroOutNullBytes();
/* 142 */
                // We don't need this NULL check because NULL is filtered out in `$"b" =!=2`
/* 143 */       if (agg_isNull4) {
/* 144 */         agg_rowWriter1.setNullAt(0);
/* 145 */       } else {
/* 146 */         agg_rowWriter1.write(0, agg_value4);
/* 147 */       }
/* 148 */       append(agg_result1);
/* 149 */
/* 150 */       if (shouldStop()) return;
/* 151 */     }
/* 152 */
/* 153 */     agg_mapIter.close();
/* 154 */     if (agg_sorter == null) {
/* 155 */       agg_hashMap.free();
/* 156 */     }
/* 157 */   }
/* 158 */
/* 159 */ }
```

In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`.
This pr could remove this NULL check;

```
scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
res5: Boolean = false

scala> targetQuery.debugCodegen
...
Generated code:
...
/* 144 */   protected void processNext() throws java.io.IOException {
...
/* 152 */     // output the result
/* 153 */
/* 154 */     while (agg_mapIter.next()) {
/* 155 */       wholestagecodegen_numOutputRows.add(1);
/* 156 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 157 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 158 */
/* 159 */       int agg_value4 = agg_aggKey.getInt(0);
/* 160 */       agg_rowWriter1.write(0, agg_value4);
/* 161 */       append(agg_result1);
/* 162 */
/* 163 */       if (shouldStop()) return;
/* 164 */     }
/* 165 */
/* 166 */     agg_mapIter.close();
/* 167 */     if (agg_sorter == null) {
/* 168 */       agg_hashMap.free();
/* 169 */     }
/* 170 */   }
```

## How was this patch tested?
Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #18576 from maropu/SPARK-21351.
2018-04-04 14:39:19 +08:00
..
benchmarks [SPARK-17335][SQL] Fix ArrayType and MapType CatalogString. 2016-09-03 19:02:20 +02:00
src [SPARK-21351][SQL] Update nullability based on children's output 2018-04-04 14:39:19 +08:00
pom.xml [SPARK-19550][BUILD][FOLLOW-UP] Remove MaxPermSize for sql module 2018-01-15 07:49:34 -06:00