[SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in Spark 3.2

### What changes were proposed in this pull request?

Remove TimestampNTZ type support in the production code of Spark 3.2.
To archive the goal, this PR adds the check "Utils.isTesting" in the following code branches:
- keyword "timestamp_ntz" and "timestamp_ltz" in parser
- New expressions from https://issues.apache.org/jira/browse/SPARK-35662
- Using java.time.localDateTime as the external type for TimestampNTZType
- `SQLConf.timestampType` which determines the default timestamp type of Spark SQL.

This is to minimize the code difference between the master branch. So that future users won't think TimestampNTZ is already available in Spark 3.2.
The downside is that users can still find TimestampNTZType under package `org.apache.spark.sql.types`. There should be nothing left other than this.
### Why are the changes needed?

As of now, there are some blockers for delivering the TimestampNTZ project in Spark 3.2:

- In the Hive Thrift server, both TimestampType and TimestampNTZType are mapped to the same timestamp type, which can cause confusion for users.
- For the Parquet data source, the new written TimestampNTZType Parquet columns will be read as TimestampType in old Spark releases. Also, we need to decide the merge schema for files mixed with TimestampType and TimestampNTZ type.
- The type coercion rules for TimestampNTZType are incomplete. For example, what should the data type of the in clause "IN(Timestamp'2020-01-01 00:00:00', TimestampNtz'2020-01-01 00:00:00') be.
- It is tricky to support TimestampNTZType in JSON/CSV data readers. We need to avoid regressions as possible as we can.

There are 10 days left for the expected 3.2 RC date. So, I propose to **release the TimestampNTZ type in Spark 3.3 instead of Spark 3.2**. So that we have enough time to make considerate designs for the issues.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing Unit tests + manual tests from spark-shell to validate the changes are gone.
New functions
```
spark.sql("select to_timestamp_ntz'2021-01-01 00:00:00'").show()
spark.sql("select to_timestamp_ltz'2021-01-01 00:00:00'").show()
spark.sql("select make_timestamp_ntz(1,1,1,1,1,1)").show()
spark.sql("select make_timestamp_ltz(1,1,1,1,1,1)").show()
spark.sql("select localtimestamp()").show()
```
The SQL configuration `spark.sql.timestampType` should not work in 3.2
```
spark.conf.set("spark.sql.timestampType", "TIMESTAMP_NTZ")
spark.sql("select make_timestamp(1,1,1,1,1,1)").schema
spark.sql("select to_timestamp('2021-01-01 00:00:00')").schema
spark.sql("select timestamp'2021-01-01 00:00:00'").schema
Seq((1, java.sql.Timestamp.valueOf("2021-01-01 00:00:00"))).toDF("i", "ts").write.partitionBy("ts").parquet("/tmp/test")
spark.read.parquet("/tmp/test").schema
```
LocalDateTime is not supported as a built-in external type:
```
Seq(LocalDateTime.now()).toDF()
org.apache.spark.sql.catalyst.expressions.Literal(java.time.LocalDateTime.now())
org.apache.spark.sql.catalyst.expressions.Literal(0L, TimestampNTZType)
```

Closes #33444 from gengliangwang/banNTZ.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Gengliang Wang 2021-07-21 09:55:09 -07:00 committed by Dongjoon Hyun
parent 7d363733ac
commit 99eb3ff226
15 changed files with 70 additions and 98 deletions

View file

@ -114,14 +114,6 @@ object Encoders {
*/
def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()
/**
* Creates an encoder that serializes instances of the `java.time.LocalDateTime` class
* to the internal representation of nullable Catalyst's TimestampNTZType.
*
* @since 3.2.0
*/
def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder()
/**
* An encoder for nullable timestamp type.
*

View file

@ -35,6 +35,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
/**
* Functions to convert Scala types to Catalyst types and vice versa.
@ -511,7 +512,8 @@ object CatalystTypeConverters {
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case i: Instant => InstantConverter.toCatalyst(i)
case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l)
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case l: LocalDateTime if Utils.isTesting => TimestampNTZConverter.toCatalyst(l)
case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray)

View file

@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
* Type-inference utilities for POJOs and Java collections.
@ -119,7 +120,9 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
case c: Class[_] if c == classOf[java.time.LocalDateTime] => (TimestampNTZType, true)
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case c: Class[_] if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
(TimestampNTZType, true)
case c: Class[_] if c == classOf[java.time.Duration] => (DayTimeIntervalType(), true)
case c: Class[_] if c == classOf[java.time.Period] => (YearMonthIntervalType(), true)
@ -251,7 +254,8 @@ object JavaTypeInference {
case c if c == classOf[java.sql.Timestamp] =>
createDeserializerForSqlTimestamp(path)
case c if c == classOf[java.time.LocalDateTime] =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
createDeserializerForLocalDateTime(path)
case c if c == classOf[java.time.Duration] =>
@ -413,7 +417,8 @@ object JavaTypeInference {
case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
case c if c == classOf[java.time.LocalDateTime] =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting =>
createSerializerForLocalDateTime(inputObject)
case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils
/**
@ -752,7 +753,8 @@ object ScalaReflection extends ScalaReflection {
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) && Utils.isTesting =>
Schema(TimestampNTZType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true)
@ -858,7 +860,9 @@ object ScalaReflection extends ScalaReflection {
StringType -> classOf[UTF8String],
DateType -> classOf[DateType.InternalType],
TimestampType -> classOf[TimestampType.InternalType],
TimestampNTZType -> classOf[TimestampNTZType.InternalType],
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
TimestampNTZType ->
(if (Utils.isTesting) classOf[TimestampNTZType.InternalType] else classOf[java.lang.Object]),
BinaryType -> classOf[BinaryType.InternalType],
CalendarIntervalType -> classOf[CalendarInterval]
)
@ -873,7 +877,9 @@ object ScalaReflection extends ScalaReflection {
DoubleType -> classOf[java.lang.Double],
DateType -> classOf[java.lang.Integer],
TimestampType -> classOf[java.lang.Long],
TimestampNTZType -> classOf[java.lang.Long]
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
TimestampNTZType ->
(if (Utils.isTesting) classOf[java.lang.Long] else classOf[java.lang.Object])
)
def dataTypeJavaClass(dt: DataType): Class[_] = {

View file

@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
@ -317,6 +318,20 @@ object FunctionRegistry {
val FUNC_ALIAS = TreeNodeTag[String]("functionAliasName")
val expressionsForTimestampNTZSupport: Map[String, (ExpressionInfo, FunctionBuilder)] =
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
if (Utils.isTesting) {
Map(
expression[ParseToTimestampNTZ]("to_timestamp_ntz"),
expression[ParseToTimestampLTZ]("to_timestamp_ltz"),
expression[MakeTimestampNTZ]("make_timestamp_ntz"),
expression[MakeTimestampLTZ]("make_timestamp_ltz"),
expression[LocalTimestamp]("localtimestamp")
)
} else {
Map.empty
}
// Note: Whenever we add a new entry here, make sure we also update ExpressionToSQLSuite
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
// misc non-aggregate functions
@ -519,7 +534,6 @@ object FunctionRegistry {
expression[CurrentDate]("current_date"),
expression[CurrentTimestamp]("current_timestamp"),
expression[CurrentTimeZone]("current_timezone"),
expression[LocalTimestamp]("localtimestamp"),
expression[DateDiff]("datediff"),
expression[DateAdd]("date_add"),
expression[DateFormatClass]("date_format"),
@ -542,8 +556,6 @@ object FunctionRegistry {
expression[ParseToDate]("to_date"),
expression[ToUnixTimestamp]("to_unix_timestamp"),
expression[ToUTCTimestamp]("to_utc_timestamp"),
expression[ParseToTimestampNTZ]("to_timestamp_ntz"),
expression[ParseToTimestampLTZ]("to_timestamp_ltz"),
expression[TruncDate]("trunc"),
expression[TruncTimestamp]("date_trunc"),
expression[UnixTimestamp]("unix_timestamp"),
@ -555,8 +567,6 @@ object FunctionRegistry {
expression[SessionWindow]("session_window"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),
expression[MakeTimestampNTZ]("make_timestamp_ntz"),
expression[MakeTimestampLTZ]("make_timestamp_ltz"),
expression[MakeInterval]("make_interval"),
expression[MakeDTInterval]("make_dt_interval"),
expression[MakeYMInterval]("make_ym_interval"),
@ -712,7 +722,7 @@ object FunctionRegistry {
expression[CsvToStructs]("from_csv"),
expression[SchemaOfCsv]("schema_of_csv"),
expression[StructsToCsv]("to_csv")
)
) ++ expressionsForTimestampNTZSupport
val builtin: SimpleFunctionRegistry = {
val fr = new SimpleFunctionRegistry

View file

@ -297,10 +297,6 @@ package object dsl {
/** Creates a new AttributeReference of type timestamp */
def timestamp: AttributeReference = AttributeReference(s, TimestampType, nullable = true)()
/** Creates a new AttributeReference of type timestamp without time zone */
def timestampNTZ: AttributeReference =
AttributeReference(s, TimestampNTZType, nullable = true)()
/** Creates a new AttributeReference of the day-time interval type */
def dayTimeInterval(startField: Byte, endField: Byte): AttributeReference = {
AttributeReference(s, DayTimeIntervalType(startField, endField), nullable = true)()

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
* A factory for constructing encoders that convert external row to/from the Spark SQL
@ -105,7 +106,8 @@ object RowEncoder {
createSerializerForSqlTimestamp(inputObject)
}
case TimestampNTZType => createSerializerForLocalDateTime(inputObject)
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case TimestampNTZType if Utils.isTesting => createSerializerForLocalDateTime(inputObject)
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
@ -230,7 +232,8 @@ object RowEncoder {
} else {
ObjectType(classOf[java.sql.Timestamp])
}
case TimestampNTZType =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case TimestampNTZType if Utils.isTesting =>
ObjectType(classOf[java.time.LocalDateTime])
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
@ -287,7 +290,8 @@ object RowEncoder {
createDeserializerForSqlTimestamp(input)
}
case TimestampNTZType =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case TimestampNTZType if Utils.isTesting =>
createDeserializerForLocalDateTime(input)
case DateType =>

View file

@ -80,7 +80,9 @@ object Literal {
case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale))
case i: Instant => Literal(instantToMicros(i), TimestampType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType)
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case l: LocalDateTime if Utils.isTesting =>
Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType)
case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType())
@ -120,7 +122,8 @@ object Literal {
case _ if clz == classOf[Date] => DateType
case _ if clz == classOf[Instant] => TimestampType
case _ if clz == classOf[Timestamp] => TimestampType
case _ if clz == classOf[LocalDateTime] => TimestampNTZType
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case _ if clz == classOf[LocalDateTime] && Utils.isTesting => TimestampNTZType
case _ if clz == classOf[Duration] => DayTimeIntervalType()
case _ if clz == classOf[Period] => YearMonthIntervalType()
case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
@ -185,7 +188,8 @@ object Literal {
case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale))
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
case TimestampNTZType => create(0L, TimestampNTZType)
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case TimestampNTZType if Utils.isTesting => create(0L, TimestampNTZType)
case it: DayTimeIntervalType => create(0L, it)
case it: YearMonthIntervalType => create(0, it)
case StringType => Literal("")
@ -207,7 +211,9 @@ object Literal {
case ByteType => v.isInstanceOf[Byte]
case ShortType => v.isInstanceOf[Short]
case IntegerType | DateType | _: YearMonthIntervalType => v.isInstanceOf[Int]
case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case TimestampNTZType if Utils.isTesting => v.isInstanceOf[Long]
case LongType | TimestampType | _: DayTimeIntervalType =>
v.isInstanceOf[Long]
case FloatType => v.isInstanceOf[Float]
case DoubleType => v.isInstanceOf[Double]

View file

@ -47,6 +47,7 @@ import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils.isTesting
import org.apache.spark.util.random.RandomSampler
/**
@ -2131,10 +2132,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
val zoneId = getZoneId(conf.sessionLocalTimeZone)
val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, DateType))
specialDate.getOrElse(toLiteral(stringToDate, DateType))
case "TIMESTAMP_NTZ" =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case "TIMESTAMP_NTZ" if isTesting =>
val specialTs = convertSpecialTimestampNTZ(value).map(Literal(_, TimestampNTZType))
specialTs.getOrElse(toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType))
case "TIMESTAMP_LTZ" =>
case "TIMESTAMP_LTZ" if isTesting =>
constructTimestampLTZLiteral(value)
case "TIMESTAMP" =>
SQLConf.get.timestampType match {
@ -2573,8 +2575,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
case ("double", Nil) => DoubleType
case ("date", Nil) => DateType
case ("timestamp", Nil) => SQLConf.get.timestampType
case ("timestamp_ntz", Nil) => TimestampNTZType
case ("timestamp_ltz", Nil) => TimestampType
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType
case ("timestamp_ltz", Nil) if isTesting => TimestampType
case ("string", Nil) => StringType
case ("character" | "char", length :: Nil) => CharType(length.getText.toInt)
case ("varchar", length :: Nil) => VarcharType(length.getText.toInt)

View file

@ -2884,9 +2884,10 @@ object SQLConf {
s"and type literal. Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
"use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it as " +
s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME ZONE. " +
"Before the 3.2.0 release, Spark only supports the TIMESTAMP WITH " +
"Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " +
"LOCAL TIME ZONE type.")
.version("3.2.0")
.version("3.3.0")
.internal()
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(TimestampTypes.values.map(_.toString))
@ -3975,12 +3976,14 @@ class SQLConf extends Serializable with Logging {
def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes.
// The configuration `TIMESTAMP_TYPE` is only effective for testing in Spark 3.2.
case "TIMESTAMP_NTZ" if Utils.isTesting =>
TimestampNTZType
case _ =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
TimestampType
case "TIMESTAMP_NTZ" =>
TimestampNTZType
}
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

View file

@ -82,9 +82,6 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits {
/** @since 3.0.0 */
implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = Encoders.LOCALDATE
/** @since 3.2.0 */
implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = Encoders.LOCALDATETIME
/** @since 2.2.0 */
implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = Encoders.TIMESTAMP

View file

@ -554,14 +554,6 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(data, ds.collectAsList());
}
@Test
public void testLocalDateTimeEncoder() {
Encoder<LocalDateTime> encoder = Encoders.LOCALDATETIME();
List<LocalDateTime> data = Arrays.asList(LocalDateTime.of(1, 1, 1, 1, 1));
Dataset<LocalDateTime> ds = spark.createDataset(data, encoder);
Assert.assertEquals(data, ds.collectAsList());
}
@Test
public void testDurationEncoder() {
Encoder<Duration> encoder = Encoders.DURATION();

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql
import java.time.{Duration, LocalDateTime, Period}
import java.time.{Duration, Period}
import scala.util.Random
@ -1416,17 +1416,6 @@ class DataFrameAggregateSuite extends QueryTest
val df2 = Seq(Period.ofYears(1)).toDF("a").groupBy("a").count()
checkAnswer(df2, Row(Period.ofYears(1), 1))
}
test("SPARK-36054: Support group by TimestampNTZ column") {
val ts1 = "2021-01-01T00:00:00"
val ts2 = "2021-01-01T00:00:01"
val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse)
val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts")
val expectedSchema =
new StructType().add(StructField("ts", TimestampNTZType)).add("count", LongType, false)
assert (df.schema == expectedSchema)
checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2), Row(LocalDateTime.parse(ts2), 1)))
}
}
case class B(c: Option[Double])

View file

@ -2008,11 +2008,6 @@ class DatasetSuite extends QueryTest
checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil)
}
test("SPARK-35664: implicit encoder for java.time.LocalDateTime") {
val localDateTime = java.time.LocalDateTime.parse("2021-06-08T12:31:58.999999")
assert(Seq(localDateTime).toDS().head() === localDateTime)
}
test("SPARK-34605: implicit encoder for java.time.Duration") {
val duration = java.time.Duration.ofMinutes(10)
assert(spark.range(1).map { _ => duration }.head === duration)

View file

@ -847,34 +847,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
}
}
test("SPARK-35674: using java.time.LocalDateTime in UDF") {
// Regular case
val input = Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00")).toDF("dateTime")
val plusYear = udf((l: java.time.LocalDateTime) => l.plusYears(1))
val result = input.select(plusYear($"dateTime").as("newDateTime"))
checkAnswer(result, Row(java.time.LocalDateTime.parse("2022-01-01T00:00:00")) :: Nil)
assert(result.schema === new StructType().add("newDateTime", TimestampNTZType))
// UDF produces `null`
val nullFunc = udf((_: java.time.LocalDateTime) => null.asInstanceOf[java.time.LocalDateTime])
val nullResult = input.select(nullFunc($"dateTime").as("nullDateTime"))
checkAnswer(nullResult, Row(null) :: Nil)
assert(nullResult.schema === new StructType().add("nullDateTime", TimestampNTZType))
// Input parameter of UDF is null
val nullInput = Seq(null.asInstanceOf[java.time.LocalDateTime]).toDF("nullDateTime")
val constDuration = udf((_: java.time.LocalDateTime) =>
java.time.LocalDateTime.parse("2021-01-01T00:00:00"))
val constResult = nullInput.select(constDuration($"nullDateTime").as("firstDayOf2021"))
checkAnswer(constResult, Row(java.time.LocalDateTime.parse("2021-01-01T00:00:00")) :: Nil)
assert(constResult.schema === new StructType().add("firstDayOf2021", TimestampNTZType))
// Error in the conversion of UDF result to the internal representation of timestamp without
// time zone
val overflowFunc = udf((l: java.time.LocalDateTime) => l.plusDays(Long.MaxValue))
val e = intercept[SparkException] {
input.select(overflowFunc($"dateTime")).collect()
}.getCause.getCause
assert(e.isInstanceOf[java.lang.ArithmeticException])
}
test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") {
// Regular case
val input = Seq(java.time.Duration.ofHours(23)).toDF("d")