[SPARK-24575][SQL] Prohibit window expressions inside WHERE and HAVING clauses
## What changes were proposed in this pull request? As discussed [before](https://github.com/apache/spark/pull/19193#issuecomment-393726964), this PR prohibits window expressions inside WHERE and HAVING clauses. ## How was this patch tested? This PR comes with a dedicated unit test. Author: aokolnychyi <anton.okolnychyi@sap.com> Closes #21580 from aokolnychyi/spark-24575.
This commit is contained in:
parent
c8ef9232cf
commit
c5a0d1132a
|
@ -1923,6 +1923,9 @@ class Analyzer(
|
||||||
// "Aggregate with Having clause" will be triggered.
|
// "Aggregate with Having clause" will be triggered.
|
||||||
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
|
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
|
||||||
|
|
||||||
|
case Filter(condition, _) if hasWindowFunction(condition) =>
|
||||||
|
failAnalysis("It is not allowed to use window functions inside WHERE and HAVING clauses")
|
||||||
|
|
||||||
// Aggregate with Having clause. This rule works with an unresolved Aggregate because
|
// Aggregate with Having clause. This rule works with an unresolved Aggregate because
|
||||||
// a resolved Aggregate will not have Window Functions.
|
// a resolved Aggregate will not have Window Functions.
|
||||||
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
|
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
|
||||||
|
|
|
@ -17,9 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql
|
package org.apache.spark.sql
|
||||||
|
|
||||||
import java.sql.{Date, Timestamp}
|
import org.scalatest.Matchers.the
|
||||||
|
|
||||||
import scala.collection.mutable
|
|
||||||
|
|
||||||
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
|
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
|
||||||
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
|
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
|
||||||
|
@ -27,7 +25,6 @@ import org.apache.spark.sql.functions._
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.test.SharedSQLContext
|
import org.apache.spark.sql.test.SharedSQLContext
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.unsafe.types.CalendarInterval
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Window function testing for DataFrame API.
|
* Window function testing for DataFrame API.
|
||||||
|
@ -624,4 +621,41 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-24575: Window functions inside WHERE and HAVING clauses") {
|
||||||
|
def checkAnalysisError(df: => DataFrame): Unit = {
|
||||||
|
val thrownException = the [AnalysisException] thrownBy {
|
||||||
|
df.queryExecution.analyzed
|
||||||
|
}
|
||||||
|
assert(thrownException.message.contains("window functions inside WHERE and HAVING clauses"))
|
||||||
|
}
|
||||||
|
|
||||||
|
checkAnalysisError(testData2.select('a).where(rank().over(Window.orderBy('b)) === 1))
|
||||||
|
checkAnalysisError(testData2.where('b === 2 && rank().over(Window.orderBy('b)) === 1))
|
||||||
|
checkAnalysisError(
|
||||||
|
testData2.groupBy('a)
|
||||||
|
.agg(avg('b).as("avgb"))
|
||||||
|
.where('a > 'avgb && rank().over(Window.orderBy('a)) === 1))
|
||||||
|
checkAnalysisError(
|
||||||
|
testData2.groupBy('a)
|
||||||
|
.agg(max('b).as("maxb"), sum('b).as("sumb"))
|
||||||
|
.where(rank().over(Window.orderBy('a)) === 1))
|
||||||
|
checkAnalysisError(
|
||||||
|
testData2.groupBy('a)
|
||||||
|
.agg(max('b).as("maxb"), sum('b).as("sumb"))
|
||||||
|
.where('sumb === 5 && rank().over(Window.orderBy('a)) === 1))
|
||||||
|
|
||||||
|
checkAnalysisError(sql("SELECT a FROM testData2 WHERE RANK() OVER(ORDER BY b) = 1"))
|
||||||
|
checkAnalysisError(sql("SELECT * FROM testData2 WHERE b = 2 AND RANK() OVER(ORDER BY b) = 1"))
|
||||||
|
checkAnalysisError(
|
||||||
|
sql("SELECT * FROM testData2 GROUP BY a HAVING a > AVG(b) AND RANK() OVER(ORDER BY a) = 1"))
|
||||||
|
checkAnalysisError(
|
||||||
|
sql("SELECT a, MAX(b), SUM(b) FROM testData2 GROUP BY a HAVING RANK() OVER(ORDER BY a) = 1"))
|
||||||
|
checkAnalysisError(
|
||||||
|
sql(
|
||||||
|
s"""SELECT a, MAX(b)
|
||||||
|
|FROM testData2
|
||||||
|
|GROUP BY a
|
||||||
|
|HAVING SUM(b) = 5 AND RANK() OVER(ORDER BY a) = 1""".stripMargin))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue