[SPARK-35110][SQL] Handle ANSI intervals in WindowExecBase

### What changes were proposed in this pull request?
This PR makes window frame could support `YearMonthIntervalType` and `DayTimeIntervalType`.

### Why are the changes needed?
Extend the function of window frame

### Does this PR introduce _any_ user-facing change?
Yes. Users could use `YearMonthIntervalType` or `DayTimeIntervalType` as the sort expression for window frame.

### How was this patch tested?
New tests

Closes #32294 from beliefer/SPARK-35110.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
beliefer 2021-04-22 17:14:11 +03:00 committed by Max Gekk
parent 7f7a3d80a6
commit 6c587d2627
4 changed files with 87 additions and 4 deletions

View file

@ -101,7 +101,10 @@ case class WindowSpecDefinition(
private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match {
case (DateType, IntegerType) => true
case (DateType, YearMonthIntervalType) => true
case (TimestampType, CalendarIntervalType) => true
case (TimestampType, YearMonthIntervalType) => true
case (TimestampType, DayTimeIntervalType) => true
case (a, b) => a == b
}
}

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType}
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, DayTimeIntervalType, IntegerType, TimestampType, YearMonthIntervalType}
trait WindowExecBase extends UnaryExecNode {
def windowExpression: Seq[NamedExpression]
@ -95,8 +95,11 @@ trait WindowExecBase extends UnaryExecNode {
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (TimestampType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(timeZone))
case (DateType, YearMonthIntervalType) => DateAddYMInterval(expr, boundOffset)
case (TimestampType, CalendarIntervalType) => TimeAdd(expr, boundOffset, Some(timeZone))
case (TimestampType, YearMonthIntervalType) =>
TimestampAddYMInterval(expr, boundOffset, Some(timeZone))
case (TimestampType, DayTimeIntervalType) => TimeAdd(expr, boundOffset, Some(timeZone))
case (a, b) if a == b => Add(expr, boundOffset)
}
val bound = MutableProjection.create(boundExpr :: Nil, child.output)

View file

@ -70,6 +70,18 @@ RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_timestamp;
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_date;
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_date;
-- RangeBetween with reverse OrderBy
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 46
-- Number of queries: 50
-- !query
@ -211,6 +211,71 @@ NULL NULL NULL
2020-12-30 16:00:00 b 1.6093728E9
-- !query
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_timestamp
-- !query schema
struct<val_timestamp:timestamp,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1-1' YEAR TO MONTH FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-07-31 17:00:00 NULL 1.5015456E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-07-31 17:00:00 a 1.5016970666666667E9
2017-08-05 23:13:20 a 1.502E9
2020-12-30 16:00:00 a 1.6093728E9
2017-07-31 17:00:00 b 1.5022728E9
2017-08-17 13:00:00 b 1.503E9
2020-12-30 16:00:00 b 1.6093728E9
-- !query
SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_timestamp
-- !query schema
struct<val_timestamp:timestamp,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1 02:03:04.001' DAY TO SECOND FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-07-31 17:00:00 NULL 1.5015456E9
2017-07-31 17:00:00 a 1.5015456E9
2017-07-31 17:00:00 a 1.5015456E9
2017-08-05 23:13:20 a 1.502E9
2020-12-30 16:00:00 a 1.6093728E9
2017-07-31 17:00:00 b 1.5015456E9
2017-08-17 13:00:00 b 1.503E9
2020-12-30 16:00:00 b 1.6093728E9
-- !query
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1-1' year to month FOLLOWING) FROM testData
ORDER BY cate, val_date
-- !query schema
struct<val_date:date,cate:string,avg(val_timestamp) OVER (PARTITION BY cate ORDER BY val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1-1' YEAR TO MONTH FOLLOWING):double>
-- !query output
NULL NULL NULL
2017-08-01 NULL 1.5015456E9
2017-08-01 a 1.5016970666666667E9
2017-08-01 a 1.5016970666666667E9
2017-08-02 a 1.502E9
2020-12-31 a 1.6093728E9
2017-08-01 b 1.5022728E9
2017-08-03 b 1.503E9
2020-12-31 b 1.6093728E9
-- !query
SELECT val_date, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_date
RANGE BETWEEN CURRENT ROW AND interval '1 2:3:4.001' day to second FOLLOWING) FROM testData
ORDER BY cate, val_date
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve '(PARTITION BY testdata.cate ORDER BY testdata.val_date ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND INTERVAL '1 02:03:04.001' DAY TO SECOND FOLLOWING)' due to data type mismatch: The data type 'date' used in the order specification does not match the data type 'day-time interval' which is used in the range frame.; line 1 pos 46
-- !query
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val