From 031910b0ec24526d044fd31c05430dcda42b5be3 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 9 Aug 2017 13:23:49 +0800 Subject: [PATCH] [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API should allow literal boundary ## What changes were proposed in this pull request? Window rangeBetween() API should allow literal boundary, that means, the window range frame can calculate frame of double/date/timestamp. Example of the use case can be: ``` SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData ``` This PR refactors the Window `rangeBetween` and `rowsBetween` API, while the legacy user code should still be valid. ## How was this patch tested? Add new test cases both in `DataFrameWindowFunctionsSuite` and in `window.sql`. Author: Xingbo Jiang Closes #18814 from jiangxb1987/literal-boundary. --- .../sql/catalyst/analysis/TypeCoercion.scala | 9 +- .../expressions/windowExpressions.scala | 8 +- .../sql/execution/window/WindowExec.scala | 9 +- .../apache/spark/sql/expressions/Window.scala | 63 ++++++- .../spark/sql/expressions/WindowSpec.scala | 69 ++++++- .../org/apache/spark/sql/functions.scala | 27 +++ .../resources/sql-tests/inputs/window.sql | 22 ++- .../sql-tests/results/window.sql.out | 175 ++++++++++++------ .../sql/DataFrameWindowFunctionsSuite.scala | 53 +++++- 9 files changed, 352 insertions(+), 83 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 25af014f67..06d8350db9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -821,9 +821,12 @@ object TypeCoercion { } private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = { - boundary match { - case e: SpecialFrameBoundary => e - case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt) + (boundary, dt) match { + case (e: SpecialFrameBoundary, _) => e + case (e, _: DateType) => e + case (e, _: TimestampType) => e + case (e: Expression, t) if e.dataType != t && Cast.canCast(e.dataType, t) => + Cast(e, t) case _ => boundary } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index a829dccfd3..e11e3a105f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -89,7 +89,11 @@ case class WindowSpecDefinition( elements.mkString("(", " ", ")") } - private def isValidFrameType(ft: DataType): Boolean = orderSpec.head.dataType == ft + private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match { + case (DateType, IntegerType) => true + case (TimestampType, CalendarIntervalType) => true + case (a, b) => a == b + } } /** @@ -129,7 +133,7 @@ case object RowFrame extends FrameType { * of the current row. */ case object RangeFrame extends FrameType { - override def inputType: AbstractDataType = NumericType + override def inputType: AbstractDataType = TypeCollection.NumericAndInterval override def sql: String = "RANGE" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 0766e37826..f8bb667e30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} /** * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted) @@ -139,7 +141,12 @@ case class WindowExec( } // Create the projection which returns the current 'value' modified by adding the offset. - val boundExpr = Add(expr, Cast(boundOffset, expr.dataType)) + val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => + TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone)) + case (a, b) if a== b => Add(expr, boundOffset) + } val bound = newMutableProjection(boundExpr :: Nil, child.output) // Construct the ordering. This is used to compare the result of current value projection diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala index cd79128d8f..1caa243f8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala @@ -75,7 +75,7 @@ object Window { } /** - * Value representing the last row in the partition, equivalent to "UNBOUNDED PRECEDING" in SQL. + * Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING" in SQL. * This can be used to specify the frame boundaries: * * {{{ @@ -167,17 +167,17 @@ object Window { * current row. * * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, - * and `Window.currentRow` to specify special boundary values, rather than using integral - * values directly. + * and `Window.currentRow` to specify special boundary values, rather than using long values + * directly. * * A range-based boundary is based on the actual value of the ORDER BY * expression(s). An offset is used to alter the value of the ORDER BY expression, for * instance if the current order by expression has a value of 10 and the lower bound offset * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a * number of constraints on the ORDER BY expressions: there can be only one expression and this - * expression must have a numerical data type. An exception can be made when the offset is 0, - * because no value modification is needed, in this case multiple and non-numeric ORDER BY - * expression are allowed. + * expression must have a numerical data type. An exception can be made when the offset is + * unbounded, because no value modification is needed, in this case multiple and non-numeric + * ORDER BY expression are allowed. * * {{{ * import org.apache.spark.sql.expressions.Window @@ -210,6 +210,57 @@ object Window { spec.rangeBetween(start, end) } + /** + * Creates a [[WindowSpec]] with the frame boundaries defined, + * from `start` (inclusive) to `end` (inclusive). + * + * Both `start` and `end` are relative to the current row. For example, "lit(0)" means + * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the + * five off after the current row. + * + * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from + * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not + * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s. + * + * A range-based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical/date/timestamp data type. An exception can be made when the + * offset is unbounded, because no value modification is needed, in this case multiple and + * non-numerical/date/timestamp data type ORDER BY expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1)) + * df.withColumn("sum", sum('id) over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start boundary start, inclusive. The frame is unbounded if the expression is + * [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]]. + * @param end boundary end, inclusive. The frame is unbounded if the expression is + * [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]]. + * @since 2.3.0 + */ + def rangeBetween(start: Column, end: Column): WindowSpec = { + spec.rangeBetween(start, end) + } + private[sql] def spec: WindowSpec = { new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index f8b404de77..4c41aa3c5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -146,22 +146,22 @@ class WindowSpec private[sql]( /** * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). * - * Both `start` and `end` are relative from the current row. For example, "0" means "current row", - * while "-1" means one off before the current row, and "5" means the five off after the - * current row. + * Both `start` and `end` are relative from the current row. For example, "0" means + * "current row", while "-1" means one off before the current row, and "5" means the five off + * after the current row. * * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, - * and `Window.currentRow` to specify special boundary values, rather than using integral - * values directly. + * and `Window.currentRow` to specify special boundary values, rather than using long values + * directly. * * A range-based boundary is based on the actual value of the ORDER BY * expression(s). An offset is used to alter the value of the ORDER BY expression, for * instance if the current order by expression has a value of 10 and the lower bound offset * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a * number of constraints on the ORDER BY expressions: there can be only one expression and this - * expression must have a numerical data type. An exception can be made when the offset is 0, - * because no value modification is needed, in this case multiple and non-numeric ORDER BY - * expression are allowed. + * expression must have a numerical data type. An exception can be made when the offset is + * unbounded, because no value modification is needed, in this case multiple and non-numeric + * ORDER BY expression are allowed. * * {{{ * import org.apache.spark.sql.expressions.Window @@ -209,6 +209,59 @@ class WindowSpec private[sql]( SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd)) } + /** + * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). + * + * Both `start` and `end` are relative to the current row. For example, "lit(0)" means + * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the + * five off after the current row. + * + * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from + * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not + * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s. + * + * A range-based boundary is based on the actual value of the ORDER BY + * expression(s). An offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY expressions: there can be only one expression and this + * expression must have a numerical/date/timestamp data type. An exception can be made when the + * offset is unbounded, because no value modification is needed, in this case multiple and + * non-numerical/date/timestamp data type ORDER BY expression are allowed. + * + * {{{ + * import org.apache.spark.sql.expressions.Window + * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")) + * .toDF("id", "category") + * val byCategoryOrderedById = + * Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1)) + * df.withColumn("sum", sum('id) over byCategoryOrderedById).show() + * + * +---+--------+---+ + * | id|category|sum| + * +---+--------+---+ + * | 1| b| 3| + * | 2| b| 5| + * | 3| b| 3| + * | 1| a| 4| + * | 1| a| 4| + * | 2| a| 2| + * +---+--------+---+ + * }}} + * + * @param start boundary start, inclusive. The frame is unbounded if the expression is + * [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]]. + * @param end boundary end, inclusive. The frame is unbounded if the expression is + * [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]]. + * @since 2.3.0 + */ + def rangeBetween(start: Column, end: Column): WindowSpec = { + new WindowSpec( + partitionSpec, + orderSpec, + SpecifiedWindowFrame(RangeFrame, start.expr, end.expr)) + } + /** * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 496619a43a..14ab8a2665 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -777,6 +778,32 @@ object functions { ////////////////////////////////////////////////////////////////////////////////////////////// // Window functions ////////////////////////////////////////////////////////////////////////////////////////////// + /** + * Window function: returns the special frame boundary that represents the first row in the + * window partition. + * + * @group window_funcs + * @since 2.3.0 + */ + def unboundedPreceding(): Column = Column(UnboundedPreceding) + + /** + * Window function: returns the special frame boundary that represents the last row in the + * window partition. + * + * @group window_funcs + * @since 2.3.0 + */ + def unboundedFollowing(): Column = Column(UnboundedFollowing) + + /** + * Window function: returns the special frame boundary that represents the current row in the + * window partition. + * + * @group window_funcs + * @since 2.3.0 + */ + def currentRow(): Column = Column(CurrentRow) /** * Window function: returns the cumulative distribution of values within a window partition, diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index 342e5719e9..c4bea34ec4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -1,8 +1,15 @@ -- Test data. CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"), -(3, 2147483650L, "b"), (null, null, null), (3, 1L, null) -AS testData(val, val_long, cate); +(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, null, null, null, null, null), +(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +AS testData(val, val_long, val_double, val_date, val_timestamp, cate); -- RowsBetween SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData @@ -19,6 +26,13 @@ SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val; SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long; +SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double +RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double; +SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date +RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date; +SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp +RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData +ORDER BY cate, val_timestamp; -- RangeBetween with reverse OrderBy SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC @@ -31,7 +45,7 @@ SELECT val, cate, count(val) OVER(PARTITION BY cate RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val; SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val; -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date +SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val; SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val; diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index 97511068b3..73ad27e5bf 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -1,12 +1,19 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 19 +-- Number of queries: 22 -- !query 0 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"), -(3, 2147483650L, "b"), (null, null, null), (3, 1L, null) -AS testData(val, val_long, cate) +(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, null, null, null, null, null), +(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +AS testData(val, val_long, val_double, val_date, val_timestamp, cate) -- !query 0 schema struct<> -- !query 0 output @@ -109,11 +116,63 @@ NULL b NULL -- !query 7 +SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double +RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double +-- !query 7 schema +struct +-- !query 7 output +NULL NULL NULL +1.0 NULL 1.0 +1.0 a 4.5 +1.0 a 4.5 +2.5 a 2.5 +100.001 a 100.001 +1.0 b 4.3 +3.3 b 3.3 +100.001 b 100.001 + + +-- !query 8 +SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date +RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date +-- !query 8 schema +struct +-- !query 8 output +NULL NULL NULL +2017-08-01 NULL 2017-08-01 +2017-08-01 a 2017-08-02 +2017-08-01 a 2017-08-02 +2017-08-02 a 2017-08-02 +2020-12-31 a 2020-12-31 +2017-08-01 b 2017-08-03 +2017-08-03 b 2017-08-03 +2020-12-31 b 2020-12-31 + + +-- !query 9 +SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp +RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData +ORDER BY cate, val_timestamp +-- !query 9 schema +struct +-- !query 9 output +NULL NULL NULL +2017-07-31 17:00:00 NULL 1.5015456E9 +2017-07-31 17:00:00 a 1.5016970666666667E9 +2017-07-31 17:00:00 a 1.5016970666666667E9 +2017-08-05 23:13:20 a 1.502E9 +2020-12-30 16:00:00 a 1.6093728E9 +2017-07-31 17:00:00 b 1.5022728E9 +2017-08-17 13:00:00 b 1.503E9 +2020-12-30 16:00:00 b 1.6093728E9 + + +-- !query 10 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val --- !query 7 schema +-- !query 10 schema struct --- !query 7 output +-- !query 10 output NULL NULL NULL 3 NULL 3 NULL a NULL @@ -125,62 +184,62 @@ NULL a NULL 3 b 5 --- !query 8 +-- !query 11 SELECT val, cate, count(val) OVER(PARTITION BY cate ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val --- !query 8 schema -struct<> --- !query 8 output -org.apache.spark.sql.AnalysisException -cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33 - - --- !query 9 -SELECT val, cate, count(val) OVER(PARTITION BY cate -RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val --- !query 9 schema -struct<> --- !query 9 output -org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33 - - --- !query 10 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate -RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val --- !query 10 schema -struct<> --- !query 10 output -org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33 - - --- !query 11 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date -RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'DateType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33 +cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33 -- !query 12 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val -RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val +SELECT val, cate, count(val) OVER(PARTITION BY cate +RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33 +cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33 -- !query 13 -SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val -RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val +SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate +RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val -- !query 13 schema struct<> -- !query 13 output +org.apache.spark.sql.AnalysisException +cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33 + + +-- !query 14 +SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp +RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val +-- !query 14 schema +struct<> +-- !query 14 output +org.apache.spark.sql.AnalysisException +cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'TimestampType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33 + + +-- !query 15 +SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val +RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val +-- !query 15 schema +struct<> +-- !query 15 output +org.apache.spark.sql.AnalysisException +cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33 + + +-- !query 16 +SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val +RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val +-- !query 16 schema +struct<> +-- !query 16 output org.apache.spark.sql.catalyst.parser.ParseException Frame bound value must be a literal.(line 2, pos 30) @@ -191,7 +250,7 @@ RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cat ------------------------------^^^ --- !query 14 +-- !query 17 SELECT val, cate, max(val) OVER w AS max, min(val) OVER w AS min, @@ -218,9 +277,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct FROM testData WINDOW w AS (PARTITION BY cate ORDER BY val) ORDER BY cate, val --- !query 14 schema +-- !query 17 schema struct --- !query 14 output +-- !query 17 output NULL NULL NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.5 0.0 1 1 NULL NULL 0 3 NULL 3 3 3 1 3 3.0 NaN NULL 3 NULL 3 3 3 2 2 1.0 1.0 2 2 0.0 NaN 1 NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0.0 1 1 NULL NULL 0 @@ -232,11 +291,11 @@ NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0. 3 b 3 1 1 3 6 2.0 1.0 1 1 1 3 3 3 3 3 1.0 1.0 2 3 0.6666666666666666 1.0 3 --- !query 15 +-- !query 18 SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val --- !query 15 schema +-- !query 18 schema struct --- !query 15 output +-- !query 18 output NULL NULL NULL 3 NULL NULL NULL a NULL @@ -248,20 +307,20 @@ NULL a NULL 3 b NULL --- !query 16 +-- !query 19 SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val --- !query 16 schema +-- !query 19 schema struct<> --- !query 16 output +-- !query 19 output org.apache.spark.sql.AnalysisException Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; --- !query 17 +-- !query 20 SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val --- !query 17 schema +-- !query 20 schema struct --- !query 17 output +-- !query 20 output NULL NULL 13 1.8571428571428572 3 NULL 13 1.8571428571428572 NULL a 13 1.8571428571428572 @@ -273,7 +332,7 @@ NULL a 13 1.8571428571428572 3 b 13 1.8571428571428572 --- !query 18 +-- !query 21 SELECT val, cate, first_value(false) OVER w AS first_value, first_value(true, true) OVER w AS first_value_ignore_null, @@ -284,9 +343,9 @@ last_value(false, false) OVER w AS last_value_contain_null FROM testData WINDOW w AS () ORDER BY cate, val --- !query 18 schema +-- !query 21 schema struct --- !query 18 output +-- !query 21 output NULL NULL false true false false true false 3 NULL false true false false true false NULL a false true false false true false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 9806e57f08..ea725af8d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql +import java.sql.{Date, Timestamp} + import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval /** * Window function testing for DataFrame API. @@ -172,7 +175,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { assert(e.message.contains("Boundary end is not a valid integer: 2147483648")) } - test("range between should accept integer/long values as boundary") { + test("range between should accept int/long values as boundary") { val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2")) .toDF("key", "value") @@ -191,6 +194,54 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))), Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1)) ) + + def dt(date: String): Date = Date.valueOf(date) + + val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), (dt("2020-12-31"), "1"), + (dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), "2")) + .toDF("key", "value") + checkAnswer( + df2.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), lit(2)))), + Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), Row(dt("2020-12-31"), 1), + Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), Row(dt("2020-12-31"), 1)) + ) + } + + test("range between should accept double values as boundary") { + val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"), + (3.3D, "2"), (2.02D, "1"), (100.001D, "2")) + .toDF("key", "value") + df.createOrReplaceTempView("window_table") + checkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key") + .rangeBetween(currentRow, lit(2.5D)))), + Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 1), Row(100.001, 1)) + ) + } + + test("range between should accept interval values as boundary") { + def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000) + + val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"), + (ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2")) + .toDF("key", "value") + df.createOrReplaceTempView("window_table") + checkAnswer( + df.select( + $"key", + count("key").over( + Window.partitionBy($"value").orderBy($"key") + .rangeBetween(currentRow, + lit(CalendarInterval.fromString("interval 23 days 4 hours"))))), + Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 1), + Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1)) + ) } test("aggregation and rows between with unbounded") {