[SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive overwrite

## What changes were proposed in this pull request?

After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support.

This PR implement:
DataFrameWriter.saveAsTable work with hive format with overwrite mode

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16549 from windpiger/saveAsTableWithHiveOverwrite.
This commit is contained in:
windpiger 2017-01-14 10:53:33 -08:00 committed by gatorsmile
parent b6a7aa4f77
commit 8942353905
3 changed files with 34 additions and 14 deletions

View file

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case (true, SaveMode.Overwrite) =>
// Get all input data source relations of the query.
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
relation.catalogTable.identifier
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
// Only do the check if the table is a data source table (the relation is a BaseRelation).
// TODO(cloud-fan): also check hive table relation here when we support overwrite mode
// for creating hive tables.
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
&& srcRelations.contains(relation.catalogTable.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
}

View file

@ -109,12 +109,11 @@ private[hive] trait HiveStrategies {
table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
// Currently we will never hit this branch, as SQL string API can only use `Ignore` or
// `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
// tables yet.
if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
// Currently `DataFrameWriter.saveAsTable` doesn't support
// the Append mode of hive serde tables yet.
if (mode == SaveMode.Append) {
throw new AnalysisException(
"CTAS for hive serde tables does not support append or overwrite semantics.")
"CTAS for hive serde tables does not support append semantics.")
}
val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)

View file

@ -1314,7 +1314,24 @@ class HiveDDLSuite
.write.format("hive").option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(1, "a"))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Seq("c" -> 1).toDF("i", "j").write.format("hive")
.mode(SaveMode.Overwrite).option("fileFormat", "parquet").saveAsTable("t")
checkAnswer(spark.table("t"), Row("c", 1))
var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.outputFormat ==
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(table.storage.serde ==
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
Seq(9 -> "x").toDF("i", "j")
.write.format("hive").mode(SaveMode.Overwrite).option("fileFormat", "avro").saveAsTable("t")
checkAnswer(spark.table("t"), Row(9, "x"))
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.inputFormat ==
Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
@ -1324,7 +1341,7 @@ class HiveDDLSuite
Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
sql("INSERT INTO t SELECT 2, 'b'")
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
val e = intercept[AnalysisException] {
Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
@ -1340,8 +1357,7 @@ class HiveDDLSuite
val e3 = intercept[AnalysisException] {
spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
}
assert(e3.message.contains(
"CTAS for hive serde tables does not support append or overwrite semantics"))
assert(e3.message.contains("Cannot overwrite table default.t that is also being read from"))
}
}