diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index b4b6903cda..f7c9dc55f7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -550,11 +550,12 @@ public final class OffHeapColumnVector extends WritableColumnVector { } else if (type instanceof ShortType) { this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L); } else if (type instanceof IntegerType || type instanceof FloatType || - type instanceof DateType || DecimalType.is32BitDecimalType(type)) { + type instanceof DateType || DecimalType.is32BitDecimalType(type) || + type instanceof YearMonthIntervalType) { this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L); } else if (type instanceof LongType || type instanceof DoubleType || DecimalType.is64BitDecimalType(type) || type instanceof TimestampType || - type instanceof TimestampNTZType) { + type instanceof TimestampNTZType || type instanceof DayTimeIntervalType) { this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L); } else if (childColumns != null) { // Nothing to store. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index e59b49978f..593fa400ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1062,20 +1062,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("SPARK-36825: year-month/day-time intervals written and read as INT32/INT64") { - Seq( - YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), - DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) - ).foreach { case (it, f) => - val data = (1 to 10).map(i => Row(i, f(i))) - val schema = StructType(Array(StructField("d", IntegerType, false), - StructField("i", it, false))) - withTempPath { file => - val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - df.write.parquet(file.getCanonicalPath) - withAllParquetReaders { - val df2 = spark.read.parquet(file.getCanonicalPath) - checkAnswer(df2, df.collect().toSeq) + test("SPARK-36825, SPARK-36854: year-month/day-time intervals written and read as INT32/INT64") { + Seq(false, true).foreach { offHeapEnabled => + withSQLConf(SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offHeapEnabled.toString) { + Seq( + YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), + DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) + ).foreach { case (it, f) => + val data = (1 to 10).map(i => Row(i, f(i))) + val schema = StructType(Array(StructField("d", IntegerType, false), + StructField("i", it, false))) + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.parquet(file.getCanonicalPath) + withAllParquetReaders { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } } } }