[SPARK-34302][SQL] Migrate ALTER TABLE ... CHANGE COLUMN command to use UnresolvedTable to resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate the following `ALTER TABLE ... CHANGE COLUMN` command to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900).

### Does this PR introduce _any_ user-facing change?

After this PR, the above `ALTER TABLE ... CHANGE COLUMN` commands will have a consistent resolution behavior.

### How was this patch tested?

Updated existing tests.

Closes #33113 from imback82/alter_change_column.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2021-06-29 02:53:05 +00:00 committed by Wenchen Fan
parent 622fc686e2
commit 620fde4767
13 changed files with 302 additions and 312 deletions

View file

@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction}
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
@ -299,7 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
Batch("Normalize Alter Table Commands", Once, ResolveAlterTableCommands),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
@ -3527,13 +3527,35 @@ class Analyzer(override val catalogManager: CatalogManager)
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table commands.
*/
object ResolveFieldNames extends Rule[LogicalPlan] {
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableCommand if a.table.resolved =>
a.transformExpressions {
val table = a.table.asInstanceOf[ResolvedTable]
val transformed = a.transformExpressions {
case u: UnresolvedFieldName =>
val table = a.table.asInstanceOf[ResolvedTable]
resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u)
resolveFieldNames(table.schema, u.name).getOrElse(u)
case u: UnresolvedFieldPosition => u.position match {
case after: After =>
resolveFieldNames(table.schema, u.fieldName.init :+ after.column())
.map { resolved =>
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
}.getOrElse(u)
case _ => ResolvedFieldPosition(u.position)
}
}
transformed match {
case alter @ AlterTableAlterColumn(
_: ResolvedTable, ResolvedFieldName(_, field), Some(dataType), _, _, _) =>
// Hive style syntax provides the column type, even if it may not have changed.
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == dataType) {
// The user didn't want the field to change, so remove this change.
alter.copy(dataType = None)
} else {
alter
}
case other => other
}
}
@ -3543,10 +3565,10 @@ class Analyzer(override val catalogManager: CatalogManager)
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Seq[String]): Option[Seq[String]] = {
fieldNames: Seq[String]): Option[ResolvedFieldName] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => path :+ field.name }
fieldOpt.map { case (path, field) => ResolvedFieldName(path, field) }
}
}
@ -3598,68 +3620,6 @@ class Analyzer(override val catalogManager: CatalogManager)
Some(addColumn(schema, "root", Nil))
}
case typeChange: UpdateColumnType =>
// Hive style syntax provides the column type, even if it may not have changed
val fieldOpt = schema.findNestedField(
typeChange.fieldNames(), includeCollections = true, conf.resolver)
if (fieldOpt.isEmpty) {
// We couldn't resolve the field. Leave it to CheckAnalysis
Some(typeChange)
} else {
val (fieldNames, field) = fieldOpt.get
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == typeChange.newDataType()) {
// The user didn't want the field to change, so remove this change
None
} else {
Some(TableChange.updateColumnType(
(fieldNames :+ field.name).toArray, typeChange.newDataType()))
}
}
case n: UpdateColumnNullability =>
// Need to resolve column
resolveFieldNames(
schema,
n.fieldNames(),
TableChange.updateColumnNullability(_, n.nullable())).orElse(Some(n))
case position: UpdateColumnPosition =>
position.position() match {
case after: After =>
// Need to resolve column as well as position reference
val fieldOpt = schema.findNestedField(
position.fieldNames(), includeCollections = true, conf.resolver)
if (fieldOpt.isEmpty) {
Some(position)
} else {
val (normalizedPath, field) = fieldOpt.get
val targetCol = schema.findNestedField(
normalizedPath :+ after.column(), includeCollections = true, conf.resolver)
if (targetCol.isEmpty) {
// Leave unchanged to CheckAnalysis
Some(position)
} else {
Some(TableChange.updateColumnPosition(
(normalizedPath :+ field.name).toArray,
ColumnPosition.after(targetCol.get._2.name)))
}
}
case _ =>
// Need to resolve column
resolveFieldNames(
schema,
position.fieldNames(),
TableChange.updateColumnPosition(_, position.position())).orElse(Some(position))
}
case comment: UpdateColumnComment =>
resolveFieldNames(
schema,
comment.fieldNames(),
TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment))
case delete: DeleteColumn =>
resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn)
.orElse(Some(delete))

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -444,12 +444,42 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
case alter: AlterTableCommand if alter.table.resolved =>
val table = alter.table.asInstanceOf[ResolvedTable]
def findField(fieldName: Seq[String]): StructField = {
// Include collections because structs nested in maps and arrays may be altered.
val field = table.schema.findNestedField(fieldName, includeCollections = true)
if (field.isEmpty) {
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${fieldName.quoted} " +
s"in ${table.name} schema: ${table.schema.treeString}")
}
field.get._2
}
def findParentStruct(fieldNames: Seq[String]): StructType = {
val parent = fieldNames.init
val field = if (parent.nonEmpty) {
findField(parent).dataType
} else {
table.schema
}
field match {
case s: StructType => s
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldNames.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
}
alter.transformExpressions {
case u: UnresolvedFieldName =>
val table = alter.table.asInstanceOf[ResolvedTable]
alter.failAnalysis(
s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " +
s"schema: ${table.schema.treeString}")
case UnresolvedFieldName(name) =>
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${name.quoted} in " +
s"${table.name} schema: ${table.schema.treeString}")
case UnresolvedFieldPosition(fieldName, position: After) =>
val parent = findParentStruct(fieldName)
val allFields = parent match {
case s: StructType => s.fieldNames
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldName.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " +
s"${allFields.mkString("[", ", ", "]")}")
}
checkAlterTableCommand(alter)
@ -522,66 +552,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
positionArgumentExists(add.position(), parent, fieldsAdded)
TypeUtils.failWithIntervalType(add.dataType())
colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last
case update: UpdateColumnType =>
val field = {
val f = findField("update", update.fieldNames)
CharVarcharUtils.getRawType(f.metadata)
.map(dt => f.copy(dataType = dt))
.getOrElse(f)
}
val fieldName = update.fieldNames.quoted
update.newDataType match {
case _: StructType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a struct by updating its fields")
case _: MapType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case u: UserDefinedType[_] =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a UserDefinedType[${u.sql}] by updating its fields")
case _: CalendarIntervalType | _: YearMonthIntervalType |
_: DayTimeIntervalType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to " +
s"interval type")
case _ =>
// update is okay
}
// We don't need to handle nested types here which shall fail before
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, update.newDataType)) {
alter.failAnalysis(
s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to " +
s"${update.newDataType.simpleString}")
}
case update: UpdateColumnNullability =>
val field = findField("update", update.fieldNames)
val fieldName = update.fieldNames.quoted
if (!update.nullable && field.nullable) {
alter.failAnalysis(
s"Cannot change nullable column to non-nullable: $fieldName")
}
case updatePos: UpdateColumnPosition =>
findField("update", updatePos.fieldNames)
val parent = findParentStruct("update", updatePos.fieldNames())
val parentName = updatePos.fieldNames().init
positionArgumentExists(
updatePos.position(),
parent,
colsToAdd.getOrElse(parentName, Nil))
case update: UpdateColumnComment =>
findField("update", update.fieldNames)
case delete: DeleteColumn =>
findField("delete", delete.fieldNames)
// REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns
@ -1088,8 +1058,51 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
alter match {
case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) =>
checkColumnNotExists(name.init :+ newName, table.schema)
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists(col.path :+ newName, table.schema)
case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
newDataType match {
case _: StructType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
"update a struct by updating its fields")
case _: MapType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case u: UserDefinedType[_] =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a UserDefinedType[${u.sql}] by updating its fields")
case _: CalendarIntervalType | _: YearMonthIntervalType | _: DayTimeIntervalType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to interval type")
case _ => // update is okay
}
// We don't need to handle nested types here which shall fail before.
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to ${newDataType.simpleString}")
}
}
if (a.nullable.isDefined) {
if (!a.nullable.get && col.field.nullable) {
alter.failAnalysis(s"Cannot change nullable column to non-nullable: $fieldName")
}
}
case _ =>
}
}

