[SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API should allow literal boundary

## What changes were proposed in this pull request?

Window rangeBetween() API should allow literal boundary, that means, the window range frame can calculate frame of double/date/timestamp.

Example of the use case can be:
```
SELECT
	val_timestamp,
	cate,
	avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING)
FROM testData
```

This PR refactors the Window `rangeBetween` and `rowsBetween` API, while the legacy user code should still be valid.

## How was this patch tested?

Add new test cases both in `DataFrameWindowFunctionsSuite` and in `window.sql`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18814 from jiangxb1987/literal-boundary.
This commit is contained in:
Xingbo Jiang 2017-08-09 13:23:49 +08:00 committed by Wenchen Fan
parent 6edfff055c
commit 031910b0ec
9 changed files with 352 additions and 83 deletions

View file

@ -821,9 +821,12 @@ object TypeCoercion {
}
private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
boundary match {
case e: SpecialFrameBoundary => e
case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
(boundary, dt) match {
case (e: SpecialFrameBoundary, _) => e
case (e, _: DateType) => e
case (e, _: TimestampType) => e
case (e: Expression, t) if e.dataType != t && Cast.canCast(e.dataType, t) =>
Cast(e, t)
case _ => boundary
}
}

View file

@ -89,7 +89,11 @@ case class WindowSpecDefinition(
elements.mkString("(", " ", ")")
}
private def isValidFrameType(ft: DataType): Boolean = orderSpec.head.dataType == ft
private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match {
case (DateType, IntegerType) => true
case (TimestampType, CalendarIntervalType) => true
case (a, b) => a == b
}
}
/**
@ -129,7 +133,7 @@ case object RowFrame extends FrameType {
* of the current row.
*/
case object RangeFrame extends FrameType {
override def inputType: AbstractDataType = NumericType
override def inputType: AbstractDataType = TypeCollection.NumericAndInterval
override def sql: String = "RANGE"
}

View file

@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType}
/**
* This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@ -139,7 +141,12 @@ case class WindowExec(
}
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = Add(expr, Cast(boundOffset, expr.dataType))
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (TimestampType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
case (a, b) if a== b => Add(expr, boundOffset)
}
val bound = newMutableProjection(boundExpr :: Nil, child.output)
// Construct the ordering. This is used to compare the result of current value projection

View file

@ -75,7 +75,7 @@ object Window {
}
/**
* Value representing the last row in the partition, equivalent to "UNBOUNDED PRECEDING" in SQL.
* Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING" in SQL.
* This can be used to specify the frame boundaries:
*
* {{{
@ -167,17 +167,17 @@ object Window {
* current row.
*
* We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`,
* and `Window.currentRow` to specify special boundary values, rather than using integral
* values directly.
* and `Window.currentRow` to specify special boundary values, rather than using long values
* directly.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical data type. An exception can be made when the offset is 0,
* because no value modification is needed, in this case multiple and non-numeric ORDER BY
* expression are allowed.
* expression must have a numerical data type. An exception can be made when the offset is
* unbounded, because no value modification is needed, in this case multiple and non-numeric
* ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
@ -210,6 +210,57 @@ object Window {
spec.rangeBetween(start, end)
}
/**
* Creates a [[WindowSpec]] with the frame boundaries defined,
* from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative to the current row. For example, "lit(0)" means
* "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the
* five off after the current row.
*
* Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from
* [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not
* transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical/date/timestamp data type. An exception can be made when the
* offset is unbounded, because no value modification is needed, in this case multiple and
* non-numerical/date/timestamp data type ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
* val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
* .toDF("id", "category")
* val byCategoryOrderedById =
* Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
* df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
*
* +---+--------+---+
* | id|category|sum|
* +---+--------+---+
* | 1| b| 3|
* | 2| b| 5|
* | 3| b| 3|
* | 1| a| 4|
* | 1| a| 4|
* | 2| a| 2|
* +---+--------+---+
* }}}
*
* @param start boundary start, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
* @param end boundary end, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
* @since 2.3.0
*/
def rangeBetween(start: Column, end: Column): WindowSpec = {
spec.rangeBetween(start, end)
}
private[sql] def spec: WindowSpec = {
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
}

View file

@ -146,22 +146,22 @@ class WindowSpec private[sql](
/**
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative from the current row. For example, "0" means "current row",
* while "-1" means one off before the current row, and "5" means the five off after the
* current row.
* Both `start` and `end` are relative from the current row. For example, "0" means
* "current row", while "-1" means one off before the current row, and "5" means the five off
* after the current row.
*
* We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`,
* and `Window.currentRow` to specify special boundary values, rather than using integral
* values directly.
* and `Window.currentRow` to specify special boundary values, rather than using long values
* directly.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical data type. An exception can be made when the offset is 0,
* because no value modification is needed, in this case multiple and non-numeric ORDER BY
* expression are allowed.
* expression must have a numerical data type. An exception can be made when the offset is
* unbounded, because no value modification is needed, in this case multiple and non-numeric
* ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
@ -209,6 +209,59 @@ class WindowSpec private[sql](
SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd))
}
/**
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative to the current row. For example, "lit(0)" means
* "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means the
* five off after the current row.
*
* Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()` from
* [[org.apache.spark.sql.functions]] to specify special boundary values, literals are not
* transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
*
* A range-based boundary is based on the actual value of the ORDER BY
* expression(s). An offset is used to alter the value of the ORDER BY expression, for
* instance if the current order by expression has a value of 10 and the lower bound offset
* is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
* number of constraints on the ORDER BY expressions: there can be only one expression and this
* expression must have a numerical/date/timestamp data type. An exception can be made when the
* offset is unbounded, because no value modification is needed, in this case multiple and
* non-numerical/date/timestamp data type ORDER BY expression are allowed.
*
* {{{
* import org.apache.spark.sql.expressions.Window
* val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
* .toDF("id", "category")
* val byCategoryOrderedById =
* Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
* df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
*
* +---+--------+---+
* | id|category|sum|
* +---+--------+---+
* | 1| b| 3|
* | 2| b| 5|
* | 3| b| 3|
* | 1| a| 4|
* | 1| a| 4|
* | 2| a| 2|
* +---+--------+---+
* }}}
*
* @param start boundary start, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
* @param end boundary end, inclusive. The frame is unbounded if the expression is
* [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
* @since 2.3.0
*/
def rangeBetween(start: Column, end: Column): WindowSpec = {
new WindowSpec(
partitionSpec,
orderSpec,
SpecifiedWindowFrame(RangeFrame, start.expr, end.expr))
}
/**
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
*/

View file

@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -777,6 +778,32 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
/**
* Window function: returns the special frame boundary that represents the first row in the
* window partition.
*
* @group window_funcs
* @since 2.3.0
*/
def unboundedPreceding(): Column = Column(UnboundedPreceding)
/**
* Window function: returns the special frame boundary that represents the last row in the
* window partition.
*
* @group window_funcs
* @since 2.3.0
*/
def unboundedFollowing(): Column = Column(UnboundedFollowing)
/**
* Window function: returns the special frame boundary that represents the current row in the
* window partition.
*
* @group window_funcs
* @since 2.3.0
*/
def currentRow(): Column = Column(CurrentRow)
/**
* Window function: returns the cumulative distribution of values within a window partition,

View file

@ -1,8 +1,15 @@
-- Test data.
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
AS testData(val, val_long, cate);
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
-- RowsBetween
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData
@ -19,6 +26,13 @@ SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double
RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double;
SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date;
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
-- RangeBetween with reverse OrderBy
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
@ -31,7 +45,7 @@ SELECT val, cate, count(val) OVER(PARTITION BY cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val;

View file

@ -1,12 +1,19 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 19
-- Number of queries: 22
-- !query 0
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
AS testData(val, val_long, cate)
(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
(null, null, null, null, null, null),
(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
-- !query 0 schema
struct<>
-- !query 0 output
@ -109,11 +116,63 @@ NULL b NULL
-- !query 7
SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double
RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double
-- !query 7 schema
struct<val_double:double,cate:string,sum(val_double) OVER (PARTITION BY cate ORDER BY val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double>
-- !query 7 output
NULL NULL NULL
1.0 NULL 1.0
1.0 a 4.5
1.0 a 4.5
2.5 a 2.5
100.001 a 100.001
1.0 b 4.3
3.3 b 3.3
100.001 b 100.001
-- !query 8
SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date
-- !query 8 schema
struct<val_date:date,cate:string,max(val_date) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date>
-- !query 8 output
NULL NULL NULL
2017-08-01 NULL 2017-08-01
2017-08-01 a 2017-08-02
2017-08-01 a 2017-08-02
2017-08-02 a 2017-08-02
2020-12-31 a 2020-12-31
2017-08-01 b 2017-08-03
2017-08-03 b 2017-08-03
2020-12-31 b 2020-12-31
-- !query 9
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
ORDER BY cate, val_timestamp
-- !query 9 schema
struct<val_timestamp:timestamp,cate:string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks 2 days 4 hours FOLLOWING):double>
-- !query 9 output
NULL NULL NULL
2017-07-31 17:00:00 NULL 1.5015456E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-08-05 23:13:20 a 1.502E9
2020-12-30 16:00:00 a 1.6093728E9
2017-07-31 17:00:00 b 1.5022728E9
2017-08-17 13:00:00 b 1.503E9
2020-12-30 16:00:00 b 1.6093728E9
-- !query 10
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 7 schema
-- !query 10 schema
struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
-- !query 7 output
-- !query 10 output
NULL NULL NULL
3 NULL 3
NULL a NULL
@ -125,62 +184,62 @@ NULL a NULL
3 b 5
-- !query 8
-- !query 11
SELECT val, cate, count(val) OVER(PARTITION BY cate
ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 8 schema
struct<>
-- !query 8 output
org.apache.spark.sql.AnalysisException
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
-- !query 9
SELECT val, cate, count(val) OVER(PARTITION BY cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 9 schema
struct<>
-- !query 9 output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33
-- !query 10
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 10 schema
struct<>
-- !query 10 output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
-- !query 11
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 11 schema
struct<>
-- !query 11 output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'DateType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
-- !query 12
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
SELECT val, cate, count(val) OVER(PARTITION BY cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 12 schema
struct<>
-- !query 12 output
org.apache.spark.sql.AnalysisException
cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33
-- !query 13
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 13 schema
struct<>
-- !query 13 output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
-- !query 14
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
-- !query 14 schema
struct<>
-- !query 14 output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'TimestampType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33
-- !query 15
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
-- !query 15 schema
struct<>
-- !query 15 output
org.apache.spark.sql.AnalysisException
cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
-- !query 16
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
-- !query 16 schema
struct<>
-- !query 16 output
org.apache.spark.sql.catalyst.parser.ParseException
Frame bound value must be a literal.(line 2, pos 30)
@ -191,7 +250,7 @@ RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cat
------------------------------^^^
-- !query 14
-- !query 17
SELECT val, cate,
max(val) OVER w AS max,
min(val) OVER w AS min,
@ -218,9 +277,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct
FROM testData
WINDOW w AS (PARTITION BY cate ORDER BY val)
ORDER BY cate, val
-- !query 14 schema
-- !query 17 schema
struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint>
-- !query 14 output
-- !query 17 output
NULL NULL NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.5 0.0 1 1 NULL NULL 0
3 NULL 3 3 3 1 3 3.0 NaN NULL 3 NULL 3 3 3 2 2 1.0 1.0 2 2 0.0 NaN 1
NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0.0 1 1 NULL NULL 0
@ -232,11 +291,11 @@ NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0.
3 b 3 1 1 3 6 2.0 1.0 1 1 1 3 3 3 3 3 1.0 1.0 2 3 0.6666666666666666 1.0 3
-- !query 15
-- !query 18
SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val
-- !query 15 schema
-- !query 18 schema
struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
-- !query 15 output
-- !query 18 output
NULL NULL NULL
3 NULL NULL
NULL a NULL
@ -248,20 +307,20 @@ NULL a NULL
3 b NULL
-- !query 16
-- !query 19
SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
-- !query 16 schema
-- !query 19 schema
struct<>
-- !query 16 output
-- !query 19 output
org.apache.spark.sql.AnalysisException
Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
-- !query 17
-- !query 20
SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
-- !query 17 schema
-- !query 20 schema
struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
-- !query 17 output
-- !query 20 output
NULL NULL 13 1.8571428571428572
3 NULL 13 1.8571428571428572
NULL a 13 1.8571428571428572
@ -273,7 +332,7 @@ NULL a 13 1.8571428571428572
3 b 13 1.8571428571428572
-- !query 18
-- !query 21
SELECT val, cate,
first_value(false) OVER w AS first_value,
first_value(true, true) OVER w AS first_value_ignore_null,
@ -284,9 +343,9 @@ last_value(false, false) OVER w AS last_value_contain_null
FROM testData
WINDOW w AS ()
ORDER BY cate, val
-- !query 18 schema
-- !query 21 schema
struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
-- !query 18 output
-- !query 21 output
NULL NULL false true false false true false
3 NULL false true false false true false
NULL a false true false false true false

View file

@ -17,11 +17,14 @@
package org.apache.spark.sql
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
/**
* Window function testing for DataFrame API.
@ -172,7 +175,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
assert(e.message.contains("Boundary end is not a valid integer: 2147483648"))
}
test("range between should accept integer/long values as boundary") {
test("range between should accept int/long values as boundary") {
val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
(3L, "2"), (2L, "1"), (2147483650L, "2"))
.toDF("key", "value")
@ -191,6 +194,54 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))
)
def dt(date: String): Date = Date.valueOf(date)
val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), (dt("2020-12-31"), "1"),
(dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), "2"))
.toDF("key", "value")
checkAnswer(
df2.select(
$"key",
count("key").over(
Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), lit(2)))),
Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), Row(dt("2020-12-31"), 1),
Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), Row(dt("2020-12-31"), 1))
)
}
test("range between should accept double values as boundary") {
val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"),
(3.3D, "2"), (2.02D, "1"), (100.001D, "2"))
.toDF("key", "value")
df.createOrReplaceTempView("window_table")
checkAnswer(
df.select(
$"key",
count("key").over(
Window.partitionBy($"value").orderBy($"key")
.rangeBetween(currentRow, lit(2.5D)))),
Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 1), Row(100.001, 1))
)
}
test("range between should accept interval values as boundary") {
def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
(ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
.toDF("key", "value")
df.createOrReplaceTempView("window_table")
checkAnswer(
df.select(
$"key",
count("key").over(
Window.partitionBy($"value").orderBy($"key")
.rangeBetween(currentRow,
lit(CalendarInterval.fromString("interval 23 days 4 hours"))))),
Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 1),
Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1))
)
}
test("aggregation and rows between with unbounded") {