[SPARK-31984][SQL] Make micros rebasing functions via local timestamps pure
### What changes were proposed in this pull request? 1. Set the given time zone as the first parameter of `RebaseDateTime`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` to Java 7 `GregorianCalendar`. ```scala val cal = new Calendar.Builder() // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems .setCalendarType("gregory") ... .setTimeZone(tz) .build() ``` This makes the instance of the calendar independent from the default JVM time zone. 2. Change type of the first parameter from `ZoneId` to `TimeZone`. This allows to avoid unnecessary conversion from `TimeZone` to `ZoneId`, for example in ```scala def rebaseJulianToGregorianMicros(micros: Long): Long = { ... if (rebaseRecord == null || micros < rebaseRecord.switches(0)) { rebaseJulianToGregorianMicros(timeZone.toZoneId, micros) ``` and back to `TimeZone` inside of `rebaseJulianToGregorianMicros(zoneId: ZoneId, ...)` 3. Modify tests in `RebaseDateTimeSuite`, and set the default JVM time zone only for functions that depend on it. ### Why are the changes needed? 1. Ignoring passed parameter and using a global variable is bad practice. 2. Dependency from the global state doesn't allow to run the functions in parallel otherwise there is non-zero probability that the functions may return wrong result if the default JVM is changed during their execution. 3. This open opportunity for parallelisation of JSON files generation `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`. Currently, the tests `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'` generate the JSON files by iterating over all time zones sequentially w/ step of 1 week. Due to the large step, we can miss some spikes in diffs between 2 calendars (Java 8 Gregorian and Java 7 hybrid calendars) as the PR https://github.com/apache/spark/pull/28787 fixed and https://github.com/apache/spark/pull/28816 should fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running existing tests from `RebaseDateTimeSuite`. Closes #28824 from MaxGekk/pure-micros-rebasing. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
d24d27f1bc
commit
6e9ff72195
|
@ -320,13 +320,14 @@ object RebaseDateTime {
|
|||
* Julian calendar: 1582-01-01 00:00:00.123456 -> -12243196799876544
|
||||
* The code below converts -12244061221876544 to -12243196799876544.
|
||||
*
|
||||
* @param zoneId The time zone ID at which the rebasing should be performed.
|
||||
* @param tz The time zone at which the rebasing should be performed.
|
||||
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
|
||||
* in Proleptic Gregorian calendar. It can be negative.
|
||||
* @return The rebased microseconds since the epoch in Julian calendar.
|
||||
*/
|
||||
private[sql] def rebaseGregorianToJulianMicros(zoneId: ZoneId, micros: Long): Long = {
|
||||
private[sql] def rebaseGregorianToJulianMicros(tz: TimeZone, micros: Long): Long = {
|
||||
val instant = microsToInstant(micros)
|
||||
val zoneId = tz.toZoneId
|
||||
val zonedDateTime = instant.atZone(zoneId)
|
||||
var ldt = zonedDateTime.toLocalDateTime
|
||||
if (ldt.isAfter(julianEndTs) && ldt.isBefore(gregorianStartTs)) {
|
||||
|
@ -337,6 +338,7 @@ object RebaseDateTime {
|
|||
.setCalendarType("gregory")
|
||||
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
|
||||
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
|
||||
.setTimeZone(tz)
|
||||
.build()
|
||||
// A local timestamp can have 2 instants in the cases of switching from:
|
||||
// 1. Summer to winter time.
|
||||
|
@ -379,7 +381,7 @@ object RebaseDateTime {
|
|||
val tzId = timeZone.getID
|
||||
val rebaseRecord = gregJulianRebaseMap.getOrNull(tzId)
|
||||
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
|
||||
rebaseGregorianToJulianMicros(timeZone.toZoneId, micros)
|
||||
rebaseGregorianToJulianMicros(timeZone, micros)
|
||||
} else {
|
||||
rebaseMicros(rebaseRecord, micros)
|
||||
}
|
||||
|
@ -401,17 +403,17 @@ object RebaseDateTime {
|
|||
* Proleptic Gregorian calendar: 1582-01-01 00:00:00.123456 -> -12244061221876544
|
||||
* The code below converts -12243196799876544 to -12244061221876544.
|
||||
*
|
||||
* @param zoneId The time zone ID at which the rebasing should be performed.
|
||||
* @param tz The time zone at which the rebasing should be performed.
|
||||
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
|
||||
* in the Julian calendar. It can be negative.
|
||||
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
|
||||
*/
|
||||
private[sql] def rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long = {
|
||||
private[sql] def rebaseJulianToGregorianMicros(tz: TimeZone, micros: Long): Long = {
|
||||
val cal = new Calendar.Builder()
|
||||
// `gregory` is a hybrid calendar that supports both
|
||||
// the Julian and Gregorian calendar systems
|
||||
// `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
|
||||
.setCalendarType("gregory")
|
||||
.setInstant(microsToMillis(micros))
|
||||
.setTimeZone(tz)
|
||||
.build()
|
||||
val localDateTime = LocalDateTime.of(
|
||||
cal.get(YEAR),
|
||||
|
@ -427,6 +429,7 @@ object RebaseDateTime {
|
|||
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
|
||||
.`with`(ChronoField.ERA, cal.get(ERA))
|
||||
.plusDays(cal.get(DAY_OF_MONTH) - 1)
|
||||
val zoneId = tz.toZoneId
|
||||
val zonedDateTime = localDateTime.atZone(zoneId)
|
||||
// In the case of local timestamp overlapping, we need to choose the correct time instant
|
||||
// which is related to the original local timestamp. We look ahead of 1 day, and if the next
|
||||
|
@ -479,7 +482,7 @@ object RebaseDateTime {
|
|||
val tzId = timeZone.getID
|
||||
val rebaseRecord = julianGregRebaseMap.getOrNull(tzId)
|
||||
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
|
||||
rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
|
||||
rebaseJulianToGregorianMicros(timeZone, micros)
|
||||
} else {
|
||||
rebaseMicros(rebaseRecord, micros)
|
||||
}
|
||||
|
|
|
@ -201,21 +201,21 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
test("optimization of micros rebasing - Gregorian to Julian") {
|
||||
outstandingZoneIds.foreach { zid =>
|
||||
withClue(s"zone id = $zid") {
|
||||
withDefaultTimeZone(zid) {
|
||||
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant)
|
||||
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant)
|
||||
var micros = start
|
||||
do {
|
||||
val rebased = rebaseGregorianToJulianMicros(zid, micros)
|
||||
val rebasedAndOptimized = rebaseGregorianToJulianMicros(micros)
|
||||
assert(rebasedAndOptimized === rebased)
|
||||
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
|
||||
} while (micros <= end)
|
||||
}
|
||||
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant)
|
||||
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant)
|
||||
var micros = start
|
||||
do {
|
||||
val rebased = rebaseGregorianToJulianMicros(TimeZone.getTimeZone(zid), micros)
|
||||
val rebasedAndOptimized = withDefaultTimeZone(zid) {
|
||||
rebaseGregorianToJulianMicros(micros)
|
||||
}
|
||||
assert(rebasedAndOptimized === rebased)
|
||||
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
|
||||
} while (micros <= end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -223,26 +223,26 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
test("optimization of micros rebasing - Julian to Gregorian") {
|
||||
outstandingZoneIds.foreach { zid =>
|
||||
withClue(s"zone id = $zid") {
|
||||
withDefaultTimeZone(zid) {
|
||||
val start = rebaseGregorianToJulianMicros(
|
||||
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
val end = rebaseGregorianToJulianMicros(
|
||||
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
var micros = start
|
||||
do {
|
||||
val rebased = rebaseJulianToGregorianMicros(zid, micros)
|
||||
val rebasedAndOptimized = rebaseJulianToGregorianMicros(micros)
|
||||
assert(rebasedAndOptimized === rebased)
|
||||
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
|
||||
} while (micros <= end)
|
||||
}
|
||||
val start = rebaseGregorianToJulianMicros(
|
||||
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
val end = rebaseGregorianToJulianMicros(
|
||||
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
var micros = start
|
||||
do {
|
||||
val rebased = rebaseJulianToGregorianMicros(TimeZone.getTimeZone(zid), micros)
|
||||
val rebasedAndOptimized = withDefaultTimeZone(zid) {
|
||||
rebaseJulianToGregorianMicros(micros)
|
||||
}
|
||||
assert(rebasedAndOptimized === rebased)
|
||||
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
|
||||
} while (micros <= end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def generateRebaseJson(
|
||||
adjustFunc: Long => Long,
|
||||
rebaseFunc: (ZoneId, Long) => Long,
|
||||
adjustFunc: (TimeZone, Long) => Long,
|
||||
rebaseFunc: (TimeZone, Long) => Long,
|
||||
dir: String,
|
||||
fileName: String): Unit = {
|
||||
import java.nio.file.{Files, Paths}
|
||||
|
@ -260,14 +260,15 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
.sortBy(_.getId)
|
||||
.foreach { zid =>
|
||||
withDefaultTimeZone(zid) {
|
||||
val start = adjustFunc(instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant))
|
||||
val tz = TimeZone.getTimeZone(zid)
|
||||
val start = adjustFunc(
|
||||
tz,
|
||||
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
// sun.util.calendar.ZoneInfo resolves DST after 2037 year incorrectly.
|
||||
// See https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8073446
|
||||
val end = adjustFunc(instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0)
|
||||
.atZone(zid)
|
||||
.toInstant))
|
||||
val end = adjustFunc(
|
||||
tz,
|
||||
instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0).atZone(zid).toInstant))
|
||||
|
||||
var micros = start
|
||||
var diff = Long.MaxValue
|
||||
|
@ -276,7 +277,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
val switches = new ArrayBuffer[Long]()
|
||||
val diffs = new ArrayBuffer[Long]()
|
||||
while (micros < end) {
|
||||
val rebased = rebaseFunc(zid, micros)
|
||||
val rebased = rebaseFunc(tz, micros)
|
||||
val curDiff = rebased - micros
|
||||
if (curDiff != diff) {
|
||||
if (step > MICROS_PER_SECOND) {
|
||||
|
@ -308,7 +309,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
|
||||
ignore("generate 'gregorian-julian-rebase-micros.json'") {
|
||||
generateRebaseJson(
|
||||
adjustFunc = identity[Long],
|
||||
adjustFunc = (_: TimeZone, micros: Long) => micros,
|
||||
rebaseFunc = rebaseGregorianToJulianMicros,
|
||||
dir = "/Users/maximgekk/tmp",
|
||||
fileName = "gregorian-julian-rebase-micros.json")
|
||||
|
@ -383,26 +384,27 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
|
||||
test("rebase not-existed timestamps in the hybrid calendar") {
|
||||
outstandingZoneIds.foreach { zid =>
|
||||
withDefaultTimeZone(zid) {
|
||||
Seq(
|
||||
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
|
||||
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
|
||||
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
|
||||
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
|
||||
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
|
||||
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
|
||||
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
|
||||
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
|
||||
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
|
||||
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
|
||||
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
|
||||
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
|
||||
).foreach { case (gregTs, hybridTs) =>
|
||||
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
|
||||
val hybridMicros = parseToJulianMicros(hybridTs)
|
||||
val gregorianMicros = parseToGregMicros(gregTs, zid)
|
||||
Seq(
|
||||
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
|
||||
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
|
||||
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
|
||||
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
|
||||
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
|
||||
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
|
||||
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
|
||||
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
|
||||
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
|
||||
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
|
||||
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
|
||||
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
|
||||
).foreach { case (gregTs, hybridTs) =>
|
||||
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
|
||||
val hybridMicros = withDefaultTimeZone(zid) { parseToJulianMicros(hybridTs) }
|
||||
val gregorianMicros = parseToGregMicros(gregTs, zid)
|
||||
|
||||
assert(rebaseGregorianToJulianMicros(zid, gregorianMicros) === hybridMicros)
|
||||
val tz = TimeZone.getTimeZone(zid)
|
||||
assert(rebaseGregorianToJulianMicros(tz, gregorianMicros) === hybridMicros)
|
||||
withDefaultTimeZone(zid) {
|
||||
assert(rebaseGregorianToJulianMicros(gregorianMicros) === hybridMicros)
|
||||
}
|
||||
}
|
||||
|
@ -416,38 +418,39 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
|
|||
// clocks were moved backward to become Sunday, 18 November, 1945 01:00:00 AM.
|
||||
// In this way, the overlap happened w/o Daylight Saving Time.
|
||||
val hkZid = getZoneId("Asia/Hong_Kong")
|
||||
var expected = "1945-11-18 01:30:00.0"
|
||||
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
|
||||
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
|
||||
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
|
||||
var overlapInterval = MICROS_PER_HOUR
|
||||
if (earlierMicros + overlapInterval != laterMicros) {
|
||||
// Old JDK might have an outdated time zone database.
|
||||
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
|
||||
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
|
||||
expected = "1945-09-14 23:30:00.0"
|
||||
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
|
||||
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
|
||||
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
|
||||
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
|
||||
overlapInterval = laterMicros - earlierMicros
|
||||
}
|
||||
val hkTz = TimeZone.getTimeZone(hkZid)
|
||||
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkTz, earlierMicros)
|
||||
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkTz, laterMicros)
|
||||
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
|
||||
withDefaultTimeZone(hkZid) {
|
||||
var expected = "1945-11-18 01:30:00.0"
|
||||
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
|
||||
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
|
||||
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
|
||||
var overlapInterval = MICROS_PER_HOUR
|
||||
if (earlierMicros + overlapInterval != laterMicros) {
|
||||
// Old JDK might have an outdated time zone database.
|
||||
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
|
||||
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
|
||||
expected = "1945-09-14 23:30:00.0"
|
||||
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
|
||||
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
|
||||
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
|
||||
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
|
||||
overlapInterval = laterMicros - earlierMicros
|
||||
}
|
||||
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkZid, earlierMicros)
|
||||
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkZid, laterMicros)
|
||||
def toTsStr(micros: Long): String = toJavaTimestamp(micros).toString
|
||||
assert(toTsStr(rebasedEarlierMicros) === expected)
|
||||
assert(toTsStr(rebasedLaterMicros) === expected)
|
||||
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
|
||||
// Check optimized rebasing
|
||||
assert(rebaseGregorianToJulianMicros(earlierMicros) === rebasedEarlierMicros)
|
||||
assert(rebaseGregorianToJulianMicros(laterMicros) === rebasedLaterMicros)
|
||||
// Check reverse rebasing
|
||||
assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros)
|
||||
assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros)
|
||||
// Check reverse not-optimized rebasing
|
||||
assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
|
||||
assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
|
||||
}
|
||||
// Check reverse not-optimized rebasing
|
||||
assert(rebaseJulianToGregorianMicros(hkTz, rebasedEarlierMicros) === earlierMicros)
|
||||
assert(rebaseJulianToGregorianMicros(hkTz, rebasedLaterMicros) === laterMicros)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue