[SPARK-19261][SQL] Alter add columns for Hive serde and some datasource tables

## What changes were proposed in this pull request?
Support` ALTER TABLE ADD COLUMNS (...) `syntax for Hive serde and some datasource tables.
In this PR, we consider a few aspects:

1. View is not supported for `ALTER ADD COLUMNS`

2. Since tables created in SparkSQL with Hive DDL syntax will populate table properties with schema information, we need make sure the consistency of the schema before and after ALTER operation in order for future use.

3. For embedded-schema type of format, such as `parquet`, we need to make sure that the predicate on the newly-added columns can be evaluated properly, or pushed down properly. In case of the data file does not have the columns for the newly-added columns, such predicates should return as if the column values are NULLs.

4. For datasource table, this feature does not support the following:
4.1 TEXT format, since there is only one default column `value` is inferred for text format data.
4.2 ORC format, since SparkSQL native ORC reader does not support the difference between user-specified-schema and inferred schema from ORC files.
4.3 Third party datasource types that implements RelationProvider, including the built-in JDBC format, since different implementations by the vendors may have different ways to dealing with schema.
4.4 Other datasource types, such as `parquet`, `json`, `csv`, `hive` are supported.

5. Column names being added can not be duplicate of any existing data column or partition column names. Case sensitivity is taken into consideration according to the sql configuration.

6. This feature also supports In-Memory catalog, while Hive support is turned off.
## How was this patch tested?
Add new test cases

Author: Xin Wu <xinwu@us.ibm.com>

Closes #16626 from xwu0226/alter_add_columns.
This commit is contained in:
Xin Wu 2017-03-21 08:49:54 -07:00 committed by Xiao Li
parent 63f077fbe5
commit 4c0ff5f585
8 changed files with 400 additions and 10 deletions

View file

@ -85,6 +85,8 @@ statement
LIKE source=tableIdentifier locationSpec? #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq)? #analyze
| ALTER TABLE tableIdentifier
ADD COLUMNS '(' columns=colTypeList ')' #addTableColumns
| ALTER (TABLE | VIEW) from=tableIdentifier
RENAME TO to=tableIdentifier #renameTable
| ALTER (TABLE | VIEW) tableIdentifier
@ -198,7 +200,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
| kw1=COMMIT

View file

