From 7d496fb361cfa885515a91d90669af2b9cc3a45e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 26 Sep 2021 12:44:19 +0900 Subject: [PATCH] [SPARK-36854][SQL] Handle ANSI intervals by the off-heap column vector ### What changes were proposed in this pull request? Modify `OffHeapColumnVector.reserveInternal` to handle ANSI intervals - `DayTimeIntervalType` and `YearMonthIntervalType`. ### Why are the changes needed? The changes fix the issue which the example below demonstrates: ```scala scala> spark.conf.set("spark.sql.columnVector.offheap.enabled", true) scala> spark.read.parquet("/Users/maximgekk/tmp/parquet_offheap").show() 21/09/25 22:09:03 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.RuntimeException: Unhandled YearMonthIntervalType(0,1) at org.apache.spark.sql.execution.vectorized.OffHeapColumnVector.reserveInternal(OffHeapColumnVector.java:562) ``` SPARK-36854 shows how the parquet files in `/Users/maximgekk/tmp/parquet_offheap` were prepared. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the modified test suite: ``` $ build/sbt "sql/test:testOnly *ParquetIOSuite" ``` Closes #34106 from MaxGekk/ansi-interval-OffHeapColumnVector. Authored-by: Max Gekk Signed-off-by: Hyukjin Kwon --- .../vectorized/OffHeapColumnVector.java | 5 +-- .../datasources/parquet/ParquetIOSuite.scala | 32 +++++++++++-------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index b4b6903cda..f7c9dc55f7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -550,11 +550,12 @@ public final class OffHeapColumnVector extends WritableColumnVector { } else if (type instanceof ShortType) { this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L); } else if (type instanceof IntegerType || type instanceof FloatType || - type instanceof DateType || DecimalType.is32BitDecimalType(type)) { + type instanceof DateType || DecimalType.is32BitDecimalType(type) || + type instanceof YearMonthIntervalType) { this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType || - type instanceof TimestampNTZType) { + type instanceof TimestampNTZType || type instanceof DayTimeIntervalType) { this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L); } else if (childColumns != null) { // Nothing to store. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index e59b49978f..593fa400ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1062,20 +1062,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("SPARK-36825: year-month/day-time intervals written and read as INT32/INT64") { - Seq( - YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), - DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) - ).foreach { case (it, f) => - val data = (1 to 10).map(i => Row(i, f(i))) - val schema = StructType(Array(StructField("d", IntegerType, false), - StructField("i", it, false))) - withTempPath { file => - val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - df.write.parquet(file.getCanonicalPath) - withAllParquetReaders { - val df2 = spark.read.parquet(file.getCanonicalPath) - checkAnswer(df2, df.collect().toSeq) + test("SPARK-36825, SPARK-36854: year-month/day-time intervals written and read as INT32/INT64") { + Seq(false, true).foreach { offHeapEnabled => + withSQLConf(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offHeapEnabled.toString) { + Seq( + YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), + DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) + ).foreach { case (it, f) => + val data = (1 to 10).map(i => Row(i, f(i))) + val schema = StructType(Array(StructField("d", IntegerType, false), + StructField("i", it, false))) + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.parquet(file.getCanonicalPath) + withAllParquetReaders { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } } } }