From 620fde476777cdce4e55c3398d5aec44ba035de3 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 29 Jun 2021 02:53:05 +0000 Subject: [PATCH] [SPARK-34302][SQL] Migrate ALTER TABLE ... CHANGE COLUMN command to use UnresolvedTable to resolve the identifier ### What changes were proposed in this pull request? This PR proposes to migrate the following `ALTER TABLE ... CHANGE COLUMN` command to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing). ### Why are the changes needed? This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900). ### Does this PR introduce _any_ user-facing change? After this PR, the above `ALTER TABLE ... CHANGE COLUMN` commands will have a consistent resolution behavior. ### How was this patch tested? Updated existing tests. Closes #33113 from imback82/alter_change_column. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 100 ++++-------- .../sql/catalyst/analysis/CheckAnalysis.scala | 149 ++++++++++-------- .../catalyst/analysis/ResolveCatalogs.scala | 22 --- .../catalyst/analysis/v2ResolutionPlans.scala | 28 +++- .../sql/catalyst/parser/AstBuilder.scala | 25 +-- .../catalyst/plans/logical/statements.scala | 11 -- .../catalyst/plans/logical/v2Commands.scala | 41 ++++- .../sql/catalyst/parser/DDLParserSuite.scala | 60 +++---- .../analysis/ResolveSessionCatalog.scala | 86 ++++------ .../sql-tests/results/change-column.sql.out | 4 +- .../V2CommandsCaseSensitivitySuite.scala | 8 +- .../command/PlanResolutionSuite.scala | 70 ++++---- .../v2/jdbc/JDBCTableCatalogSuite.scala | 10 +- 13 files changed, 302 insertions(+), 312 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5de228bfd0..e47596f49a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn} import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction} import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} @@ -299,7 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager) Batch("Post-Hoc Resolution", Once, Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), - Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames), + Batch("Normalize Alter Table Commands", Once, ResolveAlterTableCommands), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, new ResolveHints.RemoveAllHints), @@ -3527,13 +3527,35 @@ class Analyzer(override val catalogManager: CatalogManager) * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity * for alter table commands. */ - object ResolveFieldNames extends Rule[LogicalPlan] { + object ResolveAlterTableCommands extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case a: AlterTableCommand if a.table.resolved => - a.transformExpressions { + val table = a.table.asInstanceOf[ResolvedTable] + val transformed = a.transformExpressions { case u: UnresolvedFieldName => - val table = a.table.asInstanceOf[ResolvedTable] - resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u) + resolveFieldNames(table.schema, u.name).getOrElse(u) + case u: UnresolvedFieldPosition => u.position match { + case after: After => + resolveFieldNames(table.schema, u.fieldName.init :+ after.column()) + .map { resolved => + ResolvedFieldPosition(ColumnPosition.after(resolved.field.name)) + }.getOrElse(u) + case _ => ResolvedFieldPosition(u.position) + } + } + + transformed match { + case alter @ AlterTableAlterColumn( + _: ResolvedTable, ResolvedFieldName(_, field), Some(dataType), _, _, _) => + // Hive style syntax provides the column type, even if it may not have changed. + val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType) + if (dt == dataType) { + // The user didn't want the field to change, so remove this change. + alter.copy(dataType = None) + } else { + alter + } + case other => other } } @@ -3543,10 +3565,10 @@ class Analyzer(override val catalogManager: CatalogManager) */ private def resolveFieldNames( schema: StructType, - fieldNames: Seq[String]): Option[Seq[String]] = { + fieldNames: Seq[String]): Option[ResolvedFieldName] = { val fieldOpt = schema.findNestedField( fieldNames, includeCollections = true, conf.resolver) - fieldOpt.map { case (path, field) => path :+ field.name } + fieldOpt.map { case (path, field) => ResolvedFieldName(path, field) } } } @@ -3598,68 +3620,6 @@ class Analyzer(override val catalogManager: CatalogManager) Some(addColumn(schema, "root", Nil)) } - case typeChange: UpdateColumnType => - // Hive style syntax provides the column type, even if it may not have changed - val fieldOpt = schema.findNestedField( - typeChange.fieldNames(), includeCollections = true, conf.resolver) - - if (fieldOpt.isEmpty) { - // We couldn't resolve the field. Leave it to CheckAnalysis - Some(typeChange) - } else { - val (fieldNames, field) = fieldOpt.get - val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType) - if (dt == typeChange.newDataType()) { - // The user didn't want the field to change, so remove this change - None - } else { - Some(TableChange.updateColumnType( - (fieldNames :+ field.name).toArray, typeChange.newDataType())) - } - } - case n: UpdateColumnNullability => - // Need to resolve column - resolveFieldNames( - schema, - n.fieldNames(), - TableChange.updateColumnNullability(_, n.nullable())).orElse(Some(n)) - - case position: UpdateColumnPosition => - position.position() match { - case after: After => - // Need to resolve column as well as position reference - val fieldOpt = schema.findNestedField( - position.fieldNames(), includeCollections = true, conf.resolver) - - if (fieldOpt.isEmpty) { - Some(position) - } else { - val (normalizedPath, field) = fieldOpt.get - val targetCol = schema.findNestedField( - normalizedPath :+ after.column(), includeCollections = true, conf.resolver) - if (targetCol.isEmpty) { - // Leave unchanged to CheckAnalysis - Some(position) - } else { - Some(TableChange.updateColumnPosition( - (normalizedPath :+ field.name).toArray, - ColumnPosition.after(targetCol.get._2.name))) - } - } - case _ => - // Need to resolve column - resolveFieldNames( - schema, - position.fieldNames(), - TableChange.updateColumnPosition(_, position.position())).orElse(Some(position)) - } - - case comment: UpdateColumnComment => - resolveFieldNames( - schema, - comment.fieldNames(), - TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment)) - case delete: DeleteColumn => resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn) .orElse(Some(delete)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 95854966a1..1f0d7d4061 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils} import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -444,12 +444,42 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) case alter: AlterTableCommand if alter.table.resolved => + val table = alter.table.asInstanceOf[ResolvedTable] + def findField(fieldName: Seq[String]): StructField = { + // Include collections because structs nested in maps and arrays may be altered. + val field = table.schema.findNestedField(fieldName, includeCollections = true) + if (field.isEmpty) { + alter.failAnalysis(s"Cannot ${alter.operation} missing field ${fieldName.quoted} " + + s"in ${table.name} schema: ${table.schema.treeString}") + } + field.get._2 + } + def findParentStruct(fieldNames: Seq[String]): StructType = { + val parent = fieldNames.init + val field = if (parent.nonEmpty) { + findField(parent).dataType + } else { + table.schema + } + field match { + case s: StructType => s + case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldNames.quoted}, " + + s"because its parent is not a StructType. Found $o") + } + } alter.transformExpressions { - case u: UnresolvedFieldName => - val table = alter.table.asInstanceOf[ResolvedTable] - alter.failAnalysis( - s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " + - s"schema: ${table.schema.treeString}") + case UnresolvedFieldName(name) => + alter.failAnalysis(s"Cannot ${alter.operation} missing field ${name.quoted} in " + + s"${table.name} schema: ${table.schema.treeString}") + case UnresolvedFieldPosition(fieldName, position: After) => + val parent = findParentStruct(fieldName) + val allFields = parent match { + case s: StructType => s.fieldNames + case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldName.quoted}, " + + s"because its parent is not a StructType. Found $o") + } + alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " + + s"${allFields.mkString("[", ", ", "]")}") } checkAlterTableCommand(alter) @@ -522,66 +552,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { positionArgumentExists(add.position(), parent, fieldsAdded) TypeUtils.failWithIntervalType(add.dataType()) colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last - case update: UpdateColumnType => - val field = { - val f = findField("update", update.fieldNames) - CharVarcharUtils.getRawType(f.metadata) - .map(dt => f.copy(dataType = dt)) - .getOrElse(f) - } - val fieldName = update.fieldNames.quoted - update.newDataType match { - case _: StructType => - alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + - s"update a struct by updating its fields") - case _: MapType => - alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + - s"update a map by updating $fieldName.key or $fieldName.value") - case _: ArrayType => - alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + - s"update the element by updating $fieldName.element") - case u: UserDefinedType[_] => - alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + - s"update a UserDefinedType[${u.sql}] by updating its fields") - case _: CalendarIntervalType | _: YearMonthIntervalType | - _: DayTimeIntervalType => - alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to " + - s"interval type") - case _ => - // update is okay - } - - // We don't need to handle nested types here which shall fail before - def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match { - case (CharType(l1), CharType(l2)) => l1 == l2 - case (CharType(l1), VarcharType(l2)) => l1 <= l2 - case (VarcharType(l1), VarcharType(l2)) => l1 <= l2 - case _ => Cast.canUpCast(from, to) - } - - if (!canAlterColumnType(field.dataType, update.newDataType)) { - alter.failAnalysis( - s"Cannot update ${table.name} field $fieldName: " + - s"${field.dataType.simpleString} cannot be cast to " + - s"${update.newDataType.simpleString}") - } - case update: UpdateColumnNullability => - val field = findField("update", update.fieldNames) - val fieldName = update.fieldNames.quoted - if (!update.nullable && field.nullable) { - alter.failAnalysis( - s"Cannot change nullable column to non-nullable: $fieldName") - } - case updatePos: UpdateColumnPosition => - findField("update", updatePos.fieldNames) - val parent = findParentStruct("update", updatePos.fieldNames()) - val parentName = updatePos.fieldNames().init - positionArgumentExists( - updatePos.position(), - parent, - colsToAdd.getOrElse(parentName, Nil)) - case update: UpdateColumnComment => - findField("update", update.fieldNames) case delete: DeleteColumn => findField("delete", delete.fieldNames) // REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns @@ -1088,8 +1058,51 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } alter match { - case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) => - checkColumnNotExists(name.init :+ newName, table.schema) + case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => + checkColumnNotExists(col.path :+ newName, table.schema) + case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) => + val fieldName = col.name.quoted + if (a.dataType.isDefined) { + val field = CharVarcharUtils.getRawType(col.field.metadata) + .map(dt => col.field.copy(dataType = dt)) + .getOrElse(col.field) + val newDataType = a.dataType.get + newDataType match { + case _: StructType => + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + + "update a struct by updating its fields") + case _: MapType => + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + + s"update a map by updating $fieldName.key or $fieldName.value") + case _: ArrayType => + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + + s"update the element by updating $fieldName.element") + case u: UserDefinedType[_] => + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " + + s"update a UserDefinedType[${u.sql}] by updating its fields") + case _: CalendarIntervalType | _: YearMonthIntervalType | _: DayTimeIntervalType => + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to interval type") + case _ => // update is okay + } + + // We don't need to handle nested types here which shall fail before. + def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match { + case (CharType(l1), CharType(l2)) => l1 == l2 + case (CharType(l1), VarcharType(l2)) => l1 <= l2 + case (VarcharType(l1), VarcharType(l2)) => l1 <= l2 + case _ => Cast.canUpCast(from, to) + } + + if (!canAlterColumnType(field.dataType, newDataType)) { + alter.failAnalysis(s"Cannot update ${table.name} field $fieldName: " + + s"${field.dataType.simpleString} cannot be cast to ${newDataType.simpleString}") + } + } + if (a.nullable.isDefined) { + if (!a.nullable.get && col.field.nullable) { + alter.failAnalysis(s"Cannot change nullable column to non-nullable: $fieldName") + } + } case _ => } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 66a2b96ce7..d7603fbcff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -66,28 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) } createAlterTable(nameParts, catalog, tbl, changes) - case a @ AlterTableAlterColumnStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => - a.dataType.foreach(failNullType) - val colName = a.column.toArray - val typeChange = a.dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = a.nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = a.comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = a.position.map { newPosition => - TableChange.updateColumnPosition(colName, newPosition) - } - createAlterTable( - nameParts, - catalog, - tbl, - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) - case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => assertNoNullTypeInSchema(c.tableSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 7b85563b8a..f79dd6a2fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition +import org.apache.spark.sql.types.{DataType, StructField} /** * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to @@ -92,15 +93,29 @@ case class UnresolvedPartitionSpec( sealed trait FieldName extends LeafExpression with Unevaluable { def name: Seq[String] override def dataType: DataType = throw new IllegalStateException( - "UnresolvedFieldName.dataType should not be called.") + "FieldName.dataType should not be called.") override def nullable: Boolean = throw new IllegalStateException( - "UnresolvedFieldName.nullable should not be called.") + "FieldName.nullable should not be called.") } case class UnresolvedFieldName(name: Seq[String]) extends FieldName { override lazy val resolved = false } +sealed trait FieldPosition extends LeafExpression with Unevaluable { + def position: ColumnPosition + override def dataType: DataType = throw new IllegalStateException( + "FieldPosition.dataType should not be called.") + override def nullable: Boolean = throw new IllegalStateException( + "FieldPosition.nullable should not be called.") +} + +case class UnresolvedFieldPosition( + fieldName: Seq[String], + position: ColumnPosition) extends FieldPosition { + override lazy val resolved = false +} + /** * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to * [[ResolvedFunc]] during analysis. @@ -150,7 +165,12 @@ case class ResolvedPartitionSpec( ident: InternalRow, location: Option[String] = None) extends PartitionSpec -case class ResolvedFieldName(name: Seq[String]) extends FieldName +case class ResolvedFieldName(path: Seq[String], field: StructField) extends FieldName { + def name: Seq[String] = path :+ field.name +} + +case class ResolvedFieldPosition(position: ColumnPosition) extends FieldPosition + /** * A plan containing resolved (temp) views. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index dd48381ab2..224c2d02ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3558,7 +3558,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a [[AlterTableAlterColumnStatement]] command to alter a column's property. + * Parse a [[AlterTableAlterColumn]] command to alter a column's property. * * For example: * {{{ @@ -3573,13 +3573,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg override def visitAlterTableAlterColumn( ctx: AlterTableAlterColumnContext): LogicalPlan = withOrigin(ctx) { val action = ctx.alterColumnAction + val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER" if (action == null) { - val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER" operationNotAllowed( s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER", ctx) } - + val columnNameParts = typedVisit[Seq[String]](ctx.column) val dataType = if (action.dataType != null) { Some(typedVisit[DataType](action.dataType)) } else { @@ -3599,16 +3599,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg None } val position = if (action.colPosition != null) { - Some(typedVisit[ColumnPosition](action.colPosition)) + Some(UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](action.colPosition))) } else { None } assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1) - AlterTableAlterColumnStatement( - visitMultipartIdentifier(ctx.table), - typedVisit[Seq[String]](ctx.column), + AlterTableAlterColumn( + createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"), + UnresolvedFieldName(columnNameParts), dataType = dataType, nullable = nullable, comment = comment, @@ -3616,7 +3616,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a [[AlterTableAlterColumnStatement]] command. This is Hive SQL syntax. + * Parse a [[AlterTableAlterColumn]] command. This is Hive SQL syntax. * * For example: * {{{ @@ -3640,13 +3640,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead")) } - AlterTableAlterColumnStatement( - typedVisit[Seq[String]](ctx.table), - columnNameParts, + AlterTableAlterColumn( + createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"), + UnresolvedFieldName(columnNameParts), dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), nullable = None, comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec), - position = Option(ctx.colPosition).map(typedVisit[ColumnPosition])) + position = Option(ctx.colPosition).map( + pos => UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](pos)))) } override def visitHiveReplaceColumns( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index c14138e04f..8777b3caf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -246,17 +246,6 @@ case class AlterTableReplaceColumnsStatement( tableName: Seq[String], columnsToAdd: Seq[QualifiedColType]) extends LeafParsedStatement -/** - * ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL. - */ -case class AlterTableAlterColumnStatement( - tableName: Seq[String], - column: Seq[String], - dataType: Option[DataType], - nullable: Option[Boolean], - comment: Option[String], - position: Option[ColumnPosition]) extends LeafParsedStatement - /** * An INSERT INTO statement, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 2384d28d86..3d88d6232c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema @@ -1142,3 +1142,42 @@ case class AlterTableRenameColumn( override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild) } + +/** + * The logical plan of the ALTER TABLE ... ALTER COLUMN command. + */ +case class AlterTableAlterColumn( + table: LogicalPlan, + column: FieldName, + dataType: Option[DataType], + nullable: Option[Boolean], + comment: Option[String], + position: Option[FieldPosition]) extends AlterTableCommand { + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + dataType.foreach(failNullType) + + override def operation: String = "update" + + override def changes: Seq[TableChange] = { + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + val colName = column.name.toArray + val typeChange = dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) + } + val commentChange = comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = position.map { newPosition => + require(newPosition.resolved, + "FieldPosition should be resolved before it's converted to TableChange.") + TableChange.updateColumnPosition(colName, newPosition.position) + } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5518409f61..571338aa11 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -901,9 +901,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type using ALTER") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), Some(LongType), None, None, @@ -920,9 +920,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), Some(LongType), None, None, @@ -932,9 +932,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column comment") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), None, None, Some("new comment"), @@ -944,13 +944,13 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column position") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), None, None, None, - Some(first()))) + Some(UnresolvedFieldPosition(Seq("a", "b", "c"), first())))) } test("alter table: multiple property changes are not allowed") { @@ -970,9 +970,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: SET/DROP NOT NULL") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), None, Some(false), None, @@ -980,9 +980,9 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), None, Some(true), None, @@ -1017,9 +1017,9 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), None, None, @@ -1027,9 +1027,9 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql2), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), None, Some("new_comment"), @@ -1037,13 +1037,13 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql3), - AlterTableAlterColumnStatement( - Seq("table_name"), - Seq("a", "b", "c"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None), + UnresolvedFieldName(Seq("a", "b", "c")), Some(IntegerType), None, None, - Some(after("other_col")))) + Some(UnresolvedFieldPosition(Seq("a", "b", "c"), after("other_col"))))) // renaming column not supported in hive style ALTER COLUMN. intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 1775d5837b..5b3e269fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -94,58 +94,34 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } createAlterTable(nameParts, catalog, tbl, changes) - case a @ AlterTableAlterColumnStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => - a.dataType.foreach(failNullType) - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - if (a.column.length > 1) { - throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError - } - if (a.nullable.isDefined) { - throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError - } - if (a.position.isDefined) { - throw QueryCompilationErrors.alterOnlySupportedWithV2TableError - } - val builder = new MetadataBuilder - // Add comment to metadata - a.comment.map(c => builder.putString("comment", c)) - val colName = a.column(0) - val dataType = a.dataType.getOrElse { - v1Table.schema.findNestedField(Seq(colName), resolver = conf.resolver) - .map(_._2.dataType) - .getOrElse { - throw QueryCompilationErrors.alterColumnCannotFindColumnInV1TableError( - quoteIfNeeded(colName), v1Table) - } - } - val newColumn = StructField( - colName, - dataType, - nullable = true, - builder.build()) - AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName, newColumn) - }.getOrElse { - val colName = a.column.toArray - val typeChange = a.dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = a.nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = a.comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = a.position.map { newPosition => - TableChange.updateColumnPosition(colName, newPosition) - } - createAlterTable( - nameParts, - catalog, - tbl, - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) + case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) => + if (a.column.name.length > 1) { + throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError } + if (a.nullable.isDefined) { + throw QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError + } + if (a.position.isDefined) { + throw QueryCompilationErrors.alterOnlySupportedWithV2TableError + } + val builder = new MetadataBuilder + // Add comment to metadata + a.comment.map(c => builder.putString("comment", c)) + val colName = a.column.name(0) + val dataType = a.dataType.getOrElse { + table.schema.findNestedField(Seq(colName), resolver = conf.resolver) + .map(_._2.dataType) + .getOrElse { + throw QueryCompilationErrors.alterColumnCannotFindColumnInV1TableError( + quoteIfNeeded(colName), table) + } + } + val newColumn = StructField( + colName, + dataType, + nullable = true, + builder.build()) + AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn) case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) => throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError @@ -662,6 +638,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } } + object ResolvedV1TableAndIdentifier { + def unapply(resolved: LogicalPlan): Option[(V1Table, Identifier)] = resolved match { + case ResolvedTable(catalog, ident, table: V1Table, _) if isSessionCatalog(catalog) => + Some(table -> ident) + case _ => None + } + } + object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match { case ResolvedTable(catalog, ident, _: V1Table, _) if isSessionCatalog(catalog) => Some(ident) diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 96b28d734f..6a16f8901b 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -221,7 +221,7 @@ ALTER TABLE temp_view CHANGE a TYPE INT struct<> -- !query output org.apache.spark.sql.AnalysisException -Invalid command: 'temp_view' is a view not a table.; line 1 pos 0 +temp_view is a temp view. 'ALTER TABLE ... CHANGE COLUMN' expects a table.; line 1 pos 12 -- !query @@ -238,7 +238,7 @@ ALTER TABLE global_temp.global_temp_view CHANGE a TYPE INT struct<> -- !query output org.apache.spark.sql.AnalysisException -Invalid command: 'global_temp.global_temp_view' is a view not a table.; line 1 pos 0 +global_temp.global_temp_view is a temp view. 'ALTER TABLE ... CHANGE COLUMN' expects a table.; line 1 pos 12 -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 24d33e2045..7984aff99b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, UnresolvedFieldName} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AlterTableAlterColumn, AlterTableCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -209,7 +209,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: drop column nullability resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - TableChange.updateColumnNullability(ref, true), + AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None), Seq("Cannot update missing field", ref.quoted) ) } @@ -218,7 +218,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: change column type resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - TableChange.updateColumnType(ref, StringType), + AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None), Seq("Cannot update missing field", ref.quoted) ) } @@ -227,7 +227,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes test("AlterTable: change column comment resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - TableChange.updateColumnComment(ref, "Here's a comment for ya"), + AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None), Seq("Cannot update missing field", ref.quoted) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7b31fc1322..fbb72bcc88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,15 +26,14 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, TableChange, V1Table} -import org.apache.spark.sql.connector.catalog.TableChange.{UpdateColumnComment, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -1101,17 +1100,27 @@ class PlanResolutionSuite extends AnalysisTest { "ALTER COLUMN with qualified column is only supported with v2 tables")) } else { parsed1 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( - TableChange.updateColumnType(Array("i"), LongType))) - case _ => fail("expect AlterTable") + case AlterTableAlterColumn( + _: ResolvedTable, + column: ResolvedFieldName, + Some(LongType), + None, + None, + None) => + assert(column.name == Seq("i")) + case _ => fail("expect AlterTableAlterColumn") } parsed2 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( - TableChange.updateColumnComment(Array("i"), "new comment"))) - case _ => fail("expect AlterTable") + case AlterTableAlterColumn( + _: ResolvedTable, + column: ResolvedFieldName, + None, + None, + Some("new comment"), + None) => + assert(column.name == Seq("i")) + case _ => fail("expect AlterTableAlterColumn") } } } @@ -1158,21 +1167,18 @@ class PlanResolutionSuite extends AnalysisTest { test("alter table: hive style change column") { Seq("v2Table", "testcat.tab").foreach { tblName => parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes.length == 1, "Should only have a comment change") - assert(changes.head.isInstanceOf[UpdateColumnComment], - s"Expected only a UpdateColumnComment change but got: ${changes.head}") - case _ => fail("expect AlterTable") + case AlterTableAlterColumn( + _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) => + assert(comment == "an index") + case _ => fail("expect AlterTableAlterColumn with comment change only") } parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes.length == 2, "Should have a comment change and type change") - assert(changes.exists(_.isInstanceOf[UpdateColumnComment]), - s"Expected UpdateColumnComment change but got: ${changes}") - assert(changes.exists(_.isInstanceOf[UpdateColumnType]), - s"Expected UpdateColumnType change but got: ${changes}") - case _ => fail("expect AlterTable") + case AlterTableAlterColumn( + _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) => + assert(comment == "an index") + assert(dataType == LongType) + case _ => fail("expect AlterTableAlterColumn with type and comment changes") } } } @@ -1201,23 +1207,23 @@ class PlanResolutionSuite extends AnalysisTest { DSV2ResolutionTests.foreach { case (sql, isSessionCatalog) => test(s"Data source V2 relation resolution '$sql'") { val parsed = parseAndResolve(sql, withDefault = true) - val catalogIdent = if (isSessionCatalog) v2SessionCatalog else testCat + val catalog = if (isSessionCatalog) v2SessionCatalog else testCat val tableIdent = if (isSessionCatalog) "v2Table" else "tab" parsed match { - case AlterTable(_, _, r: DataSourceV2Relation, _) => - assert(r.catalog.exists(_ == catalogIdent)) - assert(r.identifier.exists(_.name() == tableIdent)) + case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) => + assert(r.catalog == catalog) + assert(r.identifier.name() == tableIdent) case Project(_, AsDataSourceV2Relation(r)) => - assert(r.catalog.exists(_ == catalogIdent)) + assert(r.catalog.exists(_ == catalog)) assert(r.identifier.exists(_.name() == tableIdent)) case AppendData(r: DataSourceV2Relation, _, _, _, _) => - assert(r.catalog.exists(_ == catalogIdent)) + assert(r.catalog.exists(_ == catalog)) assert(r.identifier.exists(_.name() == tableIdent)) case DescribeRelation(r: ResolvedTable, _, _, _) => - assert(r.catalog == catalogIdent) + assert(r.catalog == catalog) assert(r.identifier.name() == tableIdent) case ShowTableProperties(r: ResolvedTable, _, _) => - assert(r.catalog == catalogIdent) + assert(r.catalog == catalog) assert(r.identifier.name() == tableIdent) case ShowTablePropertiesCommand(t: TableIdentifier, _, _) => assert(t.identifier == tableIdent) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 34ff874b1e..8dc8ad621c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -266,7 +266,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg1 = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE") }.getMessage - assert(msg1.contains("Cannot update missing field bad_column in test.alt_table schema")) + assert(msg1.contains("Cannot update missing field bad_column in h2.test.alt_table schema")) // Update column to wrong type val msg2 = intercept[ParseException] { sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE bad_type") @@ -297,7 +297,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL") }.getMessage - assert(msg.contains("Cannot update missing field bad_column in test.alt_table")) + assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table")) } // Update column nullability in not existing table and namespace Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => @@ -321,7 +321,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'") }.getMessage - assert(msg.contains("Cannot update missing field bad_column in test.alt_table")) + assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table")) } // Update column comments in not existing table and namespace Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => @@ -376,7 +376,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") }.getMessage - assert(msg.contains("Cannot update missing field C1 in test.alt_table schema")) + assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema")) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -390,7 +390,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") }.getMessage - assert(msg.contains("Cannot update missing field C1 in test.alt_table schema")) + assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema")) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {