From 3c441135bbf26cbffa8fe1310b01ef6afc3c21a7 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 28 Jul 2021 13:52:27 +0800 Subject: [PATCH] [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field ### What changes were proposed in this pull request? Last pr only support add inner field check for hive ddl, this pr add check for parquet data source write API. ### Why are the changes needed? Failed earlier ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added Ut Without this UI it failed as ``` [info] - SPARK-36312: ParquetWriteSupport should check inner field *** FAILED *** (8 seconds, 29 milliseconds) [info] Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown (HiveDDLSuite.scala:3035) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563) [info] at org.scalatest.Assertions.intercept(Assertions.scala:756) [info] at org.scalatest.Assertions.intercept$(Assertions.scala:746) [info] at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1563) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396(HiveDDLSuite.scala:3035) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396$adapted(HiveDDLSuite.scala:3034) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath(SQLHelper.scala:69) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath$(SQLHelper.scala:66) [info] at org.apache.spark.sql.QueryTest.withTempPath(QueryTest.scala:34) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$395(HiveDDLSuite.scala:3034) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView(SQLTestUtils.scala:316) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView$(SQLTestUtils.scala:314) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.withView(HiveDDLSuite.scala:396) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$394(HiveDDLSuite.scala:3032) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:431) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563) [info] at org.scalatest.Suite.run(Suite.scala:1112) [info] at org.scalatest.Suite.run$(Suite.scala:1094) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:62) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:62) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413) [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:748) [info] Cause: org.apache.spark.SparkException: Job aborted. [info] at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496) [info] at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251) [info] at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186) [info] at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) [info] at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) [info] at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) [info] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) [info] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) [info] at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) [info] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) [info] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) [info] at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) [info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) [info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) [info] at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93) [info] at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80) [info] at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78) [info] at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115) [info] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) [info] at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) [info] at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) [info] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) [info] at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781) [in ``` Closes #33531 from AngersZhuuuu/SPARK-36312. Authored-by: Angerszhuuuu Signed-off-by: Wenchen Fan (cherry picked from commit 59e0c25376e1b3d227a1dc9ed93a7593314eddb3) Signed-off-by: Wenchen Fan --- .../parquet/ParquetWriteSupport.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index cd0e639386..20f69e8b1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -482,7 +482,7 @@ object ParquetWriteSupport { val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" def setSchema(schema: StructType, configuration: Configuration): Unit = { - schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) + ParquetSchemaConverter.checkFieldNames(schema) configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 33e83878ce..7f42b3c0f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -2960,6 +2960,24 @@ class HiveDDLSuite } } + test("SPARK-36312: ParquetWriteSupport should check inner field") { + withView("v") { + spark.range(1).createTempView("v") + withTempPath { path => + val e = intercept[AnalysisException] { + spark.sql( + """ + |SELECT + |NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1 + |FROM v + |""".stripMargin).write.mode(SaveMode.Overwrite).parquet(path.toString) + }.getMessage + assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains" + + " invalid character(s). Please use alias to rename it.")) + } + } + } + test("SPARK-34261: Avoid side effect if create exists temporary function") { withUserDefinedFunction("f1" -> true) { sql("CREATE TEMPORARY FUNCTION f1 AS 'org.apache.hadoop.hive.ql.udf.UDFUUID'")