[SPARK-27592][SQL] Set the bucketed data source table SerDe correctly
## What changes were proposed in this pull request? Hive using incorrect **InputFormat**(`org.apache.hadoop.mapred.SequenceFileInputFormat`) to read Spark's **Parquet** bucketed data source table. Spark side: ```sql spark-sql> CREATE TABLE t (c1 INT, c2 INT) USING parquet CLUSTERED BY (c1) SORTED BY (c1) INTO 2 BUCKETS; 2019-04-29 17:52:05 WARN HiveExternalCatalog:66 - Persisting bucketed data source table `default`.`t` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. spark-sql> DESC FORMATTED t; c1 int NULL c2 int NULL # Detailed Table Information Database default Table t Owner yumwang Created Time Mon Apr 29 17:52:05 CST 2019 Last Access Thu Jan 01 08:00:00 CST 1970 Created By Spark 2.4.0 Type MANAGED Provider parquet Num Buckets 2 Bucket Columns [`c1`] Sort Columns [`c1`] Table Properties [transient_lastDdlTime=1556531525] Location file:/user/hive/warehouse/t Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Storage Properties [serialization.format=1] ``` Hive side: ```sql hive> DESC FORMATTED t; OK # col_name data_type comment c1 int c2 int # Detailed Table Information Database: default Owner: root CreateTime: Wed May 08 03:38:46 GMT-07:00 2019 LastAccessTime: UNKNOWN Retention: 0 Location: file:/user/hive/warehouse/t Table Type: MANAGED_TABLE Table Parameters: bucketing_version spark spark.sql.create.version 3.0.0-SNAPSHOT spark.sql.sources.provider parquet spark.sql.sources.schema.bucketCol.0 c1 spark.sql.sources.schema.numBucketCols 1 spark.sql.sources.schema.numBuckets 2 spark.sql.sources.schema.numParts 1 spark.sql.sources.schema.numSortCols 1 spark.sql.sources.schema.part.0 {\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]} spark.sql.sources.schema.sortCol.0 c1 transient_lastDdlTime 1557311926 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: path file:/user/hive/warehouse/t serialization.format 1 ``` So it's non-bucketed table at Hive side. This pr set the `SerDe` correctly so Hive can read these tables. Related code:33f3c48cac/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala (L976-L990)
f9776e3892/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala (L444-L459)
## How was this patch tested? unit tests Closes #24486 from wangyum/SPARK-27592. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
a493031e2e
commit
1b416a0c77
|
@ -363,11 +363,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
||||||
(None, message)
|
(None, message)
|
||||||
|
|
||||||
// our bucketing is un-compatible with hive(different hash function)
|
// our bucketing is un-compatible with hive(different hash function)
|
||||||
case _ if table.bucketSpec.nonEmpty =>
|
case Some(serde) if table.bucketSpec.nonEmpty =>
|
||||||
val message =
|
val message =
|
||||||
s"Persisting bucketed data source table $qualifiedTableName into " +
|
s"Persisting bucketed data source table $qualifiedTableName into " +
|
||||||
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
|
"Hive metastore in Spark SQL specific format, which is NOT compatible with " +
|
||||||
(None, message)
|
"Hive bucketed table. But Hive can read this table as a non-bucketed table."
|
||||||
|
(Some(newHiveCompatibleMetastoreTable(serde)), message)
|
||||||
|
|
||||||
case Some(serde) =>
|
case Some(serde) =>
|
||||||
val message =
|
val message =
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
|
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
|
||||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
|
||||||
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
|
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
@ -284,4 +284,40 @@ class DataSourceWithHiveMetastoreCatalogSuite
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-27592 set the bucketed data source table SerDe correctly") {
|
||||||
|
val provider = "parquet"
|
||||||
|
withTable("t") {
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|CREATE TABLE t
|
||||||
|
|USING $provider
|
||||||
|
|CLUSTERED BY (c1)
|
||||||
|
|SORTED BY (c1)
|
||||||
|
|INTO 2 BUCKETS
|
||||||
|
|AS SELECT 1 AS c1, 2 AS c2
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
val metadata = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
|
||||||
|
|
||||||
|
val hiveSerDe = HiveSerDe.sourceToSerDe(provider).get
|
||||||
|
assert(metadata.storage.serde === hiveSerDe.serde)
|
||||||
|
assert(metadata.storage.inputFormat === hiveSerDe.inputFormat)
|
||||||
|
assert(metadata.storage.outputFormat === hiveSerDe.outputFormat)
|
||||||
|
|
||||||
|
// It's a bucketed table at Spark side
|
||||||
|
assert(sql("DESC FORMATTED t").collect().containsSlice(
|
||||||
|
Seq(Row("Num Buckets", "2", ""), Row("Bucket Columns", "[`c1`]", ""))
|
||||||
|
))
|
||||||
|
checkAnswer(table("t"), Row(1, 2))
|
||||||
|
|
||||||
|
// It's not a bucketed table at Hive side
|
||||||
|
val hiveSide = sparkSession.metadataHive.runSqlHive("DESC FORMATTED t")
|
||||||
|
assert(hiveSide.contains("Num Buckets: \t-1 \t "))
|
||||||
|
assert(hiveSide.contains("Bucket Columns: \t[] \t "))
|
||||||
|
assert(hiveSide.contains("\tspark.sql.sources.schema.numBuckets\t2 "))
|
||||||
|
assert(hiveSide.contains("\tspark.sql.sources.schema.bucketCol.0\tc1 "))
|
||||||
|
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\t2"))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue