[SPARK-32344][SQL] Unevaluable expr is set to FIRST/LAST ignoreNullsExpr in distinct aggregates

### What changes were proposed in this pull request?

This PR intends to fix a bug of distinct FIRST/LAST aggregates in v2.4.6/v3.0.0/master;
```
scala> sql("SELECT FIRST(DISTINCT v) FROM VALUES 1, 2, 3 t(v)").show()
...
Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: false#37
  at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:258)
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:226)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.ignoreNulls(First.scala:68)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions$lzycompute(First.scala:82)
  at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions(First.scala:81)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$15.apply(HashAggregateExec.scala:268)
```
A root cause of this bug is that the `Aggregation` strategy replaces a foldable boolean `ignoreNullsExpr` expr with a `Unevaluable` expr (`AttributeReference`) for distinct FIRST/LAST aggregate functions. But, this operation cannot be allowed because the `Analyzer` has checked that it must be foldabe;
ffdbbae1d4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala (L74-L76)
So, this PR proposes to change a vriable for `IGNORE NULLS`  from `Expression` to `Boolean` to avoid the case.

### Why are the changes needed?

Bugfix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a test in `DataFrameAggregateSuite`.

Closes #29143 from maropu/SPARK-32344.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Takeshi Yamamuro 2020-07-19 11:11:42 +09:00 committed by HyukjinKwon
parent 40ef01283d
commit c7a68a920d
12 changed files with 89 additions and 63 deletions

View file

@ -791,9 +791,9 @@ class Analyzer(
// AggregateFunction's with the exception of First and Last in their default mode
// (which we handle) and possibly some Hive UDAF's.
case First(expr, _) =>
First(ifExpr(expr), Literal(true))
First(ifExpr(expr), true)
case Last(expr, _) =>
Last(ifExpr(expr), Literal(true))
Last(ifExpr(expr), true)
case a: AggregateFunction =>
a.withNewChildren(a.children.map(ifExpr))
}.transform {

View file

@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
@ -49,12 +50,16 @@ import org.apache.spark.sql.types._
""",
group = "agg_funcs",
since = "2.0.0")
case class First(child: Expression, ignoreNullsExpr: Expression)
case class First(child: Expression, ignoreNulls: Boolean)
extends DeclarativeAggregate with ExpectsInputTypes {
def this(child: Expression) = this(child, Literal.create(false, BooleanType))
def this(child: Expression) = this(child, false)
override def children: Seq[Expression] = child :: ignoreNullsExpr :: Nil
def this(child: Expression, ignoreNullsExpr: Expression) = {
this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "first"))
}
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = true
@ -71,16 +76,11 @@ case class First(child: Expression, ignoreNullsExpr: Expression)
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
} else if (!ignoreNullsExpr.foldable) {
TypeCheckFailure(
s"The second argument of First must be a boolean literal, but got: ${ignoreNullsExpr.sql}")
} else {
TypeCheckSuccess
}
}
private def ignoreNulls: Boolean = ignoreNullsExpr.eval().asInstanceOf[Boolean]
private lazy val first = AttributeReference("first", child.dataType)()
private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
@ -120,3 +120,11 @@ case class First(child: Expression, ignoreNullsExpr: Expression)
override def toString: String = s"$prettyName($child)${if (ignoreNulls) " ignore nulls"}"
}
object FirstLast {
def validateIgnoreNullExpr(exp: Expression, funcName: String): Boolean = exp match {
case Literal(b: Boolean, BooleanType) => b
case _ => throw new AnalysisException(
s"The second argument in $funcName should be a boolean literal.")
}
}

View file

@ -17,8 +17,8 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
@ -49,12 +49,16 @@ import org.apache.spark.sql.types._
""",
group = "agg_funcs",
since = "2.0.0")
case class Last(child: Expression, ignoreNullsExpr: Expression)
case class Last(child: Expression, ignoreNulls: Boolean)
extends DeclarativeAggregate with ExpectsInputTypes {
def this(child: Expression) = this(child, Literal.create(false, BooleanType))
def this(child: Expression) = this(child, false)
override def children: Seq[Expression] = child :: ignoreNullsExpr :: Nil
def this(child: Expression, ignoreNullsExpr: Expression) = {
this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "last"))
}
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = true
@ -71,16 +75,11 @@ case class Last(child: Expression, ignoreNullsExpr: Expression)
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
} else if (!ignoreNullsExpr.foldable) {
TypeCheckFailure(
s"The second argument of Last must be a boolean literal, but got: ${ignoreNullsExpr.sql}")
} else {
TypeCheckSuccess
}
}
private def ignoreNulls: Boolean = ignoreNullsExpr.eval().asInstanceOf[Boolean]
private lazy val last = AttributeReference("last", child.dataType)()
private lazy val valueSet = AttributeReference("valueSet", BooleanType)()

View file

@ -257,7 +257,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
// Select the result of the first aggregate in the last aggregate.
val result = AggregateExpression(
aggregate.First(evalWithinGroup(regularGroupId, operator.toAttribute), Literal(true)),
aggregate.First(evalWithinGroup(regularGroupId, operator.toAttribute), true),
mode = Complete,
isDistinct = false)

View file

@ -1535,7 +1535,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitFirst(ctx: FirstContext): Expression = withOrigin(ctx) {
val ignoreNullsExpr = ctx.IGNORE != null
First(expression(ctx.expression), Literal(ignoreNullsExpr)).toAggregateExpression()
First(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
}
/**
@ -1543,7 +1543,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitLast(ctx: LastContext): Expression = withOrigin(ctx) {
val ignoreNullsExpr = ctx.IGNORE != null
Last(expression(ctx.expression), Literal(ignoreNullsExpr)).toAggregateExpression()
Last(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
}
/**

View file

@ -17,14 +17,15 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
import org.apache.spark.sql.types.IntegerType
class LastTestSuite extends SparkFunSuite {
class FirstLastTestSuite extends SparkFunSuite {
val input = AttributeReference("input", IntegerType, nullable = true)()
val evaluator = DeclarativeAggregateEvaluator(Last(input, Literal(false)), Seq(input))
val evaluatorIgnoreNulls = DeclarativeAggregateEvaluator(Last(input, Literal(true)), Seq(input))
val evaluator = DeclarativeAggregateEvaluator(Last(input, false), Seq(input))
val evaluatorIgnoreNulls = DeclarativeAggregateEvaluator(Last(input, true), Seq(input))
test("empty buffer") {
assert(evaluator.initialize() === InternalRow(null, false))
@ -106,4 +107,15 @@ class LastTestSuite extends SparkFunSuite {
val m1 = evaluatorIgnoreNulls.merge(p1, p2)
assert(evaluatorIgnoreNulls.eval(m1) === InternalRow(1))
}
test("SPARK-32344: correct error handling for a type mismatch") {
val msg1 = intercept[AnalysisException] {
new First(input, Literal(1, IntegerType))
}.getMessage
assert(msg1.contains("The second argument in first should be a boolean literal"))
val msg2 = intercept[AnalysisException] {
new Last(input, Literal(1, IntegerType))
}.getMessage
assert(msg2.contains("The second argument in last should be a boolean literal"))
}
}

View file

@ -785,10 +785,10 @@ class ExpressionParserSuite extends AnalysisTest {
}
test("SPARK-19526 Support ignore nulls keywords for first and last") {
assertEqual("first(a ignore nulls)", First('a, Literal(true)).toAggregateExpression())
assertEqual("first(a)", First('a, Literal(false)).toAggregateExpression())
assertEqual("last(a ignore nulls)", Last('a, Literal(true)).toAggregateExpression())
assertEqual("last(a)", Last('a, Literal(false)).toAggregateExpression())
assertEqual("first(a ignore nulls)", First('a, true).toAggregateExpression())
assertEqual("first(a)", First('a, false).toAggregateExpression())
assertEqual("last(a ignore nulls)", Last('a, true).toAggregateExpression())
assertEqual("last(a)", Last('a, false).toAggregateExpression())
}
test("timestamp literals") {

View file

@ -461,7 +461,7 @@ object functions {
* @since 2.0.0
*/
def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new First(e.expr, Literal(ignoreNulls))
First(e.expr, ignoreNulls)
}
/**
@ -586,7 +586,7 @@ object functions {
* @since 2.0.0
*/
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new Last(e.expr, Literal(ignoreNulls))
new Last(e.expr, ignoreNulls)
}
/**

View file

@ -315,12 +315,12 @@
| org.apache.spark.sql.catalyst.expressions.aggregate.CountMinSketchAgg | count_min_sketch | N/A | N/A |
| org.apache.spark.sql.catalyst.expressions.aggregate.CovPopulation | covar_pop | SELECT covar_pop(c1, c2) FROM VALUES (1,1), (2,2), (3,3) AS tab(c1, c2) | struct<covar_pop(CAST(c1 AS DOUBLE), CAST(c2 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.aggregate.CovSample | covar_samp | SELECT covar_samp(c1, c2) FROM VALUES (1,1), (2,2), (3,3) AS tab(c1, c2) | struct<covar_samp(CAST(c1 AS DOUBLE), CAST(c2 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.aggregate.First | first_value | SELECT first_value(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<first_value(col, false):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.First | first | SELECT first(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<first(col, false):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.First | first_value | SELECT first_value(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<first_value(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.First | first | SELECT first(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<first(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus | approx_count_distinct | SELECT approx_count_distinct(col1) FROM VALUES (1), (1), (2), (2), (3) tab(col1) | struct<approx_count_distinct(col1):bigint> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Kurtosis | kurtosis | SELECT kurtosis(col) FROM VALUES (-10), (-20), (100), (1000) AS tab(col) | struct<kurtosis(CAST(col AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last_value | SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last_value(col, false):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last | SELECT last(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last(col, false):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last_value | SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last_value(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Last | last | SELECT last(col) FROM VALUES (10), (5), (20) AS tab(col) | struct<last(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Max | max | SELECT max(col) FROM VALUES (10), (50), (20) AS tab(col) | struct<max(col):int> |
| org.apache.spark.sql.catalyst.expressions.aggregate.MaxBy | max_by | SELECT max_by(x, y) FROM VALUES (('a', 10)), (('b', 50)), (('c', 20)) AS tab(x, y) | struct<max_by(x, y):string> |
| org.apache.spark.sql.catalyst.expressions.aggregate.Min | min | SELECT min(col) FROM VALUES (10), (-1), (20) AS tab(col) | struct<min(col):int> |

View file

@ -270,7 +270,7 @@ struct<lead((ten * 2), 1, -1) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIR
-- !query
SELECT first(ten) OVER (PARTITION BY four ORDER BY ten), ten, four FROM tenk1 WHERE unique2 < 10
-- !query schema
struct<first(ten, false) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int,ten:int,four:int>
struct<first(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int,ten:int,four:int>
-- !query output
0 0 0
0 0 0
@ -287,7 +287,7 @@ struct<first(ten, false) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RA
-- !query
SELECT last(four) OVER (ORDER BY ten), ten, four FROM tenk1 WHERE unique2 < 10
-- !query schema
struct<last(four, false) OVER (ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int,ten:int,four:int>
struct<last(four) OVER (ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int,ten:int,four:int>
-- !query output
0 4 0
1 1 1
@ -306,7 +306,7 @@ SELECT last(ten) OVER (PARTITION BY four), ten, four FROM
(SELECT * FROM tenk1 WHERE unique2 < 10 ORDER BY four, ten)s
ORDER BY four, ten
-- !query schema
struct<last(ten, false) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):int,ten:int,four:int>
struct<last(ten) OVER (PARTITION BY four ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):int,ten:int,four:int>
-- !query output
4 0 0
4 0 0
@ -476,7 +476,7 @@ sum(ten) over (partition by four order by ten),
last(ten) over (partition by four order by ten)
FROM (select distinct ten, four from tenk1) ss
-- !query schema
struct<four:int,ten:int,sum(CAST(ten AS BIGINT)) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,last(ten, false) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
struct<four:int,ten:int,sum(CAST(ten AS BIGINT)) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,last(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
-- !query output
0 0 0 0
0 2 2 2
@ -506,7 +506,7 @@ sum(ten) over (partition by four order by ten range between unbounded preceding
last(ten) over (partition by four order by ten range between unbounded preceding and current row)
FROM (select distinct ten, four from tenk1) ss
-- !query schema
struct<four:int,ten:int,sum(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,last(ten, false) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
struct<four:int,ten:int,sum(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):bigint,last(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int>
-- !query output
0 0 0 0
0 2 2 2
@ -536,7 +536,7 @@ sum(ten) over (partition by four order by ten range between unbounded preceding
last(ten) over (partition by four order by ten range between unbounded preceding and unbounded following)
FROM (select distinct ten, four from tenk1) ss
-- !query schema
struct<four:int,ten:int,sum(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,last(ten, false) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):int>
struct<four:int,ten:int,sum(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,last(ten) OVER (PARTITION BY four ORDER BY ten ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):int>
-- !query output
0 0 20 8
0 2 20 8

View file

@ -101,7 +101,7 @@ from
window w as
(order by ss.id asc nulls first range between 2 preceding and 2 following)
-- !query schema
struct<id:bigint,y:bigint,first(y, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,y:bigint,first(y) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
-- !query output
1 1 1 3
2 2 1 4
@ -123,7 +123,7 @@ from
window w as
(order by ss.id asc nulls last range between 2 preceding and 2 following)
-- !query schema
struct<id:bigint,y:bigint,first(y, false) OVER (ORDER BY id ASC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y, false) OVER (ORDER BY id ASC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,y:bigint,first(y) OVER (ORDER BY id ASC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y) OVER (ORDER BY id ASC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
-- !query output
1 1 1 3
2 2 1 4
@ -145,7 +145,7 @@ from
window w as
(order by ss.id desc nulls first range between 2 preceding and 2 following)
-- !query schema
struct<id:bigint,y:bigint,first(y, false) OVER (ORDER BY id DESC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y, false) OVER (ORDER BY id DESC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,y:bigint,first(y) OVER (ORDER BY id DESC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y) OVER (ORDER BY id DESC NULLS FIRST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
-- !query output
1 1 3 1
2 2 4 1
@ -167,7 +167,7 @@ from
window w as
(order by ss.id desc nulls last range between 2 preceding and 2 following)
-- !query schema
struct<id:bigint,y:bigint,first(y, false) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y, false) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,y:bigint,first(y) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint,last(y) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CAST((- 2) AS BIGINT) FOLLOWING AND CAST(2 AS BIGINT) FOLLOWING):bigint>
-- !query output
1 1 3 1
2 2 4 1
@ -182,7 +182,7 @@ NULL 43 42 43
select x.id, last(x.id) over (order by x.id range between current row and 2147450884 following)
from range(32764, 32767) x
-- !query schema
struct<id:bigint,last(id, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2147450884 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,last(id) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2147450884 AS BIGINT) FOLLOWING):bigint>
-- !query output
32764 32766
32765 32766
@ -193,7 +193,7 @@ struct<id:bigint,last(id, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN
select x.id, last(x.id) over (order by x.id desc range between current row and 2147450885 following)
from range(-32766, -32765) x
-- !query schema
struct<id:bigint,last(id, false) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND CAST(2147450885 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,last(id) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND CAST(2147450885 AS BIGINT) FOLLOWING):bigint>
-- !query output
-32766 -32766
@ -202,7 +202,7 @@ struct<id:bigint,last(id, false) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN
select x.id, last(x.id) over (order by x.id range between current row and 4 following)
from range(2147483644, 2147483647) x
-- !query schema
struct<id:bigint,last(id, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(4 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,last(id) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(4 AS BIGINT) FOLLOWING):bigint>
-- !query output
2147483644 2147483646
2147483645 2147483646
@ -213,7 +213,7 @@ struct<id:bigint,last(id, false) OVER (ORDER BY id ASC NULLS FIRST RANGE BETWEEN
select x.id, last(x.id) over (order by x.id desc range between current row and 5 following)
from range(-2147483646, -2147483645) x
-- !query schema
struct<id:bigint,last(id, false) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND CAST(5 AS BIGINT) FOLLOWING):bigint>
struct<id:bigint,last(id) OVER (ORDER BY id DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND CAST(5 AS BIGINT) FOLLOWING):bigint>
-- !query output
-2147483646 -2147483646
@ -272,7 +272,7 @@ from numerics
window w as (order by f_float4 range between
1 preceding and 1 following)
-- !query schema
struct<id:int,f_float4:float,first(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int>
struct<id:int,f_float4:float,first(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 1
2 -1.0 2 3
@ -289,7 +289,7 @@ from numerics
window w as (order by f_float4 range between
1 preceding and 1.1 following)
-- !query schema
struct<id:int,f_float4:float,first(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int>
struct<id:int,f_float4:float,first(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 1
2 -1.0 2 3
@ -306,7 +306,7 @@ from numerics
window w as (order by f_float4 range between
'inf' preceding and 'inf' following)
-- !query schema
struct<id:int,f_float4:float,first(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int>
struct<id:int,f_float4:float,first(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 7
2 -1.0 1 7
@ -323,7 +323,7 @@ from numerics
window w as (order by f_float4 range between
1.1 preceding and 'NaN' following)
-- !query schema
struct<id:int,f_float4:float,first(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int>
struct<id:int,f_float4:float,first(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float4 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 7
2 -1.0 2 7
@ -340,7 +340,7 @@ from numerics
window w as (order by f_float8 range between
1 preceding and 1 following)
-- !query schema
struct<id:int,f_float8:float,first(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int>
struct<id:int,f_float8:float,first(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1 AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 1
2 -1.0 2 3
@ -357,7 +357,7 @@ from numerics
window w as (order by f_float8 range between
1 preceding and 1.1 following)
-- !query schema
struct<id:int,f_float8:float,first(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int>
struct<id:int,f_float8:float,first(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1) AS FLOAT) FOLLOWING AND CAST(1.1 AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 1
2 -1.0 2 3
@ -374,7 +374,7 @@ from numerics
window w as (order by f_float8 range between
'inf' preceding and 'inf' following)
-- !query schema
struct<id:int,f_float8:float,first(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int>
struct<id:int,f_float8:float,first(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- CAST(inf AS DOUBLE)) AS FLOAT) FOLLOWING AND CAST(inf AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 7
2 -1.0 1 7
@ -391,7 +391,7 @@ from numerics
window w as (order by f_float8 range between
1.1 preceding and 'NaN' following)
-- !query schema
struct<id:int,f_float8:float,first(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int>
struct<id:int,f_float8:float,first(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int,last(id) OVER (ORDER BY f_float8 ASC NULLS FIRST RANGE BETWEEN CAST((- 1.1) AS FLOAT) FOLLOWING AND CAST(NaN AS FLOAT) FOLLOWING):int>
-- !query output
1 -3.0 1 7
2 -1.0 2 7
@ -408,7 +408,7 @@ from numerics
window w as (order by f_numeric range between
1 preceding and 1 following)
-- !query schema
struct<id:int,f_numeric:int,first(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING):int,last(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING):int>
struct<id:int,f_numeric:int,first(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING):int,last(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING):int>
-- !query output
1 -3 1 1
2 -1 2 3
@ -425,7 +425,7 @@ from numerics
window w as (order by f_numeric range between
1 preceding and 1.1 following)
-- !query schema
struct<id:int,f_numeric:int,first(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int>
struct<id:int,f_numeric:int,first(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int,last(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int>
-- !query output
1 -3 1 1
2 -1 2 3
@ -442,7 +442,7 @@ from numerics
window w as (order by f_numeric range between
1 preceding and 1.1 following)
-- !query schema
struct<id:int,f_numeric:int,first(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int,last(id, false) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int>
struct<id:int,f_numeric:int,first(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int,last(id) OVER (ORDER BY f_numeric ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CAST(1.1 AS INT) FOLLOWING):int>
-- !query output
1 -3 1 1
2 -1 2 3

View file

@ -1037,6 +1037,13 @@ class DataFrameAggregateSuite extends QueryTest
val groupBy = df.groupBy("b").agg(count("*"))
checkAnswer(groupBy, Row(null, 1) :: Row(Row(null), 1) :: Row(Row(1.0), 1) :: Nil)
}
test("SPARK-32344: Unevaluable's set to FIRST/LAST ignoreNullsExpr in distinct aggregates") {
val queryTemplate = (agg: String) =>
s"SELECT $agg(DISTINCT v) FROM (SELECT v FROM VALUES 1, 2, 3 t(v) ORDER BY v)"
checkAnswer(sql(queryTemplate("FIRST")), Row(1))
checkAnswer(sql(queryTemplate("LAST")), Row(3))
}
}
case class B(c: Option[Double])