[SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet

### What changes were proposed in this pull request?
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via Parquet datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to:
- -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into parquet.
- -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.

According to the parquet spec, parquet timestamps of the `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS` output type and parquet dates should be based on Proleptic Gregorian calendar but the `INT96` timestamps should be stored as Julian days. Since the version 3.0, Spark conforms the spec but for the backward compatibility with previous version, the PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config:
```
spark.sql.legacy.parquet.rebaseDateTime.enabled
```
which is set to `false` by default which means the rebasing is not performed by default.

The details of the implementation:
1. Added 2 methods to `DateTimeUtils` for rebasing microseconds. `rebaseGregorianToJulianMicros()` builds a local timestamp in Proleptic Gregorian calendar, extracts date-time fields `year`, `month`, ..., `second fraction` from the local timestamp and uses them to build another local timestamp based on the hybrid calendar (using `java.util.Calendar` API). After that it calculates the number of microseconds since the epoch using the resulted local timestamp. The function performs the conversion via the system JVM time zone for compatibility with Spark 2.4 and earlier versions. The `rebaseJulianToGregorianMicros()` function does reverse conversion.
2. Added 2 methods to `DateTimeUtils` for rebasing days. `rebaseGregorianToJulianDays()` builds a local date from the passed number of days since the epoch in Proleptic Gregorian calendar, interprets the resulted date as a local date in the hybrid calendar and gets the number of days since the epoch from the resulted local date. The conversion is performed via the `UTC` time zone because the conversion is independent from time zones, and `UTC` is selected to void round issues of casting days to milliseconds and back. The `rebaseJulianToGregorianDays()` functions does revers conversion.
3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to parquet files if the SQL config is on.
4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from parquet files if the SQL config is on.
5. The SQL config `spark.sql.legacy.parquet.rebaseDateTime.enabled` controls conversions from/to dates, timestamps of `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS`, see the SQL config `spark.sql.parquet.outputTimestampType`.
6. The rebasing is always performed for `INT96` timestamps, independently from `spark.sql.legacy.parquet.rebaseDateTime.enabled`.
7. Supported the vectorized parquet reader, see the SQL config `spark.sql.parquet.enableVectorizedReader`.

### Why are the changes needed?
- For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions.
- It fixes the bug of incorrect saving/loading timestamps of the `INT96` type

### Does this PR introduce any user-facing change?
Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `TIMESTAMP_MICROS` is interpreted by Spark 3.0.0-preview2 differently:
```scala
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
+--------------------------+
|ts                        |
+--------------------------+
|1001-01-07 11:32:20.123456|
+--------------------------+
```
After the changes:
```scala
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)

scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false)
+--------------------------+
|ts                        |
+--------------------------+
|1001-01-01 01:02:03.123456|
+--------------------------+
```

### How was this patch tested?
1. Added tests to `ParquetIOSuite` to check rebasing in read for regular reader and vectorized parquet reader. The test reads back parquet files saved by Spark 2.4.5 via:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
df: org.apache.spark.sql.DataFrame = [date: date]
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_date")

scala> val df = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros")

scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_millis")

scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "INT96")
scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96")
```
2. Manually check the write code path. Save date/timestamps (TIMESTAMP_MICROS, TIMESTAMP_MILLIS, INT96) by Spark 3.1.0-SNAPSHOT (after the changes):
```bash
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true)
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
scala> val df = Seq(("1001-01-01", "1001-01-01 01:02:03.123456")).toDF("dateS", "tsS").select($"dateS".cast("date").as("d"), $"tsS".cast("timestamp").as("ts"))
df: org.apache.spark.sql.DataFrame = [d: date, ts: timestamp]
scala> df.write.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros")
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
+----------+--------------------------+
|d         |ts                        |
+----------+--------------------------+
|1001-01-01|1001-01-01 01:02:03.123456|
+----------+--------------------------+
```
Read the saved date/timestamp by Spark 2.4.5:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false)
+----------+--------------------------+
|d         |ts                        |
+----------+--------------------------+
|1001-01-01|1001-01-01 01:02:03.123456|
+----------+--------------------------+
```

Closes #27915 from MaxGekk/rebase-parquet-datetime.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2020-03-19 12:49:51 +08:00 committed by Wenchen Fan
parent 6fd3138e9c
commit bb295d80e3
11 changed files with 343 additions and 17 deletions

View file

@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import java.time._ import java.time._
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields} import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
import java.util.{Locale, TimeZone} import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -148,7 +148,9 @@ object DateTimeUtils {
def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = { def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
// use Long to avoid rounding errors // use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds) val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
val rebased = rebaseJulianToGregorianMicros(micros)
rebased
} }
/** /**
@ -157,7 +159,7 @@ object DateTimeUtils {
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive). * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/ */
def toJulianDay(us: SQLTimestamp): (Int, Long) = { def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY val micros = julian_us % MICROS_PER_DAY
(day.toInt, MICROSECONDS.toNanos(micros)) (day.toInt, MICROSECONDS.toNanos(micros))
@ -936,4 +938,102 @@ object DateTimeUtils {
val days = period.getDays val days = period.getDays
new CalendarInterval(months, days, 0) new CalendarInterval(months, days, 0)
} }
/**
* Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
* calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
* And takes microseconds since the epoch from the Julian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianMicros(micros: Long): Long = {
val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
.build()
millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
}
/**
* Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
* interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
* And takes microseconds since the epoch from the Gregorian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
*/
def rebaseJulianToGregorianMicros(micros: Long): Long = {
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setInstant(microsToMillis(micros))
.build()
val localDateTime = LocalDateTime.of(
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH),
cal.get(Calendar.HOUR_OF_DAY),
cal.get(Calendar.MINUTE),
cal.get(Calendar.SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
}
/**
* Converts the given number of days since the epoch day 1970-01-01 to
* a local date in Julian calendar, interprets the result as a local
* date in Proleptic Gregorian calendar, and take the number of days
* since the epoch from the Gregorian date.
*
* @param days The number of days since the epoch in Julian calendar.
* @return The rebased number of days in Gregorian calendar.
*/
def rebaseJulianToGregorianDays(days: Int): Int = {
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
.setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH))
Math.toIntExact(localDate.toEpochDay)
}
/**
* Rebasing days since the epoch to store the same number of days
* as by Spark 2.4 and earlier versions. Spark 3.0 switched to
* Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
* this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
* Julian calendar for dates before 1582-10-15. So, the same local date may
* be mapped to different number of days since the epoch in different calendars.
*
* For example:
* Proleptic Gregorian calendar: 1582-01-01 -> -141714
* Julian calendar: 1582-01-01 -> -141704
* The code below converts -141714 to -141704.
*
* @param days The number of days since the epoch 1970-01-01. It can be negative.
* @return The rebased number of days since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianDays(days: Int): Int = {
val localDate = LocalDate.ofEpochDay(days)
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
.build()
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
}
} }

View file

@ -2487,6 +2487,20 @@ object SQLConf {
.booleanConf .booleanConf
.createWithDefault(false) .createWithDefault(false)
val LEGACY_PARQUET_REBASE_DATETIME =
buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write and " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
/** /**
* Holds information about keys that have been deprecated. * Holds information about keys that have been deprecated.
* *
@ -3064,6 +3078,8 @@ class SQLConf extends Serializable with Logging {
def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)
def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
/** ********************** SQLConf functionality methods ************ */ /** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */ /** Set Spark SQL configuration properties. */

View file

@ -670,4 +670,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
assert(toDate("tomorrow CET ", zoneId).get === today + 1) assert(toDate("tomorrow CET ", zoneId).get === today + 1)
} }
} }
test("rebase julian to/from gregorian micros") {
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01 01:02:03.654321",
"1000-01-01 03:02:01.123456",
"1582-10-04 00:00:00.000000",
"1582-10-15 00:00:00.999999", // Gregorian cutover day
"1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
"1969-12-31 11:22:33.000100",
"1970-01-01 00:00:00.000001", // The epoch day
"2020-03-14 09:33:01.500000").foreach { ts =>
withClue(s"time zone = ${timeZone.getID} ts = $ts") {
val julianTs = Timestamp.valueOf(ts)
val julianMicros = millisToMicros(julianTs.getTime) +
((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
.atZone(timeZone.toZoneId)
.toInstant)
assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
}
}
}
}
}
test("rebase gregorian to/from julian days") {
// millisToDays() and fromJavaDate() are taken from Spark 2.4
def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
}
def fromJavaDate(date: Date): Int = {
millisToDays(date.getTime, defaultTimeZone())
}
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01",
"1000-01-01",
"1582-10-04",
"1582-10-15", // Gregorian cutover day
"1883-11-10", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20", // America/Los_Angeles -08:00 zone offset
"1969-12-31",
"1970-01-01", // The epoch day
"2020-03-14").foreach { date =>
val julianDays = fromJavaDate(Date.valueOf(date))
val gregorianDays = localDateToDays(LocalDate.parse(date))
assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
}
}
}
}
} }

View file

