[SPARK-30535][SQL] Revert "[] Migrate ALTER TABLE commands to the new framework

### What changes were proposed in this pull request?

This reverts commit b5cb9abdd5.

### Why are the changes needed?

The merged commit (#27243) was too risky for several reasons:
 1. It doesn't fix a bug
 2. It makes the resolution of the table that's going to be altered a child. We had avoided this on purpose as having an arbitrary rule change the child of AlterTable seemed risky. This change alone is a big -1 for me for this change.
 3. While the code may look cleaner, I think this approach makes certain things harder, e.g. differentiating between the Hive based Alter table CHANGE COLUMN and ALTER COLUMN syntax. Resolving and normalizing columns for ALTER COLUMN also becomes a bit harder, as we now have to check every single AlterTable command instead of just a single ALTER TABLE ALTER COLUMN statement

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing unit tests

This closes #27315

Closes #27327 from brkyvz/revAlter.

Authored-by: Burak Yavuz <brkyvz@gmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
This commit is contained in:
Burak Yavuz 2020-01-22 22:43:46 -08:00 committed by Xiao Li
parent d2bca8ff70
commit db528e4fe1
19 changed files with 461 additions and 323 deletions

View file

@ -755,14 +755,12 @@ class Analyzer(
.map(view => i.copy(table = view))
.getOrElse(i)
case u @ UnresolvedTable(ident) =>
lookupTempView(ident)
.map(_ => UnresolvedTableWithViewExists(
ResolvedView(ident.asIdentifier, isTempView = true)))
.getOrElse(u)
lookupTempView(ident).foreach { _ =>
u.failAnalysis(s"${ident.quoted} is a temp view not table.")
}
u
case u @ UnresolvedTableOrView(ident) =>
lookupTempView(ident)
.map(_ => ResolvedView(ident.asIdentifier, isTempView = true))
.getOrElse(u)
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
}
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
@ -816,6 +814,14 @@ class Analyzer(
lookupV2Relation(u.multipartIdentifier)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)
case u: UnresolvedV2Relation =>
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}
/**
@ -882,7 +888,8 @@ class Analyzer(
case u @ UnresolvedTable(identifier) =>
lookupTableOrView(identifier).map {
case v: ResolvedView => UnresolvedTableWithViewExists(v)
case v: ResolvedView =>
u.failAnalysis(s"${v.identifier.quoted} is a view not table.")
case table => table
}.getOrElse(u)
@ -895,7 +902,7 @@ class Analyzer(
case SessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW =>
ResolvedView(ident, isTempView = false)
ResolvedView(ident)
case table =>
ResolvedTable(catalog.asTableCatalog, ident, table)
}

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -87,20 +87,6 @@ trait CheckAnalysis extends PredicateHelper {
}
def checkAnalysis(plan: LogicalPlan): Unit = {
// Analysis that needs to be performed top down can be added here.
plan.foreach {
case p if p.analyzed => // Skip already analyzed sub-plans
case alter: AlterTable =>
alter.table match {
case u @ UnresolvedTableWithViewExists(view) if !view.isTempView =>
u.failAnalysis("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
case _ =>
}
case _ => // Analysis successful!
}
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
@ -119,13 +105,23 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
case u: UnresolvedTableWithViewExists =>
val viewKind = if (u.view.isTempView) { "temp view" } else { "view" }
u.failAnalysis(s"${u.view.identifier.quoted} is a $viewKind not a table.")
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
@ -429,9 +425,8 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}
case alter: AlterTable
if alter.childrenResolved && alter.table.isInstanceOf[ResolvedTable] =>
val table = alter.table.asInstanceOf[ResolvedTable].table
case alter: AlterTable if alter.childrenResolved =>
val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = {
// include collections because structs nested in maps and arrays may be altered
val field = table.schema.findNestedField(fieldName, includeCollections = true)
@ -484,8 +479,6 @@ trait CheckAnalysis extends PredicateHelper {
throw new AnalysisException(
s"Cannot change nullable column to non-nullable: $fieldName")
}
case update: UpdateColumnPosition =>
findField("update", update.fieldNames)
case rename: RenameColumn =>
findField("rename", rename.fieldNames)
case update: UpdateColumnComment =>

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
@ -32,6 +32,71 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)
case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)
case AlterTableDropColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
case AlterTableSetPropertiesStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) =>
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
createAlterTable(nameParts, catalog, tbl, changes)
// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
case AlterTableUnsetPropertiesStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), keys, _) =>
val changes = keys.map(key => TableChange.removeProperty(key))
createAlterTable(nameParts, catalog, tbl, changes)
case AlterTableSetLocationStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) =>
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
}
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
createAlterTable(nameParts, catalog, tbl, changes)
case AlterViewSetPropertiesStatement(
NonSessionCatalogAndTable(catalog, tbl), props) =>
throw new AnalysisException(

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
/**
@ -59,6 +60,28 @@ object UnresolvedRelation {
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
}
/**
* A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation
* (`DataSourceV2Relation`), not v1 relation or temp view.
*
* @param originalNameParts the original table identifier name parts before catalog is resolved.
* @param catalog The catalog which the table should be looked up from.
* @param tableName The name of the table to look up.
*/
case class UnresolvedV2Relation(
originalNameParts: Seq[String],
catalog: TableCatalog,
tableName: Identifier)
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def name: String = originalNameParts.quoted
override def output: Seq[Attribute] = Nil
override lazy val resolved = false
}
/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, Table, TableCatalog}
/**
@ -41,16 +41,6 @@ case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode {
override def output: Seq[Attribute] = Nil
}
/**
* Holds the resolved view. It is used in a scenario where table is expected but the identifier
* is resolved to a (temp) view.
*/
case class UnresolvedTableWithViewExists(view: ResolvedView) extends LeafNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
/**
* Holds the name of a table or view that has yet to be looked up in a catalog. It will
* be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis.
@ -81,6 +71,6 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T
*/
// TODO: create a generic representation for temp view, v1 view and v2 view, after we add view
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
case class ResolvedView(identifier: Identifier, isTempView: Boolean) extends LeafNode {
case class ResolvedView(identifier: Identifier) extends LeafNode {
override def output: Seq[Attribute] = Nil
}

View file

@ -2908,7 +2908,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
* Parse a [[AlterTableAddColumns]] command.
* Parse a [[AlterTableAddColumnsStatement]] command.
*
* For example:
* {{{
@ -2917,14 +2917,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddColumns(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)),
AlterTableAddColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType])
)
}
/**
* Parse a [[AlterTableRenameColumn]] command.
* Parse a [[AlterTableRenameColumnStatement]] command.
*
* For example:
* {{{
@ -2933,14 +2933,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitRenameTableColumn(
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumn(
UnresolvedTable(visitMultipartIdentifier(ctx.table)),
AlterTableRenameColumnStatement(
visitMultipartIdentifier(ctx.table),
ctx.from.parts.asScala.map(_.getText),
ctx.to.getText)
}
/**
* Parse a [[AlterTableAlterColumn]] command.
* Parse a [[AlterTableAlterColumnStatement]] command.
*
* For example:
* {{{
@ -2957,8 +2957,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx)
}
AlterTableAlterColumn(
UnresolvedTable(visitMultipartIdentifier(ctx.table)),
AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.table),
typedVisit[Seq[String]](ctx.column),
dataType = Option(ctx.dataType).map(typedVisit[DataType]),
nullable = None,
@ -2967,7 +2967,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
* Parse a [[AlterTableAlterColumn]] command to change column nullability.
* Parse a [[AlterTableAlterColumnStatement]] command to change column nullability.
*
* For example:
* {{{
@ -2981,8 +2981,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case SqlBaseParser.SET => false
case SqlBaseParser.DROP => true
}
AlterTableAlterColumn(
UnresolvedTable(visitMultipartIdentifier(ctx.table)),
AlterTableAlterColumnStatement(
visitMultipartIdentifier(ctx.table),
typedVisit[Seq[String]](ctx.column),
dataType = None,
nullable = Some(nullable),
@ -2992,7 +2992,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
* Parse a [[AlterTableAlterColumn]] command. This is Hive SQL syntax.
* Parse a [[AlterTableAlterColumnStatement]] command. This is Hive SQL syntax.
*
* For example:
* {{{
@ -3015,8 +3015,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
"please run ALTER COLUMN ... SET/DROP NOT NULL instead.")
}
AlterTableAlterColumn(
UnresolvedTable(typedVisit[Seq[String]](ctx.table)),
AlterTableAlterColumnStatement(
typedVisit[Seq[String]](ctx.table),
columnNameParts,
dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
nullable = None,
@ -3025,7 +3025,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
* Parse a [[AlterTableDropColumns]] command.
* Parse a [[AlterTableDropColumnsStatement]] command.
*
* For example:
* {{{
@ -3036,13 +3036,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumns(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)),
AlterTableDropColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
columnsToDrop)
}
/**
* Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetProperties]] commands.
* Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands.
*
* For example:
* {{{
@ -3058,12 +3058,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
if (ctx.VIEW != null) {
AlterViewSetPropertiesStatement(identifier, cleanedTableProperties)
} else {
AlterTableSetProperties(UnresolvedTable(identifier), cleanedTableProperties)
AlterTableSetPropertiesStatement(identifier, cleanedTableProperties)
}
}
/**
* Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetProperties]] commands.
* Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands.
*
* For example:
* {{{
@ -3081,12 +3081,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
if (ctx.VIEW != null) {
AlterViewUnsetPropertiesStatement(identifier, cleanedProperties, ifExists)
} else {
AlterTableUnsetProperties(UnresolvedTable(identifier), cleanedProperties, ifExists)
AlterTableUnsetPropertiesStatement(identifier, cleanedProperties, ifExists)
}
}
/**
* Create an [[AlterTableSetLocation]] command.
* Create an [[AlterTableSetLocationStatement]] command.
*
* For example:
* {{{
@ -3094,8 +3094,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
AlterTableSetLocation(
UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)),
AlterTableSetLocationStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
visitLocationSpec(ctx.locationSpec))
}

View file

@ -149,6 +149,62 @@ case class QualifiedColType(
comment: Option[String],
position: Option[ColumnPosition])
/**
* ALTER TABLE ... ADD COLUMNS command, as parsed from SQL.
*/
case class AlterTableAddColumnsStatement(
tableName: Seq[String],
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement
/**
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
*/
case class AlterTableAlterColumnStatement(
tableName: Seq[String],
column: Seq[String],
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[ColumnPosition]) extends ParsedStatement
/**
* ALTER TABLE ... RENAME COLUMN command, as parsed from SQL.
*/
case class AlterTableRenameColumnStatement(
tableName: Seq[String],
column: Seq[String],
newName: String) extends ParsedStatement
/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class AlterTableDropColumnsStatement(
tableName: Seq[String],
columnsToDrop: Seq[Seq[String]]) extends ParsedStatement
/**
* ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL.
*/
case class AlterTableSetPropertiesStatement(
tableName: Seq[String],
properties: Map[String, String]) extends ParsedStatement
/**
* ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL.
*/
case class AlterTableUnsetPropertiesStatement(
tableName: Seq[String],
propertyKeys: Seq[String],
ifExists: Boolean) extends ParsedStatement
/**
* ALTER TABLE ... SET LOCATION command, as parsed from SQL.
*/
case class AlterTableSetLocationStatement(
tableName: Seq[String],
partitionSpec: Option[TablePartitionSpec],
location: String) extends ParsedStatement
/**
* ALTER TABLE ... RECOVER PARTITIONS command, as parsed from SQL.
*/

View file

@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}
@ -384,125 +384,37 @@ case class DropTable(
ifExists: Boolean) extends Command
/**
* The base class for ALTER TABLE commands that work for v2 tables.
* The logical plan of the ALTER TABLE command that works for v2 tables.
*/
abstract class AlterTable extends Command {
def table: LogicalPlan
case class AlterTable(
catalog: TableCatalog,
ident: Identifier,
table: NamedRelation,
changes: Seq[TableChange]) extends Command {
def changes: Seq[TableChange]
override lazy val resolved: Boolean = table.resolved && {
changes.forall {
case add: AddColumn =>
add.fieldNames match {
case Array(_) =>
// a top-level field can always be added
true
case _ =>
// the parent field must exist
table.schema.findNestedField(add.fieldNames.init, includeCollections = true).isDefined
}
override def children: Seq[LogicalPlan] = Seq(table)
case colChange: ColumnChange =>
// the column that will be changed must exist
table.schema.findNestedField(colChange.fieldNames, includeCollections = true).isDefined
override lazy val resolved: Boolean = table.resolved
}
/**
* The logical plan of the ALTER TABLE ... ADD COLUMNS command that works for v2 tables.
*/
case class AlterTableAddColumns(
table: LogicalPlan,
columnsToAdd: Seq[QualifiedColType]) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
columnsToAdd.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
case _ =>
// property changes require no resolution checks
true
}
}
}
/**
* The logical plan of the ALTER TABLE ... CHANGE COLUMN command that works for v2 tables.
*/
case class AlterTableAlterColumn(
table: LogicalPlan,
column: Seq[String],
dataType: Option[DataType],
nullable: Option[Boolean],
comment: Option[String],
position: Option[ColumnPosition]) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
val colName = column.toArray
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
}
}
/**
* The logical plan of the ALTER TABLE ... RENAME COLUMN command that works for v2 tables.
*/
case class AlterTableRenameColumn(
table: LogicalPlan,
column: Seq[String],
newName: String) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
Seq(TableChange.renameColumn(column.toArray, newName))
}
}
/**
* The logical plan of the ALTER TABLE ... DROP COLUMNS command that works for v2 tables.
*/
case class AlterTableDropColumns(
table: LogicalPlan,
columnsToDrop: Seq[Seq[String]]) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
columnsToDrop.map(col => TableChange.deleteColumn(col.toArray))
}
}
/**
* The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command that works for v2 tables.
*/
case class AlterTableSetProperties(
table: LogicalPlan,
properties: Map[String, String]) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
properties.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
}
}
/**
* The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command that works for v2 tables.
*/
// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag.
case class AlterTableUnsetProperties(
table: LogicalPlan,
propertyKeys: Seq[String],
ifExists: Boolean) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
propertyKeys.map(key => TableChange.removeProperty(key))
}
}
/**
* The logical plan of the ALTER TABLE ... SET LOCATION command that works for v2 tables.
*/
case class AlterTableSetLocation(
table: LogicalPlan,
partitionSpec: Option[TablePartitionSpec],
location: String) extends AlterTable {
override lazy val changes: Seq[TableChange] = {
Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location))
}
}
/**
* The logical plan of the ALTER TABLE RENAME command that works for v2 tables.
*/

