diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index 8b23bed6d2..24fe9fa42c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -320,13 +320,14 @@ object RebaseDateTime { * Julian calendar: 1582-01-01 00:00:00.123456 -> -12243196799876544 * The code below converts -12244061221876544 to -12243196799876544. * - * @param zoneId The time zone ID at which the rebasing should be performed. + * @param tz The time zone at which the rebasing should be performed. * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z' * in Proleptic Gregorian calendar. It can be negative. * @return The rebased microseconds since the epoch in Julian calendar. */ - private[sql] def rebaseGregorianToJulianMicros(zoneId: ZoneId, micros: Long): Long = { + private[sql] def rebaseGregorianToJulianMicros(tz: TimeZone, micros: Long): Long = { val instant = microsToInstant(micros) + val zoneId = tz.toZoneId val zonedDateTime = instant.atZone(zoneId) var ldt = zonedDateTime.toLocalDateTime if (ldt.isAfter(julianEndTs) && ldt.isBefore(gregorianStartTs)) { @@ -337,6 +338,7 @@ object RebaseDateTime { .setCalendarType("gregory") .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth) .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond) + .setTimeZone(tz) .build() // A local timestamp can have 2 instants in the cases of switching from: // 1. Summer to winter time. @@ -379,7 +381,7 @@ object RebaseDateTime { val tzId = timeZone.getID val rebaseRecord = gregJulianRebaseMap.getOrNull(tzId) if (rebaseRecord == null || micros < rebaseRecord.switches(0)) { - rebaseGregorianToJulianMicros(timeZone.toZoneId, micros) + rebaseGregorianToJulianMicros(timeZone, micros) } else { rebaseMicros(rebaseRecord, micros) } @@ -401,17 +403,17 @@ object RebaseDateTime { * Proleptic Gregorian calendar: 1582-01-01 00:00:00.123456 -> -12244061221876544 * The code below converts -12243196799876544 to -12244061221876544. * - * @param zoneId The time zone ID at which the rebasing should be performed. + * @param tz The time zone at which the rebasing should be performed. * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z' * in the Julian calendar. It can be negative. * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar. */ - private[sql] def rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long = { + private[sql] def rebaseJulianToGregorianMicros(tz: TimeZone, micros: Long): Long = { val cal = new Calendar.Builder() - // `gregory` is a hybrid calendar that supports both - // the Julian and Gregorian calendar systems + // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems .setCalendarType("gregory") .setInstant(microsToMillis(micros)) + .setTimeZone(tz) .build() val localDateTime = LocalDateTime.of( cal.get(YEAR), @@ -427,6 +429,7 @@ object RebaseDateTime { (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt) .`with`(ChronoField.ERA, cal.get(ERA)) .plusDays(cal.get(DAY_OF_MONTH) - 1) + val zoneId = tz.toZoneId val zonedDateTime = localDateTime.atZone(zoneId) // In the case of local timestamp overlapping, we need to choose the correct time instant // which is related to the original local timestamp. We look ahead of 1 day, and if the next @@ -479,7 +482,7 @@ object RebaseDateTime { val tzId = timeZone.getID val rebaseRecord = julianGregRebaseMap.getOrNull(tzId) if (rebaseRecord == null || micros < rebaseRecord.switches(0)) { - rebaseJulianToGregorianMicros(timeZone.toZoneId, micros) + rebaseJulianToGregorianMicros(timeZone, micros) } else { rebaseMicros(rebaseRecord, micros) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala index b3363169b3..254bf01c89 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala @@ -201,21 +201,21 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { test("optimization of micros rebasing - Gregorian to Julian") { outstandingZoneIds.foreach { zid => withClue(s"zone id = $zid") { - withDefaultTimeZone(zid) { - val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0) - .atZone(zid) - .toInstant) - val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0) - .atZone(zid) - .toInstant) - var micros = start - do { - val rebased = rebaseGregorianToJulianMicros(zid, micros) - val rebasedAndOptimized = rebaseGregorianToJulianMicros(micros) - assert(rebasedAndOptimized === rebased) - micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong - } while (micros <= end) - } + val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0) + .atZone(zid) + .toInstant) + val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0) + .atZone(zid) + .toInstant) + var micros = start + do { + val rebased = rebaseGregorianToJulianMicros(TimeZone.getTimeZone(zid), micros) + val rebasedAndOptimized = withDefaultTimeZone(zid) { + rebaseGregorianToJulianMicros(micros) + } + assert(rebasedAndOptimized === rebased) + micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong + } while (micros <= end) } } } @@ -223,26 +223,26 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { test("optimization of micros rebasing - Julian to Gregorian") { outstandingZoneIds.foreach { zid => withClue(s"zone id = $zid") { - withDefaultTimeZone(zid) { - val start = rebaseGregorianToJulianMicros( - instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant)) - val end = rebaseGregorianToJulianMicros( - instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant)) - var micros = start - do { - val rebased = rebaseJulianToGregorianMicros(zid, micros) - val rebasedAndOptimized = rebaseJulianToGregorianMicros(micros) - assert(rebasedAndOptimized === rebased) - micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong - } while (micros <= end) - } + val start = rebaseGregorianToJulianMicros( + instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant)) + val end = rebaseGregorianToJulianMicros( + instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant)) + var micros = start + do { + val rebased = rebaseJulianToGregorianMicros(TimeZone.getTimeZone(zid), micros) + val rebasedAndOptimized = withDefaultTimeZone(zid) { + rebaseJulianToGregorianMicros(micros) + } + assert(rebasedAndOptimized === rebased) + micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong + } while (micros <= end) } } } private def generateRebaseJson( - adjustFunc: Long => Long, - rebaseFunc: (ZoneId, Long) => Long, + adjustFunc: (TimeZone, Long) => Long, + rebaseFunc: (TimeZone, Long) => Long, dir: String, fileName: String): Unit = { import java.nio.file.{Files, Paths} @@ -260,14 +260,15 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { .sortBy(_.getId) .foreach { zid => withDefaultTimeZone(zid) { - val start = adjustFunc(instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0) - .atZone(zid) - .toInstant)) + val tz = TimeZone.getTimeZone(zid) + val start = adjustFunc( + tz, + instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant)) // sun.util.calendar.ZoneInfo resolves DST after 2037 year incorrectly. // See https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8073446 - val end = adjustFunc(instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0) - .atZone(zid) - .toInstant)) + val end = adjustFunc( + tz, + instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0).atZone(zid).toInstant)) var micros = start var diff = Long.MaxValue @@ -276,7 +277,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { val switches = new ArrayBuffer[Long]() val diffs = new ArrayBuffer[Long]() while (micros < end) { - val rebased = rebaseFunc(zid, micros) + val rebased = rebaseFunc(tz, micros) val curDiff = rebased - micros if (curDiff != diff) { if (step > MICROS_PER_SECOND) { @@ -308,7 +309,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { ignore("generate 'gregorian-julian-rebase-micros.json'") { generateRebaseJson( - adjustFunc = identity[Long], + adjustFunc = (_: TimeZone, micros: Long) => micros, rebaseFunc = rebaseGregorianToJulianMicros, dir = "/Users/maximgekk/tmp", fileName = "gregorian-julian-rebase-micros.json") @@ -383,26 +384,27 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { test("rebase not-existed timestamps in the hybrid calendar") { outstandingZoneIds.foreach { zid => - withDefaultTimeZone(zid) { - Seq( - "1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999", - "1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000", - "1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001", - "1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000", - "1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999", - "1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001", - "1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455", - "1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111", - "1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415", - "1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321", - "1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999", - "1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000" - ).foreach { case (gregTs, hybridTs) => - withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") { - val hybridMicros = parseToJulianMicros(hybridTs) - val gregorianMicros = parseToGregMicros(gregTs, zid) + Seq( + "1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999", + "1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000", + "1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001", + "1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000", + "1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999", + "1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001", + "1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455", + "1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111", + "1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415", + "1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321", + "1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999", + "1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000" + ).foreach { case (gregTs, hybridTs) => + withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") { + val hybridMicros = withDefaultTimeZone(zid) { parseToJulianMicros(hybridTs) } + val gregorianMicros = parseToGregMicros(gregTs, zid) - assert(rebaseGregorianToJulianMicros(zid, gregorianMicros) === hybridMicros) + val tz = TimeZone.getTimeZone(zid) + assert(rebaseGregorianToJulianMicros(tz, gregorianMicros) === hybridMicros) + withDefaultTimeZone(zid) { assert(rebaseGregorianToJulianMicros(gregorianMicros) === hybridMicros) } } @@ -416,38 +418,39 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { // clocks were moved backward to become Sunday, 18 November, 1945 01:00:00 AM. // In this way, the overlap happened w/o Daylight Saving Time. val hkZid = getZoneId("Asia/Hong_Kong") + var expected = "1945-11-18 01:30:00.0" + var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0) + var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant) + var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant) + var overlapInterval = MICROS_PER_HOUR + if (earlierMicros + overlapInterval != laterMicros) { + // Old JDK might have an outdated time zone database. + // See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition + // from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00" + expected = "1945-09-14 23:30:00.0" + ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0) + earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant) + laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant) + // If time zone db doesn't have overlapping at all, set the overlap interval to zero. + overlapInterval = laterMicros - earlierMicros + } + val hkTz = TimeZone.getTimeZone(hkZid) + val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkTz, earlierMicros) + val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkTz, laterMicros) + assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros) withDefaultTimeZone(hkZid) { - var expected = "1945-11-18 01:30:00.0" - var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0) - var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant) - var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant) - var overlapInterval = MICROS_PER_HOUR - if (earlierMicros + overlapInterval != laterMicros) { - // Old JDK might have an outdated time zone database. - // See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition - // from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00" - expected = "1945-09-14 23:30:00.0" - ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0) - earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant) - laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant) - // If time zone db doesn't have overlapping at all, set the overlap interval to zero. - overlapInterval = laterMicros - earlierMicros - } - val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkZid, earlierMicros) - val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkZid, laterMicros) def toTsStr(micros: Long): String = toJavaTimestamp(micros).toString assert(toTsStr(rebasedEarlierMicros) === expected) assert(toTsStr(rebasedLaterMicros) === expected) - assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros) // Check optimized rebasing assert(rebaseGregorianToJulianMicros(earlierMicros) === rebasedEarlierMicros) assert(rebaseGregorianToJulianMicros(laterMicros) === rebasedLaterMicros) // Check reverse rebasing assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros) assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros) - // Check reverse not-optimized rebasing - assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros) - assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros) } + // Check reverse not-optimized rebasing + assert(rebaseJulianToGregorianMicros(hkTz, rebasedEarlierMicros) === earlierMicros) + assert(rebaseJulianToGregorianMicros(hkTz, rebasedLaterMicros) === laterMicros) } }