[SPARK-16195][SQL] Allow users to specify empty over clause in window expressions through dataset API

## What changes were proposed in this pull request?
Allow to specify empty over clause in window expressions through dataset API

In SQL, its allowed to specify an empty OVER clause in the window expression.

```SQL
select area, sum(product) over () as c from windowData
where product > 3 group by area, product
having avg(month) > 0 order by avg(month), product
```
In this case the analytic function sum is presented based on all the rows of the result set

Currently its not allowed through dataset API and is handled in this PR.

## How was this patch tested?

Added a new test in DataframeWindowSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #13897 from dilipbiswal/spark-empty-over.
This commit is contained in:
Dilip Biswal 2016-06-24 17:27:33 -07:00 committed by Herman van Hovell
parent e5d0928e24
commit 9053054c7f
3 changed files with 30 additions and 1 deletions

View file

@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@ -1094,6 +1095,22 @@ class Column(protected[sql] val expr: Expression) extends Logging {
*/ */
def over(window: expressions.WindowSpec): Column = window.withAggregate(this) def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
/**
* Define a empty analytic clause. In this case the analytic function is applied
* and presented for all rows in the result set.
*
* {{{
* df.select(
* sum("price").over(),
* avg("price").over()
* )
* }}}
*
* @group expr_ops
* @since 2.0.0
*/
def over(): Column = over(Window.spec)
} }

View file

@ -74,7 +74,7 @@ object Window {
spec.orderBy(cols : _*) spec.orderBy(cols : _*)
} }
private def spec: WindowSpec = { private[sql] def spec: WindowSpec = {
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame) new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
} }

View file

@ -245,6 +245,18 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
Seq(Row("a", 6, 9), Row("b", 9, 6))) Seq(Row("a", 6, 9), Row("b", 9, 6)))
} }
test("SPARK-16195 empty over spec") {
val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)).
toDF("key", "value")
df.createOrReplaceTempView("window_table")
checkAnswer(
df.select($"key", $"value", sum($"value").over(), avg($"value").over()),
Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5)))
checkAnswer(
sql("select key, value, sum(value) over(), avg(value) over() from window_table"),
Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5)))
}
test("window function with udaf") { test("window function with udaf") {
val udaf = new UserDefinedAggregateFunction { val udaf = new UserDefinedAggregateFunction {
def inputSchema: StructType = new StructType() def inputSchema: StructType = new StructType()