[SPARK-36854][SQL] Handle ANSI intervals by the off-heap column vector

### What changes were proposed in this pull request?
Modify `OffHeapColumnVector.reserveInternal` to handle ANSI intervals - `DayTimeIntervalType` and `YearMonthIntervalType`.

### Why are the changes needed?
The changes fix the issue which the example below demonstrates:
```scala
scala> spark.conf.set("spark.sql.columnVector.offheap.enabled", true)
scala> spark.read.parquet("/Users/maximgekk/tmp/parquet_offheap").show()
21/09/25 22:09:03 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.RuntimeException: Unhandled YearMonthIntervalType(0,1)
	at org.apache.spark.sql.execution.vectorized.OffHeapColumnVector.reserveInternal(OffHeapColumnVector.java:562)
```
SPARK-36854 shows how the parquet files in `/Users/maximgekk/tmp/parquet_offheap` were prepared.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suite:
```
$ build/sbt "sql/test:testOnly *ParquetIOSuite"
```

Closes #34106 from MaxGekk/ansi-interval-OffHeapColumnVector.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Max Gekk 2021-09-26 12:44:19 +09:00 committed by Hyukjin Kwon
parent 3577898ecb
commit 7d496fb361
2 changed files with 21 additions and 16 deletions

View file

@ -550,11 +550,12 @@ public final class OffHeapColumnVector extends WritableColumnVector {
} else if (type instanceof ShortType) {
this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L);
} else if (type instanceof IntegerType || type instanceof FloatType ||
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
type instanceof DateType || DecimalType.is32BitDecimalType(type) ||
type instanceof YearMonthIntervalType) {
this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
} else if (type instanceof LongType || type instanceof DoubleType ||
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType ||
type instanceof TimestampNTZType) {
type instanceof TimestampNTZType || type instanceof DayTimeIntervalType) {
this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
} else if (childColumns != null) {
// Nothing to store.

View file

@ -1062,20 +1062,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
test("SPARK-36825: year-month/day-time intervals written and read as INT32/INT64") {
Seq(
YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)),
DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i))
).foreach { case (it, f) =>
val data = (1 to 10).map(i => Row(i, f(i)))
val schema = StructType(Array(StructField("d", IntegerType, false),
StructField("i", it, false)))
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
withAllParquetReaders {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
test("SPARK-36825, SPARK-36854: year-month/day-time intervals written and read as INT32/INT64") {
Seq(false, true).foreach { offHeapEnabled =>
withSQLConf(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offHeapEnabled.toString) {
Seq(
YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)),
DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i))
).foreach { case (it, f) =>
val data = (1 to 10).map(i => Row(i, f(i)))
val schema = StructType(Array(StructField("d", IntegerType, false),
StructField("i", it, false)))
withTempPath { file =>
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
df.write.parquet(file.getCanonicalPath)
withAllParquetReaders {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
}
}