[SPARK-36016][SQL] Support TimestampNTZType in expression ApproxCountDistinctForIntervals

### What changes were proposed in this pull request?
The current `ApproxCountDistinctForInterval`s supports `TimestampType`, but not supports timestamp without time zone yet.
This PR will add the function.

### Why are the changes needed?
`ApproxCountDistinctForInterval` need supports `TimestampNTZType`.

### Does this PR introduce _any_ user-facing change?
'Yes'. `ApproxCountDistinctForInterval` accepts `TimestampNTZType`.

### How was this patch tested?
New tests.

Closes #33243 from beliefer/SPARK-36016.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
gengjiaan 2021-07-07 20:22:46 +03:00 committed by Max Gekk
parent 55373b118f
commit be382a6285
2 changed files with 9 additions and 5 deletions

View file

@ -61,7 +61,7 @@ case class ApproxCountDistinctForIntervals(
}
override def inputTypes: Seq[AbstractDataType] = {
Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType)
Seq(TypeCollection(NumericType, TimestampType, DateType, TimestampNTZType), ArrayType)
}
// Mark as lazy so that endpointsExpression is not evaluated during tree transformation.
@ -79,7 +79,7 @@ case class ApproxCountDistinctForIntervals(
TypeCheckFailure("The endpoints provided must be constant literals")
} else {
endpointsExpression.dataType match {
case ArrayType(_: NumericType | DateType | TimestampType, _) =>
case ArrayType(_: NumericType | DateType | TimestampType | TimestampNTZType, _) =>
if (endpoints.length < 2) {
TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals")
} else {
@ -122,7 +122,7 @@ case class ApproxCountDistinctForIntervals(
n.numeric.toDouble(value.asInstanceOf[n.InternalType])
case _: DateType =>
value.asInstanceOf[Int].toDouble
case _: TimestampType =>
case TimestampType | TimestampNTZType =>
value.asInstanceOf[Long].toDouble
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import java.sql.{Date, Timestamp}
import java.time.LocalDateTime
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
@ -38,7 +39,7 @@ class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
assert(
wrongColumn.checkInputDataTypes() match {
case TypeCheckFailure(msg)
if msg.contains("requires (numeric or timestamp or date) type") => true
if msg.contains("requires (numeric or timestamp or date or timestamp_ntz) type") => true
case _ => false
})
}
@ -199,7 +200,9 @@ class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
(intRecords.map(DateTimeUtils.toJavaDate),
intEndpoints.map(DateTimeUtils.toJavaDate), DateType),
(intRecords.map(DateTimeUtils.toJavaTimestamp(_)),
intEndpoints.map(DateTimeUtils.toJavaTimestamp(_)), TimestampType)
intEndpoints.map(DateTimeUtils.toJavaTimestamp(_)), TimestampType),
(intRecords.map(DateTimeUtils.microsToLocalDateTime(_)),
intEndpoints.map(DateTimeUtils.microsToLocalDateTime(_)), TimestampNTZType)
)
inputs.foreach { case (records, endpoints, dataType) =>
@ -209,6 +212,7 @@ class ApproxCountDistinctForIntervalsSuite extends SparkFunSuite {
val value = r match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
case ldt: LocalDateTime => DateTimeUtils.localDateTimeToMicros(ldt)
case _ => r
}
input.update(0, value)