View file

@ -22,7 +22,8 @@ import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@ -280,6 +281,17 @@ private[sql] object CatalogV2Util {
properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
}
def createAlterTable(
originalNameParts: Seq[String],
catalog: CatalogPlugin,
tableName: Seq[String],
changes: Seq[TableChange]): AlterTable = {
val tableCatalog = catalog.asTableCatalog
val ident = tableName.asIdentifier
val unresolved = UnresolvedV2Relation(originalNameParts, tableCatalog, ident)
AlterTable(tableCatalog, ident, unresolved, changes)
}
def getTableProviderCatalog(
provider: SupportsCatalogOptions,
catalogManager: CatalogManager,

View file

@ -476,22 +476,22 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql1_table),
AlterTableSetProperties(
UnresolvedTable(Seq("table_name")), Map("test" -> "test", "comment" -> "new_comment")))
AlterTableSetPropertiesStatement(
Seq("table_name"), Map("test" -> "test", "comment" -> "new_comment")))
comparePlans(
parsePlan(sql2_table),
AlterTableUnsetProperties(
UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = false))
AlterTableUnsetPropertiesStatement(
Seq("table_name"), Seq("comment", "test"), ifExists = false))
comparePlans(
parsePlan(sql3_table),
AlterTableUnsetProperties(
UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = true))
AlterTableUnsetPropertiesStatement(
Seq("table_name"), Seq("comment", "test"), ifExists = true))
}
test("alter table: add column") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, None)
)))
}
@ -499,7 +499,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add multiple columns") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, None),
QualifiedColType(Seq("y"), StringType, true, None, None)
)))
@ -508,7 +508,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, None)
)))
}
@ -516,7 +516,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS (...)") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, None)
)))
}
@ -524,7 +524,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COLUMNS (...) and COMMENT") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None)
)))
}
@ -532,7 +532,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add non-nullable column") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, false, None, None)
)))
}
@ -540,7 +540,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with COMMENT") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None)
)))
}
@ -548,13 +548,13 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with position") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, Some(first()))
)))
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x"), IntegerType, true, None, Some(after("y")))
)))
}
@ -562,7 +562,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add column with nested column name") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None)
)))
}
@ -570,7 +570,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: add multiple columns with nested column name") {
comparePlans(
parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq(
AlterTableAddColumnsStatement(Seq("table_name"), Seq(
QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None),
QualifiedColType(Seq("a", "b"), StringType, true, None, Some(first()))
)))
@ -579,12 +579,12 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: set location") {
comparePlans(
parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"),
AlterTableSetLocation(UnresolvedTable(Seq("a", "b", "c")), None, "new location"))
AlterTableSetLocationStatement(Seq("a", "b", "c"), None, "new location"))
comparePlans(
parsePlan("ALTER TABLE a.b.c PARTITION(ds='2017-06-10') SET LOCATION 'new location'"),
AlterTableSetLocation(
UnresolvedTable(Seq("a", "b", "c")),
AlterTableSetLocationStatement(
Seq("a", "b", "c"),
Some(Map("ds" -> "2017-06-10")),
"new location"))
}
@ -592,8 +592,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: rename column") {
comparePlans(
parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"),
AlterTableRenameColumn(
UnresolvedTable(Seq("table_name")),
AlterTableRenameColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
"d"))
}
@ -601,8 +601,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type using ALTER") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
None,
@ -613,8 +613,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column type") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
None,
@ -625,8 +625,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column comment") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
None,
None,
@ -637,8 +637,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: update column position") {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
None,
None,
@ -650,8 +650,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " +
"TYPE bigint COMMENT 'new comment' AFTER d"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(LongType),
None,
@ -662,8 +662,8 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: SET/DROP NOT NULL") {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
None,
Some(false),
@ -672,8 +672,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
None,
Some(true),
@ -684,7 +684,7 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: drop column") {
comparePlans(
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
AlterTableDropColumns(UnresolvedTable(Seq("table_name")), Seq(Seq("a", "b", "c"))))
AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c"))))
}
test("alter table: drop multiple columns") {
@ -692,8 +692,8 @@ class DDLParserSuite extends AnalysisTest {
Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
comparePlans(
parsePlan(drop),
AlterTableDropColumns(
UnresolvedTable(Seq("table_name")),
AlterTableDropColumnsStatement(
Seq("table_name"),
Seq(Seq("x"), Seq("y"), Seq("a", "b", "c"))))
}
}
@ -705,8 +705,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql1),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(IntegerType),
None,
@ -715,8 +715,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql2),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(IntegerType),
None,
@ -725,8 +725,8 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql3),
AlterTableAlterColumn(
UnresolvedTable(Seq("table_name")),
AlterTableAlterColumnStatement(
Seq("table_name"),
Seq("a", "b", "c"),
Some(IntegerType),
None,

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
@ -47,63 +47,141 @@ class ResolveSessionCatalog(
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumns(ResolvedTable(_, ident, _: V1Table), cols) =>
cols.foreach { c =>
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
if (!c.nullable) {
throw new AnalysisException(
"ADD COLUMN with v1 tables cannot specify NOT NULL.")
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
cols.foreach { c =>
assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
if (!c.nullable) {
throw new AnalysisException(
"ADD COLUMN with v1 tables cannot specify NOT NULL.")
}
}
AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
}.getOrElse {
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)
}
AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
case a @ AlterTableAlterColumn(ResolvedTable(_, ident, _: V1Table), _, _, _, _, _) =>
if (a.column.length > 1) {
throw new AnalysisException(
"ALTER COLUMN with qualified column is only supported with v2 tables.")
case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (a.column.length > 1) {
throw new AnalysisException(
"ALTER COLUMN with qualified column is only supported with v2 tables.")
}
if (a.dataType.isEmpty) {
throw new AnalysisException(
"ALTER COLUMN with v1 tables must specify new data type.")
}
if (a.nullable.isDefined) {
throw new AnalysisException(
"ALTER COLUMN with v1 tables cannot specify NOT NULL.")
}
if (a.position.isDefined) {
throw new AnalysisException("" +
"ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
}
val builder = new MetadataBuilder
// Add comment to metadata
a.comment.map(c => builder.putString("comment", c))
// Add Hive type string to metadata.
val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get)
if (a.dataType.get != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString)
}
val newColumn = StructField(
a.column(0),
cleanedDataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(tbl.asTableIdentifier, a.column(0), newColumn)
}.getOrElse {
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
}
if (a.dataType.isEmpty) {
throw new AnalysisException(
"ALTER COLUMN with v1 tables must specify new data type.")
case AlterTableRenameColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
}.getOrElse {
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)
}
if (a.nullable.isDefined) {
throw new AnalysisException(
"ALTER COLUMN with v1 tables cannot specify NOT NULL.")
case AlterTableDropColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw new AnalysisException("DROP COLUMN is only supported with v2 tables.")
}.getOrElse {
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
}
if (a.position.isDefined) {
throw new AnalysisException("" +
"ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
case AlterTableSetPropertiesStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), props) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableSetPropertiesCommand(tbl.asTableIdentifier, props, isView = false)
}.getOrElse {
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}.toSeq
createAlterTable(nameParts, catalog, tbl, changes)
}
val builder = new MetadataBuilder
// Add comment to metadata
a.comment.map(c => builder.putString("comment", c))
// Add Hive type string to metadata.
val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get)
if (a.dataType.get != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString)
case AlterTableUnsetPropertiesStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), keys, ifExists) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableUnsetPropertiesCommand(
tbl.asTableIdentifier, keys, ifExists, isView = false)
}.getOrElse {
val changes = keys.map(key => TableChange.removeProperty(key))
createAlterTable(nameParts, catalog, tbl, changes)
}
val newColumn = StructField(
a.column(0),
cleanedDataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(ident.asTableIdentifier, a.column(0), newColumn)
case AlterTableRenameColumn(ResolvedTable(_, _, _: V1Table), _, _) =>
throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
case AlterTableDropColumns(ResolvedTable(_, _, _: V1Table), _) =>
throw new AnalysisException("DROP COLUMN is only supported with v2 tables.")
case AlterTableSetProperties(ResolvedTable(_, ident, _: V1Table), props) =>
AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)
case AlterTableUnsetProperties(ResolvedTable(_, ident, _: V1Table), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = false)
case AlterTableSetLocation(
ResolvedTable(_, ident, _: V1Table), partitionSpec, newLoc) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, newLoc)
case AlterTableSetLocationStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
AlterTableSetLocationCommand(tbl.asTableIdentifier, partitionSpec, newLoc)
}.getOrElse {
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
}
val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc))
createAlterTable(nameParts, catalog, tbl, changes)
}
// ALTER VIEW should always use v1 command if the resolved catalog is session catalog.
case AlterViewSetPropertiesStatement(SessionCatalogAndTable(_, tbl), props) =>
@ -140,7 +218,7 @@ class ResolveSessionCatalog(
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(ResolvedView(ident, _), partitionSpec, isExtended) =>
case DescribeRelation(ResolvedView(ident), partitionSpec, isExtended) =>
DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended)
case DescribeColumnStatement(

View file

@ -257,6 +257,14 @@ case class AlterTableAddColumnsCommand(
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
if (catalogTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support views.
|You must drop and re-create the views for adding the new columns. Views: $table
""".stripMargin)
}
if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get, conf).
getConstructor().newInstance() match {

View file

@ -241,18 +241,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DropTable(catalog, ident, ifExists) =>
DropTableExec(catalog, ident, ifExists) :: Nil
case a @ AlterTableSetLocation(r: ResolvedTable, partitionSpec, _) =>
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
"ALTER TABLE SET LOCATION does not support partition for v2 tables.")
}
AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil
case a: AlterTable =>
a.table match {
case r: ResolvedTable => AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil
case _ => Nil
}
case AlterTable(catalog, ident, _, changes) =>
AlterTableExec(catalog, ident, changes) :: Nil
case RenameTable(catalog, oldIdent, newIdent) =>
RenameTableExec(catalog, oldIdent, newIdent) :: Nil

View file

@ -195,7 +195,7 @@ ALTER TABLE temp_view CHANGE a TYPE INT COMMENT 'this is column a'
struct<>
-- !query 20 output
org.apache.spark.sql.AnalysisException
temp_view is a temp view not a table.; line 1 pos 0
Invalid command: 'temp_view' is a view not a table.; line 1 pos 0
-- !query 21
@ -212,7 +212,7 @@ ALTER TABLE global_temp.global_temp_view CHANGE a TYPE INT COMMENT 'this is colu
struct<>
-- !query 22 output
org.apache.spark.sql.AnalysisException
global_temp.global_temp_view is a temp view not a table.; line 1 pos 0
Invalid command: 'global_temp.global_temp_view' is a view not a table.; line 1 pos 0
-- !query 23

View file

@ -2201,7 +2201,7 @@ class DataSourceV2SQLSuite
withTempView("v") {
sql("create global temp view v as select 1")
val e = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL"))
assert(e.getMessage.contains("global_temp.v is a temp view not a table."))
assert(e.getMessage.contains("global_temp.v is a temp view not table."))
}
}

View file

@ -145,13 +145,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
// For v2 ALTER TABLE statements, we have better error message saying view is not supported.
assertAnalysisError(
s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'",
s"$viewName is a temp view not a table")
s"'$viewName' is a view not a table")
// For the following v2 ALERT TABLE statements, relations are first resolved before
// unsupported operations are checked.
// For the following v2 ALERT TABLE statements, unsupported operations are checked first
// before resolving the relations.
assertAnalysisError(
s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'",
s"$viewName is a temp view not a table")
"ALTER TABLE SET LOCATION does not support partition for v2 tables")
}
}

View file

@ -2779,7 +2779,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
}
assert(e.message.contains("tmp_v is a temp view not a table"))
assert(e.message.contains("'tmp_v' is a view not a table"))
}
}
@ -2789,8 +2789,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val e = intercept[AnalysisException] {
sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
}
assert(e.message.contains(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead"))
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
}
}

View file

@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, CTESubstitution, EmptyFunctionRegistry, NoSuchTableException, ResolveCatalogs, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, StringLiteral}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@ -140,7 +140,6 @@ class PlanResolutionSuite extends AnalysisTest {
val rules = Seq(
CTESubstitution,
analyzer.ResolveRelations,
analyzer.ResolveTables,
new ResolveCatalogs(catalogManager),
new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v")),
analyzer.ResolveTables,
@ -724,24 +723,24 @@ class PlanResolutionSuite extends AnalysisTest {
comparePlans(parsed3, expected3)
} else {
parsed1 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.setProperty("test", "test"),
TableChange.setProperty("comment", "new_comment")))
case _ => fail("expect AlterTable")
}
parsed2 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.removeProperty("comment"),
TableChange.removeProperty("test")))
case _ => fail("expect AlterTable")
}
parsed3 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.removeProperty("comment"),
TableChange.removeProperty("test")))
case _ => fail("expect AlterTable")
@ -754,9 +753,15 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed4 = parseAndResolve(sql4)
val parsed5 = parseAndResolve(sql5)
// For non-existing tables, we expect `UnresolvedTable` in the resolved plan.
assert(parsed4.collect{ case u: UnresolvedTable => u }.length == 1)
assert(parsed5.collect{ case u: UnresolvedTable => u }.length == 1)
// For non-existing tables, we convert it to v2 command with `UnresolvedV2Table`
parsed4 match {
case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK
case _ => fail("Expect AlterTable, but got:\n" + parsed4.treeString)
}
parsed5 match {
case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK
case _ => fail("Expect AlterTable, but got:\n" + parsed5.treeString)
}
}
test("support for other types in TBLPROPERTIES") {
@ -777,8 +782,8 @@ class PlanResolutionSuite extends AnalysisTest {
comparePlans(parsed, expected)
} else {
parsed match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.setProperty("a", "1"),
TableChange.setProperty("b", "0.1"),
TableChange.setProperty("c", "true")))
@ -801,8 +806,8 @@ class PlanResolutionSuite extends AnalysisTest {
comparePlans(parsed, expected)
} else {
parsed match {
case a: AlterTable =>
assert(a.changes == Seq(TableChange.setProperty("location", "new location")))
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(TableChange.setProperty("location", "new location")))
case _ => fail("Expect AlterTable, but got:\n" + parsed.treeString)
}
}
@ -1043,23 +1048,23 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed3 = parseAndResolve(sql3)
parsed1 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.updateColumnType(Array("i"), LongType)))
case _ => fail("expect AlterTable")
}
parsed2 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.updateColumnType(Array("i"), LongType),
TableChange.updateColumnComment(Array("i"), "new comment")))
case _ => fail("expect AlterTable")
}
parsed3 match {
case a: AlterTable =>
assert(a.changes == Seq(
case AlterTable(_, _, _: DataSourceV2Relation, changes) =>
assert(changes == Seq(
TableChange.updateColumnComment(Array("i"), "new comment")))
case _ => fail("expect AlterTable")
}

View file

@ -158,7 +158,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
val message = intercept[AnalysisException] {
sql("SHOW TBLPROPERTIES parquet_temp")
}.getMessage
assert(message.contains("parquet_temp is a temp view not a table"))
assert(message.contains("parquet_temp is a temp view not table"))
}
}