[SPARK-18885][SQL] unify CREATE TABLE syntax for data source and hive serde tables

## What changes were proposed in this pull request?

Today we have different syntax to create data source or hive serde tables, we should unify them to not confuse users and step forward to make hive a data source.

Please read https://issues.apache.org/jira/secure/attachment/12843835/CREATE-TABLE.pdf for  details.

TODO(for follow-up PRs):
1. TBLPROPERTIES is not added to the new syntax, we should decide if we wanna add it later.
2. `SHOW CREATE TABLE` should be updated to use the new syntax.
3. we should decide if we wanna change the behavior of `SET LOCATION`.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16296 from cloud-fan/create-table.
This commit is contained in:
Wenchen Fan 2017-01-05 17:40:27 -08:00 committed by Yin Huai
parent f5d18af6a8
commit cca945b6aa
24 changed files with 526 additions and 143 deletions

View file

@ -522,14 +522,11 @@ Hive metastore. Persistent tables will still exist even after your Spark program
long as you maintain your connection to the same metastore. A DataFrame for a persistent table can long as you maintain your connection to the same metastore. A DataFrame for a persistent table can
be created by calling the `table` method on a `SparkSession` with the name of the table. be created by calling the `table` method on a `SparkSession` with the name of the table.
By default `saveAsTable` will create a "managed table", meaning that the location of the data will For file-based data source, e.g. text, parquet, json, etc. you can specify a custom table path via the
be controlled by the metastore. Managed tables will also have their data deleted automatically `path` option, e.g. `df.write.option("path", "/some/path").saveAsTable("t")`. When the table is dropped,
when a table is dropped. the custom table path will not be removed and the table data is still there. If no custom table path is
specifed, Spark will write data to a default table path under the warehouse directory. When the table is
Currently, `saveAsTable` does not expose an API supporting the creation of an "external table" from a `DataFrame`. dropped, the default table path will be removed too.
However, this functionality can be achieved by providing a `path` option to the `DataFrameWriter` with `path` as the key
and location of the external table as its value (a string) when saving the table with `saveAsTable`. When an External table
is dropped only its metadata is removed.
Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits: Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore. This brings several benefits:
@ -954,6 +951,53 @@ adds support for finding tables in the MetaStore and writing queries using HiveQ
</div> </div>
</div> </div>
### Specifying storage format for Hive tables
When you create a Hive table, you need to define how this table should read/write data from/to file system,
i.e. the "input format" and "output format". You also need to define how this table should deserialize the data
to rows, or serialize rows to data, i.e. the "serde". The following options can be used to specify the storage
format("serde", "input format", "output format"), e.g. `CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')`.
By default, we will read the table files as plain text. Note that, Hive storage handler is not supported yet when
creating table, you can create a table using storage handler at Hive side, and use Spark SQL to read it.
<table class="table">
<tr><th>Property Name</th><th>Meaning</th></tr>
<tr>
<td><code>fileFormat</code></td>
<td>
A fileFormat is kind of a package of storage format specifications, including "serde", "input format" and
"output format". Currently we support 6 fileFormats: 'sequencefile', 'rcfile', 'orc', 'parquet', 'textfile' and 'avro'.
</td>
</tr>
<tr>
<td><code>inputFormat, outputFormat</code></td>
<td>
These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in pair, and you can not
specify them if you already specified the `fileFormat` option.
</td>
</tr>
<tr>
<td><code>serde</code></td>
<td>
This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option
if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
don't include the serde information and you can use this option with these 3 fileFormats.
</td>
</tr>
<tr>
<td><code>fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim</code></td>
<td>
These options can only be used with "textfile" fileFormat. They define how to read delimited files into rows.
</td>
</tr>
</table>
All other properties defined with `OPTIONS` will be regarded as Hive serde properties.
### Interacting with Different Versions of Hive Metastore ### Interacting with Different Versions of Hive Metastore
One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore, One of the most important pieces of Spark SQL's Hive support is interaction with Hive metastore,

