[SPARK-34421][SQL] Resolve temporary functions and views in views with CTEs

### What changes were proposed in this pull request?
This PR:
- Fixes a bug that prevents analysis of:
  ```
  CREATE TEMPORARY VIEW temp_view AS WITH cte AS (SELECT temp_func(0)) SELECT * FROM cte;
  SELECT * FROM temp_view
  ```
  by throwing:
  ```
  Undefined function: 'temp_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.
  ```
- and doesn't report analysis error when it should:
  ```
  CREATE TEMPORARY VIEW temp_view AS SELECT 0;
  CREATE VIEW view_on_temp_view AS WITH cte AS (SELECT * FROM temp_view) SELECT * FROM cte
  ```
  by properly collecting temporary objects from VIEW definitions with CTEs.

- Minor refactor to make the affected code more readable.

### Why are the changes needed?
To fix a bug introduced with https://github.com/apache/spark/pull/30567

### Does this PR introduce _any_ user-facing change?
Yes, the query works again.

### How was this patch tested?
Added new UT + existing ones.

Closes #31550 from peter-toth/SPARK-34421-temp-functions-in-views-with-cte.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Peter Toth 2021-02-19 18:14:49 +08:00 committed by Wenchen Fan
parent b26e7b510b
commit 27abb6ab56
2 changed files with 97 additions and 12 deletions

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View, With}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@ -552,23 +552,30 @@ object ViewHelper {
private def collectTemporaryObjects(
catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]], Seq[String]) = {
def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
child.collect {
child.flatMap {
case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) =>
Seq(nameParts)
case plan if !plan.resolved => plan.expressions.flatMap(_.collect {
case w: With if !w.resolved => w.innerChildren.flatMap(collectTempViews)
case plan if !plan.resolved => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
}).flatten
}.flatten.distinct
case _ => Seq.empty
})
case _ => Seq.empty
}.distinct
}
def collectTempFunctions(child: LogicalPlan): Seq[String] = {
child.collect {
case plan if !plan.resolved => plan.expressions.flatMap(_.collect {
case e: SubqueryExpression => collectTempFunctions(e.plan)
case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) =>
Seq(e.name.funcName)
}).flatten
}.flatten.distinct
child.flatMap {
case w: With if !w.resolved => w.innerChildren.flatMap(collectTempFunctions)
case plan if !plan.resolved =>
plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempFunctions(e.plan)
case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) =>
Seq(e.name.funcName)
case _ => Seq.empty
})
case _ => Seq.empty
}.distinct
}
(collectTempViews(child), collectTempFunctions(child))
}

View file

@ -3943,6 +3943,84 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}
test("SPARK-34421: Resolve temporary objects in temporary views with CTEs") {
val tempFuncName = "temp_func"
withUserDefinedFunction(tempFuncName -> true) {
spark.udf.register(tempFuncName, identity[Int](_))
val tempViewName = "temp_view"
withTempView(tempViewName) {
sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1")
val testViewName = "test_view"
withTempView(testViewName) {
sql(
s"""
|CREATE TEMPORARY VIEW $testViewName AS
|WITH cte AS (
| SELECT $tempFuncName(0)
|)
|SELECT * FROM cte
|""".stripMargin)
checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(0))
}
withTempView(testViewName) {
sql(
s"""
|CREATE TEMPORARY VIEW $testViewName AS
|WITH cte AS (
| SELECT * FROM $tempViewName
|)
|SELECT * FROM cte
|""".stripMargin)
checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(1))
}
}
}
}
test("SPARK-34421: Resolve temporary objects in permanent views with CTEs") {
val tempFuncName = "temp_func"
withUserDefinedFunction((tempFuncName, true)) {
spark.udf.register(tempFuncName, identity[Int](_))
val tempViewName = "temp_view"
withTempView(tempViewName) {
sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1")
val testViewName = "test_view"
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE VIEW $testViewName AS
|WITH cte AS (
| SELECT * FROM $tempViewName
|)
|SELECT * FROM cte
|""".stripMargin)
}
assert(e.message.contains("Not allowed to create a permanent view " +
s"`default`.`$testViewName` by referencing a temporary view $tempViewName"))
val e2 = intercept[AnalysisException] {
sql(
s"""
|CREATE VIEW $testViewName AS
|WITH cte AS (
| SELECT $tempFuncName(0)
|)
|SELECT * FROM cte
|""".stripMargin)
}
assert(e2.message.contains("Not allowed to create a permanent view " +
s"`default`.`$testViewName` by referencing a temporary function `$tempFuncName`"))
}
}
}
}
case class Foo(bar: Option[String])