[SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp via local date-time

### What changes were proposed in this pull request?
In the PR, I propose to change conversion of java.sql.Timestamp/Date values to/from internal values of Catalyst's TimestampType/DateType before cutover day `1582-10-15` of Gregorian calendar. I propose to construct local date-time from microseconds/days since the epoch. Take each date-time component `year`, `month`, `day`, `hour`, `minute`, `second` and `second fraction`, and construct java.sql.Timestamp/Date using the extracted components.

### Why are the changes needed?
This will rebase underlying time/date offset in the way that collected java.sql.Timestamp/Date values will have the same local time-date component as the original values in Gregorian calendar.

Here is the example which demonstrates the issue:
```sql
scala> sql("select date '1100-10-10'").collect()
res1: Array[org.apache.spark.sql.Row] = Array([1100-10-03])
```

### Does this PR introduce any user-facing change?
Yes, after the changes:
```sql
scala> sql("select date '1100-10-10'").collect()
res0: Array[org.apache.spark.sql.Row] = Array([1100-10-10])
```

### How was this patch tested?
By running `DateTimeUtilsSuite`, `DateFunctionsSuite` and `DateExpressionsSuite`.

Closes #27807 from MaxGekk/rebase-timestamp-before-1582.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2020-03-11 20:53:56 +08:00 committed by Wenchen Fan
parent 2b46662bd0
commit 3d3e366aa8
5 changed files with 102 additions and 15 deletions

View file

@ -47,6 +47,15 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588
final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay
final val GREGORIAN_CUTOVER_MICROS = instantToMicros(
LocalDateTime.of(1582, 10, 15, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
.toInstant)
final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS)
final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00")
final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
@ -86,28 +95,50 @@ object DateTimeUtils {
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
microsToDays(millisToMicros(date.getTime))
if (date.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (date.before(julianCommonEraStart)) 0 else 1
val localDate = date.toLocalDate.`with`(ChronoField.ERA, era)
localDateToDays(localDate)
} else {
microsToDays(millisToMicros(date.getTime))
}
}
/**
* Returns a java.sql.Date from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) {
Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch))
} else {
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
}
}
/**
* Returns a java.sql.Timestamp from number of micros since epoch.
*/
def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
Timestamp.from(microsToInstant(us))
if (us < GREGORIAN_CUTOVER_MICROS) {
val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
Timestamp.valueOf(ldt)
} else {
Timestamp.from(microsToInstant(us))
}
}
/**
* Returns the number of micros since epoch from java.sql.Timestamp.
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
instantToMicros(t.toInstant)
if (t.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (t.before(julianCommonEraStart)) 0 else 1
val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era)
val instant = ZonedDateTime.of(localDateTime, ZoneId.systemDefault()).toInstant
instantToMicros(instant)
} else {
instantToMicros(t.toInstant)
}
}
/**

View file

@ -23,7 +23,7 @@ class HiveResultSuite extends SharedSparkSession {
import testImplicits._
test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
@ -36,8 +36,8 @@ class HiveResultSuite extends SharedSparkSession {
test("timestamp formatting in hive result") {
val timestamps = Seq(
"2018-12-28 01:02:03",
"1582-10-13 01:02:03",
"1582-10-14 01:02:03",
"1582-10-03 01:02:03",
"1582-10-04 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val executedPlan1 = df.queryExecution.executedPlan

View file

@ -21,6 +21,7 @@ import java.math.BigDecimal;
import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
@ -136,7 +137,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000;
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}

View file

@ -21,6 +21,7 @@ import java.math.BigDecimal;
import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
@ -136,7 +137,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000;
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}

View file

@ -18,7 +18,8 @@
package org.apache.spark.sql.hive
import java.lang.reflect.{ParameterizedType, Type, WildcardType}
import java.util.concurrent.TimeUnit._
import java.time.LocalDate
import java.util.Calendar
import scala.collection.JavaConverters._
@ -181,6 +182,33 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[hive] trait HiveInspectors {
private final val JULIAN_CUTOVER_DAY =
rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt)
private def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = {
val localDate = LocalDate.ofEpochDay(daysSinceEpoch)
val utcCal = new Calendar.Builder()
.setCalendarType("gregory")
.setTimeZone(DateTimeUtils.TimeZoneUTC)
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
.build()
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, DateTimeConstants.MILLIS_PER_DAY))
}
private def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = {
val millis = Math.multiplyExact(daysSinceEpoch, DateTimeConstants.MILLIS_PER_DAY)
val utcCal = new Calendar.Builder()
.setCalendarType("gregory")
.setTimeZone(DateTimeUtils.TimeZoneUTC)
.setInstant(millis)
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH))
Math.toIntExact(localDate.toEpochDay)
}
def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
@ -466,7 +494,7 @@ private[hive] trait HiveInspectors {
_ => constant
case poi: WritableConstantTimestampObjectInspector =>
val t = poi.getWritableConstantValue
val constant = SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp)
_ => constant
case poi: WritableConstantIntObjectInspector =>
val constant = poi.getWritableConstantValue.get()
@ -618,7 +646,14 @@ private[hive] trait HiveInspectors {
case x: DateObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
// Rebasing written days via conversion to local dates.
// See the comment for `getDateWritable()`.
val daysSinceEpoch = x.getPrimitiveWritableObject(data).getDays
if (daysSinceEpoch < JULIAN_CUTOVER_DAY) {
rebaseJulianToGregorianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
} else {
null
}
@ -634,8 +669,7 @@ private[hive] trait HiveInspectors {
case x: TimestampObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
val t = x.getPrimitiveWritableObject(data)
SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp)
} else {
null
}
@ -1012,7 +1046,27 @@ private[hive] trait HiveInspectors {
}
private def getDateWritable(value: Any): hiveIo.DateWritable =
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
if (value == null) {
null
} else {
// Rebasing days since the epoch to store the same number of days
// as by Spark 2.4 and earlier versions. Spark 3.0 switched to
// Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
// this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
// Julian calendar for dates before 1582-10-15. So, the same local date may
// be mapped to different number of days since the epoch in different calendars.
// For example:
// Proleptic Gregorian calendar: 1582-01-01 -> -141714
// Julian calendar: 1582-01-01 -> -141704
// The code below converts -141714 to -141704.
val daysSinceEpoch = value.asInstanceOf[Int]
val rebasedDays = if (daysSinceEpoch < DateTimeUtils.GREGORIAN_CUTOVER_DAY) {
rebaseGregorianToJulianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
new hiveIo.DateWritable(rebasedDays)
}
private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
if (value == null) {