[SPARK-25175][SQL] Field resolution should fail if there is ambiguity for ORC native data source table persisted in metastore

## What changes were proposed in this pull request?
Apache Spark doesn't create Hive table with duplicated fields in both case-sensitive and case-insensitive mode. However, if Spark creates ORC files in case-sensitive mode first and create Hive table on that location, where it's created. In this situation, field resolution should fail in case-insensitive mode. Otherwise, we don't know which columns will be returned or filtered. Previously, SPARK-25132 fixed the same issue in Parquet.

Here is a simple example:

```
val data = spark.range(5).selectExpr("id as a", "id * 2 as A")
spark.conf.set("spark.sql.caseSensitive", true)
data.write.format("orc").mode("overwrite").save("/user/hive/warehouse/orc_data")

sql("CREATE TABLE orc_data_source (A LONG) USING orc LOCATION '/user/hive/warehouse/orc_data'")
spark.conf.set("spark.sql.caseSensitive", false)
sql("select A from orc_data_source").show
+---+
|  A|
+---+
|  3|
|  2|
|  4|
|  1|
|  0|
+---+
```

See #22148 for more details about parquet data source reader.

## How was this patch tested?
Unit tests added.

Closes #22262 from seancxmao/SPARK-25175.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
seancxmao 2018-09-09 19:22:47 -07:00 committed by Dongjoon Hyun
parent 77c996403d
commit a0aed475c5
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
2 changed files with 63 additions and 39 deletions

View file

@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources.orc
import java.util.Locale
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
@ -27,7 +29,7 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._
@ -116,8 +118,29 @@ object OrcUtils extends Logging {
}
})
} else {
val resolver = if (isCaseSensitive) caseSensitiveResolution else caseInsensitiveResolution
Some(requiredSchema.fieldNames.map { name => orcFieldNames.indexWhere(resolver(_, name)) })
if (isCaseSensitive) {
Some(requiredSchema.fieldNames.map { name =>
orcFieldNames.indexWhere(caseSensitiveResolution(_, name))
})
} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveOrcFieldMap =
orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase(Locale.ROOT))
Some(requiredSchema.fieldNames.map { requiredFieldName =>
caseInsensitiveOrcFieldMap
.get(requiredFieldName.toLowerCase(Locale.ROOT))
.map { matchedOrcFields =>
if (matchedOrcFields.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched.
val matchedOrcFieldsString = matchedOrcFields.map(_._1).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """
+ s"$matchedOrcFieldsString in case-insensitive mode")
} else {
matchedOrcFields.head._2
}
}.getOrElse(-1)
})
}
}
}
}

View file

@ -434,44 +434,45 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") {
withTempDir { dir =>
val format = "parquet"
val tableDir = dir.getCanonicalPath + s"/$format"
val tableName = s"spark_25132_${format}"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
Seq("parquet", "orc").foreach { format =>
test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
withTempDir { dir =>
val tableName = s"spark_25132_${format}_native"
val tableDir = dir.getCanonicalPath + s"/$tableName"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
}
}
}
}