[SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check duplicates for the user specified columns

### What changes were proposed in this pull request?

Currently, v2 ALTER TABLE ADD COLUMNS does not check duplicates for the user specified columns. For example,
```
spark.sql(s"CREATE TABLE $t (id int) USING $v2Format")
spark.sql("ALTER TABLE $t ADD COLUMNS (data string, data string)")
```
doesn't fail the analysis, and it's up to the catalog implementation to handle it. For v1 command, the duplication is checked before invoking the catalog.

### Why are the changes needed?

To check the duplicate columns during analysis and be consistent with v1 command.

### Does this PR introduce _any_ user-facing change?

Yes, now the above will command will print out the fllowing:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the user specified columns: `data`
```

### How was this patch tested?

Added new unit tests

Closes #33600 from imback82/alter_add_duplicate_columns.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 3b713e7f61)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2021-08-02 17:54:50 +08:00 committed by Wenchen Fan
parent a9b32b390c
commit 87ae397897
3 changed files with 57 additions and 3 deletions

View file

@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionM
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
/**
* Throws user facing errors when passed invalid queries that fail to analyze.
@ -951,6 +952,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
colsToAdd.foreach { colToAdd =>
checkColumnNotExists("add", colToAdd.name, table.schema)
}
SchemaUtils.checkColumnNameDuplication(
colsToAdd.map(_.name.quoted),
"in the user specified columns",
alter.conf.resolver)
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists("rename", col.path :+ newName, table.schema)

View file

@ -384,6 +384,29 @@ trait AlterTableTests extends SharedSparkSession {
}
}
test("SPARK-36372: Adding duplicate columns should not be allowed") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
sql(s"CREATE TABLE $t (id int) USING $v2Format")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data string)")
}
assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`"))
}
}
test("SPARK-36372: Adding duplicate nested columns should not be allowed") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
sql(s"CREATE TABLE $t (id int, point struct<x: double, y: double>) USING $v2Format")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, point.xx double)")
}
assert(e.message.contains(
"Found duplicate column(s) in the user specified columns: `point.z`"))
}
}
test("AlterTable: update column type int -> long") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {

View file

@ -179,7 +179,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
Some(UnresolvedFieldPosition(ColumnPosition.after("id")))),
QualifiedColType(
None,
"x",
"y",
LongType,
true,
None,
@ -227,6 +227,28 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
)
}
test("SPARK-36372: Adding duplicate columns should not be allowed") {
alterTableTest(
AlterTableAddColumns(
table,
Seq(QualifiedColType(
Some(UnresolvedFieldName(Seq("point"))),
"z",
LongType,
true,
None,
None),
QualifiedColType(
Some(UnresolvedFieldName(Seq("point"))),
"Z",
LongType,
true,
None,
None))),
Seq("Found duplicate column(s) in the user specified columns: `point.z`"),
expectErrorOnCaseSensitive = false)
}
test("AlterTable: drop column resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
@ -272,10 +294,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
}
}
private def alterTableTest(alter: AlterTableColumnCommand, error: Seq[String]): Unit = {
private def alterTableTest(
alter: AlterTableColumnCommand,
error: Seq[String],
expectErrorOnCaseSensitive: Boolean = true): Unit = {
Seq(true, false).foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
if (caseSensitive) {
val expectError = if (expectErrorOnCaseSensitive) caseSensitive else !caseSensitive
if (expectError) {
assertAnalysisError(alter, error, caseSensitive)
} else {
assertAnalysisSuccess(alter, caseSensitive)