View file

@ -64,7 +64,7 @@ public class JavaSparkHiveExample {
.enableHiveSupport() .enableHiveSupport()
.getOrCreate(); .getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL // Queries are expressed in HiveQL

View file

@ -44,7 +44,7 @@ if __name__ == "__main__":
.getOrCreate() .getOrCreate()
# spark is an existing SparkSession # spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL # Queries are expressed in HiveQL

View file

@ -195,7 +195,7 @@ head(teenagers)
# $example on:spark_hive$ # $example on:spark_hive$
# enableHiveSupport defaults to TRUE # enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE) sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL. # Queries can be expressed in HiveQL.

View file

@ -50,7 +50,7 @@ object SparkHiveExample {
import spark.implicits._ import spark.implicits._
import spark.sql import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL // Queries are expressed in HiveQL

View file

@ -69,16 +69,18 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider | createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? (OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)? (PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? (AS? query)? #createTableUsing bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')? | createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)? (COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')? (PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec? bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec? rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)? (TBLPROPERTIES tablePropertyList)?
(AS? query)? #createTable (AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier #createTableLike LIKE source=tableIdentifier #createTableLike
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS

View file

@ -27,6 +27,8 @@ class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase)
override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
baseMap + kv.copy(_1 = kv._1.toLowerCase) baseMap + kv.copy(_1 = kv._1.toLowerCase)

View file

@ -342,11 +342,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} }
/** /**
* Create a data source table, returning a [[CreateTable]] logical plan. * Create a table, returning a [[CreateTable]] logical plan.
* *
* Expected format: * Expected format:
* {{{ * {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name * CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider * USING table_provider
* [OPTIONS table_property_list] * [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)] * [PARTITIONED BY (col_name, col_name, ...)]
@ -354,19 +354,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* [SORTED BY (col_name [ASC|DESC], ...)] * [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS * INTO num_buckets BUCKETS
* ] * ]
* [LOCATION path]
* [COMMENT table_comment]
* [AS select_statement]; * [AS select_statement];
* }}} * }}}
*/ */
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) { if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx) operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
} }
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText val provider = ctx.tableProvider.qualifiedName.getText
if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
}
val schema = Option(ctx.colTypeList()).map(createSchema) val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionColumnNames = val partitionColumnNames =
Option(ctx.partitionColumnNames) Option(ctx.partitionColumnNames)
@ -374,10 +373,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.getOrElse(Array.empty[String]) .getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
// TODO: this may be wrong for non file-based data source like JDBC, which should be external val location = Option(ctx.locationSpec).map(visitLocationSpec)
// even there is no `path` in options. We should consider allow the EXTERNAL keyword.
val storage = DataSource.buildStorageFormatFromOptions(options) val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
if (location.isDefined && storage.locationUri.isDefined) {
throw new ParseException(
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
val customLocation = storage.locationUri.orElse(location)
val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL CatalogTableType.EXTERNAL
} else { } else {
CatalogTableType.MANAGED CatalogTableType.MANAGED
@ -386,12 +392,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val tableDesc = CatalogTable( val tableDesc = CatalogTable(
identifier = table, identifier = table,
tableType = tableType, tableType = tableType,
storage = storage, storage = storage.copy(locationUri = customLocation),
schema = schema.getOrElse(new StructType), schema = schema.getOrElse(new StructType),
provider = Some(provider), provider = Some(provider),
partitionColumnNames = partitionColumnNames, partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec bucketSpec = bucketSpec,
) comment = Option(ctx.comment).map(string))
// Determine the storage mode. // Determine the storage mode.
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@ -1011,10 +1017,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} }
/** /**
* Create a table, returning a [[CreateTable]] logical plan. * Create a Hive serde table, returning a [[CreateTable]] logical plan.
* *
* This is not used to create datasource tables, which is handled through * This is a legacy syntax for Hive compatibility, we recommend users to use the Spark SQL
* "CREATE TABLE ... USING ...". * CREATE TABLE syntax to create Hive serde table, e.g. "CREATE TABLE ... USING hive ..."
* *
* Note: several features are currently not supported - temporary tables, bucketing, * Note: several features are currently not supported - temporary tables, bucketing,
* skewed columns and storage handlers (STORED BY). * skewed columns and storage handlers (STORED BY).
@ -1032,7 +1038,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* [AS select_statement]; * [AS select_statement];
* }}} * }}}
*/ */
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
// TODO: implement temporary tables // TODO: implement temporary tables
if (temp) { if (temp) {
@ -1046,7 +1052,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.bucketSpec != null) { if (ctx.bucketSpec != null) {
operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
} }
val comment = Option(ctx.STRING).map(string)
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil) val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
@ -1057,19 +1062,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val schema = StructType(dataCols ++ partitionCols) val schema = StructType(dataCols ++ partitionCols)
// Storage format // Storage format
val defaultStorage: CatalogStorageFormat = { val defaultStorage = HiveSerDe.getDefaultStorage(conf)
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
serde = defaultHiveSerde.flatMap(_.serde),
compressed = false,
properties = Map())
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty) .getOrElse(CatalogStorageFormat.empty)
@ -1104,7 +1097,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
provider = Some(DDLUtils.HIVE_PROVIDER), provider = Some(DDLUtils.HIVE_PROVIDER),
partitionColumnNames = partitionCols.map(_.name), partitionColumnNames = partitionCols.map(_.name),
properties = properties, properties = properties,
comment = comment) comment = Option(ctx.comment).map(string))
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

