[SPARK-33546][SQL] Enable row format file format validation in CREATE TABLE LIKE

### What changes were proposed in this pull request?

[SPARK-33546] stated the there are three inconsistency behaviors for CREATE TABLE LIKE.

1. CREATE TABLE LIKE does not validate the user-specified hive serde. e.g., STORED AS PARQUET can't be used with ROW FORMAT SERDE.
2. CREATE TABLE LIKE requires STORED AS and ROW FORMAT SERDE to be specified together, which is not necessary.
3. CREATE TABLE LIKE does not respect the default hive serde.

This PR fix No.1, and after investigate, No.2 and No.3 turn out not to be issue.

Within Hive.

CREATE TABLE abc ... ROW FORMAT SERDE 'xxx.xxx.SerdeClass' (Without Stored as) will have
following result. Using the user specific SerdeClass and fetch default input/output format from default textfile format.

```
SerDe Library:          xxx.xxx.SerdeClass
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
```

But for
CREATE TABLE dst LIKE src ROW FORMAT SERDE 'xxx.xxx.SerdeClass' (Without Stored as) will just ignore user specific SerdeClass and using (input, output, serdeClass) from src table.

It's better to just throw an exception on such ambiguous behavior, so No.2 is not an issue, but in the PR, we add some comments.

For No.3, in fact, CreateTableLikeCommand is using following logical to try to follow src table's storageFormat if current fileFormat.inputFormat is empty

```
val newStorage = if (fileFormat.inputFormat.isDefined) {
      fileFormat
    } else {
      sourceTableDesc.storage.copy(locationUri = fileFormat.locationUri)
    }
```

If we try to fill the new target table with HiveSerDe.getDefaultStorage if file format and row format is not explicity spefified, it will break the CREATE TABLE LIKE semantic.

### Why are the changes needed?

Bug Fix.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT and Existing UT.

Closes #30705 from leanken/leanken-SPARK-33546.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
xuewei.linxuewei 2020-12-14 08:27:18 +00:00 committed by Wenchen Fan
parent 817f58ddcb
commit e7fe92f129
3 changed files with 108 additions and 36 deletions

View file

@ -2956,9 +2956,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
protected def getSerdeInfo(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
ctx: ParserRuleContext,
skipCheck: Boolean = false): Option[SerdeInfo] = {
if (!skipCheck) validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
ctx: ParserRuleContext): Option[SerdeInfo] = {
validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))

View file

