From 3b713e7f6189dfe1c5bbb1a527bf1266bde69f69 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 2 Aug 2021 17:54:50 +0800 Subject: [PATCH] [SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check duplicates for the user specified columns ### What changes were proposed in this pull request? Currently, v2 ALTER TABLE ADD COLUMNS does not check duplicates for the user specified columns. For example, ``` spark.sql(s"CREATE TABLE $t (id int) USING $v2Format") spark.sql("ALTER TABLE $t ADD COLUMNS (data string, data string)") ``` doesn't fail the analysis, and it's up to the catalog implementation to handle it. For v1 command, the duplication is checked before invoking the catalog. ### Why are the changes needed? To check the duplicate columns during analysis and be consistent with v1 command. ### Does this PR introduce _any_ user-facing change? Yes, now the above will command will print out the fllowing: ``` org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the user specified columns: `data` ``` ### How was this patch tested? Added new unit tests Closes #33600 from imback82/alter_add_duplicate_columns. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 5 +++ .../spark/sql/connector/AlterTableTests.scala | 23 +++++++++++++ .../V2CommandsCaseSensitivitySuite.scala | 32 +++++++++++++++++-- 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2d8ac6446e..77f721cddc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionM import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SchemaUtils /** * Throws user facing errors when passed invalid queries that fail to analyze. @@ -951,6 +952,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { colsToAdd.foreach { colToAdd => checkColumnNotExists("add", colToAdd.name, table.schema) } + SchemaUtils.checkColumnNameDuplication( + colsToAdd.map(_.name.quoted), + "in the user specified columns", + alter.conf.resolver) case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => checkColumnNotExists("rename", col.path :+ newName, table.schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 004a64ac69..1bd45f50f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -384,6 +384,29 @@ trait AlterTableTests extends SharedSparkSession { } } + test("SPARK-36372: Adding duplicate columns should not be allowed") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (id int) USING $v2Format") + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data string)") + } + assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`")) + } + } + + test("SPARK-36372: Adding duplicate nested columns should not be allowed") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (id int, point struct) USING $v2Format") + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, point.xx double)") + } + assert(e.message.contains( + "Found duplicate column(s) in the user specified columns: `point.z`")) + } + } + test("AlterTable: update column type int -> long") { val t = s"${catalogAndNamespace}table_name" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 6651576150..763cd6a82d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -179,7 +179,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes Some(UnresolvedFieldPosition(ColumnPosition.after("id")))), QualifiedColType( None, - "x", + "y", LongType, true, None, @@ -227,6 +227,28 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes ) } + test("SPARK-36372: Adding duplicate columns should not be allowed") { + alterTableTest( + AlterTableAddColumns( + table, + Seq(QualifiedColType( + Some(UnresolvedFieldName(Seq("point"))), + "z", + LongType, + true, + None, + None), + QualifiedColType( + Some(UnresolvedFieldName(Seq("point"))), + "Z", + LongType, + true, + None, + None))), + Seq("Found duplicate column(s) in the user specified columns: `point.z`"), + expectErrorOnCaseSensitive = false) + } + test("AlterTable: drop column resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( @@ -272,10 +294,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes } } - private def alterTableTest(alter: AlterTableColumnCommand, error: Seq[String]): Unit = { + private def alterTableTest( + alter: AlterTableColumnCommand, + error: Seq[String], + expectErrorOnCaseSensitive: Boolean = true): Unit = { Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - if (caseSensitive) { + val expectError = if (expectErrorOnCaseSensitive) caseSensitive else !caseSensitive + if (expectError) { assertAnalysisError(alter, error, caseSensitive) } else { assertAnalysisSuccess(alter, caseSensitive)