[SPARK-25862][SQL] Remove rangeBetween APIs introduced in SPARK-21608
## What changes were proposed in this pull request? This patch removes the rangeBetween functions introduced in SPARK-21608. As explained in SPARK-25841, these functions are confusing and don't quite work. We will redesign them and introduce better ones in SPARK-25843. ## How was this patch tested? Removed relevant test cases as well. These test cases will need to be added back in SPARK-25843. Closes #22870 from rxin/SPARK-25862. Lead-authored-by: Reynold Xin <rxin@databricks.com> Co-authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
parent
243ce319a0
commit
9cf9a83afa
|
@ -206,7 +206,7 @@ case class SpecifiedWindowFrame(
|
|||
// Check combination (of expressions).
|
||||
(lower, upper) match {
|
||||
case (l: Expression, u: Expression) if !isValidFrameBoundary(l, u) =>
|
||||
TypeCheckFailure(s"Window frame upper bound '$upper' does not followes the lower bound " +
|
||||
TypeCheckFailure(s"Window frame upper bound '$upper' does not follow the lower bound " +
|
||||
s"'$lower'.")
|
||||
case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
|
||||
case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
|
||||
|
|
|
@ -214,15 +214,6 @@ object Window {
|
|||
spec.rangeBetween(start, end)
|
||||
}
|
||||
|
||||
/**
|
||||
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
|
||||
* @since 2.3.0
|
||||
*/
|
||||
@deprecated("Use the version with Long parameter types", "2.4.0")
|
||||
def rangeBetween(start: Column, end: Column): WindowSpec = {
|
||||
spec.rangeBetween(start, end)
|
||||
}
|
||||
|
||||
private[sql] def spec: WindowSpec = {
|
||||
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
|
||||
}
|
||||
|
|
|
@ -209,18 +209,6 @@ class WindowSpec private[sql](
|
|||
SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd))
|
||||
}
|
||||
|
||||
/**
|
||||
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
|
||||
* @since 2.3.0
|
||||
*/
|
||||
@deprecated("Use the version with Long parameter types", "2.4.0")
|
||||
def rangeBetween(start: Column, end: Column): WindowSpec = {
|
||||
new WindowSpec(
|
||||
partitionSpec,
|
||||
orderSpec,
|
||||
SpecifiedWindowFrame(RangeFrame, start.expr, end.expr))
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
|
||||
*/
|
||||
|
|
|
@ -829,32 +829,6 @@ object functions {
|
|||
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Window functions
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||
/**
|
||||
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
|
||||
*
|
||||
* @group window_funcs
|
||||
* @since 2.3.0
|
||||
*/
|
||||
@deprecated("Use Window.unboundedPreceding", "2.4.0")
|
||||
def unboundedPreceding(): Column = Column(UnboundedPreceding)
|
||||
|
||||
/**
|
||||
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
|
||||
*
|
||||
* @group window_funcs
|
||||
* @since 2.3.0
|
||||
*/
|
||||
@deprecated("Use Window.unboundedFollowing", "2.4.0")
|
||||
def unboundedFollowing(): Column = Column(UnboundedFollowing)
|
||||
|
||||
/**
|
||||
* This function has been deprecated in Spark 2.4. See SPARK-25842 for more information.
|
||||
*
|
||||
* @group window_funcs
|
||||
* @since 2.3.0
|
||||
*/
|
||||
@deprecated("Use Window.currentRow", "2.4.0")
|
||||
def currentRow(): Column = Column(CurrentRow)
|
||||
|
||||
/**
|
||||
* Window function: returns the cumulative distribution of values within a window partition,
|
||||
|
|
|
@ -191,7 +191,7 @@ ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, v
|
|||
struct<>
|
||||
-- !query 11 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
|
||||
cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not follow the lower bound 'unboundedfollowing$()'.; line 1 pos 33
|
||||
|
||||
|
||||
-- !query 12
|
||||
|
|
|
@ -17,12 +17,11 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import java.sql.{Date, Timestamp}
|
||||
import java.sql.Date
|
||||
|
||||
import org.apache.spark.sql.expressions.Window
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.test.SharedSQLContext
|
||||
import org.apache.spark.unsafe.types.CalendarInterval
|
||||
|
||||
/**
|
||||
* Window frame testing for DataFrame API.
|
||||
|
@ -219,71 +218,6 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSQLContext {
|
|||
Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
|
||||
Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))
|
||||
)
|
||||
|
||||
def dt(date: String): Date = Date.valueOf(date)
|
||||
|
||||
val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), (dt("2020-12-31"), "1"),
|
||||
(dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), "2"))
|
||||
.toDF("key", "value")
|
||||
val window = Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), lit(2))
|
||||
|
||||
checkAnswer(
|
||||
df2.select(
|
||||
$"key",
|
||||
count("key").over(window)),
|
||||
Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), Row(dt("2020-12-31"), 1),
|
||||
Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), Row(dt("2020-12-31"), 1))
|
||||
)
|
||||
}
|
||||
|
||||
test("range between should accept double values as boundary") {
|
||||
val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"), (3.3D, "2"), (2.02D, "1"),
|
||||
(100.001D, "2")).toDF("key", "value")
|
||||
val window = Window.partitionBy($"value").orderBy($"key").rangeBetween(currentRow, lit(2.5D))
|
||||
|
||||
checkAnswer(
|
||||
df.select(
|
||||
$"key",
|
||||
count("key").over(window)),
|
||||
Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 1), Row(100.001, 1))
|
||||
)
|
||||
}
|
||||
|
||||
test("range between should accept interval values as boundary") {
|
||||
def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
|
||||
|
||||
val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
|
||||
(ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
|
||||
.toDF("key", "value")
|
||||
val window = Window.partitionBy($"value").orderBy($"key")
|
||||
.rangeBetween(currentRow, lit(CalendarInterval.fromString("interval 23 days 4 hours")))
|
||||
|
||||
checkAnswer(
|
||||
df.select(
|
||||
$"key",
|
||||
count("key").over(window)),
|
||||
Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 1),
|
||||
Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1))
|
||||
)
|
||||
}
|
||||
|
||||
test("range between should accept interval values as both boundaries") {
|
||||
def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
|
||||
|
||||
val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
|
||||
(ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
|
||||
.toDF("key", "value")
|
||||
val window = Window.partitionBy($"value").orderBy($"key")
|
||||
.rangeBetween(lit(CalendarInterval.fromString("interval 3 hours")),
|
||||
lit(CalendarInterval.fromString("interval 23 days 4 hours")))
|
||||
|
||||
checkAnswer(
|
||||
df.select(
|
||||
$"key",
|
||||
count("key").over(window)),
|
||||
Seq(Row(ts(1501545600), 1), Row(ts(1501545600), 1), Row(ts(1609372800), 0),
|
||||
Row(ts(1503000000), 0), Row(ts(1502000000), 0), Row(ts(1609372800), 0))
|
||||
)
|
||||
}
|
||||
|
||||
test("unbounded rows/range between with aggregation") {
|
||||
|
|
Loading…
Reference in a new issue