[SPARK-25132][SQL] Case-insensitive field resolution when reading from Parquet

## What changes were proposed in this pull request?
Spark SQL returns NULL for a column whose Hive metastore schema and Parquet schema are in different letter cases, regardless of spark.sql.caseSensitive set to true or false. This PR aims to add case-insensitive field resolution for ParquetFileFormat.
* Do case-insensitive resolution only if Spark is in case-insensitive mode.
* Field resolution should fail if there is ambiguity, i.e. more than one field is matched.

## How was this patch tested?
Unit tests added.

Closes #22148 from seancxmao/SPARK-25132-Parquet.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
seancxmao 2018-08-21 10:34:23 +08:00 committed by hyukjinkwon
parent b461acb2d9
commit f984ec75ed
4 changed files with 161 additions and 30 deletions

View file

@ -310,6 +310,9 @@ class ParquetFileFormat
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap, TimeZone}
import java.util.{Locale, Map => JMap, TimeZone}
import scala.collection.JavaConverters._
@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@ -71,8 +72,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
StructType.fromString(schemaString)
}
val parquetRequestedSchema =
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema, catalystRequestedSchema, caseSensitive)
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
@ -117,8 +120,12 @@ private[parquet] object ParquetReadSupport {
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
def clipParquetSchema(
parquetSchema: MessageType,
catalystSchema: StructType,
caseSensitive: Boolean = true): MessageType = {
val clippedParquetFields = clipParquetGroupFields(
parquetSchema.asGroupType(), catalystSchema, caseSensitive)
if (clippedParquetFields.isEmpty) {
ParquetSchemaConverter.EMPTY_MESSAGE
} else {
@ -129,20 +136,21 @@ private[parquet] object ParquetReadSupport {
}
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
private def clipParquetType(
parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive)
case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive)
case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
@ -168,14 +176,15 @@ private[parquet] object ParquetReadSupport {
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
* [[StructType]].
*/
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
private def clipParquetListType(
parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = {
// Precondition of this method, should only be called for lists with nested element types.
assert(!isPrimitiveCatalystType(elementType))
// Unannotated repeated group should be interpreted as required list of required element, so
// list element type is just the group itself. Clip it.
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
clipParquetType(parquetList, elementType)
clipParquetType(parquetList, elementType, caseSensitive)
} else {
assert(
parquetList.getOriginalType == OriginalType.LIST,
@ -207,7 +216,7 @@ private[parquet] object ParquetReadSupport {
Types
.buildGroup(parquetList.getRepetition)
.as(OriginalType.LIST)
.addField(clipParquetType(repeatedGroup, elementType))
.addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
.named(parquetList.getName)
} else {
// Otherwise, the repeated field's type is the element type with the repeated field's
@ -218,7 +227,7 @@ private[parquet] object ParquetReadSupport {
.addField(
Types
.repeatedGroup()
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive))
.named(repeatedGroup.getName))
.named(parquetList.getName)
}
@ -231,7 +240,10 @@ private[parquet] object ParquetReadSupport {
* a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
parquetMap: GroupType,
keyType: DataType,
valueType: DataType,
caseSensitive: Boolean): GroupType = {
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
@ -243,8 +255,8 @@ private[parquet] object ParquetReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
.addField(clipParquetType(parquetValueType, valueType, caseSensitive))
.named(repeatedGroup.getName)
Types
@ -262,8 +274,9 @@ private[parquet] object ParquetReadSupport {
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
* pruning.
*/
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
private def clipParquetGroup(
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive)
Types
.buildGroup(parquetRecord.getRepetition)
.as(parquetRecord.getOriginalType)
@ -277,14 +290,35 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType))
.getOrElse(toParquet.convertField(f))
if (caseSensitive) {
val caseSensitiveParquetFieldMap =
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
structType.map { f =>
caseSensitiveParquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType, caseSensitive))
.getOrElse(toParquet.convertField(f))
}
} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveParquetFieldMap =
parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
structType.map { f =>
caseInsensitiveParquetFieldMap
.get(f.name.toLowerCase(Locale.ROOT))
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
s"$parquetTypesString in case-insensitive mode")
} else {
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
}
}.getOrElse(toParquet.convertField(f))
}
}
}

View file

@ -430,6 +430,49 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}
test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") {
withTempDir { dir =>
val format = "parquet"
val tableDir = dir.getCanonicalPath + s"/$format"
val tableName = s"spark_25132_${format}"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
}
}
}
}
}
object TestingUDT {

View file

@ -1014,19 +1014,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String): Unit = {
expectedSchema: String,
caseSensitive: Boolean = true): Unit = {
testSchemaClipping(testName, parquetSchema, catalystSchema,
MessageTypeParser.parseMessageType(expectedSchema))
MessageTypeParser.parseMessageType(expectedSchema), caseSensitive)
}
private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType): Unit = {
expectedSchema: MessageType,
caseSensitive: Boolean): Unit = {
test(s"Clipping - $testName") {
val actual = ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive)
try {
expectedSchema.checkContains(actual)
@ -1387,7 +1389,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE,
caseSensitive = true)
testSchemaClipping(
"disjoint field sets",
@ -1544,4 +1547,52 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin)
testSchemaClipping(
"case-insensitive resolution: no ambiguity",
parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,
catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
},
expectedSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,
caseSensitive = false)
test("Clipping - case-insensitive resolution: more than one field is matched") {
val parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
| optional int32 a;
|}
""".stripMargin
val catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
}
assertThrows[RuntimeException] {
ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false)
}
}
}