[SPARK-30162][SQL] Add PushedFilters to metadata in Parquet DSv2 implementation

### What changes were proposed in this pull request?

This PR proposes to add `PushedFilters` into metadata to show the pushed filters in Parquet DSv2 implementation. In case of ORC, it is already added at https://github.com/apache/spark/pull/24719/files#diff-0fc82694b20da3cd2cbb07206920eef7R62-R64

### Why are the changes needed?

In order for users to be able to debug, and to match with ORC.

### Does this PR introduce any user-facing change?

```scala
spark.range(10).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").filter("5 > id").explain()
```

**Before:**

```
== Physical Plan ==
*(1) Project [id#20L]
+- *(1) Filter (isnotnull(id#20L) AND (5 > id#20L))
   +- *(1) ColumnarToRow
      +- BatchScan[id#20L] ParquetScan Location: InMemoryFileIndex[file:/tmp/foo], ReadSchema: struct<id:bigint>
```

**After:**

```
== Physical Plan ==
*(1) Project [id#13L]
+- *(1) Filter (isnotnull(id#13L) AND (5 > id#13L))
   +- *(1) ColumnarToRow
      +- BatchScan[id#13L] ParquetScan Location: InMemoryFileIndex[file:/tmp/foo], ReadSchema: struct<id:bigint>, PushedFilters: [IsNotNull(id), LessThan(id,5)]
```

### How was this patch tested?
Unittest were added and manually tested.

Closes #26857 from HyukjinKwon/SPARK-30162.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
HyukjinKwon 2019-12-12 08:33:33 -08:00 committed by Dongjoon Hyun
parent fd39b6db34
commit cc087a3ac5
2 changed files with 18 additions and 8 deletions

View file

@ -87,4 +87,8 @@ case class ParquetScan(
}
override def hashCode(): Int = getClass.hashCode()
override def description(): String = {
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
}
}

View file

@ -150,15 +150,21 @@ class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest {
}
test("FileScan description") {
withTempPath { path =>
val dir = path.getCanonicalPath
spark.range(0, 10).write.orc(dir)
val df = spark.read.orc(dir)
Seq("json", "orc", "parquet").foreach { format =>
withTempPath { path =>
val dir = path.getCanonicalPath
spark.range(0, 10).write.format(format).save(dir)
val df = spark.read.format(format).load(dir)
assert(isIncluded(df.queryExecution, "ReadSchema"))
assert(isIncluded(df.queryExecution, "BatchScan"))
assert(isIncluded(df.queryExecution, "PushedFilters"))
assert(isIncluded(df.queryExecution, "Location"))
withClue(s"Source '$format':") {
assert(isIncluded(df.queryExecution, "ReadSchema"))
assert(isIncluded(df.queryExecution, "BatchScan"))
if (Seq("orc", "parquet").contains(format)) {
assert(isIncluded(df.queryExecution, "PushedFilters"))
}
assert(isIncluded(df.queryExecution, "Location"))
}
}
}
}
}