diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 46d330e095..7c9c0a726c 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -29,6 +29,8 @@ license: | - In Spark 3.1, SQL UI data adopts the `formatted` mode for the query plan explain results. To restore the behavior before Spark 3.0, you can set `spark.sql.ui.explainMode` to `extended`. - In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`. + + - In Spark 3.1, the Parquet, ORC, Avro and JSON datasources throw the exception `org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema` in read if they detect duplicate names in top-level columns as well in nested structures. The datasources take into account the SQL config `spark.sql.caseSensitive` while detecting column name duplicates. ## Upgrading from Spark SQL 3.0 to 3.0.1 diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 46fe9b2c44..8a8a7681ab 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -50,9 +50,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.v2.avro.AvroScan import org.apache.spark.util.Utils -abstract class AvroSuite extends QueryTest with SharedSparkSession { +abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDataSourceSuiteBase { import testImplicits._ + override val nestedDataSources = Seq("avro") val episodesAvro = testFile("episodes.avro") val testAvro = testFile("test.avro") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 27b5eec272..c83cd52250 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -41,8 +41,38 @@ private[spark] object SchemaUtils { * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not */ def checkSchemaColumnNameDuplication( - schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = { - checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis) + schema: DataType, + colType: String, + caseSensitiveAnalysis: Boolean = false): Unit = { + schema match { + case ArrayType(elementType, _) => + checkSchemaColumnNameDuplication(elementType, colType, caseSensitiveAnalysis) + case MapType(keyType, valueType, _) => + checkSchemaColumnNameDuplication(keyType, colType, caseSensitiveAnalysis) + checkSchemaColumnNameDuplication(valueType, colType, caseSensitiveAnalysis) + case structType: StructType => + val fields = structType.fields + checkColumnNameDuplication(fields.map(_.name), colType, caseSensitiveAnalysis) + fields.foreach { field => + checkSchemaColumnNameDuplication(field.dataType, colType, caseSensitiveAnalysis) + } + case _ => + } + } + + /** + * Checks if an input schema has duplicate column names. This throws an exception if the + * duplication exists. + * + * @param schema schema to check + * @param colType column type name, used in an exception message + * @param resolver resolver used to determine if two identifiers are equal + */ + def checkSchemaColumnNameDuplication( + schema: StructType, + colType: String, + resolver: Resolver): Unit = { + checkSchemaColumnNameDuplication(schema, colType, isCaseSensitiveAnalysis(resolver)) } // Returns true if a given resolver is case-sensitive diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 2f576a4031..02ee634dba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{ArrayType, LongType, MapType, StructType} class SchemaUtilsSuite extends SparkFunSuite { @@ -82,4 +82,28 @@ class SchemaUtilsSuite extends SparkFunSuite { checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false) } + + test("SPARK-32431: duplicated fields in nested schemas") { + val schemaA = new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType) + val schemaB = new StructType() + .add("f1", LongType) + .add("StructColumn1", schemaA) + val schemaC = new StructType() + .add("f2", LongType) + .add("StructColumn2", schemaB) + val schemaD = new StructType() + .add("f3", ArrayType(schemaC)) + val schemaE = MapType(LongType, schemaD) + val schemaF = MapType(schemaD, LongType) + Seq(schemaA, schemaB, schemaC, schemaD, schemaE, schemaF).foreach { schema => + val msg = intercept[AnalysisException] { + SchemaUtils.checkSchemaColumnNameDuplication( + schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = false) + }.getMessage + assert(msg.contains("Found duplicate column(s) in SchemaUtilsSuite: `camelcase`")) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index db564485be..36e5eb33e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -421,18 +421,18 @@ case class DataSource( relation match { case hs: HadoopFsRelation => - SchemaUtils.checkColumnNameDuplication( - hs.dataSchema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + hs.dataSchema, "in the data schema", equality) - SchemaUtils.checkColumnNameDuplication( - hs.partitionSchema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + hs.partitionSchema, "in the partition schema", equality) DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema) case _ => - SchemaUtils.checkColumnNameDuplication( - relation.schema.map(_.name), + SchemaUtils.checkSchemaColumnNameDuplication( + relation.schema, "in the data schema", equality) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 59dc3ae56b..7bd05f1287 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -79,7 +79,7 @@ abstract class FileTable( override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames, + SchemaUtils.checkSchemaColumnNameDuplication(dataSchema, "in the data schema", caseSensitive) dataSchema.foreach { field => if (!supportsDataType(field.dataType)) { @@ -88,7 +88,7 @@ abstract class FileTable( } } val partitionSchema = fileIndex.partitionSchema - SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames, + SchemaUtils.checkSchemaColumnNameDuplication(partitionSchema, "in the partition schema", caseSensitive) val partitionNameSet: Set[String] = partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala new file mode 100644 index 0000000000..152d59b7b1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{LongType, StructType} + +// Datasource tests for nested schemas +trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { + protected val nestedDataSources: Seq[String] = Seq("orc", "parquet", "json") + + test("SPARK-32431: consistent error for nested and top-level duplicate columns") { + Seq( + Seq("id AS lowercase", "id + 1 AS camelCase") -> + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType), + Seq("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") -> + new StructType().add("StructColumn", + new StructType() + .add("LowerCase", LongType) + .add("camelcase", LongType) + .add("CamelCase", LongType)) + ).foreach { case (selectExpr: Seq[String], caseInsensitiveSchema: StructType) => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + nestedDataSources.map { format => + withClue(s"format = $format select = ${selectExpr.mkString(",")}") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .range(1L) + .selectExpr(selectExpr: _*) + .write.mode("overwrite") + .format(format) + .save(path) + val e = intercept[AnalysisException] { + spark + .read + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .show + } + assert(e.getMessage.contains( + "Found duplicate column(s) in the data schema: `camelcase`")) + } + } + } + } + } + } +} + +class NestedDataSourceV1Suite extends NestedDataSourceSuiteBase { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, nestedDataSources.mkString(",")) +} + +class NestedDataSourceV2Suite extends NestedDataSourceSuiteBase { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") +}