View file

@ -408,8 +408,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object DDLStrategy extends Strategy { object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTable(tableDesc, mode, None) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil ExecutedCommandExec(cmd) :: Nil
@ -421,8 +420,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
// `CreateTables` // `CreateTables`
case CreateTable(tableDesc, mode, Some(query)) case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER =>
val cmd = val cmd =
CreateDataSourceTableAsSelectCommand( CreateDataSourceTableAsSelectCommand(
tableDesc, tableDesc,

View file

@ -762,8 +762,12 @@ case class AlterTableSetLocationCommand(
object DDLUtils { object DDLUtils {
val HIVE_PROVIDER = "hive" val HIVE_PROVIDER = "hive"
def isHiveTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get.toLowerCase == HIVE_PROVIDER
}
def isDatasourceTable(table: CatalogTable): Boolean = { def isDatasourceTable(table: CatalogTable): Boolean = {
table.provider.isDefined && table.provider.get != HIVE_PROVIDER table.provider.isDefined && table.provider.get.toLowerCase != HIVE_PROVIDER
} }
/** /**

View file

@ -109,7 +109,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.") throw new AnalysisException("Saving data into a view is not allowed.")
} }
if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { if (DDLUtils.isHiveTable(existingTable)) {
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
"not supported yet. Please use the insertInto() API as an alternative.") "not supported yet. Please use the insertInto() API as an alternative.")
} }
@ -233,7 +233,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
checkDuplication(normalizedPartitionCols, "partition") checkDuplication(normalizedPartitionCols, "partition")
if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
if (table.provider.get == DDLUtils.HIVE_PROVIDER) { if (DDLUtils.isHiveTable(table)) {
// When we hit this branch, it means users didn't specify schema for the table to be // When we hit this branch, it means users didn't specify schema for the table to be
// created, as we always include partition columns in table schema for hive serde tables. // created, as we always include partition columns in table schema for hive serde tables.
// The real schema will be inferred at hive metastore by hive serde, plus the given // The real schema will be inferred at hive metastore by hive serde, plus the given
@ -380,8 +380,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) { object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = { def apply(plan: LogicalPlan): Unit = {
plan.foreach { plan.foreach {
case CreateTable(tableDesc, _, Some(_)) case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER =>
throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
case _ => // OK case _ => // OK

View file

@ -17,12 +17,49 @@
package org.apache.spark.sql.internal package org.apache.spark.sql.internal
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
case class HiveSerDe( case class HiveSerDe(
inputFormat: Option[String] = None, inputFormat: Option[String] = None,
outputFormat: Option[String] = None, outputFormat: Option[String] = None,
serde: Option[String] = None) serde: Option[String] = None)
object HiveSerDe { object HiveSerDe {
val serdeMap = Map(
"sequencefile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
"rcfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
"orc" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
"parquet" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
"textfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
"avro" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
/** /**
* Get the Hive SerDe information from the data source abbreviation string or classname. * Get the Hive SerDe information from the data source abbreviation string or classname.
* *
@ -31,41 +68,6 @@ object HiveSerDe {
* @return HiveSerDe associated with the specified source * @return HiveSerDe associated with the specified source
*/ */
def sourceToSerDe(source: String): Option[HiveSerDe] = { def sourceToSerDe(source: String): Option[HiveSerDe] = {
val serdeMap = Map(
"sequencefile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),
"rcfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
"orc" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
"parquet" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
"textfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
"avro" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
val key = source.toLowerCase match { val key = source.toLowerCase match {
case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet" case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
case s if s.startsWith("org.apache.spark.sql.orc") => "orc" case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
@ -77,4 +79,16 @@ object HiveSerDe {
serdeMap.get(key) serdeMap.get(key)
} }
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = sourceToSerDe(defaultStorageType)
CatalogStorageFormat.empty.copy(
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
serde = defaultHiveSerde.flatMap(_.serde)
.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
}
} }

View file

@ -121,7 +121,8 @@ class SparkSqlParserSuite extends PlanTest {
tableType: CatalogTableType = CatalogTableType.MANAGED, tableType: CatalogTableType = CatalogTableType.MANAGED,
storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy( storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat, inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat), outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
schema: StructType = new StructType, schema: StructType = new StructType,
provider: Option[String] = Some("hive"), provider: Option[String] = Some("hive"),
partitionColumnNames: Seq[String] = Seq.empty, partitionColumnNames: Seq[String] = Seq.empty,

View file

@ -236,7 +236,7 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed4, expected4) comparePlans(parsed4, expected4)
} }
test("create table - table file format") { test("create hive table - table file format") {
val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile", val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile",
"sequencefile", "rcfile", "textfile") "sequencefile", "rcfile", "textfile")
@ -245,13 +245,14 @@ class DDLCommandSuite extends PlanTest {
val ct = parseAs[CreateTable](query) val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s) val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined) assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) assert(ct.tableDesc.storage.serde ==
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} }
} }
test("create table - row format and table file format") { test("create hive table - row format and table file format") {
val createTableStart = "CREATE TABLE my_tab ROW FORMAT" val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'" val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 'outputfmt'"
val query1 = s"$createTableStart SERDE 'anything' $fileFormat" val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
@ -262,13 +263,15 @@ class DDLCommandSuite extends PlanTest {
assert(parsed1.tableDesc.storage.serde == Some("anything")) assert(parsed1.tableDesc.storage.serde == Some("anything"))
assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
val parsed2 = parseAs[CreateTable](query2) val parsed2 = parseAs[CreateTable](query2)
assert(parsed2.tableDesc.storage.serde.isEmpty) assert(parsed2.tableDesc.storage.serde ==
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt")) assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
} }
test("create table - row format serde and generic file format") { test("create hive table - row format serde and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("sequencefile", "rcfile", "textfile") val supportedSources = Set("sequencefile", "rcfile", "textfile")
@ -287,7 +290,7 @@ class DDLCommandSuite extends PlanTest {
} }
} }
test("create table - row format delimited and generic file format") { test("create hive table - row format delimited and generic file format") {
val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile") val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", "textfile")
val supportedSources = Set("textfile") val supportedSources = Set("textfile")
@ -297,7 +300,8 @@ class DDLCommandSuite extends PlanTest {
val ct = parseAs[CreateTable](query) val ct = parseAs[CreateTable](query)
val hiveSerde = HiveSerDe.sourceToSerDe(s) val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined) assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) assert(ct.tableDesc.storage.serde ==
hiveSerde.get.serde.orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")))
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
} else { } else {
@ -306,7 +310,7 @@ class DDLCommandSuite extends PlanTest {
} }
} }
test("create external table - location must be specified") { test("create hive external table - location must be specified") {
assertUnsupported( assertUnsupported(
sql = "CREATE EXTERNAL TABLE my_tab", sql = "CREATE EXTERNAL TABLE my_tab",
containsThesePhrases = Seq("create external table", "location")) containsThesePhrases = Seq("create external table", "location"))
@ -316,7 +320,7 @@ class DDLCommandSuite extends PlanTest {
assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
} }
test("create table - property values must be set") { test("create hive table - property values must be set") {
assertUnsupported( assertUnsupported(
sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 'key_with_value'='x')", sql = "CREATE TABLE my_tab TBLPROPERTIES('key_without_value', 'key_with_value'='x')",
containsThesePhrases = Seq("key_without_value")) containsThesePhrases = Seq("key_without_value"))
@ -326,14 +330,14 @@ class DDLCommandSuite extends PlanTest {
containsThesePhrases = Seq("key_without_value")) containsThesePhrases = Seq("key_without_value"))
} }
test("create table - location implies external") { test("create hive table - location implies external") {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'" val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
val ct = parseAs[CreateTable](query) val ct = parseAs[CreateTable](query)
assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
} }
test("create table using - with partitioned by") { test("create table - with partitioned by") {
val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)" "USING parquet PARTITIONED BY (a)"
@ -357,7 +361,7 @@ class DDLCommandSuite extends PlanTest {
} }
} }
test("create table using - with bucket") { test("create table - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
@ -379,6 +383,57 @@ class DDLCommandSuite extends PlanTest {
} }
} }
test("create table - with comment") {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val expectedTableDesc = CatalogTable(
identifier = TableIdentifier("my_tab"),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", IntegerType).add("b", StringType),
provider = Some("parquet"),
comment = Some("abc"))
parser.parsePlan(sql) match {
case CreateTable(tableDesc, _, None) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
}
}
test("create table - with location") {
val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
val expectedTableDesc = CatalogTable(
identifier = TableIdentifier("my_tab"),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")),
schema = new StructType().add("a", IntegerType).add("b", StringType),
provider = Some("parquet"))
parser.parsePlan(v1) match {
case CreateTable(tableDesc, _, None) =>
assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
case other =>
fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
s"got ${other.getClass.getName}: $v1")
}
val v2 =
"""
|CREATE TABLE my_tab(a INT, b STRING)
|USING parquet
|OPTIONS (path '/tmp/file')
|LOCATION '/tmp/file'
""".stripMargin
val e = intercept[ParseException] {
parser.parsePlan(v2)
}
assert(e.message.contains("you can only specify one of them."))
}
// ALTER TABLE table_name RENAME TO new_table_name; // ALTER TABLE table_name RENAME TO new_table_name;
// ALTER VIEW view_name RENAME TO new_view_name; // ALTER VIEW view_name RENAME TO new_view_name;
test("alter table/view: rename table/view") { test("alter table/view: rename table/view") {

View file

@ -215,7 +215,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
tableDefinition.storage.locationUri tableDefinition.storage.locationUri
} }
if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { if (DDLUtils.isHiveTable(tableDefinition)) {
val tableWithDataSourceProps = tableDefinition.copy( val tableWithDataSourceProps = tableDefinition.copy(
// We can't leave `locationUri` empty and count on Hive metastore to set a default table // We can't leave `locationUri` empty and count on Hive metastore to set a default table
// location, because Hive metastore uses hive.metastore.warehouse.dir to generate default // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
@ -533,7 +533,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
} else { } else {
val oldTableDef = getRawTable(db, withStatsProps.identifier.table) val oldTableDef = getRawTable(db, withStatsProps.identifier.table)
val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { val newStorage = if (DDLUtils.isHiveTable(tableDefinition)) {
tableDefinition.storage tableDefinition.storage
} else { } else {
// We can't alter the table storage of data source table directly for 2 reasons: // We can't alter the table storage of data source table directly for 2 reasons:

View file

@ -64,6 +64,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
AnalyzeCreateTable(sparkSession) :: AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) :: PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) :: DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))

View file

@ -18,14 +18,73 @@
package org.apache.spark.sql.hive package org.apache.spark.sql.hive
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
/**
* Determine the serde/format of the Hive serde table, according to the storage properties.
*/
class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
if (t.bucketSpec.isDefined) {
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
}
if (t.partitionColumnNames.nonEmpty && query.isDefined) {
val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
"create a partitioned table using Hive's file formats. " +
"Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
"OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
"CTAS statement."
throw new AnalysisException(errorMessage)
}
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
val options = new HiveOptions(t.storage.properties)
val fileStorage = if (options.fileFormat.isDefined) {
HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
case Some(s) =>
CatalogStorageFormat.empty.copy(
inputFormat = s.inputFormat,
outputFormat = s.outputFormat,
serde = s.serde)
case None =>
throw new IllegalArgumentException(s"invalid fileFormat: '${options.fileFormat.get}'")
}
} else if (options.hasInputOutputFormat) {
CatalogStorageFormat.empty.copy(
inputFormat = options.inputFormat,
outputFormat = options.outputFormat)
} else {
CatalogStorageFormat.empty
}
val rowStorage = if (options.serde.isDefined) {
CatalogStorageFormat.empty.copy(serde = options.serde)
} else {
CatalogStorageFormat.empty
}
val storage = t.storage.copy(
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
properties = options.serdeProperties)
c.copy(tableDesc = t.copy(storage = storage))
}
}
private[hive] trait HiveStrategies { private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough. // Possibly being too clever with types here... or not clever enough.
@ -49,15 +108,7 @@ private[hive] trait HiveStrategies {
InsertIntoHiveTable( InsertIntoHiveTable(
table, partition, planLater(child), overwrite, ifNotExists) :: Nil table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
// add default serde
tableDesc.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
tableDesc
}
// Currently we will never hit this branch, as SQL string API can only use `Ignore` or // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
// `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
// tables yet. // tables yet.
@ -68,7 +119,7 @@ private[hive] trait HiveStrategies {
val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
val cmd = CreateHiveTableAsSelectCommand( val cmd = CreateHiveTableAsSelectCommand(
newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query, query,
mode == SaveMode.Ignore) mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil ExecutedCommandExec(cmd) :: Nil

View file

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
/**
* Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive
* serde/format information from these options.
*/
class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable {
import HiveOptions._
def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase)
val inputFormat = parameters.get(INPUT_FORMAT)
val outputFormat = parameters.get(OUTPUT_FORMAT)
if (inputFormat.isDefined != outputFormat.isDefined) {
throw new IllegalArgumentException("Cannot specify only inputFormat or outputFormat, you " +
"have to specify both of them.")
}
def hasInputOutputFormat: Boolean = inputFormat.isDefined
if (fileFormat.isDefined && inputFormat.isDefined) {
throw new IllegalArgumentException("Cannot specify fileFormat and inputFormat/outputFormat " +
"together for Hive data source.")
}
val serde = parameters.get(SERDE)
if (fileFormat.isDefined && serde.isDefined) {
if (!Set("sequencefile", "textfile", "rcfile").contains(fileFormat.get)) {
throw new IllegalArgumentException(
s"fileFormat '${fileFormat.get}' already specifies a serde.")
}
}
val containsDelimiters = delimiterOptions.keys.exists(parameters.contains)
if (containsDelimiters) {
if (serde.isDefined) {
throw new IllegalArgumentException("Cannot specify delimiters with a custom serde.")
}
if (fileFormat.isEmpty) {
throw new IllegalArgumentException("Cannot specify delimiters without fileFormat.")
}
if (fileFormat.get != "textfile") {
throw new IllegalArgumentException("Cannot specify delimiters as they are only compatible " +
s"with fileFormat 'textfile', not ${fileFormat.get}.")
}
}
for (lineDelim <- parameters.get("lineDelim") if lineDelim != "\n") {
throw new IllegalArgumentException("Hive data source only support newline '\\n' as " +
s"line delimiter, but given: $lineDelim.")
}
def serdeProperties: Map[String, String] = parameters.filterKeys {
k => !lowerCasedOptionNames.contains(k.toLowerCase)
}.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }
}
object HiveOptions {
private val lowerCasedOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
lowerCasedOptionNames += name.toLowerCase
name
}
val FILE_FORMAT = newOption("fileFormat")
val INPUT_FORMAT = newOption("inputFormat")
val OUTPUT_FORMAT = newOption("outputFormat")
val SERDE = newOption("serde")
// A map from the public delimiter option keys to the underlying Hive serde property keys.
val delimiterOptions = Map(
"fieldDelim" -> "field.delim",
"escapeDelim" -> "escape.delim",
// The following typo is inherited from Hive...
"collectionDelim" -> "colelction.delim",
"mapkeyDelim" -> "mapkey.delim",
"lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase -> v }
}

View file

@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
private[orc] object OrcFileOperator extends Logging { private[hive] object OrcFileOperator extends Logging {
/** /**
* Retrieves an ORC file reader from a given path. The path can point to either a directory or a * Retrieves an ORC file reader from a given path. The path can point to either a directory or a
* single ORC file. If it points to a directory, it picks any non-empty ORC file within that * single ORC file. If it points to a directory, it picks any non-empty ORC file within that

View file

@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformati
import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -51,6 +50,12 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(e.getMessage.toLowerCase.contains("operation not allowed")) assert(e.getMessage.toLowerCase.contains("operation not allowed"))
} }
private def analyzeCreateTable(sql: String): CatalogTable = {
TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
case CreateTable(tableDesc, mode, _) => tableDesc
}.head
}
test("Test CTAS #1") { test("Test CTAS #1") {
val s1 = val s1 =
"""CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
@ -76,7 +81,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
assert(desc.storage.serde == assert(desc.storage.serde ==
Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
} }
test("Test CTAS #2") { test("Test CTAS #2") {
@ -107,7 +112,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat"))
assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat"))
assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe"))
assert(desc.properties == Map(("p1", "v1"), ("p2", "v2"))) assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2"))
} }
test("Test CTAS #3") { test("Test CTAS #3") {
@ -125,7 +130,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
assert(desc.storage.outputFormat == assert(desc.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
assert(desc.storage.serde.isEmpty) assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc.properties == Map()) assert(desc.properties == Map())
} }
@ -305,7 +310,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
Some("org.apache.hadoop.mapred.TextInputFormat")) Some("org.apache.hadoop.mapred.TextInputFormat"))
assert(desc.storage.outputFormat == assert(desc.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
assert(desc.storage.serde.isEmpty) assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc.storage.properties.isEmpty) assert(desc.storage.properties.isEmpty)
assert(desc.properties.isEmpty) assert(desc.properties.isEmpty)
assert(desc.comment.isEmpty) assert(desc.comment.isEmpty)
@ -412,7 +417,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
val (desc2, _) = extractTableDesc(query2) val (desc2, _) = extractTableDesc(query2)
assert(desc1.storage.inputFormat == Some("winput")) assert(desc1.storage.inputFormat == Some("winput"))
assert(desc1.storage.outputFormat == Some("wowput")) assert(desc1.storage.outputFormat == Some("wowput"))
assert(desc1.storage.serde.isEmpty) assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
@ -592,4 +597,94 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
assert(hiveClient.getConf("hive.in.test", "") == "true") assert(hiveClient.getConf("hive.in.test", "") == "true")
} }
test("create hive serde table with new syntax - basic") {
val sql =
"""
|CREATE TABLE t
|(id int, name string COMMENT 'blabla')
|USING hive
|OPTIONS (fileFormat 'parquet', my_prop 1)
|LOCATION '/tmp/file'
|COMMENT 'BLABLA'
""".stripMargin
val table = analyzeCreateTable(sql)
assert(table.schema == new StructType()
.add("id", "int")
.add("name", "string", nullable = true, comment = "blabla"))
assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
assert(table.storage.locationUri == Some("/tmp/file"))
assert(table.storage.properties == Map("my_prop" -> "1"))
assert(table.comment == Some("BLABLA"))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
test("create hive serde table with new syntax - with partition and bucketing") {
val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)"
val table = analyzeCreateTable(v1)
assert(table.schema == new StructType().add("c1", "int").add("c2", "int"))
assert(table.partitionColumnNames == Seq("c2"))
// check the default formats
assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS"
val e2 = intercept[AnalysisException](analyzeCreateTable(v2))
assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
val v3 =
"""
|CREATE TABLE t (c1 int, c2 int) USING hive
|PARTITIONED BY (c2)
|CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin
val e3 = intercept[AnalysisException](analyzeCreateTable(v3))
assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet"))
}
test("create hive serde table with new syntax - Hive options error checking") {
val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')"
val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1))
assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat"))
val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " +
"(fileFormat 'x', inputFormat 'a', outputFormat 'b')"
val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2))
assert(e2.getMessage.contains(
"Cannot specify fileFormat and inputFormat/outputFormat together"))
val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')"
val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3))
assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde"))
val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')"
val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4))
assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde"))
val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')"
val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5))
assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat"))
val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')"
val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6))
assert(e6.getMessage.contains(
"Cannot specify delimiters as they are only compatible with fileFormat 'textfile'"))
// The value of 'fileFormat' option is case-insensitive.
val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')"
val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7))
assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter"))
val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')"
val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8))
assert(e8.getMessage.contains("invalid fileFormat: 'wrong'"))
}
} }

View file

@ -68,6 +68,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
val rawTable = externalCatalog.client.getTable("db1", "hive_tbl") val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER)) assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER)) assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
} }
} }

View file

@ -1189,21 +1189,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
} }
} }
test("create a data source table using hive") {
val tableName = "tab1"
withTable (tableName) {
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE $tableName
|(col1 int)
|USING hive
""".stripMargin)
}.getMessage
assert(e.contains("Cannot create hive serde table with CREATE TABLE USING"))
}
}
test("create a temp view using hive") { test("create a temp view using hive") {
val tableName = "tab1" val tableName = "tab1"
withTable (tableName) { withTable (tableName) {

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, Cat
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@ -1250,4 +1251,42 @@ class HiveDDLSuite
assert(e.message.contains("unknown is not a valid partition column")) assert(e.message.contains("unknown is not a valid partition column"))
} }
} }
test("create hive serde table with new syntax") {
withTable("t", "t2", "t3") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat 'orc', compression 'Zlib')
|LOCATION '${path.getCanonicalPath}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
assert(table.storage.properties.get("compression") == Some("Zlib"))
assert(spark.table("t").collect().isEmpty)
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())
sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
assert(DDLUtils.isHiveTable(table2))
assert(table2.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
checkAnswer(spark.table("t2"), Row(1, "a"))
sql("CREATE TABLE t3(a int, p int) USING hive PARTITIONED BY (p)")
sql("INSERT INTO t3 PARTITION(p=1) SELECT 0")
checkAnswer(spark.table("t3"), Row(0, 1))
}
}
}
} }

View file

@ -93,8 +93,6 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(path) .orc(path)
// Check if this is compressed as ZLIB. // Check if this is compressed as ZLIB.
val conf = spark.sessionState.newHadoopConf()
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined) assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString val orcFilePath = maybeOrcFile.get.toPath.toString