[SPARK-35243][SQL] Support columnar execution on ANSI interval types

### What changes were proposed in this pull request?
Columnar execution support for ANSI interval types include YearMonthIntervalType and DayTimeIntervalType

### Why are the changes needed?
support cache tables with ANSI interval types.

### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
run ./dev/lint-java
run ./dev/scalastyle
run test: CachedTableSuite
run test: ColumnTypeSuite

Closes #32452 from Peng-Lei/SPARK-35243.

Lead-authored-by: PengLei <18066542445@189.cn>
Co-authored-by: Lei Peng <peng.8lei@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
PengLei 2021-05-12 20:11:34 +09:00 committed by Hyukjin Kwon
parent ecb48ccb7d
commit 82c520a3e2
5 changed files with 23 additions and 8 deletions

View file

@ -139,8 +139,8 @@ private[sql] object ColumnAccessor {
case BooleanType => new BooleanColumnAccessor(buf)
case ByteType => new ByteColumnAccessor(buf)
case ShortType => new ShortColumnAccessor(buf)
case IntegerType | DateType => new IntColumnAccessor(buf)
case LongType | TimestampType => new LongColumnAccessor(buf)
case IntegerType | DateType | YearMonthIntervalType => new IntColumnAccessor(buf)
case LongType | TimestampType | DayTimeIntervalType => new LongColumnAccessor(buf)
case FloatType => new FloatColumnAccessor(buf)
case DoubleType => new DoubleColumnAccessor(buf)
case StringType => new StringColumnAccessor(buf)

View file

@ -174,8 +174,8 @@ private[columnar] object ColumnBuilder {
case BooleanType => new BooleanColumnBuilder
case ByteType => new ByteColumnBuilder
case ShortType => new ShortColumnBuilder
case IntegerType | DateType => new IntColumnBuilder
case LongType | TimestampType => new LongColumnBuilder
case IntegerType | DateType | YearMonthIntervalType => new IntColumnBuilder
case LongType | TimestampType | DayTimeIntervalType => new LongColumnBuilder
case FloatType => new FloatColumnBuilder
case DoubleType => new DoubleColumnBuilder
case StringType => new StringColumnBuilder

View file

@ -817,8 +817,8 @@ private[columnar] object ColumnType {
case BooleanType => BOOLEAN
case ByteType => BYTE
case ShortType => SHORT
case IntegerType | DateType => INT
case LongType | TimestampType => LONG
case IntegerType | DateType | YearMonthIntervalType => INT
case LongType | TimestampType | DayTimeIntervalType => LONG
case FloatType => FLOAT
case DoubleType => DOUBLE
case StringType => STRING

View file

@ -80,8 +80,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case BooleanType => classOf[BooleanColumnAccessor].getName
case ByteType => classOf[ByteColumnAccessor].getName
case ShortType => classOf[ShortColumnAccessor].getName
case IntegerType | DateType => classOf[IntColumnAccessor].getName
case LongType | TimestampType => classOf[LongColumnAccessor].getName
case IntegerType | DateType | YearMonthIntervalType => classOf[IntColumnAccessor].getName
case LongType | TimestampType | DayTimeIntervalType => classOf[LongColumnAccessor].getName
case FloatType => classOf[FloatColumnAccessor].getName
case DoubleType => classOf[DoubleColumnAccessor].getName
case StringType => classOf[StringColumnAccessor].getName

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.io.{File, FilenameFilter}
import java.nio.file.{Files, Paths}
import java.time.{Duration, Period}
import scala.collection.mutable.HashSet
import scala.concurrent.duration._
@ -1187,6 +1188,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
test("SPARK-35243: cache supports for YearMonthIntervalType and DayTimeIntervalType") {
withTempView("ymi_dti_interval_cache") {
Seq((1, Period.ofYears(1), Duration.ofDays(1)),
(2, Period.ofYears(2), Duration.ofDays(2)))
.toDF("k", "v1", "v2").createTempView("ymi_dti_interval_cache")
sql("CACHE TABLE tmp AS SELECT k, v1, v2 FROM ymi_dti_interval_cache")
assert(spark.catalog.isCached("tmp"))
checkAnswer(sql("SELECT * FROM tmp WHERE k = 1"),
Row(1, Period.ofYears(1), Duration.ofDays(1)))
sql("UNCACHE TABLE tmp")
assert(!spark.catalog.isCached("tmp"))
}
}
test("SPARK-30494 Fix the leak of cached data when replace an existing view") {
withTempView("tempView") {
spark.catalog.clearCache()