View file

@ -66,28 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
}
createAlterTable(nameParts, catalog, tbl, changes)
case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)

View file

@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.types.{DataType, StructField}
/**
* Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to
@ -92,15 +93,29 @@ case class UnresolvedPartitionSpec(
sealed trait FieldName extends LeafExpression with Unevaluable {
def name: Seq[String]
override def dataType: DataType = throw new IllegalStateException(
"UnresolvedFieldName.dataType should not be called.")
"FieldName.dataType should not be called.")
override def nullable: Boolean = throw new IllegalStateException(
"UnresolvedFieldName.nullable should not be called.")
"FieldName.nullable should not be called.")
}
case class UnresolvedFieldName(name: Seq[String]) extends FieldName {
override lazy val resolved = false
}
sealed trait FieldPosition extends LeafExpression with Unevaluable {
def position: ColumnPosition
override def dataType: DataType = throw new IllegalStateException(
"FieldPosition.dataType should not be called.")
override def nullable: Boolean = throw new IllegalStateException(
"FieldPosition.nullable should not be called.")
}
case class UnresolvedFieldPosition(
fieldName: Seq[String],
position: ColumnPosition) extends FieldPosition {
override lazy val resolved = false
}
/**
* Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedFunc]] during analysis.
@ -150,7 +165,12 @@ case class ResolvedPartitionSpec(
ident: InternalRow,
location: Option[String] = None) extends PartitionSpec
case class ResolvedFieldName(name: Seq[String]) extends FieldName
case class ResolvedFieldName(path: Seq[String], field: StructField) extends FieldName {
def name: Seq[String] = path :+ field.name
}
case class ResolvedFieldPosition(position: ColumnPosition) extends FieldPosition
/**
* A plan containing resolved (temp) views.

View file

@ -3558,7 +3558,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
* Parse a [[AlterTableAlterColumnStatement]] command to alter a column's property.
* Parse a [[AlterTableAlterColumn]] command to alter a column's property.
*
* For example:
* {{{
@ -3573,13 +3573,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitAlterTableAlterColumn(
ctx: AlterTableAlterColumnContext): LogicalPlan = withOrigin(ctx) {
val action = ctx.alterColumnAction
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
if (action == null) {
val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER"
operationNotAllowed(
s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER",
ctx)
}
val columnNameParts = typedVisit[Seq[String]](ctx.column)
val dataType = if (action.dataType != null) {
Some(typedVisit[DataType](action.dataType))
} else {
@ -3599,16 +3599,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
None
}
val position = if (action.colPosition != null) {
Some(typedVisit[ColumnPosition](action.colPosition))
Some(UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](action.colPosition)))
} else {
None
}
assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.table),
typedVisit[Seq[String]](ctx.column),
AlterTableAlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
UnresolvedFieldName(columnNameParts),
dataType = dataType,
nullable = nullable,
comment = comment,
@ -3616,7 +3616,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
* Parse a [[AlterTableAlterColumnStatement]] command. This is Hive SQL syntax.
* Parse a [[AlterTableAlterColumn]] command. This is Hive SQL syntax.
*
* For example:
* {{{
@ -3640,13 +3640,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
}
AlterTableAlterColumnStatement(
typedVisit[Seq[String]](ctx.table),
columnNameParts,
AlterTableAlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
UnresolvedFieldName(columnNameParts),
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
nullable = None,
comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
position = Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
position = Option(ctx.colPosition).map(
pos => UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](pos))))
}
override def visitHiveReplaceColumns(

View file

@ -246,17 +246,6 @@ case class AlterTableReplaceColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends LeafParsedStatement
/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
case class AlterTableAlterColumnStatement(
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[ColumnPosition]) extends LeafParsedStatement
/**
* An INSERT INTO statement, as parsed from SQL.
*

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
@ -1142,3 +1142,42 @@ case class AlterTableRenameColumn(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
/**
* The logical plan of the ALTER TABLE ... ALTER COLUMN command.
*/
case class AlterTableAlterColumn(
table: LogicalPlan,
column: FieldName,
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[FieldPosition]) extends AlterTableCommand {
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
dataType.foreach(failNullType)
override def operation: String = "update"
override def changes: Seq[TableChange] = {
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
val colName = column.name.toArray
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = position.map { newPosition =>
require(newPosition.resolved,
"FieldPosition should be resolved before it's converted to TableChange.")
TableChange.updateColumnPosition(colName, newPosition.position)
}
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@ -901,9 +901,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type using ALTER") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(LongType),
None,
None,
@ -920,9 +920,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(LongType),
None,
None,
@ -932,9 +932,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column comment") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
None,
Some("new comment"),
@ -944,13 +944,13 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column position") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
None,
None,
Some(first())))
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), first()))))
}
test("alter table: multiple property changes are not allowed") {
@ -970,9 +970,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: SET/DROP NOT NULL") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
Some(false),
None,
@ -980,9 +980,9 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
None,
Some(true),
None,
@ -1017,9 +1017,9 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql1),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
None,
None,
@ -1027,9 +1027,9 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql2),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
None,
Some("new_comment"),
@ -1037,13 +1037,13 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql3),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
UnresolvedFieldName(Seq("a", "b", "c")),
Some(IntegerType),
None,
None,
Some(after("other_col"))))
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), after("other_col")))))
// renaming column not supported in hive style ALTER COLUMN.
intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT",

View file

@ -94,58 +94,34 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
createAlterTable(nameParts, catalog, tbl, changes)
case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (a.column.length > 1) {
throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
}
if (a.nullable.isDefined) {
throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
}
if (a.position.isDefined) {
throw QueryCompilationErrors.alterOnlySupportedWithV2TableError
}
val builder = new MetadataBuilder
// Add comment to metadata
a.comment.map(c => builder.putString("comment", c))
val colName = a.column(0)
val dataType = a.dataType.getOrElse {
v1Table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
.map(_._2.dataType)
.getOrElse {
throw QueryCompilationErrors.alterColumnCannotFindColumnInV1TableError(
quoteIfNeeded(colName), v1Table)
}
}
val newColumn = StructField(
colName,
dataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn)
}.getOrElse {
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
}
if (a.nullable.isDefined) {
throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
}
if (a.position.isDefined) {
throw QueryCompilationErrors.alterOnlySupportedWithV2TableError
}
val builder = new MetadataBuilder
// Add comment to metadata
a.comment.map(c => builder.putString("comment", c))
val colName = a.column.name(0)
val dataType = a.dataType.getOrElse {
table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
.map(_._2.dataType)
.getOrElse {
throw QueryCompilationErrors.alterColumnCannotFindColumnInV1TableError(
quoteIfNeeded(colName), table)
}
}
val newColumn = StructField(
colName,
dataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
@ -662,6 +638,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}
object ResolvedV1TableAndIdentifier {
def unapply(resolved: LogicalPlan): Option[(V1Table, Identifier)] = resolved match {
case ResolvedTable(catalog, ident, table: V1Table, _) if isSessionCatalog(catalog) =>
Some(table -> ident)
case _ => None
}
}
object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
case ResolvedTable(catalog, ident, _: V1Table, _) if isSessionCatalog(catalog) => Some(ident)

View file

@ -221,7 +221,7 @@ ALTER TABLE temp_view CHANGE a TYPE INT
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid command: 'temp_view' is a view not a table.; line 1 pos 0
temp_view is a temp view. 'ALTER TABLE ... CHANGE COLUMN' expects a table.; line 1 pos 12
-- !query
@ -238,7 +238,7 @@ ALTER TABLE global_temp.global_temp_view CHANGE a TYPE INT
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid command: 'global_temp.global_temp_view' is a view not a table.; line 1 pos 0
global_temp.global_temp_view is a temp view. 'ALTER TABLE ... CHANGE COLUMN' expects a table.; line 1 pos 12
-- !query

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, UnresolvedFieldName}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableAlterColumn, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@ -209,7 +209,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: drop column nullability resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
TableChange.updateColumnNullability(ref, true),
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
Seq("Cannot update missing field", ref.quoted)
)
}
@ -218,7 +218,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: change column type resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
TableChange.updateColumnType(ref, StringType),
AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
Seq("Cannot update missing field", ref.quoted)
)
}
@ -227,7 +227,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
test("AlterTable: change column comment resolution") {
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
alterTableTest(
TableChange.updateColumnComment(ref, "Here's a comment for ya"),
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
Seq("Cannot update missing field", ref.quoted)
)
}

View file

@ -26,15 +26,14 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.TableChange.{UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@ -1101,17 +1100,27 @@ class PlanResolutionSuite extends AnalysisTest {
"ALTER COLUMN with qualified column is only supported with v2 tables"))
} else {
parsed1 match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.updateColumnType(Array("i"), LongType)))
case _ => fail("expect AlterTable")
case AlterTableAlterColumn(
_: ResolvedTable,
column: ResolvedFieldName,
Some(LongType),
None,
None,
None) =>
assert(column.name == Seq("i"))
case _ => fail("expect AlterTableAlterColumn")
}
parsed2 match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.updateColumnComment(Array("i"), "new comment")))
case _ => fail("expect AlterTable")
case AlterTableAlterColumn(
_: ResolvedTable,
column: ResolvedFieldName,
None,
None,
Some("new comment"),
None) =>
assert(column.name == Seq("i"))
case _ => fail("expect AlterTableAlterColumn")
}
}
}
@ -1158,21 +1167,18 @@ class PlanResolutionSuite extends AnalysisTest {
test("alter table: hive style change column") {
Seq("v2Table", "testcat.tab").foreach { tblName =>
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes.length == 1, "Should only have a comment change")
assert(changes.head.isInstanceOf[UpdateColumnComment],
s"Expected only a UpdateColumnComment change but got: ${changes.head}")
case _ => fail("expect AlterTable")
case AlterTableAlterColumn(
_: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) =>
assert(comment == "an index")
case _ => fail("expect AlterTableAlterColumn with comment change only")
}
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match {
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes.length == 2, "Should have a comment change and type change")
assert(changes.exists(_.isInstanceOf[UpdateColumnComment]),
s"Expected UpdateColumnComment change but got: ${changes}")
assert(changes.exists(_.isInstanceOf[UpdateColumnType]),
s"Expected UpdateColumnType change but got: ${changes}")
case _ => fail("expect AlterTable")
case AlterTableAlterColumn(
_: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) =>
assert(comment == "an index")
assert(dataType == LongType)
case _ => fail("expect AlterTableAlterColumn with type and comment changes")
}
}
}
@ -1201,23 +1207,23 @@ class PlanResolutionSuite extends AnalysisTest {
DSV2ResolutionTests.foreach { case (sql, isSessionCatalog) =>
test(s"Data source V2 relation resolution '$sql'") {
val parsed = parseAndResolve(sql, withDefault = true)
val catalogIdent = if (isSessionCatalog) v2SessionCatalog else testCat
val catalog = if (isSessionCatalog) v2SessionCatalog else testCat
val tableIdent = if (isSessionCatalog) "v2Table" else "tab"
parsed match {
case AlterTable(_, _, r: DataSourceV2Relation, _) =>
assert(r.catalog.exists(_ == catalogIdent))
assert(r.identifier.exists(_.name() == tableIdent))
case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) =>
assert(r.catalog == catalog)
assert(r.identifier.name() == tableIdent)
case Project(_, AsDataSourceV2Relation(r)) =>
assert(r.catalog.exists(_ == catalogIdent))
assert(r.catalog.exists(_ == catalog))
assert(r.identifier.exists(_.name() == tableIdent))
case AppendData(r: DataSourceV2Relation, _, _, _, _) =>
assert(r.catalog.exists(_ == catalogIdent))
assert(r.catalog.exists(_ == catalog))
assert(r.identifier.exists(_.name() == tableIdent))
case DescribeRelation(r: ResolvedTable, _, _, _) =>
assert(r.catalog == catalogIdent)
assert(r.catalog == catalog)
assert(r.identifier.name() == tableIdent)
case ShowTableProperties(r: ResolvedTable, _, _) =>
assert(r.catalog == catalogIdent)
assert(r.catalog == catalog)
assert(r.identifier.name() == tableIdent)
case ShowTablePropertiesCommand(t: TableIdentifier, _, _) =>
assert(t.identifier == tableIdent)

View file

@ -266,7 +266,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg1 = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE")
}.getMessage
assert(msg1.contains("Cannot update missing field bad_column in test.alt_table schema"))
assert(msg1.contains("Cannot update missing field bad_column in h2.test.alt_table schema"))
// Update column to wrong type
val msg2 = intercept[ParseException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE bad_type")
@ -297,7 +297,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table"))
}
// Update column nullability in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
@ -321,7 +321,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column in test.alt_table"))
assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table"))
}
// Update column comments in not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
@ -376,7 +376,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
}.getMessage
assert(msg.contains("Cannot update missing field C1 in test.alt_table schema"))
assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@ -390,7 +390,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field C1 in test.alt_table schema"))
assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema"))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {