[SPARK-36380][SQL] Simplify the logical plan names for ALTER TABLE ... COLUMN
### What changes were proposed in this pull request? This a followup of the recent work such as https://github.com/apache/spark/pull/33200 For `ALTER TABLE` commands, the logical plans do not have the common `AlterTable` prefix in the name and just use names like `SetTableLocation`. This PR proposes to follow the same naming rule in `ALTER TABE ... COLUMN` commands. This PR also moves these AlterTable commands to a individual file and give them a base trait. ### Why are the changes needed? name simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes #33609 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
parent
8ca11fe39f
commit
7cb9c1c241
|
@ -269,7 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
|
|||
ResolveRelations ::
|
||||
ResolveTables ::
|
||||
ResolvePartitionSpec ::
|
||||
ResolveAlterTableColumnCommands ::
|
||||
ResolveAlterTableCommands ::
|
||||
AddMetadataColumns ::
|
||||
DeduplicateRelations ::
|
||||
ResolveReferences ::
|
||||
|
@ -3607,15 +3607,15 @@ class Analyzer(override val catalogManager: CatalogManager)
|
|||
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
|
||||
* for alter table column commands.
|
||||
*/
|
||||
object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
|
||||
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
|
||||
case a: AlterTableColumnCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
|
||||
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
|
||||
val table = a.table.asInstanceOf[ResolvedTable]
|
||||
a.transformExpressions {
|
||||
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
|
||||
}
|
||||
|
||||
case a @ AlterTableAddColumns(r: ResolvedTable, cols) if !a.resolved =>
|
||||
case a @ AddColumns(r: ResolvedTable, cols) if !a.resolved =>
|
||||
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
|
||||
// normalized parent name of fields to field names that belong to the parent.
|
||||
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
|
||||
|
@ -3668,7 +3668,7 @@ class Analyzer(override val catalogManager: CatalogManager)
|
|||
resolved.copyTagsFrom(a)
|
||||
resolved
|
||||
|
||||
case a @ AlterTableAlterColumn(
|
||||
case a @ AlterColumn(
|
||||
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
|
||||
val newDataType = dataType.flatMap { dt =>
|
||||
// Hive style syntax provides the column type, even if it may not have changed.
|
||||
|
@ -3705,7 +3705,7 @@ class Analyzer(override val catalogManager: CatalogManager)
|
|||
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context.origin))
|
||||
}
|
||||
|
||||
private def hasUnresolvedFieldName(a: AlterTableColumnCommand): Boolean = {
|
||||
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
|
||||
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -442,8 +442,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
|||
case write: V2WriteCommand if write.resolved =>
|
||||
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
|
||||
|
||||
case alter: AlterTableColumnCommand if alter.table.resolved =>
|
||||
checkAlterTableColumnCommand(alter)
|
||||
case alter: AlterTableCommand =>
|
||||
checkAlterTableCommand(alter)
|
||||
|
||||
case _ => // Falls back to the following checks
|
||||
}
|
||||
|
@ -939,7 +939,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
|||
/**
|
||||
* Validates the options used for alter table commands after table and columns are resolved.
|
||||
*/
|
||||
private def checkAlterTableColumnCommand(alter: AlterTableColumnCommand): Unit = {
|
||||
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
|
||||
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
|
||||
if (struct.findNestedField(fieldNames, includeCollections = true).isDefined) {
|
||||
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
|
||||
|
@ -948,7 +948,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
|||
}
|
||||
|
||||
alter match {
|
||||
case AlterTableAddColumns(table: ResolvedTable, colsToAdd) =>
|
||||
case AddColumns(table: ResolvedTable, colsToAdd) =>
|
||||
colsToAdd.foreach { colToAdd =>
|
||||
checkColumnNotExists("add", colToAdd.name, table.schema)
|
||||
}
|
||||
|
@ -957,10 +957,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
|||
"in the user specified columns",
|
||||
alter.conf.resolver)
|
||||
|
||||
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
|
||||
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
|
||||
checkColumnNotExists("rename", col.path :+ newName, table.schema)
|
||||
|
||||
case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
|
||||
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
|
||||
val fieldName = col.name.quoted
|
||||
if (a.dataType.isDefined) {
|
||||
val field = CharVarcharUtils.getRawType(col.field.metadata)
|
||||
|
|
|
@ -3611,7 +3611,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
*/
|
||||
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
|
||||
val colToken = if (ctx.COLUMN() != null) "COLUMN" else "COLUMNS"
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
createUnresolvedTable(ctx.multipartIdentifier, s"ALTER TABLE ... ADD $colToken"),
|
||||
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]).toSeq
|
||||
)
|
||||
|
@ -3627,7 +3627,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
*/
|
||||
override def visitRenameTableColumn(
|
||||
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
|
||||
AlterTableRenameColumn(
|
||||
RenameColumn(
|
||||
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
|
||||
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
|
||||
ctx.to.getText)
|
||||
|
@ -3681,7 +3681,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
|
||||
assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
|
||||
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
|
||||
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
|
||||
dataType = dataType,
|
||||
|
@ -3715,7 +3715,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead"))
|
||||
}
|
||||
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
createUnresolvedTable(ctx.table, s"ALTER TABLE ... CHANGE COLUMN"),
|
||||
UnresolvedFieldName(columnNameParts),
|
||||
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
|
||||
|
@ -3730,7 +3730,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
if (ctx.partitionSpec != null) {
|
||||
operationNotAllowed("ALTER TABLE table PARTITION partition_spec REPLACE COLUMNS", ctx)
|
||||
}
|
||||
AlterTableReplaceColumns(
|
||||
ReplaceColumns(
|
||||
createUnresolvedTable(ctx.multipartIdentifier, "ALTER TABLE ... REPLACE COLUMNS"),
|
||||
ctx.columns.qualifiedColTypeWithPosition.asScala.map { colType =>
|
||||
if (colType.NULL != null) {
|
||||
|
@ -3763,7 +3763,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
override def visitDropTableColumns(
|
||||
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
|
||||
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
|
||||
AlterTableDropColumns(
|
||||
DropColumns(
|
||||
createUnresolvedTable(
|
||||
ctx.multipartIdentifier,
|
||||
"ALTER TABLE ... DROP COLUMNS"),
|
||||
|
|
|
@ -0,0 +1,230 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.plans.logical
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.util.TypeUtils
|
||||
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
|
||||
import org.apache.spark.sql.errors.QueryCompilationErrors
|
||||
import org.apache.spark.sql.types.DataType
|
||||
|
||||
/**
|
||||
* The base trait for commands that need to alter a v2 table with [[TableChange]]s.
|
||||
*/
|
||||
trait AlterTableCommand extends UnaryCommand {
|
||||
def changes: Seq[TableChange]
|
||||
def table: LogicalPlan
|
||||
final override def child: LogicalPlan = table
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
|
||||
*
|
||||
* {{{
|
||||
* COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
|
||||
* }}}
|
||||
*
|
||||
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
|
||||
*/
|
||||
case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
Seq(TableChange.setProperty(TableCatalog.PROP_COMMENT, comment))
|
||||
}
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... SET LOCATION command.
|
||||
*/
|
||||
case class SetTableLocation(
|
||||
table: LogicalPlan,
|
||||
partitionSpec: Option[TablePartitionSpec],
|
||||
location: String) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
if (partitionSpec.nonEmpty) {
|
||||
throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
|
||||
}
|
||||
Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
|
||||
}
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
|
||||
*/
|
||||
case class SetTableProperties(
|
||||
table: LogicalPlan,
|
||||
properties: Map[String, String]) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
properties.map { case (key, value) =>
|
||||
TableChange.setProperty(key, value)
|
||||
}.toSeq
|
||||
}
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
|
||||
*/
|
||||
case class UnsetTableProperties(
|
||||
table: LogicalPlan,
|
||||
propertyKeys: Seq[String],
|
||||
ifExists: Boolean) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
propertyKeys.map(key => TableChange.removeProperty(key))
|
||||
}
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... ADD COLUMNS command.
|
||||
*/
|
||||
case class AddColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
|
||||
columnsToAdd.foreach { c =>
|
||||
TypeUtils.failWithIntervalType(c.dataType)
|
||||
}
|
||||
|
||||
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
|
||||
|
||||
override def changes: Seq[TableChange] = {
|
||||
columnsToAdd.map { col =>
|
||||
require(col.path.forall(_.resolved),
|
||||
"FieldName should be resolved before it's converted to TableChange.")
|
||||
require(col.position.forall(_.resolved),
|
||||
"FieldPosition should be resolved before it's converted to TableChange.")
|
||||
TableChange.addColumn(
|
||||
col.name.toArray,
|
||||
col.dataType,
|
||||
col.nullable,
|
||||
col.comment.orNull,
|
||||
col.position.map(_.position).orNull)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
|
||||
*/
|
||||
case class ReplaceColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToAdd: Seq[QualifiedColType]) extends AlterTableCommand {
|
||||
columnsToAdd.foreach { c =>
|
||||
TypeUtils.failWithIntervalType(c.dataType)
|
||||
}
|
||||
|
||||
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
|
||||
|
||||
override def changes: Seq[TableChange] = {
|
||||
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
|
||||
require(table.resolved)
|
||||
val deleteChanges = table.schema.fieldNames.map { name =>
|
||||
TableChange.deleteColumn(Array(name))
|
||||
}
|
||||
val addChanges = columnsToAdd.map { col =>
|
||||
assert(col.path.isEmpty)
|
||||
assert(col.position.isEmpty)
|
||||
TableChange.addColumn(
|
||||
col.name.toArray,
|
||||
col.dataType,
|
||||
col.nullable,
|
||||
col.comment.orNull,
|
||||
null)
|
||||
}
|
||||
deleteChanges ++ addChanges
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
|
||||
*/
|
||||
case class DropColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
columnsToDrop.map { col =>
|
||||
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
TableChange.deleteColumn(col.name.toArray)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... RENAME COLUMN command.
|
||||
*/
|
||||
case class RenameColumn(
|
||||
table: LogicalPlan,
|
||||
column: FieldName,
|
||||
newName: String) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
Seq(TableChange.renameColumn(column.name.toArray, newName))
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... ALTER COLUMN command.
|
||||
*/
|
||||
case class AlterColumn(
|
||||
table: LogicalPlan,
|
||||
column: FieldName,
|
||||
dataType: Option[DataType],
|
||||
nullable: Option[Boolean],
|
||||
comment: Option[String],
|
||||
position: Option[FieldPosition]) extends AlterTableCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
val colName = column.name.toArray
|
||||
val typeChange = dataType.map { newDataType =>
|
||||
TableChange.updateColumnType(colName, newDataType)
|
||||
}
|
||||
val nullabilityChange = nullable.map { nullable =>
|
||||
TableChange.updateColumnNullability(colName, nullable)
|
||||
}
|
||||
val commentChange = comment.map { newComment =>
|
||||
TableChange.updateColumnComment(colName, newComment)
|
||||
}
|
||||
val positionChange = position.map { newPosition =>
|
||||
require(newPosition.resolved,
|
||||
"FieldPosition should be resolved before it's converted to TableChange.")
|
||||
TableChange.updateColumnPosition(colName, newPosition.position)
|
||||
}
|
||||
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.spark.sql.catalyst.plans.logical
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, NamedRelation, PartitionSpec, UnresolvedException}
|
||||
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
|
||||
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
|
||||
import org.apache.spark.sql.catalyst.trees.BinaryLike
|
||||
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
|
||||
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
|
||||
import org.apache.spark.sql.connector.catalog._
|
||||
import org.apache.spark.sql.connector.expressions.Transform
|
||||
import org.apache.spark.sql.connector.write.Write
|
||||
|
@ -675,21 +675,6 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Unary
|
|||
copy(child = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
|
||||
*
|
||||
* {{{
|
||||
* COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
|
||||
* }}}
|
||||
*
|
||||
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
|
||||
*
|
||||
*/
|
||||
case class CommentOnTable(child: LogicalPlan, comment: String) extends UnaryCommand {
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): CommentOnTable =
|
||||
copy(child = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the REFRESH FUNCTION command.
|
||||
*/
|
||||
|
@ -1043,177 +1028,3 @@ case class UncacheTable(
|
|||
|
||||
override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... SET LOCATION command.
|
||||
*/
|
||||
case class SetTableLocation(
|
||||
table: LogicalPlan,
|
||||
partitionSpec: Option[TablePartitionSpec],
|
||||
location: String) extends UnaryCommand {
|
||||
override def child: LogicalPlan = table
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): SetTableLocation =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command.
|
||||
*/
|
||||
case class SetTableProperties(
|
||||
table: LogicalPlan,
|
||||
properties: Map[String, String]) extends UnaryCommand {
|
||||
override def child: LogicalPlan = table
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command.
|
||||
*/
|
||||
case class UnsetTableProperties(
|
||||
table: LogicalPlan,
|
||||
propertyKeys: Seq[String],
|
||||
ifExists: Boolean) extends UnaryCommand {
|
||||
override def child: LogicalPlan = table
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
trait AlterTableColumnCommand extends UnaryCommand {
|
||||
def table: LogicalPlan
|
||||
def changes: Seq[TableChange]
|
||||
override def child: LogicalPlan = table
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... ADD COLUMNS command.
|
||||
*/
|
||||
case class AlterTableAddColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
|
||||
columnsToAdd.foreach { c =>
|
||||
TypeUtils.failWithIntervalType(c.dataType)
|
||||
}
|
||||
|
||||
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
|
||||
|
||||
override def changes: Seq[TableChange] = {
|
||||
columnsToAdd.map { col =>
|
||||
require(col.path.forall(_.resolved),
|
||||
"FieldName should be resolved before it's converted to TableChange.")
|
||||
require(col.position.forall(_.resolved),
|
||||
"FieldPosition should be resolved before it's converted to TableChange.")
|
||||
TableChange.addColumn(
|
||||
col.name.toArray,
|
||||
col.dataType,
|
||||
col.nullable,
|
||||
col.comment.orNull,
|
||||
col.position.map(_.position).orNull)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... REPLACE COLUMNS command.
|
||||
*/
|
||||
case class AlterTableReplaceColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToAdd: Seq[QualifiedColType]) extends AlterTableColumnCommand {
|
||||
columnsToAdd.foreach { c =>
|
||||
TypeUtils.failWithIntervalType(c.dataType)
|
||||
}
|
||||
|
||||
override lazy val resolved: Boolean = table.resolved && columnsToAdd.forall(_.resolved)
|
||||
|
||||
override def changes: Seq[TableChange] = {
|
||||
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
|
||||
require(table.resolved)
|
||||
val deleteChanges = table.schema.fieldNames.map { name =>
|
||||
TableChange.deleteColumn(Array(name))
|
||||
}
|
||||
val addChanges = columnsToAdd.map { col =>
|
||||
assert(col.path.isEmpty)
|
||||
assert(col.position.isEmpty)
|
||||
TableChange.addColumn(
|
||||
col.name.toArray,
|
||||
col.dataType,
|
||||
col.nullable,
|
||||
col.comment.orNull,
|
||||
null)
|
||||
}
|
||||
deleteChanges ++ addChanges
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
|
||||
*/
|
||||
case class AlterTableDropColumns(
|
||||
table: LogicalPlan,
|
||||
columnsToDrop: Seq[FieldName]) extends AlterTableColumnCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
columnsToDrop.map { col =>
|
||||
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
TableChange.deleteColumn(col.name.toArray)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... RENAME COLUMN command.
|
||||
*/
|
||||
case class AlterTableRenameColumn(
|
||||
table: LogicalPlan,
|
||||
column: FieldName,
|
||||
newName: String) extends AlterTableColumnCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
Seq(TableChange.renameColumn(column.name.toArray, newName))
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
||||
/**
|
||||
* The logical plan of the ALTER TABLE ... ALTER COLUMN command.
|
||||
*/
|
||||
case class AlterTableAlterColumn(
|
||||
table: LogicalPlan,
|
||||
column: FieldName,
|
||||
dataType: Option[DataType],
|
||||
nullable: Option[Boolean],
|
||||
comment: Option[String],
|
||||
position: Option[FieldPosition]) extends AlterTableColumnCommand {
|
||||
override def changes: Seq[TableChange] = {
|
||||
require(column.resolved, "FieldName should be resolved before it's converted to TableChange.")
|
||||
val colName = column.name.toArray
|
||||
val typeChange = dataType.map { newDataType =>
|
||||
TableChange.updateColumnType(colName, newDataType)
|
||||
}
|
||||
val nullabilityChange = nullable.map { nullable =>
|
||||
TableChange.updateColumnNullability(colName, nullable)
|
||||
}
|
||||
val commentChange = comment.map { newComment =>
|
||||
TableChange.updateColumnComment(colName, newComment)
|
||||
}
|
||||
val positionChange = position.map { newPosition =>
|
||||
require(newPosition.resolved,
|
||||
"FieldPosition should be resolved before it's converted to TableChange.")
|
||||
TableChange.updateColumnPosition(colName, newPosition.position)
|
||||
}
|
||||
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
|
||||
}
|
||||
|
||||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
|
||||
copy(table = newChild)
|
||||
}
|
||||
|
|
|
@ -788,7 +788,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
|
||||
)))
|
||||
|
@ -797,7 +797,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add multiple columns") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, None, None),
|
||||
QualifiedColType(None, "y", StringType, true, None, None)
|
||||
|
@ -807,7 +807,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with COLUMNS") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
|
||||
)))
|
||||
|
@ -816,7 +816,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with COLUMNS (...)") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, None, None)
|
||||
)))
|
||||
|
@ -825,7 +825,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with COLUMNS (...) and COMMENT") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
|
||||
)))
|
||||
|
@ -834,7 +834,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add non-nullable column") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, false, None, None)
|
||||
)))
|
||||
|
@ -843,7 +843,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with COMMENT") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(None, "x", IntegerType, true, Some("doc"), None)
|
||||
)))
|
||||
|
@ -852,7 +852,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with position") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(
|
||||
None,
|
||||
|
@ -865,7 +865,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(
|
||||
None,
|
||||
|
@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add column with nested column name") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("x", "y"))), "z", IntegerType, true, Some("doc"), None)
|
||||
|
@ -890,7 +890,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: add multiple columns with nested column name") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ADD COLUMN", None),
|
||||
Seq(
|
||||
QualifiedColType(
|
||||
|
@ -930,7 +930,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: rename column") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"),
|
||||
AlterTableRenameColumn(
|
||||
RenameColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
"d"))
|
||||
|
@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: update column type using ALTER") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
Some(LongType),
|
||||
|
@ -958,7 +958,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: update column type") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
Some(LongType),
|
||||
|
@ -970,7 +970,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: update column comment") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
None,
|
||||
|
@ -982,7 +982,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: update column position") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
None,
|
||||
|
@ -1008,7 +1008,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: SET/DROP NOT NULL") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
None,
|
||||
|
@ -1018,7 +1018,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
None,
|
||||
|
@ -1030,7 +1030,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
test("alter table: drop column") {
|
||||
comparePlans(
|
||||
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
|
||||
AlterTableDropColumns(
|
||||
DropColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
|
||||
Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
|
||||
}
|
||||
|
@ -1040,7 +1040,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
|
||||
comparePlans(
|
||||
parsePlan(drop),
|
||||
AlterTableDropColumns(
|
||||
DropColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
|
||||
Seq(UnresolvedFieldName(Seq("x")),
|
||||
UnresolvedFieldName(Seq("y")),
|
||||
|
@ -1055,7 +1055,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan(sql1),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
Some(IntegerType),
|
||||
|
@ -1065,7 +1065,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan(sql2),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
Some(IntegerType),
|
||||
|
@ -1075,7 +1075,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan(sql3),
|
||||
AlterTableAlterColumn(
|
||||
AlterColumn(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN", None),
|
||||
UnresolvedFieldName(Seq("a", "b", "c")),
|
||||
Some(IntegerType),
|
||||
|
@ -1099,19 +1099,19 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan(sql1),
|
||||
AlterTableReplaceColumns(
|
||||
ReplaceColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", StringType, true, None, None))))
|
||||
|
||||
comparePlans(
|
||||
parsePlan(sql2),
|
||||
AlterTableReplaceColumns(
|
||||
ReplaceColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
|
||||
Seq(QualifiedColType(None, "x", StringType, true, Some("x1"), None))))
|
||||
|
||||
comparePlans(
|
||||
parsePlan(sql3),
|
||||
AlterTableReplaceColumns(
|
||||
ReplaceColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
|
||||
Seq(
|
||||
QualifiedColType(None, "x", StringType, true, Some("x1"), None),
|
||||
|
@ -1120,7 +1120,7 @@ class DDLParserSuite extends AnalysisTest {
|
|||
|
||||
comparePlans(
|
||||
parsePlan(sql4),
|
||||
AlterTableReplaceColumns(
|
||||
ReplaceColumns(
|
||||
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... REPLACE COLUMNS", None),
|
||||
Seq(
|
||||
QualifiedColType(None, "x", StringType, true, Some("x1"), None),
|
||||
|
|
|
@ -46,7 +46,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
|
|||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
|
||||
|
||||
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
|
||||
case AlterTableAddColumns(ResolvedV1TableIdentifier(ident), cols) =>
|
||||
case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
|
||||
cols.foreach { c =>
|
||||
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
|
||||
if (!c.nullable) {
|
||||
|
@ -55,10 +55,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
|
|||
}
|
||||
AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
|
||||
|
||||
case AlterTableReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
|
||||
case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
|
||||
throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
|
||||
|
||||
case a @ AlterTableAlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
|
||||
case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _) =>
|
||||
if (a.column.name.length > 1) {
|
||||
throw QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
|
||||
}
|
||||
|
@ -87,10 +87,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
|
|||
builder.build())
|
||||
AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
|
||||
|
||||
case AlterTableRenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
|
||||
case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
|
||||
throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
|
||||
|
||||
case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
|
||||
case DropColumns(ResolvedV1TableIdentifier(_), _) =>
|
||||
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
|
||||
|
||||
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
|
|||
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.util.toPrettySQL
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TableChange}
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
|
||||
import org.apache.spark.sql.connector.read.LocalScan
|
||||
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
|
||||
import org.apache.spark.sql.connector.write.V1Write
|
||||
|
@ -314,10 +314,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
ns,
|
||||
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
|
||||
|
||||
case CommentOnTable(ResolvedTable(catalog, identifier, _, _), comment) =>
|
||||
val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment)
|
||||
AlterTableExec(catalog, identifier, Seq(changes)) :: Nil
|
||||
|
||||
case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
|
||||
CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil
|
||||
|
||||
|
@ -424,25 +420,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
}
|
||||
UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
|
||||
|
||||
case SetTableLocation(table: ResolvedTable, partitionSpec, location) =>
|
||||
if (partitionSpec.nonEmpty) {
|
||||
throw QueryCompilationErrors.alterV2TableSetLocationWithPartitionNotSupportedError()
|
||||
}
|
||||
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
|
||||
AlterTableExec(table.catalog, table.identifier, changes) :: Nil
|
||||
|
||||
case SetTableProperties(table: ResolvedTable, props) =>
|
||||
val changes = props.map { case (key, value) =>
|
||||
TableChange.setProperty(key, value)
|
||||
}.toSeq
|
||||
AlterTableExec(table.catalog, table.identifier, changes) :: Nil
|
||||
|
||||
// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
|
||||
case UnsetTableProperties(table: ResolvedTable, keys, _) =>
|
||||
val changes = keys.map(key => TableChange.removeProperty(key))
|
||||
AlterTableExec(table.catalog, table.identifier, changes) :: Nil
|
||||
|
||||
case a: AlterTableColumnCommand if a.table.resolved =>
|
||||
case a: AlterTableCommand if a.table.resolved =>
|
||||
val table = a.table.asInstanceOf[ResolvedTable]
|
||||
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.sql.connector
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddColumns, AlterTableAlterColumn, AlterTableColumnCommand, AlterTableDropColumns, AlterTableRenameColumn, CreateTableAsSelect, LogicalPlan, QualifiedColType, ReplaceTableAsSelect}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceTableAsSelect}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.connector.catalog.Identifier
|
||||
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
|
||||
|
@ -140,7 +140,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
Seq("POINT.Z", "poInt.z", "poInt.Z").foreach { ref =>
|
||||
val field = ref.split("\\.")
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(field.init)), field.last, LongType, true, None, None))),
|
||||
|
@ -152,7 +152,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: add column resolution - positional") {
|
||||
Seq("ID", "iD").foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
None,
|
||||
|
@ -168,7 +168,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
|
||||
test("AlterTable: add column resolution - column position referencing new column") {
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
None,
|
||||
|
@ -191,7 +191,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: add column resolution - nested positional") {
|
||||
Seq("X", "Y").foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("point"))),
|
||||
|
@ -207,7 +207,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
|
||||
test("AlterTable: add column resolution - column position referencing new nested column") {
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("point"))),
|
||||
|
@ -229,7 +229,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
|
||||
test("SPARK-36372: Adding duplicate columns should not be allowed") {
|
||||
alterTableTest(
|
||||
AlterTableAddColumns(
|
||||
AddColumns(
|
||||
table,
|
||||
Seq(QualifiedColType(
|
||||
Some(UnresolvedFieldName(Seq("point"))),
|
||||
|
@ -252,7 +252,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: drop column resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
|
||||
DropColumns(table, Seq(UnresolvedFieldName(ref))),
|
||||
Seq("Missing field " + ref.quoted)
|
||||
)
|
||||
}
|
||||
|
@ -261,7 +261,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: rename column resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
|
||||
RenameColumn(table, UnresolvedFieldName(ref), "newName"),
|
||||
Seq("Missing field " + ref.quoted)
|
||||
)
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: drop column nullability resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
|
||||
AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
|
||||
Seq("Missing field " + ref.quoted)
|
||||
)
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: change column type resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
|
||||
AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
|
||||
Seq("Missing field " + ref.quoted)
|
||||
)
|
||||
}
|
||||
|
@ -288,14 +288,14 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
|||
test("AlterTable: change column comment resolution") {
|
||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||
alterTableTest(
|
||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
|
||||
AlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
|
||||
Seq("Missing field " + ref.quoted)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private def alterTableTest(
|
||||
alter: AlterTableColumnCommand,
|
||||
alter: AlterTableCommand,
|
||||
error: Seq[String],
|
||||
expectErrorOnCaseSensitive: Boolean = true): Unit = {
|
||||
Seq(true, false).foreach { caseSensitive =>
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
|
|||
import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
|
||||
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
|
||||
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable}
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.connector.FakeV2Provider
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table}
|
||||
|
@ -1132,7 +1132,7 @@ class PlanResolutionSuite extends AnalysisTest {
|
|||
"ALTER COLUMN with qualified column is only supported with v2 tables"))
|
||||
} else {
|
||||
parsed1 match {
|
||||
case AlterTableAlterColumn(
|
||||
case AlterColumn(
|
||||
_: ResolvedTable,
|
||||
column: ResolvedFieldName,
|
||||
Some(LongType),
|
||||
|
@ -1144,7 +1144,7 @@ class PlanResolutionSuite extends AnalysisTest {
|
|||
}
|
||||
|
||||
parsed2 match {
|
||||
case AlterTableAlterColumn(
|
||||
case AlterColumn(
|
||||
_: ResolvedTable,
|
||||
column: ResolvedFieldName,
|
||||
None,
|
||||
|
@ -1198,14 +1198,14 @@ class PlanResolutionSuite extends AnalysisTest {
|
|||
test("alter table: hive style change column") {
|
||||
Seq("v2Table", "testcat.tab").foreach { tblName =>
|
||||
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match {
|
||||
case AlterTableAlterColumn(
|
||||
case AlterColumn(
|
||||
_: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None) =>
|
||||
assert(comment == "an index")
|
||||
case _ => fail("expect AlterTableAlterColumn with comment change only")
|
||||
}
|
||||
|
||||
parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match {
|
||||
case AlterTableAlterColumn(
|
||||
case AlterColumn(
|
||||
_: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None) =>
|
||||
assert(comment == "an index")
|
||||
assert(dataType == LongType)
|
||||
|
@ -1241,7 +1241,7 @@ class PlanResolutionSuite extends AnalysisTest {
|
|||
val catalog = if (isSessionCatalog) v2SessionCatalog else testCat
|
||||
val tableIdent = if (isSessionCatalog) "v2Table" else "tab"
|
||||
parsed match {
|
||||
case AlterTableAlterColumn(r: ResolvedTable, _, _, _, _, _) =>
|
||||
case AlterColumn(r: ResolvedTable, _, _, _, _, _) =>
|
||||
assert(r.catalog == catalog)
|
||||
assert(r.identifier.name() == tableIdent)
|
||||
case Project(_, AsDataSourceV2Relation(r)) =>
|
||||
|
|
Loading…
Reference in a new issue