spark-instrumented-optimizer/sql/hive
Nan Zhu 35133901f7 [SPARK-32351][SQL] Show partially pushed down partition filters in explain()
### What changes were proposed in this pull request?

Currently, actual non-dynamic partition pruning is executed in the optimizer phase (PruneFileSourcePartitions) if an input relation has a catalog file index. The current code assumes the same partition filters are generated again in FileSourceStrategy and passed into FileSourceScanExec. FileSourceScanExec uses the partition filters when listing files, but these non-dynamic partition filters do nothing because unnecessary partitions are already pruned in advance, so the filters are mainly used for explain output in this case. If a WHERE clause has DNF-ed predicates, FileSourceStrategy cannot extract the same filters with PruneFileSourcePartitions and then PartitionFilters is not shown in explain output.

This patch proposes to extract partition filters in FileSourceStrategy and HiveStrategy with `extractPredicatesWithinOutputSet` added in https://github.com/apache/spark/pull/29101/files#diff-6be42cfa3c62a7536b1eb1d6447c073c again, then It will show the partially pushed down partition filter in explain().

### Why are the changes needed?

without the patch, the explained plan is inconsistent with what is actually executed

<b>without the change </b> the explained plan of `"SELECT * FROM t WHERE p = '1' OR (p = '2' AND i = 1)"` for datasource and hive tables are like the following respectively (missing pushed down partition filters)

```
== Physical Plan ==
*(1) Filter ((p#21 = 1) OR ((p#21 = 2) AND (i#20 = 1)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/nanzhu/code/spark/sql/hive/target/tmp/hive_execution_test_group/war..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int>
```

```
   == Physical Plan ==
   *(1) Filter ((p#33 = 1) OR ((p#33 = 2) AND (i#32 = 1)))
   +- Scan hive default.t [i#32, p#33], HiveTableRelation [`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [i#32], Partition Cols: [p#33], Pruned Partitions: [(p=1), (p=2)]]
```

<b> with change </b> the  plan looks like (the actually executed partition filters are exhibited)

```
== Physical Plan ==
*(1) Filter ((p#21 = 1) OR ((p#21 = 2) AND (i#20 = 1)))
+- *(1) ColumnarToRow
   +- FileScan parquet default.t[i#20,p#21] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/nanzhu/code/spark/sql/hive/target/tmp/hive_execution_test_group/war..., PartitionFilters: [((p#21 = 1) OR (p#21 = 2))], PushedFilters: [], ReadSchema: struct<i:int>
```

```
== Physical Plan ==
*(1) Filter ((p#37 = 1) OR ((p#37 = 2) AND (i#36 = 1)))
+- Scan hive default.t [i#36, p#37], HiveTableRelation [`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [i#36], Partition Cols: [p#37], Pruned Partitions: [(p=1), (p=2)]], [((p#37 = 1) OR (p#37 = 2))]
```

### Does this PR introduce _any_ user-facing change

no

### How was this patch tested?
Unit test.

Closes #29831 from CodingCat/SPARK-32351.

Lead-authored-by: Nan Zhu <nanzhu@uber.com>
Co-authored-by: Nan Zhu <CodingCat@users.noreply.github.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-10-20 11:13:16 +09:00
..
benchmarks [SPARK-20202][BUILD][SQL] Remove references to org.spark-project.hive (Hive 1.2.1) 2020-10-05 15:29:56 -07:00
compatibility/src/test/scala/org/apache/spark/sql/hive/execution [SPARK-33082][SQL] Remove hive-1.2 workaround code 2020-10-07 12:27:23 -07:00
src [SPARK-32351][SQL] Show partially pushed down partition filters in explain() 2020-10-20 11:13:16 +09:00
pom.xml [SPARK-33107][BUILD][FOLLOW-UP] Remove com.twitter:parquet-hadoop-bundle:1.6.0 and orc.classifier 2020-10-11 21:54:56 -07:00