[SPARK-30826][SQL] Respect reference case in StringStartsWith pushed down to parquet

### What changes were proposed in this pull request?
In the PR, I propose to convert the attribute name of `StringStartsWith` pushed down to the Parquet datasource to column reference via the `nameToParquetField` map. Similar conversions are performed for other source filters pushed down to parquet.

### Why are the changes needed?
This fixes the bug described in [SPARK-30826](https://issues.apache.org/jira/browse/SPARK-30826). The query from an external table:
```sql
CREATE TABLE t1 (col STRING)
USING parquet
OPTIONS (path '$path')
```
created on top of written parquet files by `Seq("42").toDF("COL").write.parquet(path)` returns wrong empty result:
```scala
spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
+---+
|col|
+---+
+---+
```

### Does this PR introduce any user-facing change?
Yes. After the changes the result is correct for the example above:
```scala
spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'").show
+---+
|col|
+---+
| 42|
+---+
```

### How was this patch tested?
Added a test to `ParquetFilterSuite`

Closes #27574 from MaxGekk/parquet-StringStartsWith-case-sens.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2020-02-15 19:49:58 +08:00 committed by Wenchen Fan
parent d0f9614760
commit 8b73b92aad
2 changed files with 22 additions and 1 deletions

View file

@ -591,7 +591,7 @@ class ParquetFilters(
case sources.StringStartsWith(name, prefix)
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
Option(prefix).map { v =>
FilterApi.userDefined(binaryColumn(name),
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldName),
new UserDefinedPredicate[Binary] with Serializable {
private val strToBinary = Binary.fromReusedByteArray(v.getBytes)
private val size = strToBinary.length

View file

@ -1390,6 +1390,27 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
}
test("SPARK-30826: case insensitivity of StringStartsWith attribute") {
import testImplicits._
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTable("t1") {
withTempPath { dir =>
val path = dir.toURI.toString
Seq("42").toDF("COL").write.parquet(path)
spark.sql(
s"""
|CREATE TABLE t1 (col STRING)
|USING parquet
|OPTIONS (path '$path')
""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM t1 WHERE col LIKE '4%'"),
Row("42"))
}
}
}
}
}
class ParquetV1FilterSuite extends ParquetFilterSuite {