[SPARK-34903][SQL] Return day-time interval from timestamps subtraction

### What changes were proposed in this pull request?
Modify the `SubtractTimestamps` expression to return values of `DayTimeIntervalType` when `spark.sql.legacy.interval.enabled` is set to `false` (which is the default).

### Why are the changes needed?
To conform to the ANSI SQL standard which requires ANSI intervals as the result of timestamps subtraction, see
<img width="656" alt="Screenshot 2021-03-29 at 19 09 34" src="https://user-images.githubusercontent.com/1580697/112866455-7e2f0d00-90c2-11eb-96e6-3feb7eea7e09.png">

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *DateTimeUtilsSuite"
$ build/sbt "test:testOnly *DateExpressionsSuite"
$ build/sbt "test:testOnly *ColumnExpressionSuite"
```
and some tests from `SQLQueryTestSuite`:
```
$ build/sbt "sql/testOnly *SQLQueryTestSuite -- -z timestamp.sql"
$ build/sbt "sql/testOnly *SQLQueryTestSuite -- -z datetime.sql"
$ build/sbt "sql/testOnly *SQLQueryTestSuite -- -z interval.sql"
```

Closes #32016 from MaxGekk/subtract-timestamps-to-intervals.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Max Gekk 2021-04-01 10:27:58 +03:00
parent 89ae83d19b
commit 5911faa0d4
7 changed files with 201 additions and 36 deletions

View file

@ -71,6 +71,8 @@ license: |
- In Spark 3.2, the dates subtraction expression such as `date1 - date2` returns values of `DayTimeIntervalType`. In Spark 3.1 and earlier, the returned type is `CalendarIntervalType`. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.interval.enabled` to `true`. - In Spark 3.2, the dates subtraction expression such as `date1 - date2` returns values of `DayTimeIntervalType`. In Spark 3.1 and earlier, the returned type is `CalendarIntervalType`. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.interval.enabled` to `true`.
- In Spark 3.2, the timestamps subtraction expression such as `timestamp '2021-03-31 23:48:00' - timestamp '2021-01-01 00:00:00'` returns values of `DayTimeIntervalType`. In Spark 3.1 and earlier, the type of the same expression is `CalendarIntervalType`. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.interval.enabled` to `true`.
## Upgrading from Spark SQL 3.0 to 3.1 ## Upgrading from Spark SQL 3.0 to 3.1
- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`. - In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.

View file

@ -626,10 +626,10 @@ abstract class TypeCoercionBase {
case d @ DateSub(TimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(TimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType))
case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType))
case s @ SubtractTimestamps(DateType(), _) => case s @ SubtractTimestamps(DateType(), _, _, _) =>
s.copy(endTimestamp = Cast(s.endTimestamp, TimestampType)) s.copy(left = Cast(s.left, TimestampType))
case s @ SubtractTimestamps(_, DateType()) => case s @ SubtractTimestamps(_, DateType(), _, _) =>
s.copy(startTimestamp = Cast(s.startTimestamp, TimestampType)) s.copy(right = Cast(s.right, TimestampType))
case t @ TimeAdd(StringType(), _, _) => t.copy(start = Cast(t.start, TimestampType)) case t @ TimeAdd(StringType(), _, _) => t.copy(start = Cast(t.start, TimestampType))
} }

View file

@ -2352,25 +2352,60 @@ case class Extract(field: Expression, source: Expression, child: Expression)
} }
/** /**
* Returns the interval from startTimestamp to endTimestamp in which the `months` and `day` field * Returns the interval from `right` to `left` timestamps.
* is set to 0 and the `microseconds` field is initialized to the microsecond difference * - When the SQL config `spark.sql.legacy.interval.enabled` is `true`,
* between the given timestamps. * it returns `CalendarIntervalType` in which the months` and `day` field is set to 0 and
* the `microseconds` field is initialized to the microsecond difference between
* the given timestamps.
* - Otherwise the expression returns `DayTimeIntervalType` with the difference in microseconds
* between given timestamps.
*/ */
case class SubtractTimestamps(endTimestamp: Expression, startTimestamp: Expression) case class SubtractTimestamps(
extends BinaryExpression with ExpectsInputTypes with NullIntolerant { left: Expression,
right: Expression,
legacyInterval: Boolean,
timeZoneId: Option[String] = None)
extends BinaryExpression
with TimeZoneAwareExpression
with ExpectsInputTypes
with NullIntolerant {
def this(endTimestamp: Expression, startTimestamp: Expression) =
this(endTimestamp, startTimestamp, SQLConf.get.legacyIntervalEnabled)
override def left: Expression = endTimestamp
override def right: Expression = startTimestamp
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, TimestampType) override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, TimestampType)
override def dataType: DataType = CalendarIntervalType override def dataType: DataType =
if (legacyInterval) CalendarIntervalType else DayTimeIntervalType
override def nullSafeEval(end: Any, start: Any): Any = { override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
new CalendarInterval(0, 0, end.asInstanceOf[Long] - start.asInstanceOf[Long]) copy(timeZoneId = Option(timeZoneId))
@transient
private lazy val evalFunc: (Long, Long) => Any = legacyInterval match {
case false => (leftMicros, rightMicros) =>
subtractTimestamps(leftMicros, rightMicros, zoneId)
case true => (leftMicros, rightMicros) =>
new CalendarInterval(0, 0, leftMicros - rightMicros)
} }
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { override def nullSafeEval(leftMicros: Any, rightMicros: Any): Any = {
defineCodeGen(ctx, ev, (end, start) => evalFunc(leftMicros.asInstanceOf[Long], rightMicros.asInstanceOf[Long])
s"new org.apache.spark.unsafe.types.CalendarInterval(0, 0, $end - $start)") }
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = legacyInterval match {
case false =>
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (l, r) => s"""$dtu.subtractTimestamps($l, $r, $zid)""")
case true =>
defineCodeGen(ctx, ev, (end, start) =>
s"new org.apache.spark.unsafe.types.CalendarInterval(0, 0, $end - $start)")
}
}
object SubtractTimestamps {
def apply(left: Expression, right: Expression): SubtractTimestamps = {
new SubtractTimestamps(left, right)
} }
} }

