[SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource

### What changes were proposed in this pull request?

This PR (SPARK-31238) aims the followings.
1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calendar. After that, it calculates days from the epoch `1970-01-01` for the resulted local date.
2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource.

### Why are the changes needed?
For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result.

### Does this PR introduce any user-facing change?
Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following:
```scala
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt        |
+----------+
|1200-01-08|
+----------+
```
After the changes
```scala
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt        |
+----------+
|1200-01-01|
+----------+
```

### How was this patch tested?
- By running `OrcSourceSuite` and `HiveOrcSourceSuite`.
- Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc")
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt        |
+----------+
|1200-01-01|
+----------+
```
- Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write.

Closes #28016 from MaxGekk/rebase-date-orc.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Maxim Gekk 2020-03-26 13:14:28 -07:00 committed by Dongjoon Hyun
parent 33f532a9f2
commit d72ec85741
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
10 changed files with 153 additions and 9 deletions

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.hive
package org.apache.spark.sql.execution.datasources
import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date
@ -35,11 +35,12 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian
* @param julianDays The number of days since the epoch 1970-01-01 in
* Julian calendar.
*/
private[hive] class DaysWritable(
class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
@ -55,6 +56,11 @@ private[hive] class DaysWritable(
override def getDays: Int = julianDays
override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
override def set(d: Int): Unit = {
gregorianDays = d
julianDays = rebaseGregorianToJulianDays(d)
}
@throws[IOException]
override def write(out: DataOutput): Unit = {
WritableUtils.writeVInt(out, julianDays)

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.Locale
import org.apache.hadoop.conf.Configuration
@ -482,6 +482,32 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}
test("SPARK-31238: compatibility with Spark 2.4 in reading dates") {
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
checkAnswer(
readResourceOrcFile("test-data/before_1582_date_v2_4.snappy.orc"),
Row(java.sql.Date.valueOf("1200-01-01")))
}
}
}
test("SPARK-31238: rebasing dates in write") {
withTempPath { dir =>
val path = dir.getAbsolutePath
Seq("1001-01-01").toDF("dateS")
.select($"dateS".cast("date").as("date"))
.write
.orc(path)
Seq(false, true).foreach { vectorized =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
checkAnswer(spark.read.orc(path), Row(Date.valueOf("1001-01-01")))
}
}
}
}
}
class OrcSourceSuite extends OrcSuite with SharedSparkSession {

View file

@ -133,4 +133,9 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
throw new AnalysisException("Can not match OrcTable in the query.")
}
}
protected def readResourceOrcFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
spark.read.orc(url.toString)
}
}

View file

@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
private final boolean isTimestamp;
private final boolean isDate;
private int batchSize;
@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
isTimestamp = false;
}
if (type instanceof DateType) {
isDate = true;
} else {
isDate = false;
}
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
@Override
public int getInt(int rowId) {
return (int) longData.vector[getRowIndex(rowId)];
int value = (int) longData.vector[getRowIndex(rowId)];
if (isDate) {
return DateTimeUtils.rebaseJulianToGregorianDays(value);
} else {
return value;
}
}
@Override

View file

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date
import org.apache.hadoop.io.WritableUtils
import org.apache.orc.storage.serde2.io.DateWritable
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}
/**
* The class accepts/returns days in Gregorian calendar and rebase them
* via conversion to local date in Julian calendar for dates before 1582-10-15
* in read/write for backward compatibility with Spark 2.4 and earlier versions.
*
* This is a clone of `org.apache.spark.sql.execution.datasources.DaysWritable`.
* The class is cloned because Hive ORC v1.2 uses different `DateWritable`:
* - v1.2: `org.apache.orc.storage.serde2.io.DateWritable`
* - v2.3 and `HiveInspectors`: `org.apache.hadoop.hive.serde2.io.DateWritable`
*
* @param gregorianDays The number of days since the epoch 1970-01-01 in
* Gregorian calendar.
* @param julianDays The number of days since the epoch 1970-01-01 in
* Julian calendar.
*/
class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
this(
gregorianDays = dateWritable match {
case daysWritable: DaysWritable => daysWritable.gregorianDays
case dateWritable: DateWritable =>
rebaseJulianToGregorianDays(dateWritable.getDays)
},
julianDays = dateWritable.getDays)
}
override def getDays: Int = julianDays
override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
override def set(d: Int): Unit = {
gregorianDays = d
julianDays = rebaseGregorianToJulianDays(d)
}
@throws[IOException]
override def write(out: DataOutput): Unit = {
WritableUtils.writeVInt(out, julianDays)
}
@throws[IOException]
override def readFields(in: DataInput): Unit = {
julianDays = WritableUtils.readVInt(in)
gregorianDays = rebaseJulianToGregorianDays(julianDays)
}
}

View file

@ -47,13 +47,13 @@ private[sql] object OrcShimUtils {
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
val result = new DateWritable()
val result = new DaysWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
new DateWritable(getter.getInt(ordinal))
new DaysWritable(getter.getInt(ordinal))
}
}

View file

@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
private final boolean isTimestamp;
private final boolean isDate;
private int batchSize;
@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
isTimestamp = false;
}
if (type instanceof DateType) {
isDate = true;
} else {
isDate = false;
}
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
@Override
public int getInt(int rowId) {
return (int) longData.vector[getRowIndex(rowId)];
int value = (int) longData.vector[getRowIndex(rowId)];
if (isDate) {
return DateTimeUtils.rebaseJulianToGregorianDays(value);
} else {
return value;
}
}
@Override

View file

@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.execution.datasources.DaysWritable
import org.apache.spark.sql.types.Decimal
/**
@ -47,13 +48,13 @@ private[sql] object OrcShimUtils {
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
val result = new DateWritable()
val result = new DaysWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
new DateWritable(getter.getInt(ordinal))
new DaysWritable(getter.getInt(ordinal))
}
}

View file

@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.DaysWritable
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String