[SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro

### What changes were proposed in this pull request?
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via **Avro** datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to:
- -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into **Avro** files.
- -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value.

The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config:
```
spark.sql.legacy.avro.rebaseDateTime.enabled
```
which is set to `false` by default which means the rebasing is not performed by default.

The details of the implementation:
1. Re-use 2 methods of `DateTimeUtils` added by the PR https://github.com/apache/spark/pull/27915 for rebasing microseconds.
2. Re-use 2 methods of `DateTimeUtils` added by the PR https://github.com/apache/spark/pull/27915 for rebasing days.
3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files if the SQL config is on.
4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro** files if the SQL config is on.
5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls conversions from/to dates, and timestamps of the `timestamp-millis`, `timestamp-micros` logical types.

### Why are the changes needed?
For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions.

### Does this PR introduce any user-facing change?
Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
+----------+
|date      |
+----------+
|1001-01-07|
+----------+
```
After the changes:
```scala
scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true)
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
+----------+
|date      |
+----------+
|1001-01-01|
+----------+
```

### How was this patch tested?
1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The test reads back avro files saved by Spark 2.4.5 via:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
df: org.apache.spark.sql.DataFrame = [date: date]
scala> df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro")

scala> val df2 = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df2: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro")

scala> :paste
// Entering paste mode (ctrl-D to finish)

  val timestampSchema = s"""
    |  {
    |    "namespace": "logical",
    |    "type": "record",
    |    "name": "test",
    |    "fields": [
    |      {"name": "ts", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null}
    |    ]
    |  }
    |""".stripMargin

// Exiting paste mode, now interpreting.
scala> df3.write.format("avro").option("avroSchema", timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro")

```

2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of dates/timestamps (in microsecond and millisecond precision). The tests write rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and compare results. :
  - `rebasing microseconds timestamps in write`
  - `rebasing milliseconds timestamps in write`
  - `rebasing dates in write`

Closes #27953 from MaxGekk/rebase-avro-datetime.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Maxim Gekk 2020-03-20 13:57:49 +08:00 committed by Wenchen Fan
parent f1cc86792f
commit 4766a36647
7 changed files with 159 additions and 15 deletions

View file

@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()
// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled
private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
@ -88,6 +92,11 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
@ -95,14 +104,23 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
updater.setLong(ordinal, value.asInstanceOf[Long])
case (LONG, TimestampType) => avroType.getLogicalType match {
case _: TimestampMillis => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
case _: TimestampMicros => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case null => (updater, ordinal, value) =>
// For backward compatibility, if the Avro type is Long and it is not logical type,
// the value is processed as timestamp type with millisecond precision.
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
val micros = value.asInstanceOf[Long]
if (rebaseDateTime) {
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
} else {
updater.setLong(ordinal, micros)
}
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}

View file

@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@ -42,6 +44,9 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
extends Logging {
// Whether to rebase datetimes from Gregorian to Julian calendar in write
private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled
def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}
@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))
case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
case (TimestampType, LONG) => avroType.getLogicalType match {
case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
// For backward compatibility, if the Avro type is Long and it is not logical type,
// output the timestamp value as with millisecond precision.
case null => (getter, ordinal) => getter.getLong(ordinal) / 1000
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = if (rebaseDateTime) {
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
DateTimeUtils.microsToMillis(rebasedMicros)
case _: TimestampMicros => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
if (rebaseDateTime) {
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
} else micros
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.avro
import java.io.File
import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}
private def readResourceAvroFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.format("avro").load(url.toString)
}
test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
checkAnswer(
readResourceAvroFile("before_1582_date_v2_4.avro"),
Row(java.sql.Date.valueOf("1001-01-01")))
checkAnswer(
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
checkAnswer(
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
}
}
test("SPARK-31183: rebasing microseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val nonRebased = "1001-01-07 01:09:05.123456"
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write.format("avro")
.save(path)
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
}
}
}
test("SPARK-31183: rebasing milliseconds timestamps in write") {
val tsStr = "1001-01-01 01:02:03.123456"
val rebased = "1001-01-01 01:02:03.123"
val nonRebased = "1001-01-07 01:09:05.123"
Seq(
"""{"type": "long","logicalType": "timestamp-millis"}""",
""""long"""").foreach { tsType =>
val timestampSchema = s"""
|{
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "ts", "type": $tsType}
| ]
|}""".stripMargin
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq(tsStr).toDF("tsS")
.select($"tsS".cast("timestamp").as("ts"))
.write
.option("avroSchema", timestampSchema)
.format("avro")
.save(path)
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(rebased)))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(
spark.read.schema("ts timestamp").format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
}
}
}
}
test("SPARK-31183: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write.format("avro")
.save(path)
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
}
}
}
}
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {

View file

@ -2501,6 +2501,20 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
val LEGACY_AVRO_REBASE_DATETIME =
buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write and " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
/**
* Holds information about keys that have been deprecated.
*
@ -3080,6 +3094,8 @@ class SQLConf extends Serializable with Logging {
def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
def avroRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */