From e7fe92f12991ce4ccc101c2cc01354201c9c5384 Mon Sep 17 00:00:00 2001 From: "xuewei.linxuewei" Date: Mon, 14 Dec 2020 08:27:18 +0000 Subject: [PATCH] [SPARK-33546][SQL] Enable row format file format validation in CREATE TABLE LIKE ### What changes were proposed in this pull request? [SPARK-33546] stated the there are three inconsistency behaviors for CREATE TABLE LIKE. 1. CREATE TABLE LIKE does not validate the user-specified hive serde. e.g., STORED AS PARQUET can't be used with ROW FORMAT SERDE. 2. CREATE TABLE LIKE requires STORED AS and ROW FORMAT SERDE to be specified together, which is not necessary. 3. CREATE TABLE LIKE does not respect the default hive serde. This PR fix No.1, and after investigate, No.2 and No.3 turn out not to be issue. Within Hive. CREATE TABLE abc ... ROW FORMAT SERDE 'xxx.xxx.SerdeClass' (Without Stored as) will have following result. Using the user specific SerdeClass and fetch default input/output format from default textfile format. ``` SerDe Library: xxx.xxx.SerdeClass InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat ``` But for CREATE TABLE dst LIKE src ROW FORMAT SERDE 'xxx.xxx.SerdeClass' (Without Stored as) will just ignore user specific SerdeClass and using (input, output, serdeClass) from src table. It's better to just throw an exception on such ambiguous behavior, so No.2 is not an issue, but in the PR, we add some comments. For No.3, in fact, CreateTableLikeCommand is using following logical to try to follow src table's storageFormat if current fileFormat.inputFormat is empty ``` val newStorage = if (fileFormat.inputFormat.isDefined) { fileFormat } else { sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri) } ``` If we try to fill the new target table with HiveSerDe.getDefaultStorage if file format and row format is not explicity spefified, it will break the CREATE TABLE LIKE semantic. ### Why are the changes needed? Bug Fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT and Existing UT. Closes #30705 from leanken/leanken-SPARK-33546. Authored-by: xuewei.linxuewei Signed-off-by: Wenchen Fan --- .../sql/catalyst/parser/AstBuilder.scala | 5 +- .../spark/sql/execution/SparkSqlParser.scala | 9 +- .../sql/hive/execution/HiveDDLSuite.scala | 130 ++++++++++++++---- 3 files changed, 108 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a7bb217976..660d617a07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2956,9 +2956,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg protected def getSerdeInfo( rowFormatCtx: Seq[RowFormatContext], createFileFormatCtx: Seq[CreateFileFormatContext], - ctx: ParserRuleContext, - skipCheck: Boolean = false): Option[SerdeInfo] = { - if (!skipCheck) validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx) + ctx: ParserRuleContext): Option[SerdeInfo] = { + validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx) val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat) val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat) (fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ba5874c21f..3ca3461dfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -447,14 +447,16 @@ class SparkSqlAstBuilder extends AstBuilder { checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) val provider = ctx.tableProvider.asScala.headOption.map(_.multipartIdentifier.getText) val location = visitLocationSpecList(ctx.locationSpec()) - // TODO: Do not skip serde check for CREATE TABLE LIKE. val serdeInfo = getSerdeInfo( - ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx, skipCheck = true) + ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx) if (provider.isDefined && serdeInfo.isDefined) { operationNotAllowed(s"CREATE TABLE LIKE ... USING ... ${serdeInfo.get.describe}", ctx) } - // TODO: remove this restriction as it seems unnecessary. + // For "CREATE TABLE dst LIKE src ROW FORMAT SERDE xxx" which doesn't specify the file format, + // it's a bit weird to use the default file format, but it's also weird to get file format + // from the source table while the serde class is user-specified. + // Here we require both serde and format to be specified, to avoid confusion. serdeInfo match { case Some(SerdeInfo(storedAs, formatClasses, serde, _)) => if (storedAs.isEmpty && formatClasses.isEmpty && serde.isDefined) { @@ -463,7 +465,6 @@ class SparkSqlAstBuilder extends AstBuilder { case _ => } - // TODO: also look at `HiveSerDe.getDefaultStorage`. val storage = toStorageFormat(location, serdeInfo, ctx) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) CreateTableLikeCommand( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a6c40851b1..b686d040b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.net.URI +import java.util.Locale import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER @@ -2771,7 +2772,7 @@ class HiveDDLSuite test("Create Table LIKE with row format") { val catalog = spark.sessionState.catalog - withTable("sourceHiveTable", "sourceDsTable", "targetHiveTable1", "targetHiveTable2") { + withTable("sourceHiveTable", "sourceDsTable") { sql("CREATE TABLE sourceHiveTable(a INT, b INT) STORED AS PARQUET") sql("CREATE TABLE sourceDsTable(a INT, b INT) USING PARQUET") @@ -2817,34 +2818,6 @@ class HiveDDLSuite """.stripMargin) }.getMessage assert(e.contains("Operation not allowed: CREATE TABLE LIKE ... USING ... STORED AS")) - - // row format works with STORED AS hive format (from hive table) - spark.sql( - """ - |CREATE TABLE targetHiveTable1 LIKE sourceHiveTable STORED AS PARQUET - |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' - |WITH SERDEPROPERTIES ('test' = 'test') - """.stripMargin) - var table = catalog.getTableMetadata(TableIdentifier("targetHiveTable1")) - assert(table.provider === Some("hive")) - assert(table.storage.inputFormat === - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) - assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(table.storage.properties("test") == "test") - - // row format works with STORED AS hive format (from datasource table) - spark.sql( - """ - |CREATE TABLE targetHiveTable2 LIKE sourceDsTable STORED AS PARQUET - |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' - |WITH SERDEPROPERTIES ('test' = 'test') - """.stripMargin) - table = catalog.getTableMetadata(TableIdentifier("targetHiveTable2")) - assert(table.provider === Some("hive")) - assert(table.storage.inputFormat === - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) - assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(table.storage.properties("test") == "test") } } @@ -2872,4 +2845,103 @@ class HiveDDLSuite assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty) } } + + test("SPARK-33546: CREATE TABLE LIKE should validate row format & file format") { + val catalog = spark.sessionState.catalog + withTable("sourceHiveTable", "sourceDsTable") { + sql("CREATE TABLE sourceHiveTable(a INT, b INT) STORED AS PARQUET") + sql("CREATE TABLE sourceDsTable(a INT, b INT) USING PARQUET") + + // ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE] + val allowSerdeFileFormats = Seq("TEXTFILE", "SEQUENCEFILE", "RCFILE") + Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable => + allowSerdeFileFormats.foreach { format => + withTable("targetTable") { + spark.sql( + s""" + |CREATE TABLE targetTable LIKE $sourceTable + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |STORED AS $format + """.stripMargin) + + val expectedSerde = HiveSerDe.sourceToSerDe(format) + val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default"))) + assert(table.provider === Some("hive")) + assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get)) + assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get)) + assert(table.storage.serde === + Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } + } + + // negative case + hiveFormats.filterNot(allowSerdeFileFormats.contains(_)).foreach { format => + withTable("targetTable") { + val ex = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE targetTable LIKE $sourceTable + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |STORED AS $format + """.stripMargin) + }.getMessage + assert(ex.contains( + s"ROW FORMAT SERDE is incompatible with format '${format.toLowerCase(Locale.ROOT)}'")) + } + } + } + + // ROW FORMAT DELIMITED ... STORED AS TEXTFILE + Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable => + withTable("targetTable") { + spark.sql( + s""" + |CREATE TABLE targetTable LIKE $sourceTable + |ROW FORMAT DELIMITED + |STORED AS TEXTFILE + """.stripMargin) + + val expectedSerde = HiveSerDe.sourceToSerDe("TEXTFILE") + val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default"))) + assert(table.provider === Some("hive")) + assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get)) + assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get)) + assert(table.storage.serde === Some(expectedSerde.get.serde.get)) + + // negative case + val ex = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE targetTable LIKE $sourceTable + |ROW FORMAT DELIMITED + |STORED AS PARQUET + """.stripMargin) + }.getMessage + assert(ex.contains("ROW FORMAT DELIMITED is only compatible with 'textfile'")) + } + } + + // ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ... + hiveFormats.foreach { tableType => + val expectedSerde = HiveSerDe.sourceToSerDe(tableType) + Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable => + withTable("targetTable") { + spark.sql( + s""" + |CREATE TABLE targetTable LIKE $sourceTable + |ROW FORMAT SERDE '${expectedSerde.get.serde.get}' + |STORED AS INPUTFORMAT '${expectedSerde.get.inputFormat.get}' + |OUTPUTFORMAT '${expectedSerde.get.outputFormat.get}' + """.stripMargin) + + val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default"))) + assert(table.provider === Some("hive")) + assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get)) + assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get)) + assert(table.storage.serde === Some(expectedSerde.get.serde.get)) + } + } + } + } + } }