View file

@ -970,4 +970,20 @@ object DateTimeUtils {
val days = period.getDays val days = period.getDays
new CalendarInterval(months, days, 0) new CalendarInterval(months, days, 0)
} }
/**
* Subtracts two timestamps expressed as microseconds since 1970-01-01 00:00:00Z, and returns
* the difference in microseconds between local timestamps at the given time zone.
*
* @param endMicros The end timestamp as microseconds since the epoch, exclusive
* @param startMicros The end timestamp as microseconds since the epoch, inclusive
* @param zoneId The time zone ID in which the subtraction is performed
* @return The difference in microseconds between local timestamps corresponded to the input
* instants `end` and `start`.
*/
def subtractTimestamps(endMicros: Long, startMicros: Long, zoneId: ZoneId): Long = {
val localEndTs = getLocalDateTime(endMicros, zoneId)
val localStartTs = getLocalDateTime(startMicros, zoneId)
ChronoUnit.MICROS.between(localStartTs, localEndTs)
}
} }

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, TimeZoneUTC}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@ -1174,22 +1174,66 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
} }
} }
test("timestamps difference") { test("SPARK-34903: timestamps difference") {
val end = Instant.parse("2019-10-04T11:04:01.123456Z") val end = Instant.parse("2019-10-04T11:04:01.123456Z")
checkEvaluation(SubtractTimestamps(Literal(end), Literal(end)), outstandingTimezonesIds.foreach { tz =>
new CalendarInterval(0, 0, 0)) def sub(left: Instant, right: Instant): Expression = {
checkEvaluation(SubtractTimestamps(Literal(end), Literal(Instant.EPOCH)), SubtractTimestamps(
IntervalUtils.stringToInterval(UTF8String.fromString("interval " + Literal(left),
"436163 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds"))) Literal(right),
checkEvaluation(SubtractTimestamps(Literal(Instant.EPOCH), Literal(end)), legacyInterval = true,
IntervalUtils.stringToInterval(UTF8String.fromString("interval " + timeZoneId = Some(tz))
"-436163 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds"))) }
checkEvaluation( checkEvaluation(sub(end, end), new CalendarInterval(0, 0, 0))
SubtractTimestamps( checkEvaluation(sub(end, Instant.EPOCH),
Literal(Instant.parse("9999-12-31T23:59:59.999999Z")), IntervalUtils.stringToInterval(UTF8String.fromString("interval " +
Literal(Instant.parse("0001-01-01T00:00:00Z"))), "436163 hours 4 minutes 1 seconds 123 milliseconds 456 microseconds")))
IntervalUtils.stringToInterval(UTF8String.fromString("interval " + checkEvaluation(sub(Instant.EPOCH, end),
"87649415 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds"))) IntervalUtils.stringToInterval(UTF8String.fromString("interval " +
"-436163 hours -4 minutes -1 seconds -123 milliseconds -456 microseconds")))
checkEvaluation(
sub(
Instant.parse("9999-12-31T23:59:59.999999Z"),
Instant.parse("0001-01-01T00:00:00Z")),
IntervalUtils.stringToInterval(UTF8String.fromString("interval " +
"87649415 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds")))
}
outstandingTimezonesIds.foreach { tz =>
def check(left: Instant, right: Instant): Unit = {
checkEvaluation(
SubtractTimestamps(
Literal(left),
Literal(right),
legacyInterval = false,
timeZoneId = Some(tz)),
Duration.between(
right.atZone(getZoneId(tz)).toLocalDateTime,
left.atZone(getZoneId(tz)).toLocalDateTime))
}
check(end, end)
check(end, Instant.EPOCH)
check(Instant.EPOCH, end)
check(Instant.parse("9999-12-31T23:59:59.999999Z"), Instant.parse("0001-01-01T00:00:00Z"))
val errMsg = intercept[ArithmeticException] {
checkEvaluation(
SubtractTimestamps(
Literal(Instant.MIN),
Literal(Instant.MAX),
legacyInterval = false,
timeZoneId = Some(tz)),
Duration.ZERO)
}.getMessage
assert(errMsg.contains("overflow"))
Seq(false, true).foreach { legacy =>
checkConsistencyBetweenInterpretedAndCodegen(
(end: Expression, start: Expression) => SubtractTimestamps(end, start, legacy, Some(tz)),
TimestampType, TimestampType)
}
}
} }
test("SPARK-34896: subtract dates") { test("SPARK-34896: subtract dates") {

View file

@ -746,4 +746,45 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
// 2019-11-3 is the start of Pacific Standard Time // 2019-11-3 is the start of Pacific Standard Time
date(2019, 11, 3, 12, 0, 0, 123000, LA)) date(2019, 11, 3, 12, 0, 0, 123000, LA))
} }
test("SPARK-34903: subtract timestamps") {
DateTimeTestUtils.outstandingZoneIds.foreach { zid =>
Seq(
// 1000-02-29 exists in Julian calendar because 1000 is a leap year
(LocalDateTime.of(1000, 2, 28, 1, 2, 3, 456789000),
LocalDateTime.of(1000, 3, 1, 1, 2, 3, 456789000)) -> TimeUnit.DAYS.toMicros(1),
// The range 1582-10-04 .. 1582-10-15 doesn't exist in Julian calendar
(LocalDateTime.of(1582, 10, 4, 23, 59, 59, 999999000),
LocalDateTime.of(1582, 10, 15, 23, 59, 59, 999999000)) -> TimeUnit.DAYS.toMicros(11),
// America/Los_Angeles -08:00 zone offset
(LocalDateTime.of(1883, 11, 20, 0, 0, 0, 123456000),
// America/Los_Angeles -08:00 zone offset
LocalDateTime.of(1883, 11, 10, 0, 0, 0)) -> (TimeUnit.DAYS.toMicros(-10) - 123456),
// No difference between Proleptic Gregorian and Julian calendars after 1900-01-01
(LocalDateTime.of(1900, 1, 1, 0, 0, 0, 1000),
LocalDateTime.of(1899, 12, 31, 23, 59, 59, 999999000)) -> -2,
// The 'Asia/Hong_Kong' time zone switched from 'Japan Standard Time' (JST = UTC+9)
// to 'Hong Kong Time' (HKT = UTC+8). After Sunday, 18 November, 1945 01:59:59 AM,
// clocks were moved backward to become Sunday, 18 November, 1945 01:00:00 AM.
// In this way, the overlap happened w/o Daylight Saving Time.
(LocalDateTime.of(1945, 11, 18, 0, 30, 30),
LocalDateTime.of(1945, 11, 18, 1, 30, 30)) -> TimeUnit.HOURS.toMicros(1),
(LocalDateTime.of(1945, 11, 18, 2, 0, 0),
LocalDateTime.of(1945, 11, 18, 1, 0, 0)) -> TimeUnit.HOURS.toMicros(-1),
// The epoch has zero offset in microseconds
(LocalDateTime.of(1970, 1, 1, 0, 0, 0), LocalDateTime.of(1970, 1, 1, 0, 0, 0)) -> 0,
// 2020 is a leap year
(LocalDateTime.of(2020, 2, 29, 0, 0, 0),
LocalDateTime.of(2021, 3, 1, 0, 0, 0)) -> TimeUnit.DAYS.toMicros(366),
// Daylight saving in America/Los_Angeles: from winter to summer time
(LocalDateTime.of(2021, 3, 14, 1, 0, 0), LocalDateTime.of(2021, 3, 14, 3, 0, 0)) ->
TimeUnit.HOURS.toMicros(2)
).foreach { case ((start, end), expected) =>
val startMicros = localDateTimeToMicros(start, zid)
val endMicros = localDateTimeToMicros(end, zid)
val result = subtractTimestamps(endMicros, startMicros, zid)
assert(result === expected)
}
}
}
} }

View file

@ -30,6 +30,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.UpdateFieldsBenchmark._ import org.apache.spark.sql.UpdateFieldsBenchmark._
import org.apache.spark.sql.catalyst.expressions.{InSet, Literal, NamedExpression} import org.apache.spark.sql.catalyst.expressions.{InSet, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{outstandingTimezonesIds, outstandingZoneIds} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{outstandingTimezonesIds, outstandingZoneIds}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
@ -2528,7 +2529,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
} }
} }
test("SPARK-34761: add/subtract a day-time interval to/from a timestamp") { test("SPARK-34761, SPARK-34903: add/subtract a day-time interval to/from a timestamp") {
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
outstandingZoneIds.foreach { zid => outstandingZoneIds.foreach { zid =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) { withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
@ -2553,8 +2554,9 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
val result = expected.atZone(zid).toInstant val result = expected.atZone(zid).toInstant
val df = Seq((ts, duration, result)).toDF("ts", "interval", "result") val df = Seq((ts, duration, result)).toDF("ts", "interval", "result")
checkAnswer( checkAnswer(
df.select($"ts" + $"interval", $"interval" + $"ts", $"result" - $"interval"), df.select($"ts" + $"interval", $"interval" + $"ts", $"result" - $"interval",
Row(result, result, ts)) $"result" - $"ts"),
Row(result, result, ts, duration))
} }
} }
} }
@ -2748,4 +2750,29 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
assert(e.getMessage.contains("long overflow")) assert(e.getMessage.contains("long overflow"))
} }
} }
test("SPARK-34903: Return day-time interval from timestamps subtraction") {
outstandingTimezonesIds.foreach { tz =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
checkAnswer(
sql("select timestamp '2021-03-31 19:11:10' - timestamp '2021-03-01 19:11:10'"),
Row(Duration.ofDays(30)))
checkAnswer(
Seq((Instant.parse("2021-03-31T00:01:02Z"), Instant.parse("2021-04-01T00:00:00Z")))
.toDF("start", "end").select($"end" - $"start" < Duration.ofDays(1)),
Row(true))
checkAnswer(
Seq((Instant.parse("2021-03-31T00:01:02.777Z"), Duration.ofMillis(333)))
.toDF("ts", "i")
.select(($"ts" + $"i") - $"ts"),
Row(Duration.ofMillis(333)))
checkAnswer(
Seq((LocalDateTime.of(2021, 3, 31, 10, 0, 0)
.atZone(DateTimeUtils.getZoneId(tz)).toInstant, LocalDate.of(2020, 3, 31)))
.toDF("ts", "d")
.select($"ts" - $"d"),
Row(Duration.ofDays(365).plusHours(10)))
}
}
}
} }