[SPARK-33834][SQL] Verify ALTER TABLE CHANGE COLUMN with Char and Varchar

### What changes were proposed in this pull request?

Verify ALTER TABLE CHANGE COLUMN with Char and Varchar and avoid unexpected change
For v1 table, changing type is not allowed, we fix a regression that uses the replaced string instead of the original char/varchar type when altering char/varchar columns

For v2 table,
char/varchar to string,
char(x) to char(x),
char(x)/varchar(x) to varchar(y) if x <=y are valid cases,
other changes are invalid

### Why are the changes needed?

Verify ALTER TABLE CHANGE COLUMN with Char and Varchar and avoid unexpected change

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

new test

Closes #30833 from yaooqinn/SPARK-33834.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2020-12-22 03:07:26 +00:00 committed by Wenchen Fan
parent 7466031632
commit f5fd10b1bc
6 changed files with 216 additions and 8 deletions

View file

@ -3481,7 +3481,8 @@ class Analyzer(override val catalogManager: CatalogManager)
Some(typeChange)
} else {
val (fieldNames, field) = fieldOpt.get
if (field.dataType == typeChange.newDataType()) {
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == typeChange.newDataType()) {
// The user didn't want the field to change, so remove this change
None
} else {

View file

@ -523,7 +523,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
TypeUtils.failWithIntervalType(add.dataType())
colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last
case update: UpdateColumnType =>
val field = findField("update", update.fieldNames)
val field = {
val f = findField("update", update.fieldNames)
CharVarcharUtils.getRawType(f.metadata)
.map(dt => f.copy(dataType = dt))
.getOrElse(f)
}
val fieldName = update.fieldNames.quoted
update.newDataType match {
case _: StructType =>
@ -544,7 +549,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case _ =>
// update is okay
}
if (!Cast.canUpCast(field.dataType, update.newDataType)) {
// We don't need to handle nested types here which shall fail before
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, update.newDataType)) {
alter.failAnalysis(
s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to " +

View file

@ -470,18 +470,28 @@ class SessionCatalog(
/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database.
* We replace char/varchar with "annotated" string type in the table schema, as the query
* engine doesn't support char/varchar yet.
*/
@throws[NoSuchDatabaseException]
@throws[NoSuchTableException]
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val t = getTableRawMetadata(name)
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
}
/**
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database.
*/
@throws[NoSuchDatabaseException]
@throws[NoSuchTableException]
def getTableRawMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
val t = externalCatalog.getTable(db, table)
// We replace char/varchar with "annotated" string type in the table schema, as the query
// engine doesn't support char/varchar yet.
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
externalCatalog.getTable(db, table)
}
/**

View file

@ -342,7 +342,7 @@ case class AlterTableChangeColumnCommand(
// TODO: support change column name/dataType/metadata/position.
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val table = catalog.getTableRawMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)

View file

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.InMemoryPartitionTableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types._
trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
def format: String
def checkColType(f: StructField, dt: DataType): Unit = {
assert(f.dataType == CharVarcharUtils.replaceCharVarcharWithString(dt))
assert(CharVarcharUtils.getRawType(f.metadata).contains(dt))
}
test("allow to change column for char(x) to char(y), x == y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(4)")
checkColType(spark.table("t").schema(1), CharType(4))
}
}
test("not allow to change column for char(x) to char(y), x != y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
}
val v1 = e.getMessage contains "'CharType(4)' to 'c' with type 'CharType(5)'"
val v2 = e.getMessage contains "char(4) cannot be cast to char(5)"
assert(v1 || v2)
}
}
test("not allow to change column from string to char type") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c STRING) USING $format")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
}
val v1 = e.getMessage contains "'StringType' to 'c' with type 'CharType(5)'"
val v2 = e.getMessage contains "string cannot be cast to char(5)"
assert(v1 || v2)
}
}
test("not allow to change column from int to char type") {
withTable("t") {
sql(s"CREATE TABLE t(i int, c CHAR(4)) USING $format")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t CHANGE COLUMN i TYPE CHAR(5)")
}
val v1 = e.getMessage contains "'IntegerType' to 'i' with type 'CharType(5)'"
val v2 = e.getMessage contains "int cannot be cast to char(5)"
assert(v1 || v2)
}
}
test("allow to change column for varchar(x) to varchar(y), x == y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
checkColType(spark.table("t").schema(1), VarcharType(4))
}
}
test("not allow to change column for varchar(x) to varchar(y), x > y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
}
val v1 = e.getMessage contains "'VarcharType(4)' to 'c' with type 'VarcharType(3)'"
val v2 = e.getMessage contains "varchar(4) cannot be cast to varchar(3)"
assert(v1 || v2)
}
}
}
class FileSourceCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with SharedSparkSession {
override def format: String = "parquet"
override protected def sparkConf: SparkConf = {
super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
}
}
class DSV2CharVarcharDDLTestSuite extends CharVarcharDDLTestBase
with SharedSparkSession {
override def format: String = "foo"
protected override def sparkConf = {
super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryPartitionTableCatalog].getName)
.set(SQLConf.DEFAULT_CATALOG.key, "testcat")
}
test("allow to change change column from char to string type") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE STRING")
assert(spark.table("t").schema(1).dataType === StringType)
}
}
test("allow to change column from char(x) to varchar(y) type x <= y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
checkColType(spark.table("t").schema(1), VarcharType(4))
}
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(5)")
checkColType(spark.table("t").schema(1), VarcharType(5))
}
}
test("allow to change column from varchar(x) to varchar(y) type x <= y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(4)")
checkColType(spark.table("t").schema(1), VarcharType(4))
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(5)")
checkColType(spark.table("t").schema(1), VarcharType(5))
}
}
test("not allow to change column from char(x) to varchar(y) type x > y") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
}
assert(e.getMessage contains "char(4) cannot be cast to varchar(3)")
}
}
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
import org.apache.spark.sql.hive.test.TestHiveSingleton
class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSingleton {
@ -41,3 +42,26 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet
super.afterAll()
}
}
class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with TestHiveSingleton {
// The default Hive serde doesn't support nested null values.
override def format: String = "hive OPTIONS(fileFormat='parquet')"
private var originalPartitionMode = ""
override protected def beforeAll(): Unit = {
super.beforeAll()
originalPartitionMode = spark.conf.get("hive.exec.dynamic.partition.mode", "")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
}
override protected def afterAll(): Unit = {
if (originalPartitionMode == "") {
spark.conf.unset("hive.exec.dynamic.partition.mode")
} else {
spark.conf.set("hive.exec.dynamic.partition.mode", originalPartitionMode)
}
super.afterAll()
}
}