[SPARK-32431][SQL] Check duplicate nested columns in read from in-built datasources

### What changes were proposed in this pull request?
When `spark.sql.caseSensitive` is `false` (by default), check that there are not duplicate column names on the same level (top level or nested levels) in reading from in-built datasources Parquet, ORC, Avro and JSON. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema:
```

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error when `spark.sql.caseSensitive` is `false`:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`
```

Checking of top-level duplicates was introduced by https://github.com/apache/spark/pull/17758.

### Does this PR introduce _any_ user-facing change?
Yes. For the example from SPARK-32431:

ORC:
```scala
java.io.IOException: Error reading file: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-c02c2f9a-0cdc-4859-94fc-b9c809ca58b1/part-00001-63e8c3f0-7131-4ec9-be02-30b3fdd276f4-c000.snappy.orc
	at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1329)
	at org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:78)
...
Caused by: java.io.EOFException: Read past end of RLE integer from compressed stream Stream for column 3 kind DATA position: 6 length: 6 range: 0 offset: 12 limit: 12 range 0 = 0 to 6 uncompressed: 3 to 3
	at org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:61)
	at org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
```

JSON:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

Parquet:
```scala
+------------+
|StructColumn|
+------------+
|     [0,, 1]|
+------------+
```

Avro:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

After the changes, Parquet, ORC, JSON and Avro output the same error:
```scala
Found duplicate column(s) in the data schema: `camelcase`;
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`;
	at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:112)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:51)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:67)
```

### How was this patch tested?
Run modified test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.FileBasedDataSourceSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.*"
```
and added new UT to `SchemaUtilsSuite`.

Closes #29234 from MaxGekk/nested-case-insensitive-column.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2020-07-30 06:05:55 +00:00 committed by Wenchen Fan
parent 81b0785fb2
commit 99a855575c
7 changed files with 152 additions and 12 deletions

View file

@ -29,6 +29,8 @@ license: |
- In Spark 3.1, SQL UI data adopts the `formatted` mode for the query plan explain results. To restore the behavior before Spark 3.0, you can set `spark.sql.ui.explainMode` to `extended`.
- In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`.
- In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception `org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema` in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config `spark.sql.caseSensitive` while detecting column name duplicates.
## Upgrading from Spark SQL 3.0 to 3.0.1

View file

@ -50,9 +50,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.util.Utils
abstract class AvroSuite extends QueryTest with SharedSparkSession {
abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDataSourceSuiteBase {
import testImplicits._
override val nestedDataSources = Seq("avro")
val episodesAvro = testFile("episodes.avro")
val testAvro = testFile("test.avro")

View file

@ -41,8 +41,38 @@ private[spark] object SchemaUtils {
* @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
*/
def checkSchemaColumnNameDuplication(
schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis)
schema: DataType,
colType: String,
caseSensitiveAnalysis: Boolean = false): Unit = {
schema match {
case ArrayType(elementType, _) =>
checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis)
case MapType(keyType, valueType, _) =>
checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis)
checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis)
case structType: StructType =>
val fields = structType.fields
checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis)
fields.foreach { field =>
checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis)
}
case _ =>
}
}
/**
* Checks if an input schema has duplicate column names. This throws an exception if the
* duplication exists.
*
* @param schema schema to check
* @param colType column type name, used in an exception message
* @param resolver resolver used to determine if two identifiers are equal
*/
def checkSchemaColumnNameDuplication(
schema: StructType,
colType: String,
resolver: Resolver): Unit = {
checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver))
}
// Returns true if a given resolver is case-sensitive

View file

@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructType}
class SchemaUtilsSuite extends SparkFunSuite {
@ -82,4 +82,28 @@ class SchemaUtilsSuite extends SparkFunSuite {
checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false)
}
test("SPARK-32431: duplicated fields in nested schemas") {
val schemaA = new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType)
val schemaB = new StructType()
.add("f1", LongType)
.add("StructColumn1", schemaA)
val schemaC = new StructType()
.add("f2", LongType)
.add("StructColumn2", schemaB)
val schemaD = new StructType()
.add("f3", ArrayType(schemaC))
val schemaE = MapType(LongType, schemaD)
val schemaF = MapType(schemaD, LongType)
Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema =>
val msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false)
}.getMessage
assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`"))
}
}
}

View file

@ -421,18 +421,18 @@ case class DataSource(
relation match {
case hs: HadoopFsRelation =>
SchemaUtils.checkColumnNameDuplication(
hs.dataSchema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
hs.dataSchema,
"in the data schema",
equality)
SchemaUtils.checkColumnNameDuplication(
hs.partitionSchema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
hs.partitionSchema,
"in the partition schema",
equality)
DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema)
case _ =>
SchemaUtils.checkColumnNameDuplication(
relation.schema.map(_.name),
SchemaUtils.checkSchemaColumnNameDuplication(
relation.schema,
"in the data schema",
equality)
}

View file

@ -79,7 +79,7 @@ abstract class FileTable(
override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames,
SchemaUtils.checkSchemaColumnNameDuplication(dataSchema,
"in the data schema", caseSensitive)
dataSchema.foreach { field =>
if (!supportsDataType(field.dataType)) {
@ -88,7 +88,7 @@ abstract class FileTable(
}
}
val partitionSchema = fileIndex.partitionSchema
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema,
"in the partition schema", caseSensitive)
val partitionNameSet: Set[String] =
partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet

View file

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StructType}
// Datasource tests for nested schemas
trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession {
protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json")
test("SPARK-32431: consistent error for nested and top-level duplicate columns") {
Seq(
Seq("id AS lowercase", "id + 1 AS camelCase") ->
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType),
Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") ->
new StructType().add("StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType)
.add("CamelCase", LongType))
).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
nestedDataSources.map { format =>
withClue(s"format = $format select = ${selectExpr.mkString(",")}") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.range(1L)
.selectExpr(selectExpr: _*)
.write.mode("overwrite")
.format(format)
.save(path)
val e = intercept[AnalysisException] {
spark
.read
.schema(caseInsensitiveSchema)
.format(format)
.load(path)
.show
}
assert(e.getMessage.contains(
"Found duplicate column(s) in the data schema: `camelcase`"))
}
}
}
}
}
}
}
class NestedDataSourceV1Suite extends NestedDataSourceSuiteBase {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, nestedDataSources.mkString(","))
}
class NestedDataSourceV2Suite extends NestedDataSourceSuiteBase {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
}