[SPARK-34302][SQL][FOLLOWUP] More code cleanup
### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/33113, to do some code cleanup: 1. `UnresolvedFieldPosition` doesn't need to include the field name. We can get it through "context" (`AlterTableAlterColumn.column.name`). 2. Run `ResolveAlterTableCommands` in the main resolution batch, so that the column/field resolution is also unified between v1 and v2 commands (same error message). 3. Fail immediately in `ResolveAlterTableCommands` if we can't resolve the field, instead of waiting until `CheckAnalysis`. We don't expect other rules to resolve fields in ALTER TABLE commands, so failing immediately is simpler and we can remove duplicated code in `CheckAnalysis`. ### Why are the changes needed? code simplification. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #33213 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
5f44acff3d
commit
8b46e26fc6
|
@ -50,7 +50,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column DROP NOT NULL")
|
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column DROP NOT NULL")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot update missing field bad_column"))
|
assert(msg.contains("Missing field bad_column"))
|
||||||
}
|
}
|
||||||
|
|
||||||
def testRenameColumn(tbl: String): Unit = {
|
def testRenameColumn(tbl: String): Unit = {
|
||||||
|
@ -103,8 +103,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN bad_column")
|
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN bad_column")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(
|
assert(msg.contains(s"Missing field bad_column in table $catalogName.alt_table"))
|
||||||
msg.contains(s"Cannot delete missing field bad_column in $catalogName.alt_table schema"))
|
|
||||||
}
|
}
|
||||||
// Drop a column from a not existing table
|
// Drop a column from a not existing table
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
|
@ -120,7 +119,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
|
||||||
val msg2 = intercept[AnalysisException] {
|
val msg2 = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
|
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg2.contains("Cannot update missing field bad_column"))
|
assert(msg2.contains("Missing field bad_column"))
|
||||||
}
|
}
|
||||||
// Update column type in not existing table
|
// Update column type in not existing table
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
|
|
|
@ -269,6 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
|
||||||
ResolveRelations ::
|
ResolveRelations ::
|
||||||
ResolveTables ::
|
ResolveTables ::
|
||||||
ResolvePartitionSpec ::
|
ResolvePartitionSpec ::
|
||||||
|
ResolveAlterTableCommands ::
|
||||||
AddMetadataColumns ::
|
AddMetadataColumns ::
|
||||||
DeduplicateRelations ::
|
DeduplicateRelations ::
|
||||||
ResolveReferences ::
|
ResolveReferences ::
|
||||||
|
@ -310,7 +311,6 @@ class Analyzer(override val catalogManager: CatalogManager)
|
||||||
Batch("Post-Hoc Resolution", Once,
|
Batch("Post-Hoc Resolution", Once,
|
||||||
Seq(ResolveCommandsWithIfExists) ++
|
Seq(ResolveCommandsWithIfExists) ++
|
||||||
postHocResolutionRules: _*),
|
postHocResolutionRules: _*),
|
||||||
Batch("Normalize Alter Table Commands", Once, ResolveAlterTableCommands),
|
|
||||||
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
|
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
|
||||||
Batch("Remove Unresolved Hints", Once,
|
Batch("Remove Unresolved Hints", Once,
|
||||||
new ResolveHints.RemoveAllHints),
|
new ResolveHints.RemoveAllHints),
|
||||||
|
@ -3577,34 +3577,32 @@ class Analyzer(override val catalogManager: CatalogManager)
|
||||||
*/
|
*/
|
||||||
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
|
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
|
||||||
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
|
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
|
||||||
case a: AlterTableCommand if a.table.resolved =>
|
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
|
||||||
val table = a.table.asInstanceOf[ResolvedTable]
|
val table = a.table.asInstanceOf[ResolvedTable]
|
||||||
val transformed = a.transformExpressions {
|
a.transformExpressions {
|
||||||
case u: UnresolvedFieldName =>
|
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
|
||||||
resolveFieldNames(table.schema, u.name).getOrElse(u)
|
|
||||||
case u: UnresolvedFieldPosition => u.position match {
|
|
||||||
case after: After =>
|
|
||||||
resolveFieldNames(table.schema, u.fieldName.init :+ after.column())
|
|
||||||
.map { resolved =>
|
|
||||||
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
|
|
||||||
}.getOrElse(u)
|
|
||||||
case _ => ResolvedFieldPosition(u.position)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
transformed match {
|
case a @ AlterTableAlterColumn(
|
||||||
case alter @ AlterTableAlterColumn(
|
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
|
||||||
_: ResolvedTable, ResolvedFieldName(_, field), Some(dataType), _, _, _) =>
|
val newDataType = dataType.flatMap { dt =>
|
||||||
// Hive style syntax provides the column type, even if it may not have changed.
|
// Hive style syntax provides the column type, even if it may not have changed.
|
||||||
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
|
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
|
||||||
if (dt == dataType) {
|
if (existing == dt) None else Some(dt)
|
||||||
// The user didn't want the field to change, so remove this change.
|
}
|
||||||
alter.copy(dataType = None)
|
val newPosition = position map {
|
||||||
} else {
|
case u @ UnresolvedFieldPosition(after: After) =>
|
||||||
alter
|
// TODO: since the field name is already resolved, it's more efficient if
|
||||||
}
|
// `ResolvedFieldName` carries the parent struct and we resolve column position
|
||||||
|
// based on the parent struct, instead of re-resolving the entire column path.
|
||||||
|
val resolved = resolveFieldNames(table, path :+ after.column(), u)
|
||||||
|
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
|
||||||
|
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
|
||||||
case other => other
|
case other => other
|
||||||
}
|
}
|
||||||
|
val resolved = a.copy(dataType = newDataType, position = newPosition)
|
||||||
|
resolved.copyTagsFrom(a)
|
||||||
|
resolved
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3612,11 +3610,16 @@ class Analyzer(override val catalogManager: CatalogManager)
|
||||||
* not found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
|
* not found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
|
||||||
*/
|
*/
|
||||||
private def resolveFieldNames(
|
private def resolveFieldNames(
|
||||||
schema: StructType,
|
table: ResolvedTable,
|
||||||
fieldNames: Seq[String]): Option[ResolvedFieldName] = {
|
fieldName: Seq[String],
|
||||||
val fieldOpt = schema.findNestedField(
|
context: Expression): ResolvedFieldName = {
|
||||||
fieldNames, includeCollections = true, conf.resolver)
|
table.schema.findNestedField(fieldName, includeCollections = true, conf.resolver).map {
|
||||||
fieldOpt.map { case (path, field) => ResolvedFieldName(path, field) }
|
case (path, field) => ResolvedFieldName(path, field)
|
||||||
|
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
|
||||||
|
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -450,43 +450,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
|
||||||
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
|
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
|
||||||
|
|
||||||
case alter: AlterTableCommand if alter.table.resolved =>
|
case alter: AlterTableCommand if alter.table.resolved =>
|
||||||
val table = alter.table.asInstanceOf[ResolvedTable]
|
|
||||||
def findField(fieldName: Seq[String]): StructField = {
|
|
||||||
// Include collections because structs nested in maps and arrays may be altered.
|
|
||||||
val field = table.schema.findNestedField(fieldName, includeCollections = true)
|
|
||||||
if (field.isEmpty) {
|
|
||||||
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${fieldName.quoted} " +
|
|
||||||
s"in ${table.name} schema: ${table.schema.treeString}")
|
|
||||||
}
|
|
||||||
field.get._2
|
|
||||||
}
|
|
||||||
def findParentStruct(fieldNames: Seq[String]): StructType = {
|
|
||||||
val parent = fieldNames.init
|
|
||||||
val field = if (parent.nonEmpty) {
|
|
||||||
findField(parent).dataType
|
|
||||||
} else {
|
|
||||||
table.schema
|
|
||||||
}
|
|
||||||
field match {
|
|
||||||
case s: StructType => s
|
|
||||||
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldNames.quoted}, " +
|
|
||||||
s"because its parent is not a StructType. Found $o")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
alter.transformExpressions {
|
|
||||||
case UnresolvedFieldName(name) =>
|
|
||||||
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${name.quoted} in " +
|
|
||||||
s"${table.name} schema: ${table.schema.treeString}")
|
|
||||||
case UnresolvedFieldPosition(fieldName, position: After) =>
|
|
||||||
val parent = findParentStruct(fieldName)
|
|
||||||
val allFields = parent match {
|
|
||||||
case s: StructType => s.fieldNames
|
|
||||||
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldName.quoted}, " +
|
|
||||||
s"because its parent is not a StructType. Found $o")
|
|
||||||
}
|
|
||||||
alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " +
|
|
||||||
s"${allFields.mkString("[", ", ", "]")}")
|
|
||||||
}
|
|
||||||
checkAlterTableCommand(alter)
|
checkAlterTableCommand(alter)
|
||||||
|
|
||||||
case alter: AlterTable if alter.table.resolved =>
|
case alter: AlterTable if alter.table.resolved =>
|
||||||
|
|
|
@ -110,9 +110,7 @@ sealed trait FieldPosition extends LeafExpression with Unevaluable {
|
||||||
"FieldPosition.nullable should not be called.")
|
"FieldPosition.nullable should not be called.")
|
||||||
}
|
}
|
||||||
|
|
||||||
case class UnresolvedFieldPosition(
|
case class UnresolvedFieldPosition(position: ColumnPosition) extends FieldPosition {
|
||||||
fieldName: Seq[String],
|
|
||||||
position: ColumnPosition) extends FieldPosition {
|
|
||||||
override lazy val resolved = false
|
override lazy val resolved = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3579,7 +3579,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
||||||
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
|
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
|
||||||
AlterTableRenameColumn(
|
AlterTableRenameColumn(
|
||||||
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
|
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
|
||||||
UnresolvedFieldName(ctx.from.parts.asScala.map(_.getText).toSeq),
|
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
|
||||||
ctx.to.getText)
|
ctx.to.getText)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3605,7 +3605,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
||||||
s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER",
|
s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER",
|
||||||
ctx)
|
ctx)
|
||||||
}
|
}
|
||||||
val columnNameParts = typedVisit[Seq[String]](ctx.column)
|
|
||||||
val dataType = if (action.dataType != null) {
|
val dataType = if (action.dataType != null) {
|
||||||
Some(typedVisit[DataType](action.dataType))
|
Some(typedVisit[DataType](action.dataType))
|
||||||
} else {
|
} else {
|
||||||
|
@ -3625,7 +3624,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
val position = if (action.colPosition != null) {
|
val position = if (action.colPosition != null) {
|
||||||
Some(UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](action.colPosition)))
|
Some(UnresolvedFieldPosition(typedVisit[ColumnPosition](action.colPosition)))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -3634,7 +3633,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
||||||
|
|
||||||
AlterTableAlterColumn(
|
AlterTableAlterColumn(
|
||||||
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
|
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
|
||||||
UnresolvedFieldName(columnNameParts),
|
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
|
||||||
dataType = dataType,
|
dataType = dataType,
|
||||||
nullable = nullable,
|
nullable = nullable,
|
||||||
comment = comment,
|
comment = comment,
|
||||||
|
@ -3673,7 +3672,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
||||||
nullable = None,
|
nullable = None,
|
||||||
comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
|
comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
|
||||||
position = Option(ctx.colPosition).map(
|
position = Option(ctx.colPosition).map(
|
||||||
pos => UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](pos))))
|
pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def visitHiveReplaceColumns(
|
override def visitHiveReplaceColumns(
|
||||||
|
|
|
@ -1983,4 +1983,18 @@ private[spark] object QueryCompilationErrors {
|
||||||
new AnalysisException(
|
new AnalysisException(
|
||||||
s"class $className doesn't implement interface UserDefinedAggregateFunction")
|
s"class $className doesn't implement interface UserDefinedAggregateFunction")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def missingFieldError(
|
||||||
|
fieldName: Seq[String], table: ResolvedTable, context: Expression): Throwable = {
|
||||||
|
throw new AnalysisException(
|
||||||
|
s"Missing field ${fieldName.quoted} in table ${table.name} with schema:\n" +
|
||||||
|
table.schema.treeString,
|
||||||
|
context.origin.line,
|
||||||
|
context.origin.startPosition)
|
||||||
|
}
|
||||||
|
|
||||||
|
def invalidFieldName(fieldName: Seq[String], path: Seq[String]): Throwable = {
|
||||||
|
new AnalysisException(
|
||||||
|
s"Field name ${fieldName.quoted} is invalid, ${path.quoted} is not a struct.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,7 +372,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
|
||||||
findField(struct, names, normalizedPath ++ Seq(field.name, "element"))
|
findField(struct, names, normalizedPath ++ Seq(field.name, "element"))
|
||||||
|
|
||||||
case _ =>
|
case _ =>
|
||||||
None
|
throw QueryCompilationErrors.invalidFieldName(fieldNames, normalizedPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -950,7 +950,7 @@ class DDLParserSuite extends AnalysisTest {
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), first()))))
|
Some(UnresolvedFieldPosition(first()))))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("alter table: multiple property changes are not allowed") {
|
test("alter table: multiple property changes are not allowed") {
|
||||||
|
@ -1043,7 +1043,7 @@ class DDLParserSuite extends AnalysisTest {
|
||||||
Some(IntegerType),
|
Some(IntegerType),
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), after("other_col")))))
|
Some(UnresolvedFieldPosition(after("other_col")))))
|
||||||
|
|
||||||
// renaming column not supported in hive style ALTER COLUMN.
|
// renaming column not supported in hive style ALTER COLUMN.
|
||||||
intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT",
|
intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT",
|
||||||
|
|
|
@ -176,7 +176,12 @@ ALTER TABLE test_change CHANGE invalid_col TYPE INT
|
||||||
struct<>
|
struct<>
|
||||||
-- !query output
|
-- !query output
|
||||||
org.apache.spark.sql.AnalysisException
|
org.apache.spark.sql.AnalysisException
|
||||||
Can't find column `invalid_col` given table data columns [`a`, `b`, `c`]
|
Missing field invalid_col in table spark_catalog.default.test_change with schema:
|
||||||
|
root
|
||||||
|
|-- a: integer (nullable = true)
|
||||||
|
|-- b: string (nullable = true)
|
||||||
|
|-- c: integer (nullable = true)
|
||||||
|
; line 1 pos 0
|
||||||
|
|
||||||
|
|
||||||
-- !query
|
-- !query
|
||||||
|
|
|
@ -616,8 +616,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN data TYPE string")
|
sql(s"ALTER TABLE $t ALTER COLUMN data TYPE string")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("data"))
|
assert(exc.getMessage.contains("Missing field data"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -630,8 +629,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN point.x TYPE double")
|
sql(s"ALTER TABLE $t ALTER COLUMN point.x TYPE double")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("point.x"))
|
assert(exc.getMessage.contains("Missing field point.x"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -689,7 +687,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
|
|
||||||
val e1 = intercept[AnalysisException](
|
val e1 = intercept[AnalysisException](
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"))
|
sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"))
|
||||||
assert(e1.getMessage.contains("Couldn't resolve positional argument"))
|
assert(e1.getMessage.contains("Missing field non_exist"))
|
||||||
|
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
|
sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
|
||||||
assert(getTableMetadata(tableName).schema == new StructType()
|
assert(getTableMetadata(tableName).schema == new StructType()
|
||||||
|
@ -711,7 +709,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
|
|
||||||
val e2 = intercept[AnalysisException](
|
val e2 = intercept[AnalysisException](
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"))
|
sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"))
|
||||||
assert(e2.getMessage.contains("Couldn't resolve positional argument"))
|
assert(e2.getMessage.contains("Missing field point.non_exist"))
|
||||||
|
|
||||||
// `AlterTable.resolved` checks column existence.
|
// `AlterTable.resolved` checks column existence.
|
||||||
intercept[AnalysisException](
|
intercept[AnalysisException](
|
||||||
|
@ -802,8 +800,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN data COMMENT 'doc'")
|
sql(s"ALTER TABLE $t ALTER COLUMN data COMMENT 'doc'")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("data"))
|
assert(exc.getMessage.contains("Missing field data"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,8 +813,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t ALTER COLUMN point.x COMMENT 'doc'")
|
sql(s"ALTER TABLE $t ALTER COLUMN point.x COMMENT 'doc'")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("point.x"))
|
assert(exc.getMessage.contains("Missing field point.x"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -918,8 +914,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t RENAME COLUMN data TO some_string")
|
sql(s"ALTER TABLE $t RENAME COLUMN data TO some_string")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("data"))
|
assert(exc.getMessage.contains("Missing field data"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,8 +927,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t RENAME COLUMN point.x TO z")
|
sql(s"ALTER TABLE $t RENAME COLUMN point.x TO z")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("point.x"))
|
assert(exc.getMessage.contains("Missing field point.x"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1063,8 +1057,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t DROP COLUMN data")
|
sql(s"ALTER TABLE $t DROP COLUMN data")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("data"))
|
assert(exc.getMessage.contains("Missing field data"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1077,8 +1070,7 @@ trait AlterTableTests extends SharedSparkSession {
|
||||||
sql(s"ALTER TABLE $t DROP COLUMN point.x")
|
sql(s"ALTER TABLE $t DROP COLUMN point.x")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(exc.getMessage.contains("point.x"))
|
assert(exc.getMessage.contains("Missing field point.x"))
|
||||||
assert(exc.getMessage.contains("missing field"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,7 +192,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
||||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||||
alterTableTest(
|
alterTableTest(
|
||||||
AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
|
AlterTableDropColumns(table, Seq(UnresolvedFieldName(ref))),
|
||||||
Seq("Cannot delete missing field", ref.quoted)
|
Seq("Missing field " + ref.quoted)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -201,7 +201,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
||||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||||
alterTableTest(
|
alterTableTest(
|
||||||
AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
|
AlterTableRenameColumn(table, UnresolvedFieldName(ref), "newName"),
|
||||||
Seq("Cannot rename missing field", ref.quoted)
|
Seq("Missing field " + ref.quoted)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -210,7 +210,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
||||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||||
alterTableTest(
|
alterTableTest(
|
||||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
|
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, None),
|
||||||
Seq("Cannot update missing field", ref.quoted)
|
Seq("Missing field " + ref.quoted)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
||||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||||
alterTableTest(
|
alterTableTest(
|
||||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
|
AlterTableAlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, None, None),
|
||||||
Seq("Cannot update missing field", ref.quoted)
|
Seq("Missing field " + ref.quoted)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,7 +228,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
|
||||||
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref =>
|
||||||
alterTableTest(
|
alterTableTest(
|
||||||
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
|
AlterTableAlterColumn(table, UnresolvedFieldName(ref), None, None, Some("comment"), None),
|
||||||
Seq("Cannot update missing field", ref.quoted)
|
Seq("Missing field " + ref.quoted)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,10 @@ class PlanResolutionSuite extends AnalysisTest {
|
||||||
|
|
||||||
private val v1Table: V1Table = {
|
private val v1Table: V1Table = {
|
||||||
val t = mock(classOf[CatalogTable])
|
val t = mock(classOf[CatalogTable])
|
||||||
when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
|
when(t.schema).thenReturn(new StructType()
|
||||||
|
.add("i", "int")
|
||||||
|
.add("s", "string")
|
||||||
|
.add("point", new StructType().add("x", "int").add("y", "int")))
|
||||||
when(t.tableType).thenReturn(CatalogTableType.MANAGED)
|
when(t.tableType).thenReturn(CatalogTableType.MANAGED)
|
||||||
when(t.provider).thenReturn(Some(v1Format))
|
when(t.provider).thenReturn(Some(v1Format))
|
||||||
V1Table(t)
|
V1Table(t)
|
||||||
|
@ -1089,10 +1092,9 @@ class PlanResolutionSuite extends AnalysisTest {
|
||||||
val e1 = intercept[AnalysisException] {
|
val e1 = intercept[AnalysisException] {
|
||||||
parseAndResolve(sql3)
|
parseAndResolve(sql3)
|
||||||
}
|
}
|
||||||
assert(e1.getMessage.contains(
|
assert(e1.getMessage.contains("Missing field j in table spark_catalog.default.v1Table"))
|
||||||
"ALTER COLUMN cannot find column j in v1 table. Available: i, s"))
|
|
||||||
|
|
||||||
val sql4 = s"ALTER TABLE $tblName ALTER COLUMN a.b.c TYPE bigint"
|
val sql4 = s"ALTER TABLE $tblName ALTER COLUMN point.x TYPE bigint"
|
||||||
val e2 = intercept[AnalysisException] {
|
val e2 = intercept[AnalysisException] {
|
||||||
parseAndResolve(sql4)
|
parseAndResolve(sql4)
|
||||||
}
|
}
|
||||||
|
@ -1150,14 +1152,13 @@ class PlanResolutionSuite extends AnalysisTest {
|
||||||
val e = intercept[AnalysisException] {
|
val e = intercept[AnalysisException] {
|
||||||
parseAndResolve(sql)
|
parseAndResolve(sql)
|
||||||
}
|
}
|
||||||
assert(e.getMessage.contains(
|
assert(e.getMessage.contains("Missing field I in table spark_catalog.default.v1Table"))
|
||||||
"ALTER COLUMN cannot find column I in v1 table. Available: i, s"))
|
|
||||||
} else {
|
} else {
|
||||||
val actual = parseAndResolve(sql)
|
val actual = parseAndResolve(sql)
|
||||||
val expected = AlterTableChangeColumnCommand(
|
val expected = AlterTableChangeColumnCommand(
|
||||||
TableIdentifier(tblName, Some("default")),
|
TableIdentifier(tblName, Some("default")),
|
||||||
"I",
|
"i",
|
||||||
StructField("I", IntegerType).withComment("new comment"))
|
StructField("i", IntegerType).withComment("new comment"))
|
||||||
comparePlans(actual, expected)
|
comparePlans(actual, expected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -240,7 +240,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
|
sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot delete missing field bad_column in h2.test.alt_table schema"))
|
assert(msg.contains("Missing field bad_column in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
// Drop a column to not existing table and namespace
|
// Drop a column to not existing table and namespace
|
||||||
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
||||||
|
@ -266,7 +266,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg1 = intercept[AnalysisException] {
|
val msg1 = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg1.contains("Cannot update missing field bad_column in h2.test.alt_table schema"))
|
assert(msg1.contains("Missing field bad_column in table h2.test.alt_table"))
|
||||||
// Update column to wrong type
|
// Update column to wrong type
|
||||||
val msg2 = intercept[ParseException] {
|
val msg2 = intercept[ParseException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE bad_type")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE bad_type")
|
||||||
|
@ -297,7 +297,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table"))
|
assert(msg.contains("Missing field bad_column in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
// Update column nullability in not existing table and namespace
|
// Update column nullability in not existing table and namespace
|
||||||
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
||||||
|
@ -321,7 +321,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot update missing field bad_column in h2.test.alt_table"))
|
assert(msg.contains("Missing field bad_column in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
// Update column comments in not existing table and namespace
|
// Update column comments in not existing table and namespace
|
||||||
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
|
||||||
|
@ -346,7 +346,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
|
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot rename missing field C2 in h2.test.alt_table schema"))
|
assert(msg.contains("Missing field C2 in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
|
|
||||||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
||||||
|
@ -362,7 +362,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
|
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot delete missing field C3 in h2.test.alt_table schema"))
|
assert(msg.contains("Missing field C3 in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
|
|
||||||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
||||||
|
@ -376,7 +376,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema"))
|
assert(msg.contains("Missing field C1 in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
|
|
||||||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
||||||
|
@ -390,7 +390,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
|
||||||
val msg = intercept[AnalysisException] {
|
val msg = intercept[AnalysisException] {
|
||||||
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
|
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
|
||||||
}.getMessage
|
}.getMessage
|
||||||
assert(msg.contains("Cannot update missing field C1 in h2.test.alt_table schema"))
|
assert(msg.contains("Missing field C1 in table h2.test.alt_table"))
|
||||||
}
|
}
|
||||||
|
|
||||||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
|
||||||
|
|
Loading…
Reference in a new issue