[SPARK-14388][SQL] Implement CREATE TABLE
## What changes were proposed in this pull request? This patch implements the `CREATE TABLE` command using the `SessionCatalog`. Previously we handled only `CTAS` and `CREATE TABLE ... USING`. This requires us to refactor `CatalogTable` to accept various fields (e.g. bucket and skew columns) and pass them to Hive. WIP: Note that I haven't verified whether this actually works yet! But I believe it does. ## How was this patch tested? Tests will come in a future commit. Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12271 from andrewor14/create-table-ddl.
This commit is contained in:
parent
1018a1c1eb
commit
7d2ed8cc03
|
@ -272,8 +272,7 @@ createFileFormat
|
|||
;
|
||||
|
||||
fileFormat
|
||||
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?
|
||||
(INPUTDRIVER inDriver=STRING OUTPUTDRIVER outDriver=STRING)? #tableFileFormat
|
||||
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
|
||||
| identifier #genericFileFormat
|
||||
;
|
||||
|
||||
|
|
|
@ -220,14 +220,30 @@ case class CatalogTable(
|
|||
tableType: CatalogTableType,
|
||||
storage: CatalogStorageFormat,
|
||||
schema: Seq[CatalogColumn],
|
||||
partitionColumns: Seq[CatalogColumn] = Seq.empty,
|
||||
sortColumns: Seq[CatalogColumn] = Seq.empty,
|
||||
numBuckets: Int = 0,
|
||||
partitionColumnNames: Seq[String] = Seq.empty,
|
||||
sortColumnNames: Seq[String] = Seq.empty,
|
||||
bucketColumnNames: Seq[String] = Seq.empty,
|
||||
numBuckets: Int = -1,
|
||||
createTime: Long = System.currentTimeMillis,
|
||||
lastAccessTime: Long = System.currentTimeMillis,
|
||||
lastAccessTime: Long = -1,
|
||||
properties: Map[String, String] = Map.empty,
|
||||
viewOriginalText: Option[String] = None,
|
||||
viewText: Option[String] = None) {
|
||||
viewText: Option[String] = None,
|
||||
comment: Option[String] = None) {
|
||||
|
||||
// Verify that the provided columns are part of the schema
|
||||
private val colNames = schema.map(_.name).toSet
|
||||
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
|
||||
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
|
||||
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
|
||||
}
|
||||
requireSubsetOfSchema(partitionColumnNames, "partition")
|
||||
requireSubsetOfSchema(sortColumnNames, "sort")
|
||||
requireSubsetOfSchema(bucketColumnNames, "bucket")
|
||||
|
||||
/** Columns this table is partitioned by. */
|
||||
def partitionColumns: Seq[CatalogColumn] =
|
||||
schema.filter { c => partitionColumnNames.contains(c.name) }
|
||||
|
||||
/** Return the database this table was specified to belong to, assuming it exists. */
|
||||
def database: String = identifier.database.getOrElse {
|
||||
|
|
|
@ -553,8 +553,12 @@ abstract class CatalogTestUtils {
|
|||
identifier = TableIdentifier(name, database),
|
||||
tableType = CatalogTableType.EXTERNAL_TABLE,
|
||||
storage = storageFormat,
|
||||
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
|
||||
partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
|
||||
schema = Seq(
|
||||
CatalogColumn("col1", "int"),
|
||||
CatalogColumn("col2", "string"),
|
||||
CatalogColumn("a", "int"),
|
||||
CatalogColumn("b", "string")),
|
||||
partitionColumnNames = Seq("a", "b"))
|
||||
}
|
||||
|
||||
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
|
||||
|
|
|
@ -179,7 +179,9 @@ class SparkSqlAstBuilder extends AstBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
/** Type to keep track of a table header. */
|
||||
/**
|
||||
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
|
||||
*/
|
||||
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)
|
||||
|
||||
/**
|
||||
|
@ -616,10 +618,7 @@ class SparkSqlAstBuilder extends AstBuilder {
|
|||
case s: GenericFileFormatContext =>
|
||||
(Seq.empty[String], Option(s.identifier.getText))
|
||||
case s: TableFileFormatContext =>
|
||||
val elements = Seq(s.inFmt, s.outFmt) ++
|
||||
Option(s.serdeCls).toSeq ++
|
||||
Option(s.inDriver).toSeq ++
|
||||
Option(s.outDriver).toSeq
|
||||
val elements = Seq(s.inFmt, s.outFmt) ++ Option(s.serdeCls).toSeq
|
||||
(elements.map(string), None)
|
||||
}
|
||||
AlterTableSetFileFormat(
|
||||
|
@ -773,22 +772,6 @@ class SparkSqlAstBuilder extends AstBuilder {
|
|||
.map(_.identifier.getText))
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a skew specification. This contains three components:
|
||||
* - The Skewed Columns
|
||||
* - Values for which are skewed. The size of each entry must match the number of skewed columns.
|
||||
* - A store in directory flag.
|
||||
*/
|
||||
override def visitSkewSpec(
|
||||
ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
|
||||
val skewedValues = if (ctx.constantList != null) {
|
||||
Seq(visitConstantList(ctx.constantList))
|
||||
} else {
|
||||
visitNestedConstantList(ctx.nestedConstantList)
|
||||
}
|
||||
(visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a nested constants list into a sequence of string sequences.
|
||||
*/
|
||||
|
|
|
@ -224,29 +224,6 @@ case class DropTable(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A command that renames a table/view.
|
||||
*
|
||||
* The syntax of this command is:
|
||||
* {{{
|
||||
* ALTER TABLE table1 RENAME TO table2;
|
||||
* ALTER VIEW view1 RENAME TO view2;
|
||||
* }}}
|
||||
*/
|
||||
case class AlterTableRename(
|
||||
oldName: TableIdentifier,
|
||||
newName: TableIdentifier)
|
||||
extends RunnableCommand {
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val catalog = sqlContext.sessionState.catalog
|
||||
catalog.invalidateTable(oldName)
|
||||
catalog.renameTable(oldName, newName)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A command that sets table/view properties.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.command
|
||||
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
||||
|
||||
|
||||
// TODO: move the rest of the table commands from ddl.scala to this file
|
||||
|
||||
/**
|
||||
* A command to create a table.
|
||||
*
|
||||
* Note: This is currently used only for creating Hive tables.
|
||||
* This is not intended for temporary tables.
|
||||
*
|
||||
* The syntax of using this command in SQL is:
|
||||
* {{{
|
||||
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
|
||||
* [(col1 data_type [COMMENT col_comment], ...)]
|
||||
* [COMMENT table_comment]
|
||||
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
|
||||
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
|
||||
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...)
|
||||
* [STORED AS DIRECTORIES]
|
||||
* [ROW FORMAT row_format]
|
||||
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
|
||||
* [LOCATION path]
|
||||
* [TBLPROPERTIES (property_name=property_value, ...)]
|
||||
* [AS select_statement];
|
||||
* }}}
|
||||
*/
|
||||
case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
sqlContext.sessionState.catalog.createTable(table, ifNotExists)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A command that renames a table/view.
|
||||
*
|
||||
* The syntax of this command is:
|
||||
* {{{
|
||||
* ALTER TABLE table1 RENAME TO table2;
|
||||
* ALTER VIEW view1 RENAME TO view2;
|
||||
* }}}
|
||||
*/
|
||||
case class AlterTableRename(
|
||||
oldName: TableIdentifier,
|
||||
newName: TableIdentifier)
|
||||
extends RunnableCommand {
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val catalog = sqlContext.sessionState.catalog
|
||||
catalog.invalidateTable(oldName)
|
||||
catalog.renameTable(oldName, newName)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
}
|
|
@ -440,37 +440,25 @@ class DDLCommandSuite extends PlanTest {
|
|||
}
|
||||
|
||||
test("alter table: set file format") {
|
||||
val sql1 =
|
||||
"""
|
||||
|ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
|
||||
|OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
|
||||
""".stripMargin
|
||||
val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
|
||||
val sql1 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
|
||||
"OUTPUTFORMAT 'test' SERDE 'test'"
|
||||
val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
|
||||
val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
|
||||
"SET FILEFORMAT PARQUET"
|
||||
val parsed1 = parser.parsePlan(sql1)
|
||||
val parsed2 = parser.parsePlan(sql2)
|
||||
val parsed3 = parser.parsePlan(sql3)
|
||||
val tableIdent = TableIdentifier("table_name", None)
|
||||
val expected1 = AlterTableSetFileFormat(
|
||||
tableIdent,
|
||||
None,
|
||||
List("test", "test", "test", "test", "test"),
|
||||
List("test", "test", "test"),
|
||||
None)(sql1)
|
||||
val expected2 = AlterTableSetFileFormat(
|
||||
tableIdent,
|
||||
None,
|
||||
List("test", "test", "test"),
|
||||
None)(sql2)
|
||||
val expected3 = AlterTableSetFileFormat(
|
||||
tableIdent,
|
||||
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
|
||||
Seq(),
|
||||
Some("PARQUET"))(sql3)
|
||||
Some("PARQUET"))(sql2)
|
||||
comparePlans(parsed1, expected1)
|
||||
comparePlans(parsed2, expected2)
|
||||
comparePlans(parsed3, expected3)
|
||||
}
|
||||
|
||||
test("alter table: set location") {
|
||||
|
|
|
@ -380,8 +380,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
|
||||
|
||||
test("show tables") {
|
||||
withTempTable("show1a", "show2b") {
|
||||
sql(
|
||||
|
|
|
@ -162,7 +162,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
|
||||
runCliWithin(3.minute)(
|
||||
"CREATE TABLE hive_test(key INT, val STRING);"
|
||||
-> "OK",
|
||||
-> "",
|
||||
"SHOW TABLES;"
|
||||
-> "hive_test",
|
||||
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
|
||||
|
@ -187,7 +187,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
"USE hive_test_db;"
|
||||
-> "",
|
||||
"CREATE TABLE hive_test(key INT, val STRING);"
|
||||
-> "OK",
|
||||
-> "",
|
||||
"SHOW TABLES;"
|
||||
-> "hive_test"
|
||||
)
|
||||
|
@ -210,9 +210,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
|
|||
"""CREATE TABLE t1(key string, val string)
|
||||
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
|
||||
""".stripMargin
|
||||
-> "OK",
|
||||
-> "",
|
||||
"CREATE TABLE sourceTable (key INT, val STRING);"
|
||||
-> "OK",
|
||||
-> "",
|
||||
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
|
||||
-> "OK",
|
||||
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
|
||||
|
|
|
@ -366,10 +366,76 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"sort_merge_join_desc_6",
|
||||
"sort_merge_join_desc_7",
|
||||
|
||||
// These tests try to create a table with bucketed columns, which we don't support
|
||||
"auto_join32",
|
||||
"auto_join_filters",
|
||||
"auto_smb_mapjoin_14",
|
||||
"ct_case_insensitive",
|
||||
"explain_rearrange",
|
||||
"groupby_sort_10",
|
||||
"groupby_sort_2",
|
||||
"groupby_sort_3",
|
||||
"groupby_sort_4",
|
||||
"groupby_sort_5",
|
||||
"groupby_sort_7",
|
||||
"groupby_sort_8",
|
||||
"groupby_sort_9",
|
||||
"groupby_sort_test_1",
|
||||
"inputddl4",
|
||||
"join_filters",
|
||||
"join_nulls",
|
||||
"join_nullsafe",
|
||||
"load_dyn_part2",
|
||||
"orc_empty_files",
|
||||
"reduce_deduplicate",
|
||||
"smb_mapjoin9",
|
||||
"smb_mapjoin_1",
|
||||
"smb_mapjoin_10",
|
||||
"smb_mapjoin_13",
|
||||
"smb_mapjoin_14",
|
||||
"smb_mapjoin_15",
|
||||
"smb_mapjoin_16",
|
||||
"smb_mapjoin_17",
|
||||
"smb_mapjoin_2",
|
||||
"smb_mapjoin_21",
|
||||
"smb_mapjoin_25",
|
||||
"smb_mapjoin_3",
|
||||
"smb_mapjoin_4",
|
||||
"smb_mapjoin_5",
|
||||
"smb_mapjoin_6",
|
||||
"smb_mapjoin_7",
|
||||
"smb_mapjoin_8",
|
||||
"sort_merge_join_desc_1",
|
||||
"sort_merge_join_desc_2",
|
||||
"sort_merge_join_desc_3",
|
||||
"sort_merge_join_desc_4",
|
||||
|
||||
// These tests try to create a table with skewed columns, which we don't support
|
||||
"create_skewed_table1",
|
||||
"skewjoinopt13",
|
||||
"skewjoinopt18",
|
||||
"skewjoinopt9",
|
||||
|
||||
// Index commands are not supported
|
||||
"drop_index",
|
||||
"drop_index_removes_partition_dirs",
|
||||
"alter_index",
|
||||
"auto_sortmerge_join_1",
|
||||
"auto_sortmerge_join_10",
|
||||
"auto_sortmerge_join_11",
|
||||
"auto_sortmerge_join_12",
|
||||
"auto_sortmerge_join_13",
|
||||
"auto_sortmerge_join_14",
|
||||
"auto_sortmerge_join_15",
|
||||
"auto_sortmerge_join_16",
|
||||
"auto_sortmerge_join_2",
|
||||
"auto_sortmerge_join_3",
|
||||
"auto_sortmerge_join_4",
|
||||
"auto_sortmerge_join_5",
|
||||
"auto_sortmerge_join_6",
|
||||
"auto_sortmerge_join_7",
|
||||
"auto_sortmerge_join_8",
|
||||
"auto_sortmerge_join_9",
|
||||
|
||||
// Macro commands are not supported
|
||||
"macro",
|
||||
|
@ -435,33 +501,14 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"auto_join3",
|
||||
"auto_join30",
|
||||
"auto_join31",
|
||||
"auto_join32",
|
||||
"auto_join4",
|
||||
"auto_join5",
|
||||
"auto_join6",
|
||||
"auto_join7",
|
||||
"auto_join8",
|
||||
"auto_join9",
|
||||
"auto_join_filters",
|
||||
"auto_join_nulls",
|
||||
"auto_join_reordering_values",
|
||||
"auto_smb_mapjoin_14",
|
||||
"auto_sortmerge_join_1",
|
||||
"auto_sortmerge_join_10",
|
||||
"auto_sortmerge_join_11",
|
||||
"auto_sortmerge_join_12",
|
||||
"auto_sortmerge_join_13",
|
||||
"auto_sortmerge_join_14",
|
||||
"auto_sortmerge_join_15",
|
||||
"auto_sortmerge_join_16",
|
||||
"auto_sortmerge_join_2",
|
||||
"auto_sortmerge_join_3",
|
||||
"auto_sortmerge_join_4",
|
||||
"auto_sortmerge_join_5",
|
||||
"auto_sortmerge_join_6",
|
||||
"auto_sortmerge_join_7",
|
||||
"auto_sortmerge_join_8",
|
||||
"auto_sortmerge_join_9",
|
||||
"binary_constant",
|
||||
"binarysortable_1",
|
||||
"cast1",
|
||||
|
@ -492,13 +539,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"create_insert_outputformat",
|
||||
"create_like_tbl_props",
|
||||
"create_nested_type",
|
||||
"create_skewed_table1",
|
||||
"create_struct_table",
|
||||
"create_view_translate",
|
||||
"cross_join",
|
||||
"cross_product_check_1",
|
||||
"cross_product_check_2",
|
||||
"ct_case_insensitive",
|
||||
"database_drop",
|
||||
"database_location",
|
||||
"database_properties",
|
||||
|
@ -534,7 +579,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"escape_distributeby1",
|
||||
"escape_orderby1",
|
||||
"escape_sortby1",
|
||||
"explain_rearrange",
|
||||
"fileformat_mix",
|
||||
"fileformat_sequencefile",
|
||||
"fileformat_text",
|
||||
|
@ -589,16 +633,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"groupby_neg_float",
|
||||
"groupby_ppd",
|
||||
"groupby_ppr",
|
||||
"groupby_sort_10",
|
||||
"groupby_sort_2",
|
||||
"groupby_sort_3",
|
||||
"groupby_sort_4",
|
||||
"groupby_sort_5",
|
||||
"groupby_sort_6",
|
||||
"groupby_sort_7",
|
||||
"groupby_sort_8",
|
||||
"groupby_sort_9",
|
||||
"groupby_sort_test_1",
|
||||
"having",
|
||||
"implicit_cast1",
|
||||
"index_serde",
|
||||
|
@ -653,7 +688,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"inputddl1",
|
||||
"inputddl2",
|
||||
"inputddl3",
|
||||
"inputddl4",
|
||||
"inputddl6",
|
||||
"inputddl7",
|
||||
"inputddl8",
|
||||
|
@ -709,11 +743,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"join_array",
|
||||
"join_casesensitive",
|
||||
"join_empty",
|
||||
"join_filters",
|
||||
"join_hive_626",
|
||||
"join_map_ppr",
|
||||
"join_nulls",
|
||||
"join_nullsafe",
|
||||
"join_rc",
|
||||
"join_reorder2",
|
||||
"join_reorder3",
|
||||
|
@ -737,7 +768,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"load_dyn_part13",
|
||||
"load_dyn_part14",
|
||||
"load_dyn_part14_win",
|
||||
"load_dyn_part2",
|
||||
"load_dyn_part3",
|
||||
"load_dyn_part4",
|
||||
"load_dyn_part5",
|
||||
|
@ -790,7 +820,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"nullscript",
|
||||
"optional_outer",
|
||||
"orc_dictionary_threshold",
|
||||
"orc_empty_files",
|
||||
"order",
|
||||
"order2",
|
||||
"outer_join_ppr",
|
||||
|
@ -846,7 +875,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"rcfile_null_value",
|
||||
"rcfile_toleratecorruptions",
|
||||
"rcfile_union",
|
||||
"reduce_deduplicate",
|
||||
"reduce_deduplicate_exclude_gby",
|
||||
"reduce_deduplicate_exclude_join",
|
||||
"reduce_deduplicate_extended",
|
||||
|
@ -867,31 +895,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
|||
"show_functions",
|
||||
"show_partitions",
|
||||
"show_tblproperties",
|
||||
"skewjoinopt13",
|
||||
"skewjoinopt18",
|
||||
"skewjoinopt9",
|
||||
"smb_mapjoin9",
|
||||
"smb_mapjoin_1",
|
||||
"smb_mapjoin_10",
|
||||
"smb_mapjoin_13",
|
||||
"smb_mapjoin_14",
|
||||
"smb_mapjoin_15",
|
||||
"smb_mapjoin_16",
|
||||
"smb_mapjoin_17",
|
||||
"smb_mapjoin_2",
|
||||
"smb_mapjoin_21",
|
||||
"smb_mapjoin_25",
|
||||
"smb_mapjoin_3",
|
||||
"smb_mapjoin_4",
|
||||
"smb_mapjoin_5",
|
||||
"smb_mapjoin_6",
|
||||
"smb_mapjoin_7",
|
||||
"smb_mapjoin_8",
|
||||
"sort",
|
||||
"sort_merge_join_desc_1",
|
||||
"sort_merge_join_desc_2",
|
||||
"sort_merge_join_desc_3",
|
||||
"sort_merge_join_desc_4",
|
||||
"stats0",
|
||||
"stats_aggregator_error_1",
|
||||
"stats_empty_partition",
|
||||
|
|
|
@ -91,7 +91,7 @@ private[hive] object HiveSerDe {
|
|||
"textfile" ->
|
||||
HiveSerDe(
|
||||
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
|
||||
outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")),
|
||||
outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
|
||||
|
||||
"avro" ->
|
||||
HiveSerDe(
|
||||
|
@ -905,8 +905,13 @@ private[hive] case class MetastoreRelation(
|
|||
|
||||
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
|
||||
tTable.setSd(sd)
|
||||
sd.setCols(table.schema.map(toHiveColumn).asJava)
|
||||
tTable.setPartitionKeys(table.partitionColumns.map(toHiveColumn).asJava)
|
||||
|
||||
// Note: In Hive the schema and partition columns must be disjoint sets
|
||||
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
|
||||
table.partitionColumnNames.contains(c.getName)
|
||||
}
|
||||
sd.setCols(schema.asJava)
|
||||
tTable.setPartitionKeys(partCols.asJava)
|
||||
|
||||
table.storage.locationUri.foreach(sd.setLocation)
|
||||
table.storage.inputFormat.foreach(sd.setInputFormat)
|
||||
|
@ -1013,7 +1018,10 @@ private[hive] case class MetastoreRelation(
|
|||
val partitionKeys = table.partitionColumns.map(_.toAttribute)
|
||||
|
||||
/** Non-partitionKey attributes */
|
||||
val attributes = table.schema.map(_.toAttribute)
|
||||
// TODO: just make this hold the schema itself, not just non-partition columns
|
||||
val attributes = table.schema
|
||||
.filter { c => !table.partitionColumnNames.contains(c.name) }
|
||||
.map(_.toAttribute)
|
||||
|
||||
val output = attributes ++ partitionKeys
|
||||
|
||||
|
|
|
@ -299,6 +299,10 @@ private[hive] class HiveClientImpl(
|
|||
tableName: String): Option[CatalogTable] = withHiveState {
|
||||
logDebug(s"Looking up $dbName.$tableName")
|
||||
Option(client.getTable(dbName, tableName, false)).map { h =>
|
||||
// Note: Hive separates partition columns and the schema, but for us the
|
||||
// partition columns are part of the schema
|
||||
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
|
||||
val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols
|
||||
CatalogTable(
|
||||
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
|
||||
tableType = h.getTableType match {
|
||||
|
@ -307,9 +311,10 @@ private[hive] class HiveClientImpl(
|
|||
case HiveTableType.INDEX_TABLE => CatalogTableType.INDEX_TABLE
|
||||
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIRTUAL_VIEW
|
||||
},
|
||||
schema = h.getCols.asScala.map(fromHiveColumn),
|
||||
partitionColumns = h.getPartCols.asScala.map(fromHiveColumn),
|
||||
sortColumns = Seq(),
|
||||
schema = schema,
|
||||
partitionColumnNames = partCols.map(_.name),
|
||||
sortColumnNames = Seq(), // TODO: populate this
|
||||
bucketColumnNames = h.getBucketCols.asScala,
|
||||
numBuckets = h.getNumBuckets,
|
||||
createTime = h.getTTable.getCreateTime.toLong * 1000,
|
||||
lastAccessTime = h.getLastAccessTime.toLong * 1000,
|
||||
|
@ -675,24 +680,37 @@ private[hive] class HiveClientImpl(
|
|||
|
||||
private def toHiveTable(table: CatalogTable): HiveTable = {
|
||||
val hiveTable = new HiveTable(table.database, table.identifier.table)
|
||||
// For EXTERNAL_TABLE/MANAGED_TABLE, we also need to set EXTERNAL field in
|
||||
// the table properties accodringly. Otherwise, if EXTERNAL_TABLE is the table type
|
||||
// but EXTERNAL field is not set, Hive metastore will change the type to
|
||||
// MANAGED_TABLE (see
|
||||
// metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
|
||||
// For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties.
|
||||
// Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
|
||||
// (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
|
||||
hiveTable.setTableType(table.tableType match {
|
||||
case CatalogTableType.EXTERNAL_TABLE =>
|
||||
hiveTable.setProperty("EXTERNAL", "TRUE")
|
||||
HiveTableType.EXTERNAL_TABLE
|
||||
case CatalogTableType.MANAGED_TABLE =>
|
||||
hiveTable.setProperty("EXTERNAL", "FALSE")
|
||||
HiveTableType.MANAGED_TABLE
|
||||
case CatalogTableType.INDEX_TABLE => HiveTableType.INDEX_TABLE
|
||||
case CatalogTableType.VIRTUAL_VIEW => HiveTableType.VIRTUAL_VIEW
|
||||
})
|
||||
hiveTable.setFields(table.schema.map(toHiveColumn).asJava)
|
||||
hiveTable.setPartCols(table.partitionColumns.map(toHiveColumn).asJava)
|
||||
// Note: In Hive the schema and partition columns must be disjoint sets
|
||||
val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
|
||||
table.partitionColumnNames.contains(c.getName)
|
||||
}
|
||||
if (table.schema.isEmpty) {
|
||||
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
|
||||
// set a default serde here (this was done in Hive), and so if the user provides
|
||||
// an empty schema Hive would automatically populate the schema with a single
|
||||
// field "col". However, after SPARK-14388, we set the default serde to
|
||||
// LazySimpleSerde so this implicit behavior no longer happens. Therefore,
|
||||
// we need to do it in Spark ourselves.
|
||||
hiveTable.setFields(
|
||||
Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava)
|
||||
} else {
|
||||
hiveTable.setFields(schema.asJava)
|
||||
}
|
||||
hiveTable.setPartCols(partCols.asJava)
|
||||
// TODO: set sort columns here too
|
||||
hiveTable.setBucketCols(table.bucketColumnNames.asJava)
|
||||
hiveTable.setOwner(conf.getUser)
|
||||
hiveTable.setNumBuckets(table.numBuckets)
|
||||
hiveTable.setCreateTime((table.createTime / 1000).toInt)
|
||||
|
@ -700,9 +718,11 @@ private[hive] class HiveClientImpl(
|
|||
table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) }
|
||||
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
|
||||
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
|
||||
table.storage.serde.foreach(hiveTable.setSerializationLib)
|
||||
hiveTable.setSerializationLib(
|
||||
table.storage.serde.getOrElse("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
|
||||
table.storage.serdeProperties.foreach { case (k, v) => hiveTable.setSerdeParam(k, v) }
|
||||
table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
|
||||
table.comment.foreach { c => hiveTable.setProperty("comment", c) }
|
||||
table.viewOriginalText.foreach { t => hiveTable.setViewOriginalText(t) }
|
||||
table.viewText.foreach { t => hiveTable.setViewExpandedText(t) }
|
||||
hiveTable
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
|
|||
import org.apache.hadoop.hive.serde.serdeConstants
|
||||
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
|
||||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
|
@ -33,8 +34,9 @@ import org.apache.spark.sql.catalyst.parser._
|
|||
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.SparkSqlAstBuilder
|
||||
import org.apache.spark.sql.execution.command.CreateTable
|
||||
import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView}
|
||||
import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveSerDe}
|
||||
import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe}
|
||||
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
|
||||
|
||||
/**
|
||||
|
@ -121,84 +123,116 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create a [[CatalogStorageFormat]]. This is part of the [[CreateTableAsSelect]] command.
|
||||
* Create a [[CatalogStorageFormat]] for creating tables.
|
||||
*/
|
||||
override def visitCreateFileFormat(
|
||||
ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
|
||||
if (ctx.storageHandler == null) {
|
||||
typedVisit[CatalogStorageFormat](ctx.fileFormat)
|
||||
} else {
|
||||
visitStorageHandler(ctx.storageHandler)
|
||||
(ctx.fileFormat, ctx.storageHandler) match {
|
||||
// Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format
|
||||
case (c: TableFileFormatContext, null) =>
|
||||
visitTableFileFormat(c)
|
||||
// Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO
|
||||
case (c: GenericFileFormatContext, null) =>
|
||||
visitGenericFileFormat(c)
|
||||
case (null, storageHandler) =>
|
||||
throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx)
|
||||
case _ =>
|
||||
throw new ParseException("expected either STORED AS or STORED BY, not both", ctx)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a [[CreateTableAsSelect]] command.
|
||||
* Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]].
|
||||
*
|
||||
* This is not used to create datasource tables, which is handled through
|
||||
* "CREATE TABLE ... USING ...".
|
||||
*
|
||||
* Note: several features are currently not supported - temporary tables, bucketing,
|
||||
* skewed columns and storage handlers (STORED BY).
|
||||
*
|
||||
* Expected format:
|
||||
* {{{
|
||||
* CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
|
||||
* [(col1 data_type [COMMENT col_comment], ...)]
|
||||
* [COMMENT table_comment]
|
||||
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
|
||||
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
|
||||
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]]
|
||||
* [ROW FORMAT row_format]
|
||||
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
|
||||
* [LOCATION path]
|
||||
* [TBLPROPERTIES (property_name=property_value, ...)]
|
||||
* [AS select_statement];
|
||||
* }}}
|
||||
*/
|
||||
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
|
||||
if (ctx.query == null) {
|
||||
HiveNativeCommand(command(ctx))
|
||||
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
|
||||
val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
|
||||
// TODO: implement temporary tables
|
||||
if (temp) {
|
||||
throw new ParseException(
|
||||
"CREATE TEMPORARY TABLE is not supported yet. " +
|
||||
"Please use registerTempTable as an alternative.", ctx)
|
||||
}
|
||||
if (ctx.skewSpec != null) {
|
||||
throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx)
|
||||
}
|
||||
if (ctx.bucketSpec != null) {
|
||||
throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx)
|
||||
}
|
||||
val tableType = if (external) {
|
||||
CatalogTableType.EXTERNAL_TABLE
|
||||
} else {
|
||||
// Get the table header.
|
||||
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
|
||||
val tableType = if (external) {
|
||||
CatalogTableType.EXTERNAL_TABLE
|
||||
} else {
|
||||
CatalogTableType.MANAGED_TABLE
|
||||
}
|
||||
CatalogTableType.MANAGED_TABLE
|
||||
}
|
||||
val comment = Option(ctx.STRING).map(string)
|
||||
val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns)
|
||||
val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns)
|
||||
val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
|
||||
val selectQuery = Option(ctx.query).map(plan)
|
||||
|
||||
// Unsupported clauses.
|
||||
if (temp) {
|
||||
throw new ParseException(s"Unsupported operation: TEMPORARY clause.", ctx)
|
||||
}
|
||||
if (ctx.bucketSpec != null) {
|
||||
// TODO add this - we need cluster columns in the CatalogTable for this to work.
|
||||
throw new ParseException("Unsupported operation: " +
|
||||
"CLUSTERED BY ... [ORDERED BY ...] INTO ... BUCKETS clause.", ctx)
|
||||
}
|
||||
if (ctx.skewSpec != null) {
|
||||
throw new ParseException("Operation not allowed: " +
|
||||
"SKEWED BY ... ON ... [STORED AS DIRECTORIES] clause.", ctx)
|
||||
}
|
||||
// Note: Hive requires partition columns to be distinct from the schema, so we need
|
||||
// to include the partition columns here explicitly
|
||||
val schema = cols ++ partitionCols
|
||||
|
||||
// Create the schema.
|
||||
val schema = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns(_, _.toLowerCase))
|
||||
|
||||
// Get the column by which the table is partitioned.
|
||||
val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns(_))
|
||||
|
||||
// Create the storage.
|
||||
def format(fmt: ParserRuleContext): CatalogStorageFormat = {
|
||||
Option(fmt).map(typedVisit[CatalogStorageFormat]).getOrElse(EmptyStorageFormat)
|
||||
}
|
||||
// Default storage.
|
||||
// Storage format
|
||||
val defaultStorage: CatalogStorageFormat = {
|
||||
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
|
||||
val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
|
||||
HiveSerDe(
|
||||
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
|
||||
outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
|
||||
}
|
||||
// Defined storage.
|
||||
val fileStorage = format(ctx.createFileFormat)
|
||||
val rowStorage = format(ctx.rowFormat)
|
||||
val storage = CatalogStorageFormat(
|
||||
Option(ctx.locationSpec).map(visitLocationSpec),
|
||||
fileStorage.inputFormat.orElse(hiveSerDe.inputFormat),
|
||||
fileStorage.outputFormat.orElse(hiveSerDe.outputFormat),
|
||||
rowStorage.serde.orElse(hiveSerDe.serde).orElse(fileStorage.serde),
|
||||
rowStorage.serdeProperties ++ fileStorage.serdeProperties
|
||||
)
|
||||
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf)
|
||||
CatalogStorageFormat(
|
||||
locationUri = None,
|
||||
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
|
||||
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
|
||||
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
|
||||
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
|
||||
// Note: Keep this unspecified because we use the presence of the serde to decide
|
||||
// whether to convert a table created by CTAS to a datasource table.
|
||||
serde = None,
|
||||
serdeProperties = Map())
|
||||
}
|
||||
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
|
||||
.getOrElse(EmptyStorageFormat)
|
||||
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
|
||||
val location = Option(ctx.locationSpec).map(visitLocationSpec)
|
||||
val storage = CatalogStorageFormat(
|
||||
locationUri = location,
|
||||
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
|
||||
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
|
||||
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
|
||||
serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)
|
||||
|
||||
val tableDesc = CatalogTable(
|
||||
identifier = table,
|
||||
tableType = tableType,
|
||||
schema = schema,
|
||||
partitionColumns = partitionCols,
|
||||
storage = storage,
|
||||
properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty),
|
||||
// TODO support the sql text - have a proper location for this!
|
||||
viewText = Option(ctx.STRING).map(string))
|
||||
CTAS(tableDesc, plan(ctx.query), ifNotExists)
|
||||
// TODO support the sql text - have a proper location for this!
|
||||
val tableDesc = CatalogTable(
|
||||
identifier = name,
|
||||
tableType = tableType,
|
||||
storage = storage,
|
||||
schema = schema,
|
||||
partitionColumnNames = partitionCols.map(_.name),
|
||||
properties = properties,
|
||||
comment = comment)
|
||||
|
||||
selectQuery match {
|
||||
case Some(q) => CTAS(tableDesc, q, ifNotExists)
|
||||
case None => CreateTable(tableDesc, ifNotExists)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -353,25 +387,19 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
|
|||
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
|
||||
|
||||
/**
|
||||
* Create a [[CatalogStorageFormat]]. The INPUTDRIVER and OUTPUTDRIVER clauses are currently
|
||||
* ignored.
|
||||
* Create a [[CatalogStorageFormat]].
|
||||
*/
|
||||
override def visitTableFileFormat(
|
||||
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
|
||||
import ctx._
|
||||
if (inDriver != null || outDriver != null) {
|
||||
throw new ParseException(
|
||||
s"Operation not allowed: INPUTDRIVER ... OUTPUTDRIVER ... clauses", ctx)
|
||||
}
|
||||
EmptyStorageFormat.copy(
|
||||
inputFormat = Option(string(inFmt)),
|
||||
outputFormat = Option(string(outFmt)),
|
||||
serde = Option(serdeCls).map(string)
|
||||
inputFormat = Option(string(ctx.inFmt)),
|
||||
outputFormat = Option(string(ctx.outFmt)),
|
||||
serde = Option(ctx.serdeCls).map(string)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a [[HiveSerDe]] based on the format name given.
|
||||
* Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]].
|
||||
*/
|
||||
override def visitGenericFileFormat(
|
||||
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
|
||||
|
@ -388,11 +416,28 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
|
|||
}
|
||||
|
||||
/**
|
||||
* Storage Handlers are currently not supported in the statements we support (CTAS).
|
||||
* Create a [[RowFormat]] used for creating tables.
|
||||
*
|
||||
* Example format:
|
||||
* {{{
|
||||
* SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
|
||||
* }}}
|
||||
*
|
||||
* OR
|
||||
*
|
||||
* {{{
|
||||
* DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
|
||||
* [COLLECTION ITEMS TERMINATED BY char]
|
||||
* [MAP KEYS TERMINATED BY char]
|
||||
* [LINES TERMINATED BY char]
|
||||
* [NULL DEFINED AS char]
|
||||
* }}}
|
||||
*/
|
||||
override def visitStorageHandler(
|
||||
ctx: StorageHandlerContext): CatalogStorageFormat = withOrigin(ctx) {
|
||||
throw new ParseException("Storage Handlers are currently unsupported.", ctx)
|
||||
private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
|
||||
ctx match {
|
||||
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
|
||||
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -435,13 +480,15 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
|
|||
/**
|
||||
* Create a sequence of [[CatalogColumn]]s from a column list
|
||||
*/
|
||||
private def visitCatalogColumns(
|
||||
ctx: ColTypeListContext,
|
||||
formatter: String => String = identity): Seq[CatalogColumn] = withOrigin(ctx) {
|
||||
private def visitCatalogColumns(ctx: ColTypeListContext): Seq[CatalogColumn] = withOrigin(ctx) {
|
||||
ctx.colType.asScala.map { col =>
|
||||
CatalogColumn(
|
||||
formatter(col.identifier.getText),
|
||||
col.dataType.getText.toLowerCase, // TODO validate this?
|
||||
col.identifier.getText.toLowerCase,
|
||||
// Note: for types like "STRUCT<myFirstName: STRING, myLastName: STRING>" we can't
|
||||
// just convert the whole type string to lower case, otherwise the struct field names
|
||||
// will no longer be case sensitive. Instead, we rely on our parser to get the proper
|
||||
// case before passing it to Hive.
|
||||
CatalystSqlParser.parseDataType(col.dataType.getText).simpleString,
|
||||
nullable = true,
|
||||
Option(col.STRING).map(string))
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
|
|||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
|
||||
import org.apache.spark.sql.execution.command.CreateTable
|
||||
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, HiveSqlParser}
|
||||
|
||||
class HiveDDLCommandSuite extends PlanTest {
|
||||
|
@ -36,6 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
|
||||
private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
|
||||
parser.parsePlan(sql).collect {
|
||||
case CreateTable(desc, allowExisting) => (desc, allowExisting)
|
||||
case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting)
|
||||
case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting)
|
||||
}.head
|
||||
|
@ -76,9 +78,12 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
CatalogColumn("page_url", "string") ::
|
||||
CatalogColumn("referrer_url", "string") ::
|
||||
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
|
||||
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
|
||||
CatalogColumn("country", "string", comment = Some("country of origination")) ::
|
||||
CatalogColumn("dt", "string", comment = Some("date type")) ::
|
||||
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
|
||||
assert(desc.comment == Some("This is the staging page view table"))
|
||||
// TODO will be SQLText
|
||||
assert(desc.viewText == Option("This is the staging page view table"))
|
||||
assert(desc.viewText.isEmpty)
|
||||
assert(desc.viewOriginalText.isEmpty)
|
||||
assert(desc.partitionColumns ==
|
||||
CatalogColumn("dt", "string", comment = Some("date type")) ::
|
||||
|
@ -123,9 +128,12 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
CatalogColumn("page_url", "string") ::
|
||||
CatalogColumn("referrer_url", "string") ::
|
||||
CatalogColumn("ip", "string", comment = Some("IP Address of the User")) ::
|
||||
CatalogColumn("country", "string", comment = Some("country of origination")) :: Nil)
|
||||
CatalogColumn("country", "string", comment = Some("country of origination")) ::
|
||||
CatalogColumn("dt", "string", comment = Some("date type")) ::
|
||||
CatalogColumn("hour", "string", comment = Some("hour of the day")) :: Nil)
|
||||
// TODO will be SQLText
|
||||
assert(desc.viewText == Option("This is the staging page view table"))
|
||||
assert(desc.comment == Some("This is the staging page view table"))
|
||||
assert(desc.viewText.isEmpty)
|
||||
assert(desc.viewOriginalText.isEmpty)
|
||||
assert(desc.partitionColumns ==
|
||||
CatalogColumn("dt", "string", comment = Some("date type")) ::
|
||||
|
@ -151,7 +159,7 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
assert(desc.storage.serdeProperties == Map())
|
||||
assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
|
||||
assert(desc.storage.outputFormat ==
|
||||
Some("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
|
||||
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
|
||||
assert(desc.storage.serde.isEmpty)
|
||||
assert(desc.properties == Map())
|
||||
}
|
||||
|
@ -203,17 +211,6 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
|AS SELECT key, value FROM src ORDER BY key, value
|
||||
""".stripMargin)
|
||||
}
|
||||
intercept[ParseException] {
|
||||
parser.parsePlan(
|
||||
"""CREATE TABLE ctas2
|
||||
|STORED AS
|
||||
|INPUTFORMAT "org.apache.hadoop.mapred.TextInputFormat"
|
||||
|OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
|
||||
|INPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileInputDriver"
|
||||
|OUTPUTDRIVER "org.apache.hadoop.hive.howl.rcfile.RCFileOutputDriver"
|
||||
|AS SELECT key, value FROM src ORDER BY key, value
|
||||
""".stripMargin)
|
||||
}
|
||||
intercept[ParseException] {
|
||||
parser.parsePlan(
|
||||
"""
|
||||
|
@ -324,6 +321,194 @@ class HiveDDLCommandSuite extends PlanTest {
|
|||
""".stripMargin)
|
||||
}
|
||||
|
||||
test("create table - basic") {
|
||||
val query = "CREATE TABLE my_table (id int, name string)"
|
||||
val (desc, allowExisting) = extractTableDesc(query)
|
||||
assert(!allowExisting)
|
||||
assert(desc.identifier.database.isEmpty)
|
||||
assert(desc.identifier.table == "my_table")
|
||||
assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
|
||||
assert(desc.schema == Seq(CatalogColumn("id", "int"), CatalogColumn("name", "string")))
|
||||
assert(desc.partitionColumnNames.isEmpty)
|
||||
assert(desc.sortColumnNames.isEmpty)
|
||||
assert(desc.bucketColumnNames.isEmpty)
|
||||
assert(desc.numBuckets == -1)
|
||||
assert(desc.viewText.isEmpty)
|
||||
assert(desc.viewOriginalText.isEmpty)
|
||||
assert(desc.storage.locationUri.isEmpty)
|
||||
assert(desc.storage.inputFormat ==
|
||||
Some("org.apache.hadoop.mapred.TextInputFormat"))
|
||||
assert(desc.storage.outputFormat ==
|
||||
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
|
||||
assert(desc.storage.serde.isEmpty)
|
||||
assert(desc.storage.serdeProperties.isEmpty)
|
||||
assert(desc.properties.isEmpty)
|
||||
assert(desc.comment.isEmpty)
|
||||
}
|
||||
|
||||
test("create table - with database name") {
|
||||
val query = "CREATE TABLE dbx.my_table (id int, name string)"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.identifier.database == Some("dbx"))
|
||||
assert(desc.identifier.table == "my_table")
|
||||
}
|
||||
|
||||
test("create table - temporary") {
|
||||
val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)"
|
||||
val e = intercept[ParseException] { parser.parsePlan(query) }
|
||||
assert(e.message.contains("registerTempTable"))
|
||||
}
|
||||
|
||||
test("create table - external") {
|
||||
val query = "CREATE EXTERNAL TABLE tab1 (id int, name string)"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
|
||||
}
|
||||
|
||||
test("create table - if not exists") {
|
||||
val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)"
|
||||
val (_, allowExisting) = extractTableDesc(query)
|
||||
assert(allowExisting)
|
||||
}
|
||||
|
||||
test("create table - comment") {
|
||||
val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.comment == Some("its hot as hell below"))
|
||||
}
|
||||
|
||||
test("create table - partitioned columns") {
|
||||
val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.schema == Seq(
|
||||
CatalogColumn("id", "int"),
|
||||
CatalogColumn("name", "string"),
|
||||
CatalogColumn("month", "int")))
|
||||
assert(desc.partitionColumnNames == Seq("month"))
|
||||
}
|
||||
|
||||
test("create table - clustered by") {
|
||||
val baseQuery = "CREATE TABLE my_table (id int, name string) CLUSTERED BY(id)"
|
||||
val query1 = s"$baseQuery INTO 10 BUCKETS"
|
||||
val query2 = s"$baseQuery SORTED BY(id) INTO 10 BUCKETS"
|
||||
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
|
||||
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
|
||||
assert(e1.getMessage.contains("Operation not allowed"))
|
||||
assert(e2.getMessage.contains("Operation not allowed"))
|
||||
}
|
||||
|
||||
test("create table - skewed by") {
|
||||
val baseQuery = "CREATE TABLE my_table (id int, name string) SKEWED BY"
|
||||
val query1 = s"$baseQuery(id) ON (1, 10, 100)"
|
||||
val query2 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z'))"
|
||||
val query3 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z')) STORED AS DIRECTORIES"
|
||||
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
|
||||
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
|
||||
val e3 = intercept[ParseException] { parser.parsePlan(query3) }
|
||||
assert(e1.getMessage.contains("Operation not allowed"))
|
||||
assert(e2.getMessage.contains("Operation not allowed"))
|
||||
assert(e3.getMessage.contains("Operation not allowed"))
|
||||
}
|
||||
|
||||
test("create table - row format") {
|
||||
val baseQuery = "CREATE TABLE my_table (id int, name string) ROW FORMAT"
|
||||
val query1 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff'"
|
||||
val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')"
|
||||
val query3 =
|
||||
s"""
|
||||
|$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y'
|
||||
|COLLECTION ITEMS TERMINATED BY 'a'
|
||||
|MAP KEYS TERMINATED BY 'b'
|
||||
|LINES TERMINATED BY '\n'
|
||||
|NULL DEFINED AS 'c'
|
||||
""".stripMargin
|
||||
val (desc1, _) = extractTableDesc(query1)
|
||||
val (desc2, _) = extractTableDesc(query2)
|
||||
val (desc3, _) = extractTableDesc(query3)
|
||||
assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff"))
|
||||
assert(desc1.storage.serdeProperties.isEmpty)
|
||||
assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff"))
|
||||
assert(desc2.storage.serdeProperties == Map("k1" -> "v1"))
|
||||
assert(desc3.storage.serdeProperties == Map(
|
||||
"field.delim" -> "x",
|
||||
"escape.delim" -> "y",
|
||||
"serialization.format" -> "x",
|
||||
"line.delim" -> "\n",
|
||||
"colelction.delim" -> "a", // yes, it's a typo from Hive :)
|
||||
"mapkey.delim" -> "b"))
|
||||
}
|
||||
|
||||
test("create table - file format") {
|
||||
val baseQuery = "CREATE TABLE my_table (id int, name string) STORED AS"
|
||||
val query1 = s"$baseQuery INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'"
|
||||
val query2 = s"$baseQuery ORC"
|
||||
val (desc1, _) = extractTableDesc(query1)
|
||||
val (desc2, _) = extractTableDesc(query2)
|
||||
assert(desc1.storage.inputFormat == Some("winput"))
|
||||
assert(desc1.storage.outputFormat == Some("wowput"))
|
||||
assert(desc1.storage.serde.isEmpty)
|
||||
assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
|
||||
assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
|
||||
assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
|
||||
}
|
||||
|
||||
test("create table - storage handler") {
|
||||
val baseQuery = "CREATE TABLE my_table (id int, name string) STORED BY"
|
||||
val query1 = s"$baseQuery 'org.papachi.StorageHandler'"
|
||||
val query2 = s"$baseQuery 'org.mamachi.StorageHandler' WITH SERDEPROPERTIES ('k1'='v1')"
|
||||
val e1 = intercept[ParseException] { parser.parsePlan(query1) }
|
||||
val e2 = intercept[ParseException] { parser.parsePlan(query2) }
|
||||
assert(e1.getMessage.contains("Operation not allowed"))
|
||||
assert(e2.getMessage.contains("Operation not allowed"))
|
||||
}
|
||||
|
||||
test("create table - location") {
|
||||
val query = "CREATE TABLE my_table (id int, name string) LOCATION '/path/to/mars'"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.storage.locationUri == Some("/path/to/mars"))
|
||||
}
|
||||
|
||||
test("create table - properties") {
|
||||
val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')"
|
||||
val (desc, _) = extractTableDesc(query)
|
||||
assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2"))
|
||||
}
|
||||
|
||||
test("create table - everything!") {
|
||||
val query =
|
||||
"""
|
||||
|CREATE EXTERNAL TABLE IF NOT EXISTS dbx.my_table (id int, name string)
|
||||
|COMMENT 'no comment'
|
||||
|PARTITIONED BY (month int)
|
||||
|ROW FORMAT SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')
|
||||
|STORED AS INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'
|
||||
|LOCATION '/path/to/mercury'
|
||||
|TBLPROPERTIES ('k1'='v1', 'k2'='v2')
|
||||
""".stripMargin
|
||||
val (desc, allowExisting) = extractTableDesc(query)
|
||||
assert(allowExisting)
|
||||
assert(desc.identifier.database == Some("dbx"))
|
||||
assert(desc.identifier.table == "my_table")
|
||||
assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
|
||||
assert(desc.schema == Seq(
|
||||
CatalogColumn("id", "int"),
|
||||
CatalogColumn("name", "string"),
|
||||
CatalogColumn("month", "int")))
|
||||
assert(desc.partitionColumnNames == Seq("month"))
|
||||
assert(desc.sortColumnNames.isEmpty)
|
||||
assert(desc.bucketColumnNames.isEmpty)
|
||||
assert(desc.numBuckets == -1)
|
||||
assert(desc.viewText.isEmpty)
|
||||
assert(desc.viewOriginalText.isEmpty)
|
||||
assert(desc.storage.locationUri == Some("/path/to/mercury"))
|
||||
assert(desc.storage.inputFormat == Some("winput"))
|
||||
assert(desc.storage.outputFormat == Some("wowput"))
|
||||
assert(desc.storage.serde == Some("org.apache.poof.serde.Baff"))
|
||||
assert(desc.storage.serdeProperties == Map("k1" -> "v1"))
|
||||
assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2"))
|
||||
assert(desc.comment == Some("no comment"))
|
||||
}
|
||||
|
||||
test("create view -- basic") {
|
||||
val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
|
||||
val (desc, exists) = extractTableDesc(v1)
|
||||
|
|
|
@ -88,7 +88,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
|||
assert(hiveTable.storage.outputFormat === Some(outputFormat))
|
||||
assert(hiveTable.storage.serde === Some(serde))
|
||||
|
||||
assert(hiveTable.partitionColumns.isEmpty)
|
||||
assert(hiveTable.partitionColumnNames.isEmpty)
|
||||
assert(hiveTable.tableType === CatalogTableType.MANAGED_TABLE)
|
||||
|
||||
val columns = hiveTable.schema
|
||||
|
@ -151,7 +151,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
|||
assert(hiveTable.storage.outputFormat === Some(outputFormat))
|
||||
assert(hiveTable.storage.serde === Some(serde))
|
||||
|
||||
assert(hiveTable.partitionColumns.isEmpty)
|
||||
assert(hiveTable.partitionColumnNames.isEmpty)
|
||||
assert(hiveTable.tableType === CatalogTableType.EXTERNAL_TABLE)
|
||||
|
||||
val columns = hiveTable.schema
|
||||
|
|
|
@ -81,7 +81,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
|
|||
test("Double create fails when allowExisting = false") {
|
||||
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
|
||||
|
||||
intercept[QueryExecutionException] {
|
||||
intercept[AnalysisException] {
|
||||
sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -149,7 +149,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
|
|||
val (actualScannedColumns, actualPartValues) = plan.collect {
|
||||
case p @ HiveTableScan(columns, relation, _) =>
|
||||
val columnNames = columns.map(_.name)
|
||||
val partValues = if (relation.table.partitionColumns.nonEmpty) {
|
||||
val partValues = if (relation.table.partitionColumnNames.nonEmpty) {
|
||||
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
|
||||
} else {
|
||||
Seq.empty
|
||||
|
|
|
@ -360,7 +360,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
var message = intercept[AnalysisException] {
|
||||
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
|
||||
}.getMessage
|
||||
assert(message.contains("ctas1 already exists"))
|
||||
assert(message.contains("already exists"))
|
||||
checkRelation("ctas1", true)
|
||||
sql("DROP TABLE ctas1")
|
||||
|
||||
|
|
Loading…
Reference in a new issue