[SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too

### What changes were proposed in this pull request?
When inner field have wrong schema filed name should check field name too.
![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png)

### Why are the changes needed?
Early check early faield

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33409 from AngersZhuuuu/SPARK-36201.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 251885772d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Angerszhuuuu 2021-07-20 21:08:03 +08:00 committed by Wenchen Fan
parent 677104f495
commit 7cd89efca5
5 changed files with 43 additions and 11 deletions

View file

@ -924,23 +924,23 @@ object DDLUtils {
}
private[sql] def checkDataColNames(table: CatalogTable): Unit = {
checkDataColNames(table, table.dataSchema.fieldNames)
checkDataColNames(table, table.dataSchema)
}
private[sql] def checkDataColNames(table: CatalogTable, colNames: Seq[String]): Unit = {
private[sql] def checkDataColNames(table: CatalogTable, schema: StructType): Unit = {
table.provider.foreach {
_.toLowerCase(Locale.ROOT) match {
case HIVE_PROVIDER =>
val serde = table.storage.serde
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
OrcFileFormat.checkFieldNames(colNames)
OrcFileFormat.checkFieldNames(schema)
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
ParquetSchemaConverter.checkFieldNames(colNames)
ParquetSchemaConverter.checkFieldNames(schema)
}
case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
case "orc" => OrcFileFormat.checkFieldNames(colNames)
case "parquet" => ParquetSchemaConverter.checkFieldNames(schema)
case "orc" => OrcFileFormat.checkFieldNames(schema)
case _ =>
}
}

View file

@ -236,7 +236,7 @@ case class AlterTableAddColumnsCommand(
(colsToAdd ++ catalogTable.schema).map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name))
DDLUtils.checkDataColNames(catalogTable, StructType(colsToAdd))
val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsToAdd))

View file

@ -52,8 +52,14 @@ private[sql] object OrcFileFormat {
}
}
def checkFieldNames(names: Seq[String]): Unit = {
names.foreach(checkFieldName)
def checkFieldNames(schema: StructType): Unit = {
schema.foreach { field =>
checkFieldName(field.name)
field.dataType match {
case s: StructType => checkFieldNames(s)
case _ =>
}
}
}
def getQuotedSchemaString(dataType: DataType): String = dataType match {

View file

@ -593,8 +593,14 @@ private[sql] object ParquetSchemaConverter {
""".stripMargin.split("\n").mkString(" ").trim)
}
def checkFieldNames(names: Seq[String]): Unit = {
names.foreach(checkFieldName)
def checkFieldNames(schema: StructType): Unit = {
schema.foreach { field =>
checkFieldName(field.name)
field.dataType match {
case s: StructType => checkFieldNames(s)
case _ =>
}
}
}
def checkConversionRequirement(f: => Boolean, message: String): Unit = {

View file

@ -3008,6 +3008,26 @@ class HiveDDLSuite
}
}
test("SPARK-36201: Add check for inner field of parquet/orc schema") {
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[AnalysisException] {
spark.sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}'
|STORED AS PARQUET
|SELECT
|NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
|FROM v
""".stripMargin)
}.getMessage
assert(e.contains("Attribute name \"IF(ID=1,ID,0)\" contains" +
" invalid character(s) among \" ,;{}()\\n\\t=\". Please use alias to rename it."))
}
}
}
test("SPARK-34261: Avoid side effect if create exists temporary function") {
withUserDefinedFunction("f1" -> true) {
sql("CREATE TEMPORARY FUNCTION f1 AS 'org.apache.hadoop.hive.ql.udf.UDFUUID'")