@ -447,14 +447,16 @@ class SparkSqlAstBuilder extends AstBuilder {
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
val provider = ctx.tableProvider.asScala.headOption.map(_.multipartIdentifier.getText)
val location = visitLocationSpecList(ctx.locationSpec())
// TODO: Do not skip serde check for CREATE TABLE LIKE.
val serdeInfo = getSerdeInfo(
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx, skipCheck = true)
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE LIKE ... USING ... ${serdeInfo.get.describe}", ctx)
}
// TODO: remove this restriction as it seems unnecessary.
// For "CREATE TABLE dst LIKE src ROW FORMAT SERDE xxx" which doesn't specify the file format,
// it's a bit weird to use the default file format, but it's also weird to get file format
// from the source table while the serde class is user-specified.
// Here we require both serde and format to be specified, to avoid confusion.
serdeInfo match {
case Some(SerdeInfo(storedAs, formatClasses, serde, _)) =>
if (storedAs.isEmpty && formatClasses.isEmpty && serde.isDefined) {
@ -463,7 +465,6 @@ class SparkSqlAstBuilder extends AstBuilder {
case _ =>
}
// TODO: also look at `HiveSerDe.getDefaultStorage`.
val storage = toStorageFormat(location, serdeInfo, ctx)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
CreateTableLikeCommand(

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.net.URI
import java.util.Locale
import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
@ -2771,7 +2772,7 @@ class HiveDDLSuite
test("Create Table LIKE with row format") {
val catalog = spark.sessionState.catalog
withTable("sourceHiveTable", "sourceDsTable", "targetHiveTable1", "targetHiveTable2") {
withTable("sourceHiveTable", "sourceDsTable") {
sql("CREATE TABLE sourceHiveTable(a INT, b INT) STORED AS PARQUET")
sql("CREATE TABLE sourceDsTable(a INT, b INT) USING PARQUET")
@ -2817,34 +2818,6 @@ class HiveDDLSuite
""".stripMargin)
}.getMessage
assert(e.contains("Operation not allowed: CREATE TABLE LIKE ... USING ... STORED AS"))
// row format works with STORED AS hive format (from hive table)
spark.sql(
"""
|CREATE TABLE targetHiveTable1 LIKE sourceHiveTable STORED AS PARQUET
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
var table = catalog.getTableMetadata(TableIdentifier("targetHiveTable1"))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat ===
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(table.storage.properties("test") == "test")
// row format works with STORED AS hive format (from datasource table)
spark.sql(
"""
|CREATE TABLE targetHiveTable2 LIKE sourceDsTable STORED AS PARQUET
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|WITH SERDEPROPERTIES ('test' = 'test')
""".stripMargin)
table = catalog.getTableMetadata(TableIdentifier("targetHiveTable2"))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat ===
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(table.storage.serde === Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(table.storage.properties("test") == "test")
}
}
@ -2872,4 +2845,103 @@ class HiveDDLSuite
assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty)
}
}
test("SPARK-33546: CREATE TABLE LIKE should validate row format & file format") {
val catalog = spark.sessionState.catalog
withTable("sourceHiveTable", "sourceDsTable") {
sql("CREATE TABLE sourceHiveTable(a INT, b INT) STORED AS PARQUET")
sql("CREATE TABLE sourceDsTable(a INT, b INT) USING PARQUET")
// ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
val allowSerdeFileFormats = Seq("TEXTFILE", "SEQUENCEFILE", "RCFILE")
Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable =>
allowSerdeFileFormats.foreach { format =>
withTable("targetTable") {
spark.sql(
s"""
|CREATE TABLE targetTable LIKE $sourceTable
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|STORED AS $format
""".stripMargin)
val expectedSerde = HiveSerDe.sourceToSerDe(format)
val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default")))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get))
assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get))
assert(table.storage.serde ===
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
}
}
// negative case
hiveFormats.filterNot(allowSerdeFileFormats.contains(_)).foreach { format =>
withTable("targetTable") {
val ex = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE targetTable LIKE $sourceTable
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|STORED AS $format
""".stripMargin)
}.getMessage
assert(ex.contains(
s"ROW FORMAT SERDE is incompatible with format '${format.toLowerCase(Locale.ROOT)}'"))
}
}
}
// ROW FORMAT DELIMITED ... STORED AS TEXTFILE
Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable =>
withTable("targetTable") {
spark.sql(
s"""
|CREATE TABLE targetTable LIKE $sourceTable
|ROW FORMAT DELIMITED
|STORED AS TEXTFILE
""".stripMargin)
val expectedSerde = HiveSerDe.sourceToSerDe("TEXTFILE")
val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default")))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get))
assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get))
assert(table.storage.serde === Some(expectedSerde.get.serde.get))
// negative case
val ex = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE targetTable LIKE $sourceTable
|ROW FORMAT DELIMITED
|STORED AS PARQUET
""".stripMargin)
}.getMessage
assert(ex.contains("ROW FORMAT DELIMITED is only compatible with 'textfile'"))
}
}
// ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
hiveFormats.foreach { tableType =>
val expectedSerde = HiveSerDe.sourceToSerDe(tableType)
Seq("sourceHiveTable", "sourceDsTable").foreach { sourceTable =>
withTable("targetTable") {
spark.sql(
s"""
|CREATE TABLE targetTable LIKE $sourceTable
|ROW FORMAT SERDE '${expectedSerde.get.serde.get}'
|STORED AS INPUTFORMAT '${expectedSerde.get.inputFormat.get}'
|OUTPUTFORMAT '${expectedSerde.get.outputFormat.get}'
""".stripMargin)
val table = catalog.getTableMetadata(TableIdentifier("targetTable", Some("default")))
assert(table.provider === Some("hive"))
assert(table.storage.inputFormat === Some(expectedSerde.get.inputFormat.get))
assert(table.storage.outputFormat === Some(expectedSerde.get.outputFormat.get))
assert(table.storage.serde === Some(expectedSerde.get.serde.get))
}
}
}
}
}
}