[SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
## What changes were proposed in this pull request? Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side. For example, the codes below: ```scala val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF() df.filter($"_1" === "true").explain(true) ``` shows it keeps `null` properly. ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- LocalRelation [_1#17] == Analyzed Logical Plan == _1: boolean Filter (cast(_1#17 as double) = cast(true as double)) +- LocalRelation [_1#17] == Optimized Logical Plan == Filter (isnotnull(_1#17) && null) +- LocalRelation [_1#17] == Physical Plan == *Filter (isnotnull(_1#17) && null) << Here `null` is there +- LocalTableScan [_1#17] ``` However, when we read it back from Parquet, ```scala val path = "/tmp/testfile" df.write.parquet(path) spark.read.parquet(path).filter($"_1" === "true").explain(true) ``` `null` is removed at the post-filter. ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- Relation[_1#11] parquet == Analyzed Logical Plan == _1: boolean Filter (cast(_1#11 as double) = cast(true as double)) +- Relation[_1#11] parquet == Optimized Logical Plan == Filter (isnotnull(_1#11) && null) +- Relation[_1#11] parquet == Physical Plan == *Project [_1#11] +- *Filter isnotnull(_1#11) << Here `null` is missing +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean> ``` This PR fixes it to keep it properly. In more details, ```scala val partitionKeyFilters = ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) ``` This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty which is always the subset of `partitionSet`. And then in ```scala val afterScanFilters = filterSet -- partitionKeyFilters ``` `null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too. After this PR, it becomes as below: ``` == Parsed Logical Plan == 'Filter ('_1 = true) +- Relation[_1#276] parquet == Analyzed Logical Plan == _1: boolean Filter (cast(_1#276 as double) = cast(true as double)) +- Relation[_1#276] parquet == Optimized Logical Plan == Filter (isnotnull(_1#276) && null) +- Relation[_1#276] parquet == Physical Plan == *Project [_1#276] +- *Filter (isnotnull(_1#276) && null) +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean> ``` ## How was this patch tested? Unit test in `FileSourceStrategySuite` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16184 from HyukjinKwon/SPARK-18753.
This commit is contained in:
parent
169b9d73ee
commit
89ae26dcdb
|
@ -86,7 +86,7 @@ object FileSourceStrategy extends Strategy with Logging {
|
||||||
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
|
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
|
||||||
|
|
||||||
// Predicates with both partition keys and attributes need to be evaluated after the scan.
|
// Predicates with both partition keys and attributes need to be evaluated after the scan.
|
||||||
val afterScanFilters = filterSet -- partitionKeyFilters
|
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
|
||||||
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
|
logInfo(s"Post-Scan Filters: ${afterScanFilters.mkString(",")}")
|
||||||
|
|
||||||
val filterAttributes = AttributeSet(afterScanFilters)
|
val filterAttributes = AttributeSet(afterScanFilters)
|
||||||
|
|
|
@ -476,6 +476,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("[SPARK-18753] keep pushed-down null literal as a filter in Spark-side post-filter") {
|
||||||
|
val ds = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDS()
|
||||||
|
withTempPath { p =>
|
||||||
|
val path = p.getAbsolutePath
|
||||||
|
ds.write.parquet(path)
|
||||||
|
val readBack = spark.read.parquet(path).filter($"_1" === "true")
|
||||||
|
val filtered = ds.filter($"_1" === "true").toDF()
|
||||||
|
checkAnswer(readBack, filtered)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Helpers for checking the arguments passed to the FileFormat.
|
// Helpers for checking the arguments passed to the FileFormat.
|
||||||
|
|
||||||
protected val checkPartitionSchema =
|
protected val checkPartitionSchema =
|
||||||
|
|
Loading…
Reference in a new issue