diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 713f7941be..9f35107e5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ @@ -1094,6 +1095,22 @@ class Column(protected[sql] val expr: Expression) extends Logging { */ def over(window: expressions.WindowSpec): Column = window.withAggregate(this) + /** + * Define a empty analytic clause. In this case the analytic function is applied + * and presented for all rows in the result set. + * + * {{{ + * df.select( + * sum("price").over(), + * avg("price").over() + * ) + * }}} + * + * @group expr_ops + * @since 2.0.0 + */ + def over(): Column = over(Window.spec) + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index 350c283646..c29ec6f426 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -74,7 +74,7 @@ object Window { spec.orderBy(cols : _*) } - private def spec: WindowSpec = { + private[sql] def spec: WindowSpec = { new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala index 9a1aa46947..c6f8c3ad3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala @@ -245,6 +245,18 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext { Seq(Row("a", 6, 9), Row("b", 9, 6))) } + test("SPARK-16195 empty over spec") { + val df = Seq(("a", 1), ("a", 1), ("a", 2), ("b", 2)). + toDF("key", "value") + df.createOrReplaceTempView("window_table") + checkAnswer( + df.select($"key", $"value", sum($"value").over(), avg($"value").over()), + Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5))) + checkAnswer( + sql("select key, value, sum(value) over(), avg(value) over() from window_table"), + Seq(Row("a", 1, 6, 1.5), Row("a", 1, 6, 1.5), Row("a", 2, 6, 1.5), Row("b", 2, 6, 1.5))) + } + test("window function with udaf") { val udaf = new UserDefinedAggregateFunction { def inputSchema: StructType = new StructType()