[SPARK-20364][SQL] Disable Parquet predicate pushdown for fields having dots in the names

## What changes were proposed in this pull request?

This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with https://github.com/apache/spark/pull/17680.

The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case.

I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here.

This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best).

Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet.

**With dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
+--------+
```

**Without dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("coldots").write.parquet(path)
spark.read.parquet(path).where("`coldots` IS NOT NULL").show()
```

```
+-------+
|coldots|
+-------+
|      1|
+-------+
```

**After**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
|       1|
+--------+
```

## How was this patch tested?

Unit tests added in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18000 from HyukjinKwon/SPARK-20364-workaround.
This commit is contained in:
hyukjinkwon 2017-05-18 10:52:23 -07:00 committed by Xiao Li
parent 99452df44f
commit 8fb3d5c6da
2 changed files with 43 additions and 21 deletions

View file

@ -166,7 +166,14 @@ private[parquet] object ParquetFilters {
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema)
val nameToType = getFieldMap(schema)
// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")
// NOTE:
//
@ -184,30 +191,30 @@ private[parquet] object ParquetFilters {
// Probably I missed something and obviously this should be changed.
predicate match {
case sources.IsNull(name) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, null))
case sources.IsNotNull(name) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
case sources.IsNull(name) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, null))
case sources.IsNotNull(name) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, null))
case sources.EqualTo(name, value) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))
case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, value))
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))
case sources.LessThan(name, value) if dataTypeOf.contains(name) =>
makeLt.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThan(name, value) if canMakeFilterOn(name) =>
makeLt.lift(nameToType(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) =>
makeLtEq.lift(nameToType(name)).map(_(name, value))
case sources.GreaterThan(name, value) if dataTypeOf.contains(name) =>
makeGt.lift(dataTypeOf(name)).map(_(name, value))
case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.GreaterThan(name, value) if canMakeFilterOn(name) =>
makeGt.lift(nameToType(name)).map(_(name, value))
case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) =>
makeGtEq.lift(nameToType(name)).map(_(name, value))
case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the

View file

@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// scalastyle:on nonascii
}
}
test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") {
import testImplicits._
Seq(true, false).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) {
withTempPath { path =>
Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
val readBack = spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL")
assert(readBack.count() == 1)
}
}
}
}
}
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {