From 1b416a0c77706ba352b72841d8b6ca3f459593fa Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 15 Aug 2019 17:21:13 +0800 Subject: [PATCH] [SPARK-27592][SQL] Set the bucketed data source table SerDe correctly ## What changes were proposed in this pull request? Hive using incorrect **InputFormat**(`org.apache.hadoop.mapred.SequenceFileInputFormat`) to read Spark's **Parquet** bucketed data source table. Spark side: ```sql spark-sql> CREATE TABLE t (c1 INT, c2 INT) USING parquet CLUSTERED BY (c1) SORTED BY (c1) INTO 2 BUCKETS; 2019-04-29 17:52:05 WARN HiveExternalCatalog:66 - Persisting bucketed data source table `default`.`t` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. spark-sql> DESC FORMATTED t; c1 int NULL c2 int NULL # Detailed Table Information Database default Table t Owner yumwang Created Time Mon Apr 29 17:52:05 CST 2019 Last Access Thu Jan 01 08:00:00 CST 1970 Created By Spark 2.4.0 Type MANAGED Provider parquet Num Buckets 2 Bucket Columns [`c1`] Sort Columns [`c1`] Table Properties [transient_lastDdlTime=1556531525] Location file:/user/hive/warehouse/t Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Storage Properties [serialization.format=1] ``` Hive side: ```sql hive> DESC FORMATTED t; OK # col_name data_type comment c1 int c2 int # Detailed Table Information Database: default Owner: root CreateTime: Wed May 08 03:38:46 GMT-07:00 2019 LastAccessTime: UNKNOWN Retention: 0 Location: file:/user/hive/warehouse/t Table Type: MANAGED_TABLE Table Parameters: bucketing_version spark spark.sql.create.version 3.0.0-SNAPSHOT spark.sql.sources.provider parquet spark.sql.sources.schema.bucketCol.0 c1 spark.sql.sources.schema.numBucketCols 1 spark.sql.sources.schema.numBuckets 2 spark.sql.sources.schema.numParts 1 spark.sql.sources.schema.numSortCols 1 spark.sql.sources.schema.part.0 {\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]} spark.sql.sources.schema.sortCol.0 c1 transient_lastDdlTime 1557311926 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: path file:/user/hive/warehouse/t serialization.format 1 ``` So it's non-bucketed table at Hive side. This pr set the `SerDe` correctly so Hive can read these tables. Related code: https://github.com/apache/spark/blob/33f3c48cac087e079b9c7e342c2e58b16eaaa681/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L976-L990 https://github.com/apache/spark/blob/f9776e389215255dc61efaa2eddd92a1fa754b48/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L444-L459 ## How was this patch tested? unit tests Closes #24486 from wangyum/SPARK-27592. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- .../spark/sql/hive/HiveExternalCatalog.scala | 7 ++-- .../sql/hive/HiveMetastoreCatalogSuite.scala | 38 ++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index d4df35c8ec..03874d005a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -363,11 +363,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat (None, message) // our bucketing is un-compatible with hive(different hash function) - case _ if table.bucketSpec.nonEmpty => + case Some(serde) if table.bucketSpec.nonEmpty => val message = s"Persisting bucketed data source table $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " - (None, message) + "Hive metastore in Spark SQL specific format, which is NOT compatible with " + + "Hive bucketed table. But Hive can read this table as a non-bucketed table." + (Some(newHiveCompatibleMetastoreTable(serde)), message) case Some(serde) => val message = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index deb0a10857..007694543d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types._ @@ -284,4 +284,40 @@ class DataSourceWithHiveMetastoreCatalogSuite } } + + test("SPARK-27592 set the bucketed data source table SerDe correctly") { + val provider = "parquet" + withTable("t") { + spark.sql( + s""" + |CREATE TABLE t + |USING $provider + |CLUSTERED BY (c1) + |SORTED BY (c1) + |INTO 2 BUCKETS + |AS SELECT 1 AS c1, 2 AS c2 + """.stripMargin) + + val metadata = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) + + val hiveSerDe = HiveSerDe.sourceToSerDe(provider).get + assert(metadata.storage.serde === hiveSerDe.serde) + assert(metadata.storage.inputFormat === hiveSerDe.inputFormat) + assert(metadata.storage.outputFormat === hiveSerDe.outputFormat) + + // It's a bucketed table at Spark side + assert(sql("DESC FORMATTED t").collect().containsSlice( + Seq(Row("Num Buckets", "2", ""), Row("Bucket Columns", "[`c1`]", "")) + )) + checkAnswer(table("t"), Row(1, 2)) + + // It's not a bucketed table at Hive side + val hiveSide = sparkSession.metadataHive.runSqlHive("DESC FORMATTED t") + assert(hiveSide.contains("Num Buckets: \t-1 \t ")) + assert(hiveSide.contains("Bucket Columns: \t[] \t ")) + assert(hiveSide.contains("\tspark.sql.sources.schema.numBuckets\t2 ")) + assert(hiveSide.contains("\tspark.sql.sources.schema.bucketCol.0\tc1 ")) + assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\t2")) + } + } }