[SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate the following `ALTER TABLE ... DROP COLUMNS` command to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

### Why are the changes needed?

This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900).

### Does this PR introduce _any_ user-facing change?

After this PR, the above `ALTER TABLE ... DROP COLUMNS` commands will have a consistent resolution behavior.

### How was this patch tested?

Updated existing tests.

Closes #32854 from imback82/alter_alternative.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2021-06-24 14:59:25 +00:00 committed by Wenchen Fan
parent de35675c61
commit 5b4816cfc8
11 changed files with 101 additions and 33 deletions

View file

@ -299,6 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Post-Hoc Resolution", Once, Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++ Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*), postHocResolutionRules: _*),
Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once, Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints), new ResolveHints.RemoveAllHints),
@ -3520,6 +3521,33 @@ class Analyzer(override val catalogManager: CatalogManager)
} }
} }
/**
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table commands.
*/
object ResolveFieldNames extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableCommand if a.table.resolved =>
a.transformExpressions {
case u: UnresolvedFieldName =>
val table = a.table.asInstanceOf[ResolvedTable]
resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u)
}
}
/**
* Returns the resolved field name if the field can be resolved, returns None if the column is
* not found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Seq[String]): Option[Seq[String]] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => path :+ field.name }
}
}
/** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */ /** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */
object ResolveAlterTableChanges extends Rule[LogicalPlan] { object ResolveAlterTableChanges extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {

View file

@ -443,6 +443,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case write: V2WriteCommand if write.resolved => case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
case alter: AlterTableCommand if alter.table.resolved =>
alter.transformExpressions {
case u: UnresolvedFieldName =>
val table = alter.table.asInstanceOf[ResolvedTable]
alter.failAnalysis(
s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " +
s"schema: ${table.schema.treeString}")
}
case alter: AlterTable if alter.table.resolved => case alter: AlterTable if alter.table.resolved =>
val table = alter.table val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = { def findField(operation: String, fieldName: Array[String]): StructField = {

View file

@ -93,11 +93,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val changes = Seq(TableChange.renameColumn(col.toArray, newName)) val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes) createAlterTable(nameParts, catalog, tbl, changes)
case AlterTableDropColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
case c @ CreateTableStatement( case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema) assertNoNullTypeInSchema(c.tableSchema)

View file

@ -89,6 +89,18 @@ case class UnresolvedPartitionSpec(
override lazy val resolved = false override lazy val resolved = false
} }
sealed trait FieldName extends LeafExpression with Unevaluable {
def name: Seq[String]
override def dataType: DataType = throw new IllegalStateException(
"UnresolvedFieldName.dataType should not be called.")
override def nullable: Boolean = throw new IllegalStateException(
"UnresolvedFieldName.nullable should not be called.")
}
case class UnresolvedFieldName(name: Seq[String]) extends FieldName {
override lazy val resolved = false
}
/** /**
* Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedFunc]] during analysis. * [[ResolvedFunc]] during analysis.
@ -138,6 +150,8 @@ case class ResolvedPartitionSpec(
ident: InternalRow, ident: InternalRow,
location: Option[String] = None) extends PartitionSpec location: Option[String] = None) extends PartitionSpec
case class ResolvedFieldName(name: Seq[String]) extends FieldName
/** /**
* A plan containing resolved (temp) views. * A plan containing resolved (temp) views.
*/ */

View file

@ -3661,7 +3661,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
} }
/** /**
* Parse a [[AlterTableDropColumnsStatement]] command. * Parse a [[AlterTableDropColumns]] command.
* *
* For example: * For example:
* {{{ * {{{
@ -3672,9 +3672,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitDropTableColumns( override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) { ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumnsStatement( AlterTableDropColumns(
visitMultipartIdentifier(ctx.multipartIdentifier), createUnresolvedTable(
columnsToDrop.toSeq) ctx.multipartIdentifier,
"ALTER TABLE ... DROP COLUMNS"),
columnsToDrop.map(UnresolvedFieldName(_)).toSeq)
} }
/** /**

View file

@ -265,13 +265,6 @@ case class AlterTableRenameColumnStatement(
column: Seq[String], column: Seq[String],
newName: String) extends LeafParsedStatement newName: String) extends LeafParsedStatement
/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class AlterTableDropColumnsStatement(
tableName: Seq[String],
columnsToDrop: Seq[Seq[String]]) extends LeafParsedStatement
/** /**
* An INSERT INTO statement, as parsed from SQL. * An INSERT INTO statement, as parsed from SQL.
* *

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
@ -1098,3 +1098,29 @@ case class UnsetTableProperties(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild) copy(table = newChild)
} }
trait AlterTableCommand extends UnaryCommand {
def table: LogicalPlan
def operation: String
def changes: Seq[TableChange]
override def child: LogicalPlan = table
}
/**
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
*/
case class AlterTableDropColumns(
table: LogicalPlan,
columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
override def operation: String = "delete"
override def changes: Seq[TableChange] = {
columnsToDrop.map { col =>
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
TableChange.deleteColumn(col.name.toArray)
}
}
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale import java.util.Locale
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
@ -992,7 +992,9 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: drop column") { test("alter table: drop column") {
comparePlans( comparePlans(
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"), parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c")))) AlterTableDropColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
} }
test("alter table: drop multiple columns") { test("alter table: drop multiple columns") {
@ -1000,9 +1002,11 @@ class DDLParserSuite extends AnalysisTest {
Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop => Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
comparePlans( comparePlans(
parsePlan(drop), parsePlan(drop),
AlterTableDropColumnsStatement( AlterTableDropColumns(
Seq("table_name"), UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(Seq("x"), Seq("y"), Seq("a", "b", "c")))) Seq(UnresolvedFieldName(Seq("x")),
UnresolvedFieldName(Seq("y")),
UnresolvedFieldName(Seq("a", "b", "c")))))
} }
} }

View file

@ -157,15 +157,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
createAlterTable(nameParts, catalog, tbl, changes) createAlterTable(nameParts, catalog, tbl, changes)
} }
case AlterTableDropColumnsStatement( case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
}.getOrElse {
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
}
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) => case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false) AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)

View file

@ -437,6 +437,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val changes = keys.map(key => TableChange.removeProperty(key)) val changes = keys.map(key => TableChange.removeProperty(key))
AlterTableExec(table.catalog, table.identifier, changes) :: Nil AlterTableExec(table.catalog, table.identifier, changes) :: Nil
case a: AlterTableCommand if a.table.resolved =>
val table = a.table.asInstanceOf[ResolvedTable]
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
case _ => Nil case _ => Nil
} }
} }

View file

@ -240,7 +240,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] { val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN bad_column") sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
}.getMessage }.getMessage
assert(msg.contains("Cannot delete missing field bad_column in test.alt_table schema")) assert(msg.contains("Cannot delete missing field bad_column in h2.test.alt_table schema"))
} }
// Drop a column to not existing table and namespace // Drop a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
@ -362,7 +362,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] { val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN C3") sql(s"ALTER TABLE $tableName DROP COLUMN C3")
}.getMessage }.getMessage
assert(msg.contains("Cannot delete missing field C3 in test.alt_table schema")) assert(msg.contains("Cannot delete missing field C3 in h2.test.alt_table schema"))
} }
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {