From b0319c2ecb51bb97c3228afa4a384572b9ffbce6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 10 Jan 2017 19:26:51 +0800 Subject: [PATCH] [SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog ## What changes were proposed in this pull request? After unifying the CREATE TABLE syntax in https://github.com/apache/spark/pull/16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now. This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests. ## How was this patch tested? new tests in `HiveDDLSuite` Author: Wenchen Fan Closes #16487 from cloud-fan/hive-table. --- .../apache/spark/sql/DataFrameReader.scala | 14 ++-- .../apache/spark/sql/DataFrameWriter.scala | 11 ++- .../spark/sql/internal/CatalogImpl.scala | 4 - .../spark/sql/internal/CatalogSuite.scala | 7 -- .../sql/hive/MetastoreDataSourcesSuite.scala | 20 ----- .../sql/hive/execution/HiveDDLSuite.scala | 77 +++++++++++++++++++ 6 files changed, 93 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 365b50dee9..cd83836178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.InferSchema @@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "read files of Hive data source directly.") + } + sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, @@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { // properties should override settings in extraOptions. - this.extraOptions = this.extraOptions ++ properties.asScala + this.extraOptions ++= properties.asScala // explicit url and dbtable should override all this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) format("jdbc").load() @@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - Dataset.ofRows(sparkSession, - sparkSession.sessionState.catalog.lookupRelation( - sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))) + sparkSession.table(tableName) } /** @@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private var userSpecifiedSchema: Option[StructType] = None - private var extraOptions = new scala.collection.mutable.HashMap[String, String] + private val extraOptions = new scala.collection.mutable.HashMap[String, String] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3127ebf679..82331fdb9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -205,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def save(): Unit = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "write files of Hive data source directly.") + } + assertNotBucketed("save") val dataSource = DataSource( df.sparkSession, @@ -361,10 +366,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { - throw new AnalysisException("Cannot create hive serde table with saveAsTable API") - } - val catalog = df.sparkSession.sessionState.catalog val tableExists = catalog.tableExists(tableIdent) val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) @@ -385,6 +386,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { // Only do the check if the table is a data source table (the relation is a BaseRelation). + // TODO(cloud-fan): also check hive table relation here when we support overwrite mode + // for creating hive tables. case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 41ed9d7180..8244b2152c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { source: String, schema: StructType, options: Map[String, String]): DataFrame = { - if (source.toLowerCase == "hive") { - throw new AnalysisException("Cannot create hive serde table with createExternalTable API.") - } - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) val tableDesc = CatalogTable( identifier = tableIdent, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 89ec162c8e..5dd04543ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -322,13 +322,6 @@ class CatalogSuite assert(e2.message == "Cannot create a file-based external data source table without path") } - test("createExternalTable should fail if provider is hive") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String]) - } - assert(e.message.contains("Cannot create hive serde table with createExternalTable API")) - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index aed825e2f3..13ef79e3b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } - test("save API - format hive") { - withTempDir { dir => - val path = dir.getCanonicalPath - val e = intercept[ClassNotFoundException] { - spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path) - }.getMessage - assert(e.contains("Failed to find data source: hive")) - } - } - - test("saveAsTable API - format hive") { - val tableName = "tab1" - withTable(tableName) { - val e = intercept[AnalysisException] { - spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName) - }.getMessage - assert(e.contains("Cannot create hive serde table with saveAsTable API")) - } - } - test("create a temp view using hive") { val tableName = "tab1" withTable (tableName) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3ac07d0933..77285282a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { @@ -1289,4 +1290,80 @@ class HiveDDLSuite } } } + + test("create hive serde table with Catalog") { + withTable("t") { + withTempDir { dir => + val df = spark.catalog.createExternalTable( + "t", + "hive", + new StructType().add("i", "int"), + Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet")) + assert(df.collect().isEmpty) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + } + } + } + + test("create hive serde table with DataFrameWriter.saveAsTable") { + withTable("t", "t2") { + Seq(1 -> "a").toDF("i", "j") + .write.format("hive").option("fileFormat", "avro").saveAsTable("t") + checkAnswer(spark.table("t"), Row(1, "a")) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe")) + + sql("INSERT INTO t SELECT 2, 'b'") + checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) + + val e = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2") + } + assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " + + "to create a partitioned table using Hive")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2") + } + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val e3 = intercept[AnalysisException] { + spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t") + } + assert(e3.message.contains( + "CTAS for hive serde tables does not support append or overwrite semantics")) + } + } + + test("read/write files with hive data source is not allowed") { + withTempDir { dir => + val e = intercept[AnalysisException] { + spark.read.format("hive").load(dir.getAbsolutePath) + } + assert(e.message.contains("Hive data source can only be used with tables")) + + val e2 = intercept[AnalysisException] { + Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath) + } + assert(e2.message.contains("Hive data source can only be used with tables")) + } + } }