[SPARK-36637][SQL] Provide proper error message when use undefined window frame

### What changes were proposed in this pull request?
Two case of using undefined window frame as below should provide proper error message

1. For case using undefined window frame with window function
```
SELECT nth_value(employee_name, 2) OVER w second_highest_salary
FROM basic_pays;
```
origin error message is
```
Window function nth_value(employee_name#x, 2, false) requires an OVER clause.
```
It's confused that in use use a window frame `w` but it's not defined.
Now the error message is
```
Window specification w is not defined in the WINDOW clause.
```

2. For case using undefined window frame with aggregation function
```
SELECT SUM(salary) OVER w sum_salary
FROM basic_pays;
```
origin error message is
```
Error in query: unresolved operator 'Aggregate [unresolvedwindowexpression(sum(salary#2), WindowSpecReference(w)) AS sum_salary#34]
+- SubqueryAlias spark_catalog.default.basic_pays
+- HiveTableRelation [`default`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [name#0, dept#1, salary#2, age#3], Partition Cols: []]
```
In this case, when convert GlobalAggregate, should skip UnresolvedWindowExpression
Now the error message is
```
Window specification w is not defined in the WINDOW clause.
```

### Why are the changes needed?
Provide proper error message

### Does this PR introduce _any_ user-facing change?
Yes, error messages are improved as described in desc

### How was this patch tested?
Added UT

Closes #33892 from AngersZhuuuu/SPARK-36637.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 568ad6aa44)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Angerszhuuuu 2021-09-02 22:32:31 +08:00 committed by Wenchen Fan
parent 11d10fc994
commit 8b4cc90c44
5 changed files with 53 additions and 5 deletions

View file

@ -437,8 +437,8 @@ class Analyzer(override val catalogManager: CatalogManager)
* Substitute child plan with WindowSpecDefinitions. * Substitute child plan with WindowSpecDefinitions.
*/ */
object WindowsSubstitution extends Rule[LogicalPlan] { object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
_.containsPattern(WITH_WINDOW_DEFINITION), ruleId) { _.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
// Lookup WindowSpecDefinitions. This rule works with unresolved children. // Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions { case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
@ -446,6 +446,14 @@ class Analyzer(override val catalogManager: CatalogManager)
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName)) throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
WindowExpression(c, windowSpecDefinition) WindowExpression(c, windowSpecDefinition)
} }
case p @ Project(projectList, _) =>
projectList.foreach(_.transformDownWithPruning(
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
case UnresolvedWindowExpression(_, windowSpec) =>
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
})
p
} }
} }
@ -2492,6 +2500,9 @@ class Analyzer(override val catalogManager: CatalogManager)
expr.collect { expr.collect {
case WindowExpression(ae: AggregateExpression, _) => ae case WindowExpression(ae: AggregateExpression, _) => ae
case WindowExpression(e: PythonUDF, _) if PythonUDF.isGroupedAggPandasUDF(e) => e case WindowExpression(e: PythonUDF, _) if PythonUDF.isGroupedAggPandasUDF(e) => e
case UnresolvedWindowExpression(ae: AggregateExpression, _) => ae
case UnresolvedWindowExpression(e: PythonUDF, _)
if PythonUDF.isGroupedAggPandasUDF(e) => e
} }
}.toSet }.toSet

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure,
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, DeclarativeAggregate, NoOp} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, DeclarativeAggregate, NoOp}
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TernaryLike, UnaryLike} import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TernaryLike, UnaryLike}
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, WINDOW_EXPRESSION} import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_WINDOW_EXPRESSION, WINDOW_EXPRESSION}
import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@ -293,6 +293,8 @@ case class UnresolvedWindowExpression(
override protected def withNewChildInternal(newChild: Expression): UnresolvedWindowExpression = override protected def withNewChildInternal(newChild: Expression): UnresolvedWindowExpression =
copy(child = newChild) copy(child = newChild)
override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WINDOW_EXPRESSION)
} }
case class WindowExpression( case class WindowExpression(

View file

@ -120,6 +120,7 @@ object TreePattern extends Enumeration {
val UNRESOLVED_ORDINAL: Value = Value val UNRESOLVED_ORDINAL: Value = Value
val UNRESOLVED_FUNCTION: Value = Value val UNRESOLVED_FUNCTION: Value = Value
val UNRESOLVED_HINT: Value = Value val UNRESOLVED_HINT: Value = Value
val UNRESOLVED_WINDOW_EXPRESSION: Value = Value
// Unresolved Plan patterns (Alphabetically ordered) // Unresolved Plan patterns (Alphabetically ordered)
val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value

View file

@ -430,3 +430,13 @@ FROM
test_ignore_null test_ignore_null
WINDOW w AS (ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) WINDOW w AS (ORDER BY id ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING)
ORDER BY id; ORDER BY id;
SELECT
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays;
SELECT
SUM(salary) OVER w sum_salary
FROM
basic_pays;

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite -- Automatically generated by SQLQueryTestSuite
-- Number of queries: 53 -- Number of queries: 55
-- !query -- !query
@ -1173,3 +1173,27 @@ b 5 NULL x y z x z
a 6 z x y z x v a 6 z x y z x v
a 7 v x y z x v a 7 v x y z x v
a 8 NULL x y z x v a 8 NULL x y z x v
-- !query
SELECT
nth_value(employee_name, 2) OVER w second_highest_salary
FROM
basic_pays
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Window specification w is not defined in the WINDOW clause.
-- !query
SELECT
SUM(salary) OVER w sum_salary
FROM
basic_pays
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Window specification w is not defined in the WINDOW clause.