From 27abb6ab5674b8663440dc738a0ba79c185fb063 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 19 Feb 2021 18:14:49 +0800 Subject: [PATCH] [SPARK-34421][SQL] Resolve temporary functions and views in views with CTEs ### What changes were proposed in this pull request? This PR: - Fixes a bug that prevents analysis of: ``` CREATE TEMPORARY VIEW temp_view AS WITH cte AS (SELECT temp_func(0)) SELECT * FROM cte; SELECT * FROM temp_view ``` by throwing: ``` Undefined function: 'temp_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'. ``` - and doesn't report analysis error when it should: ``` CREATE TEMPORARY VIEW temp_view AS SELECT 0; CREATE VIEW view_on_temp_view AS WITH cte AS (SELECT * FROM temp_view) SELECT * FROM cte ``` by properly collecting temporary objects from VIEW definitions with CTEs. - Minor refactor to make the affected code more readable. ### Why are the changes needed? To fix a bug introduced with https://github.com/apache/spark/pull/30567 ### Does this PR introduce _any_ user-facing change? Yes, the query works again. ### How was this patch tested? Added new UT + existing ones. Closes #31550 from peter-toth/SPARK-34421-temp-functions-in-views-with-cte. Authored-by: Peter Toth Signed-off-by: Wenchen Fan --- .../spark/sql/execution/command/views.scala | 31 +++++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 78 +++++++++++++++++++ 2 files changed, 97 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f50719341c..a14f247515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View, With} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -552,23 +552,30 @@ object ViewHelper { private def collectTemporaryObjects( catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]], Seq[String]) = { def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { - child.collect { + child.flatMap { case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) => Seq(nameParts) - case plan if !plan.resolved => plan.expressions.flatMap(_.collect { + case w: With if !w.resolved => w.innerChildren.flatMap(collectTempViews) + case plan if !plan.resolved => plan.expressions.flatMap(_.flatMap { case e: SubqueryExpression => collectTempViews(e.plan) - }).flatten - }.flatten.distinct + case _ => Seq.empty + }) + case _ => Seq.empty + }.distinct } def collectTempFunctions(child: LogicalPlan): Seq[String] = { - child.collect { - case plan if !plan.resolved => plan.expressions.flatMap(_.collect { - case e: SubqueryExpression => collectTempFunctions(e.plan) - case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) => - Seq(e.name.funcName) - }).flatten - }.flatten.distinct + child.flatMap { + case w: With if !w.resolved => w.innerChildren.flatMap(collectTempFunctions) + case plan if !plan.resolved => + plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempFunctions(e.plan) + case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) => + Seq(e.name.funcName) + case _ => Seq.empty + }) + case _ => Seq.empty + }.distinct } (collectTempViews(child), collectTempFunctions(child)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 31b9427d1b..e6a18c3894 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3943,6 +3943,84 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-34421: Resolve temporary objects in temporary views with CTEs") { + val tempFuncName = "temp_func" + withUserDefinedFunction(tempFuncName -> true) { + spark.udf.register(tempFuncName, identity[Int](_)) + + val tempViewName = "temp_view" + withTempView(tempViewName) { + sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1") + + val testViewName = "test_view" + + withTempView(testViewName) { + sql( + s""" + |CREATE TEMPORARY VIEW $testViewName AS + |WITH cte AS ( + | SELECT $tempFuncName(0) + |) + |SELECT * FROM cte + |""".stripMargin) + checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(0)) + } + + withTempView(testViewName) { + sql( + s""" + |CREATE TEMPORARY VIEW $testViewName AS + |WITH cte AS ( + | SELECT * FROM $tempViewName + |) + |SELECT * FROM cte + |""".stripMargin) + checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(1)) + } + } + } + } + + test("SPARK-34421: Resolve temporary objects in permanent views with CTEs") { + val tempFuncName = "temp_func" + withUserDefinedFunction((tempFuncName, true)) { + spark.udf.register(tempFuncName, identity[Int](_)) + + val tempViewName = "temp_view" + withTempView(tempViewName) { + sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1") + + val testViewName = "test_view" + + val e = intercept[AnalysisException] { + sql( + s""" + |CREATE VIEW $testViewName AS + |WITH cte AS ( + | SELECT * FROM $tempViewName + |) + |SELECT * FROM cte + |""".stripMargin) + } + assert(e.message.contains("Not allowed to create a permanent view " + + s"`default`.`$testViewName` by referencing a temporary view $tempViewName")) + + val e2 = intercept[AnalysisException] { + sql( + s""" + |CREATE VIEW $testViewName AS + |WITH cte AS ( + | SELECT $tempFuncName(0) + |) + |SELECT * FROM cte + |""".stripMargin) + } + assert(e2.message.contains("Not allowed to create a permanent view " + + s"`default`.`$testViewName` by referencing a temporary function `$tempFuncName`")) + } + } + } } case class Foo(bar: Option[String])