[SPARK-36054][SQL] Support group by TimestampNTZ type column
### What changes were proposed in this pull request? Support group by TimestampNTZ type column ### Why are the changes needed? It's a basic SQL operation. ### Does this PR introduce _any_ user-facing change? No, the new timestmap type is not released yet. ### How was this patch tested? Unit test Closes #33268 from gengliangwang/agg. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
parent
819c482498
commit
382b66e267
|
@ -490,7 +490,7 @@ abstract class HashExpression[E] extends Expression {
|
||||||
case BooleanType => genHashBoolean(input, result)
|
case BooleanType => genHashBoolean(input, result)
|
||||||
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
|
case ByteType | ShortType | IntegerType | DateType => genHashInt(input, result)
|
||||||
case LongType => genHashLong(input, result)
|
case LongType => genHashLong(input, result)
|
||||||
case TimestampType => genHashTimestamp(input, result)
|
case TimestampType | TimestampNTZType => genHashTimestamp(input, result)
|
||||||
case FloatType => genHashFloat(input, result)
|
case FloatType => genHashFloat(input, result)
|
||||||
case DoubleType => genHashDouble(input, result)
|
case DoubleType => genHashDouble(input, result)
|
||||||
case d: DecimalType => genHashDecimal(ctx, d, input, result)
|
case d: DecimalType => genHashDecimal(ctx, d, input, result)
|
||||||
|
|
|
@ -553,7 +553,8 @@ public final class OffHeapColumnVector extends WritableColumnVector {
|
||||||
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
|
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
|
||||||
this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
|
this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
|
||||||
} else if (type instanceof LongType || type instanceof DoubleType ||
|
} else if (type instanceof LongType || type instanceof DoubleType ||
|
||||||
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
|
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType ||
|
||||||
|
type instanceof TimestampNTZType) {
|
||||||
this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
|
this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
|
||||||
} else if (childColumns != null) {
|
} else if (childColumns != null) {
|
||||||
// Nothing to store.
|
// Nothing to store.
|
||||||
|
|
|
@ -547,7 +547,8 @@ public final class OnHeapColumnVector extends WritableColumnVector {
|
||||||
if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity);
|
if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity);
|
||||||
intData = newData;
|
intData = newData;
|
||||||
}
|
}
|
||||||
} else if (type instanceof LongType || type instanceof TimestampType ||
|
} else if (type instanceof LongType ||
|
||||||
|
type instanceof TimestampType ||type instanceof TimestampNTZType ||
|
||||||
DecimalType.is64BitDecimalType(type) || type instanceof DayTimeIntervalType) {
|
DecimalType.is64BitDecimalType(type) || type instanceof DayTimeIntervalType) {
|
||||||
if (longData == null || longData.length < newCapacity) {
|
if (longData == null || longData.length < newCapacity) {
|
||||||
long[] newData = new long[newCapacity];
|
long[] newData = new long[newCapacity];
|
||||||
|
|
|
@ -160,7 +160,7 @@ abstract class HashMapGenerator(
|
||||||
case BooleanType => hashInt(s"$input ? 1 : 0")
|
case BooleanType => hashInt(s"$input ? 1 : 0")
|
||||||
case ByteType | ShortType | IntegerType | DateType | _: YearMonthIntervalType =>
|
case ByteType | ShortType | IntegerType | DateType | _: YearMonthIntervalType =>
|
||||||
hashInt(input)
|
hashInt(input)
|
||||||
case LongType | TimestampType | _: DayTimeIntervalType => hashLong(input)
|
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType => hashLong(input)
|
||||||
case FloatType => hashInt(s"Float.floatToIntBits($input)")
|
case FloatType => hashInt(s"Float.floatToIntBits($input)")
|
||||||
case DoubleType => hashLong(s"Double.doubleToLongBits($input)")
|
case DoubleType => hashLong(s"Double.doubleToLongBits($input)")
|
||||||
case d: DecimalType =>
|
case d: DecimalType =>
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.spark.sql
|
package org.apache.spark.sql
|
||||||
|
|
||||||
import java.time.{Duration, Period}
|
import java.time.{Duration, LocalDateTime, Period}
|
||||||
|
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
|
@ -1398,6 +1398,17 @@ class DataFrameAggregateSuite extends QueryTest
|
||||||
val df2 = Seq(Period.ofYears(1)).toDF("a").groupBy("a").count()
|
val df2 = Seq(Period.ofYears(1)).toDF("a").groupBy("a").count()
|
||||||
checkAnswer(df2, Row(Period.ofYears(1), 1))
|
checkAnswer(df2, Row(Period.ofYears(1), 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-36054: Support group by TimestampNTZ column") {
|
||||||
|
val ts1 = "2021-01-01T00:00:00"
|
||||||
|
val ts2 = "2021-01-01T00:00:01"
|
||||||
|
val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse)
|
||||||
|
val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts")
|
||||||
|
val expectedSchema =
|
||||||
|
new StructType().add(StructField("ts", TimestampNTZType)).add("count", LongType, false)
|
||||||
|
assert (df.schema == expectedSchema)
|
||||||
|
checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2), Row(LocalDateTime.parse(ts2), 1)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case class B(c: Option[Double])
|
case class B(c: Option[Double])
|
||||||
|
|
Loading…
Reference in a new issue