@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StructField, StructType}
object SessionCatalog {
val DEFAULT_DATABASE = "default"
@ -161,6 +162,20 @@ class SessionCatalog(
throw new TableAlreadyExistsException(db = db, table = name.table)
}
}
private def checkDuplication(fields: Seq[StructField]): Unit = {
val columnNames = if (conf.caseSensitiveAnalysis) {
fields.map(_.name)
} else {
fields.map(_.name.toLowerCase)
}
if (columnNames.distinct.length != columnNames.length) {
val duplicateColumns = columnNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
}
}
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@ -295,6 +310,47 @@ class SessionCatalog(
externalCatalog.alterTable(newTableDefinition)
}
/**
* Alter the schema of a table identified by the provided table identifier. The new schema
* should still contain the existing bucket columns and partition columns used by the table. This
* method will also update any Spark SQL-related parameters stored as Hive table properties (such
* as the schema itself).
*
* @param identifier TableIdentifier
* @param newSchema Updated schema to be used for the table (must contain existing partition and
* bucket columns, and partition columns need to be at the end)
*/
def alterTableSchema(
identifier: TableIdentifier,
newSchema: StructType): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
requireDbExists(db)
requireTableExists(tableIdentifier)
checkDuplication(newSchema)
val catalogTable = externalCatalog.getTable(db, table)
val oldSchema = catalogTable.schema
// not supporting dropping columns yet
val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _))
if (nonExistentColumnNames.nonEmpty) {
throw new AnalysisException(
s"""
|Some existing schema fields (${nonExistentColumnNames.mkString("[", ",", "]")}) are
|not present in the new schema. We don't support dropping columns yet.
""".stripMargin)
}
// assuming the newSchema has all partition columns at the end as required
externalCatalog.alterTableSchema(db, table, newSchema)
}
private def columnNameResolved(schema: StructType, colName: String): Boolean = {
schema.fields.map(_.name).exists(conf.resolver(_, colName))
}
/**
* Return whether a table/view with the specified name exists. If no database is specified, check
* with current database.

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.types._
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
protected val utils = new CatalogTestUtils {
@ -448,6 +449,34 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
test("alter table add columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
sessionCatalog.alterTableSchema(
TableIdentifier("t1", Some("default")),
StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema))
val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
// construct the expected table schema
val expectedTableSchema = StructType(oldTab.dataSchema.fields ++
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
assert(newTab.schema == expectedTableSchema)
}
}
test("alter table drop columns") {
withBasicCatalog { sessionCatalog =>
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
val e = intercept[AnalysisException] {
sessionCatalog.alterTableSchema(
TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1)))
}.getMessage
assert(e.contains("We don't support dropping columns yet."))
}
}
test("get table") {
withBasicCatalog { catalog =>
assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))

View file

@ -741,6 +741,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.VIEW != null)
}
/**
* Create a [[AlterTableAddColumnsCommand]] command.
*
* For example:
* {{{
* ALTER TABLE table1
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) {
AlterTableAddColumnsCommand(
visitTableIdentifier(ctx.tableIdentifier),
visitColTypeList(ctx.columns)
)
}
/**
* Create an [[AlterTableSetPropertiesCommand]] command.
*

View file

@ -37,7 +37,10 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -174,6 +177,77 @@ case class AlterTableRenameCommand(
}
/**
* A command that add columns to a table
* The syntax of using this command in SQL is:
* {{{
* ALTER TABLE table_identifier
* ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
* }}}
*/
case class AlterTableAddColumnsCommand(
table: TableIdentifier,
columns: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)
try {
sparkSession.catalog.uncacheTable(table.quotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e)
}
catalog.refreshTable(table)
// make sure any partition columns are at the end of the fields
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
catalog.alterTableSchema(
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
Seq.empty[Row]
}
/**
* ALTER TABLE ADD COLUMNS command does not support temporary view/table,
* view, or datasource table with text, orc formats or external provider.
* For datasource table, it currently only supports parquet, json, csv.
*/
private def verifyAlterTableAddColumn(
catalog: SessionCatalog,
table: TableIdentifier): CatalogTable = {
val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
if (catalogTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support views.
|You must drop and re-create the views for adding the new columns. Views: $table
""".stripMargin)
}
if (DDLUtils.isDatasourceTable(catalogTable)) {
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
// For datasource table, this command can only support the following File format.
// TextFileFormat only default to one column "value"
// OrcFileFormat can not handle difference between user-specified schema and
// inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
// Hive type is already considered as hive serde table, so the logic will not
// come in here.
case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
case s =>
throw new AnalysisException(
s"""
|ALTER ADD COLUMNS does not support datasource table with type $s.
|You must drop and re-create the table for adding the new columns. Tables: $table
""".stripMargin)
}
}
catalogTable
}
}
/**
* A command that loads data into a Hive table.
*

View file

@ -780,13 +780,7 @@ class DDLCommandSuite extends PlanTest {
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}
test("alter table: add/replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
|ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG
|COMMENT 'test_comment2') CASCADE
""".stripMargin)
test("alter table: replace columns (not allowed)") {
assertUnsupported(
"""
|ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT

View file

@ -2185,4 +2185,126 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
test(s"alter datasource table add columns - $provider") {
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
sql("INSERT INTO t1 VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
checkAnswer(
spark.table("t1"),
Seq(Row(1, null))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 is null"),
Seq(Row(1, null))
)
sql("INSERT INTO t1 VALUES (3, 2)")
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 = 2"),
Seq(Row(3, 2))
)
}
}
}
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
test(s"alter datasource table add columns - partitioned - $provider") {
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
checkAnswer(
spark.table("t1"),
Seq(Row(1, null, 2))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c3 is null"),
Seq(Row(1, null, 2))
)
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
checkAnswer(
sql("SELECT * FROM t1 WHERE c3 = 3"),
Seq(Row(2, 3, 1))
)
checkAnswer(
sql("SELECT * FROM t1 WHERE c2 = 1"),
Seq(Row(2, 3, 1))
)
}
}
}
test("alter datasource table add columns - text format not supported") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING text")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
}.getMessage
assert(e.contains("ALTER ADD COLUMNS does not support datasource table with type"))
}
}
test("alter table add columns -- not support temp view") {
withTempView("tmp_v") {
sql("CREATE TEMPORARY VIEW tmp_v AS SELECT 1 AS c1, 2 AS c2")
val e = intercept[AnalysisException] {
sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)")
}
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
}
}
test("alter table add columns -- not support view") {
withView("v1") {
sql("CREATE VIEW v1 AS SELECT 1 AS c1, 2 AS c2")
val e = intercept[AnalysisException] {
sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)")
}
assert(e.message.contains("ALTER ADD COLUMNS does not support views"))
}
}
test("alter table add columns with existing column name") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
}
}
Seq(true, false).foreach { caseSensitive =>
test(s"alter table add columns with existing column name - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
withTable("t1") {
sql("CREATE TABLE t1 (c1 int) USING PARQUET")
if (!caseSensitive) {
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
}
}
}
}
}
}

View file

@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.sql.types._
// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import testImplicits._
val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
override def afterEach(): Unit = {
try {
@ -1860,4 +1861,101 @@ class HiveDDLSuite
}
}
}
hiveFormats.foreach { tableType =>
test(s"alter hive serde table add columns -- partitioned - $tableType") {
withTable("tab") {
sql(
s"""
|CREATE TABLE tab (c1 int, c2 int)
|PARTITIONED BY (c3 int) STORED AS $tableType
""".stripMargin)
sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)")
sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
checkAnswer(
sql("SELECT * FROM tab WHERE c3 = 1"),
Seq(Row(1, 2, null, 1))
)
assert(spark.table("tab").schema
.contains(StructField("c4", IntegerType)))
sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)")
checkAnswer(
spark.table("tab"),
Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2))
)
checkAnswer(
sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"),
Seq(Row(2, 3, 4, 2))
)
sql("ALTER TABLE tab ADD COLUMNS (c5 char(10))")
assert(spark.table("tab").schema.find(_.name == "c5")
.get.metadata.getString("HIVE_TYPE_STRING") == "char(10)")
}
}
}
hiveFormats.foreach { tableType =>
test(s"alter hive serde table add columns -- with predicate - $tableType ") {
withTable("tab") {
sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType")
sql("INSERT INTO tab VALUES (1, 2)")
sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
checkAnswer(
sql("SELECT * FROM tab WHERE c4 IS NULL"),
Seq(Row(1, 2, null))
)
assert(spark.table("tab").schema
.contains(StructField("c4", IntegerType)))
sql("INSERT INTO tab VALUES (2, 3, 4)")
checkAnswer(
sql("SELECT * FROM tab WHERE c4 = 4 "),
Seq(Row(2, 3, 4))
)
checkAnswer(
spark.table("tab"),
Seq(Row(1, 2, null), Row(2, 3, 4))
)
}
}
}
Seq(true, false).foreach { caseSensitive =>
test(s"alter add columns with existing column name - caseSensitive $caseSensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> s"$caseSensitive") {
withTable("tab") {
sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET")
if (!caseSensitive) {
// duplicating partitioning column name
val e1 = intercept[AnalysisException] {
sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
}.getMessage
assert(e1.contains("Found duplicate column(s)"))
// duplicating data column name
val e2 = intercept[AnalysisException] {
sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
}.getMessage
assert(e2.contains("Found duplicate column(s)"))
} else {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e1 = intercept[AnalysisException] {
sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
}.getMessage
assert(e1.contains("HiveException"))
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e2 = intercept[AnalysisException] {
sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
}.getMessage
assert(e2.contains("HiveException"))
}
}
}
}
}
}