@ -37,6 +37,7 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.DecimalType;
@ -101,6 +102,7 @@ public class VectorizedColumnReader {
// The timezone conversion to apply to int96 timestamps. Null if no conversion. // The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final ZoneId convertTz; private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC; private static final ZoneId UTC = ZoneOffset.UTC;
private final boolean rebaseDateTime;
public VectorizedColumnReader( public VectorizedColumnReader(
ColumnDescriptor descriptor, ColumnDescriptor descriptor,
@ -129,6 +131,7 @@ public class VectorizedColumnReader {
if (totalValueCount == 0) { if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0"); throw new IOException("totalValueCount == 0");
} }
this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled();
} }
/** /**
@ -407,7 +410,7 @@ public class VectorizedColumnReader {
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException { private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions. // This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions // TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType || if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) { DecimalType.is32BitDecimalType(column.dataType())) {
defColumn.readIntegers( defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@ -417,6 +420,21 @@ public class VectorizedColumnReader {
} else if (column.dataType() == DataTypes.ShortType) { } else if (column.dataType() == DataTypes.ShortType) {
defColumn.readShorts( defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.DateType ) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(
rowId + i,
DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger()));
} else {
column.putNull(rowId + i);
}
}
} else {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
} else { } else {
throw constructConvertNotSupportedException(descriptor, column); throw constructConvertNotSupportedException(descriptor, column);
} }
@ -425,14 +443,32 @@ public class VectorizedColumnReader {
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException { private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions. // This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType || if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) || DecimalType.is64BitDecimalType(column.dataType())) {
originalType == OriginalType.TIMESTAMP_MICROS) {
defColumn.readLongs( defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(
rowId + i,
DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong()));
} else {
column.putNull(rowId + i);
}
}
} else {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) { } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) { if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong())); long micros = DateTimeUtils.millisToMicros(dataColumn.readLong());
if (rebaseDateTime) {
micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros);
}
column.putLong(rowId + i, micros);
} else { } else {
column.putNull(rowId + i); column.putNull(rowId + i);
} }

View file

@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater) updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging { extends ParquetGroupConverter(updater) with Logging {
// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled
assert( assert(
parquetType.getFieldCount <= catalystType.length, parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter(
new ParquetStringConverter(updater) new ParquetStringConverter(updater)
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
new ParquetPrimitiveConverter(updater) { if (rebaseDateTime) {
override def addLong(value: Long): Unit = { new ParquetPrimitiveConverter(updater) {
updater.setLong(value) override def addLong(value: Long): Unit = {
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
}
} }
} }
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) { if (rebaseDateTime) {
override def addLong(value: Long): Unit = { new ParquetPrimitiveConverter(updater) {
updater.setLong(DateTimeUtils.millisToMicros(value)) override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.millisToMicros(value))
}
} }
} }
@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter(
} }
case DateType => case DateType =>
new ParquetPrimitiveConverter(updater) { if (rebaseDateTime) {
override def addInt(value: Int): Unit = { new ParquetPrimitiveConverter(updater) {
// DateType is not specialized in `SpecificMutableRow`, have to box it here. override def addInt(value: Int): Unit = {
updater.set(value.asInstanceOf[DateType#InternalType]) updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value))
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
updater.set(value)
}
} }
} }

View file

@ -77,6 +77,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
private val decimalBuffer = private val decimalBuffer =
new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION)) new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION))
// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean = SQLConf.get.parquetRebaseDateTimeEnabled
override def init(configuration: Configuration): WriteContext = { override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString) this.schema = StructType.fromString(schemaString)
@ -147,6 +150,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
(row: SpecializedGetters, ordinal: Int) => (row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getShort(ordinal)) recordConsumer.addInteger(row.getShort(ordinal))
case DateType if rebaseDateTime =>
(row: SpecializedGetters, ordinal: Int) =>
val rebasedDays = DateTimeUtils.rebaseGregorianToJulianDays(row.getInt(ordinal))
recordConsumer.addInteger(rebasedDays)
case IntegerType | DateType => case IntegerType | DateType =>
(row: SpecializedGetters, ordinal: Int) => (row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addInteger(row.getInt(ordinal)) recordConsumer.addInteger(row.getInt(ordinal))
@ -177,10 +185,21 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime =>
(row: SpecializedGetters, ordinal: Int) =>
val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
recordConsumer.addLong(rebasedMicros)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) => (row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal)) recordConsumer.addLong(row.getLong(ordinal))
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime =>
(row: SpecializedGetters, ordinal: Int) =>
val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal))
val millis = DateTimeUtils.microsToMillis(rebasedMicros)
recordConsumer.addLong(millis)
case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
(row: SpecializedGetters, ordinal: Int) => (row: SpecializedGetters, ordinal: Int) =>
val millis = DateTimeUtils.microsToMillis(row.getLong(ordinal)) val millis = DateTimeUtils.microsToMillis(row.getLong(ordinal))

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet package org.apache.spark.sql.execution.datasources.parquet
import java.sql.{Date, Timestamp}
import java.util.Locale import java.util.Locale
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
@ -879,6 +880,71 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT) assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
} }
} }
test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
checkAnswer(
readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123")))
}
checkAnswer(readResourceParquetFile(
"test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
}
}
}
test("SPARK-31159: rebasing timestamps in write") {
Seq(
("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
).foreach { case (outType, tsStr, nonRebased) =>
withClue(s"output type $outType") {
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write
.parquet(path)
checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
}
}
}
}
}
}
test("SPARK-31159: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") {
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write
.parquet(path)
checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
}
}
}
} }
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)