[SPARK-21929][SQL] Support ALTER TABLE table_name ADD COLUMNS(..)
for ORC data source
## What changes were proposed in this pull request?
When [SPARK-19261](https://issues.apache.org/jira/browse/SPARK-19261) implements `ALTER TABLE ADD COLUMNS`, ORC data source is omitted due to SPARK-14387, SPARK-16628, and SPARK-18355. Now, those issues are fixed and Spark 2.3 is [using Spark schema to read ORC table instead of ORC file schema](e6e36004af
). This PR enables `ALTER TABLE ADD COLUMNS` for ORC data source.
## How was this patch tested?
Pass the updated and added test cases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #19545 from dongjoon-hyun/SPARK-21929.
This commit is contained in:
parent
ff8de99a1c
commit
ca2a780e7c
|
@ -235,11 +235,10 @@ case class AlterTableAddColumnsCommand(
|
|||
DataSource.lookupDataSource(catalogTable.provider.get).newInstance() match {
|
||||
// For datasource table, this command can only support the following File format.
|
||||
// TextFileFormat only default to one column "value"
|
||||
// OrcFileFormat can not handle difference between user-specified schema and
|
||||
// inferred schema yet. TODO, once this issue is resolved , we can add Orc back.
|
||||
// Hive type is already considered as hive serde table, so the logic will not
|
||||
// come in here.
|
||||
case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat =>
|
||||
case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
|
||||
case s =>
|
||||
throw new AnalysisException(
|
||||
s"""
|
||||
|
|
|
@ -2202,56 +2202,64 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
protected def testAddColumn(provider: String): Unit = {
|
||||
withTable("t1") {
|
||||
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
|
||||
sql("INSERT INTO t1 VALUES (1)")
|
||||
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
|
||||
checkAnswer(
|
||||
spark.table("t1"),
|
||||
Seq(Row(1, null))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 is null"),
|
||||
Seq(Row(1, null))
|
||||
)
|
||||
|
||||
sql("INSERT INTO t1 VALUES (3, 2)")
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 = 2"),
|
||||
Seq(Row(3, 2))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
protected def testAddColumnPartitioned(provider: String): Unit = {
|
||||
withTable("t1") {
|
||||
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
|
||||
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
|
||||
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
|
||||
checkAnswer(
|
||||
spark.table("t1"),
|
||||
Seq(Row(1, null, 2))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c3 is null"),
|
||||
Seq(Row(1, null, 2))
|
||||
)
|
||||
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c3 = 3"),
|
||||
Seq(Row(2, 3, 1))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 = 1"),
|
||||
Seq(Row(2, 3, 1))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val supportedNativeFileFormatsForAlterTableAddColumns = Seq("parquet", "json", "csv")
|
||||
|
||||
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
|
||||
test(s"alter datasource table add columns - $provider") {
|
||||
withTable("t1") {
|
||||
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
|
||||
sql("INSERT INTO t1 VALUES (1)")
|
||||
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
|
||||
checkAnswer(
|
||||
spark.table("t1"),
|
||||
Seq(Row(1, null))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 is null"),
|
||||
Seq(Row(1, null))
|
||||
)
|
||||
|
||||
sql("INSERT INTO t1 VALUES (3, 2)")
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 = 2"),
|
||||
Seq(Row(3, 2))
|
||||
)
|
||||
}
|
||||
testAddColumn(provider)
|
||||
}
|
||||
}
|
||||
|
||||
supportedNativeFileFormatsForAlterTableAddColumns.foreach { provider =>
|
||||
test(s"alter datasource table add columns - partitioned - $provider") {
|
||||
withTable("t1") {
|
||||
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
|
||||
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
|
||||
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
|
||||
checkAnswer(
|
||||
spark.table("t1"),
|
||||
Seq(Row(1, null, 2))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c3 is null"),
|
||||
Seq(Row(1, null, 2))
|
||||
)
|
||||
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c3 = 3"),
|
||||
Seq(Row(2, 3, 1))
|
||||
)
|
||||
checkAnswer(
|
||||
sql("SELECT * FROM t1 WHERE c2 = 1"),
|
||||
Seq(Row(2, 3, 1))
|
||||
)
|
||||
}
|
||||
testAddColumnPartitioned(provider)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -166,6 +166,14 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
|
|||
test("drop table") {
|
||||
testDropTable(isDatasourceTable = false)
|
||||
}
|
||||
|
||||
test("alter datasource table add columns - orc") {
|
||||
testAddColumn("orc")
|
||||
}
|
||||
|
||||
test("alter datasource table add columns - partitioned - orc") {
|
||||
testAddColumnPartitioned("orc")
|
||||
}
|
||||
}
|
||||
|
||||
class HiveDDLSuite
|
||||
|
|
Loading…
Reference in a new issue