[SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check duplicates for the user specified columns
### What changes were proposed in this pull request? Currently, v2 ALTER TABLE ADD COLUMNS does not check duplicates for the user specified columns. For example, ``` spark.sql(s"CREATE TABLE $t (id int) USING $v2Format") spark.sql("ALTER TABLE $t ADD COLUMNS (data string, data string)") ``` doesn't fail the analysis, and it's up to the catalog implementation to handle it. For v1 command, the duplication is checked before invoking the catalog. ### Why are the changes needed? To check the duplicate columns during analysis and be consistent with v1 command. ### Does this PR introduce _any_ user-facing change? Yes, now the above will command will print out the fllowing: ``` org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the user specified columns: `data` ``` ### How was this patch tested? Added new unit tests Closes #33600 from imback82/alter_add_duplicate_columns. Authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
c039d99812
commit
3b713e7f61
|
@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionM
|
|||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.util.SchemaUtils
|
||||
|
||||
/**
|
||||
* Throws user facing errors when passed invalid queries that fail to analyze.
|
||||
|
@ -951,6 +952,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
|||
colsToAdd.foreach { colToAdd =>
|
||||
checkColumnNotExists("add", colToAdd.name, table.schema)
|
||||
}
|
||||
SchemaUtils.checkColumnNameDuplication(
|
||||
colsToAdd.map(_.name.quoted),
|
||||
"in the user specified columns",
|
||||
alter.conf.resolver)
|
||||
|
||||
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
|
||||
checkColumnNotExists("rename", col.path :+ newName, table.schema)
|
||||
|
|
|
@ -384,6 +384,29 @@ trait AlterTableTests extends SharedSparkSession {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-36372: Adding duplicate columns should not be allowed") {
|
||||
val t = s"${catalogAndNamespace}table_name"
|
||||
withTable(t) {
|
||||
sql(s"CREATE TABLE $t (id int) USING $v2Format")
|
||||
val e = intercept[AnalysisException] {
|
||||
sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data string)")
|
||||
}
|
||||
assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`"))
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-36372: Adding duplicate nested columns should not be allowed") {
|
||||
val t = s"${catalogAndNamespace}table_name"
|
||||
withTable(t) {
|
||||
sql(s"CREATE TABLE $t (id int, point struct<x: double, y: double>) USING $v2Format")
|
||||
val e = intercept[AnalysisException] {
|
||||
sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, point.xx double)")
|
||||
}
|
||||
assert(e.message.contains(
|
||||
"Found duplicate column(s) in the user specified columns: `point.z`"))
|
||||
}
|
||||
}
|
||||
|
||||
test("AlterTable: update column type int -> long") {
|
||||
val t = s"${catalogAndNamespace}table_name"
|
||||
withTable(t) {
|
||||
|
|
|
@ -179,7 +179,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
Some(UnresolvedFieldPosition(ColumnPosition.after("id")))),
|
||||
QualifiedColType(
|
||||
None,
|
||||
"x",
|
||||
"y",
|
||||
LongType,
|
||||
true,
|
||||
None,
|
||||
|
@ -227,6 +227,28 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
)
|
||||
}
|
||||
|
||||
test("SPARK-36372: Adding duplicate columns should not be allowed") {
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("point"))),
|
||||
"z",
|
||||
LongType,
|
||||
true,
|
||||
None,
|
||||
None),
|
||||
QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("point"))),
|
||||
"Z",
|
||||
LongType,
|
||||
true,
|
||||
None,
|
||||
None))),
|
||||
Seq("Found duplicate column(s) in the user specified columns: `point.z`"),
|
||||
expectErrorOnCaseSensitive = false)
|
||||
}
|
||||
|
||||
test("AlterTable: drop column resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
|
@ -272,10 +294,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
}
|
||||
}
|
||||
|
||||
private def alterTableTest(alter: AlterTableColumnCommand, error: Seq[String]): Unit = {
|
||||
private def alterTableTest(
|
||||
alter: AlterTableColumnCommand,
|
||||
error: Seq[String],
|
||||
expectErrorOnCaseSensitive: Boolean = true): Unit = {
|
||||
Seq(true, false).foreach { caseSensitive =>
|
||||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
|
||||
if (caseSensitive) {
|
||||
val expectError = if (expectErrorOnCaseSensitive) caseSensitive else !caseSensitive
|
||||
if (expectError) {
|
||||
assertAnalysisError(alter, error, caseSensitive)
|
||||
} else {
|
||||
assertAnalysisSuccess(alter, caseSensitive)
|
||||
|
|
Loading…
Reference in a new issue