[SPARK-34377][SQL] Add new parquet datasource options to control datetime rebasing in read

### What changes were proposed in this pull request?
In the PR, I propose new options for the Parquet datasource:
1. `datetimeRebaseMode`
2. `int96RebaseMode`

Both options influence on loading ancient dates and timestamps column values from parquet files. The `datetimeRebaseMode` option impacts on loading values of the `DATE`, `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS` types, `int96RebaseMode` impacts on loading of `INT96` timestamps.

The options support the same values as the SQL configs `spark.sql.legacy.parquet.datetimeRebaseModeInRead` and `spark.sql.legacy.parquet.int96RebaseModeInRead` namely;
- `"LEGACY"`, when an option is set to this value, Spark rebases dates/timestamps from the legacy hybrid calendar (Julian + Gregorian) to the Proleptic Gregorian calendar.
- `"CORRECTED"`, dates/timestamps are read AS IS from parquet files.
- `"EXCEPTION"`, when it is set as an option value, Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.

### Why are the changes needed?
1. New options will allow to load parquet files from at least two sources in different rebasing modes in the same query. For instance:
```scala
val df1 = spark.read.option("datetimeRebaseMode", "legacy").parquet(folder1)
val df2 = spark.read.option("datetimeRebaseMode", "corrected").parquet(folder2)
df1.join(df2, ...)
```
Before the changes, it is impossible because the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`  influences on both reads.

2. Mixing of Dataset/DataFrame and RDD APIs should become possible. Since SQL configs are not propagated through RDDs, the following code fails on ancient timestamps:
```scala
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "legacy")
spark.read.parquet(folder).distinct.rdd.collect()
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV1Suite"
$ build/sbt "sql/test:testOnly *ParquetRebaseDatetimeV2Suite"
```

Closes #31489 from MaxGekk/parquet-rebase-options.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2021-02-08 13:28:40 +00:00 committed by Wenchen Fan
parent 70ef196d59
commit a85490659f
9 changed files with 185 additions and 26 deletions

View file

@ -439,6 +439,30 @@ class DataFrameReader(OptionUtils):
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
datetimeRebaseMode : str, optional
the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``,
``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar.
* ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of dates/timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian
to Proleptic Gregorian calendar.
If None is set, the value of the SQL config
``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default.
int96RebaseMode : str, optional
the rebasing mode for ``INT96`` timestamps from the Julian to
Proleptic Gregorian calendar.
* ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of ``INT96`` timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian
to Proleptic Gregorian calendar.
If None is set, the value of the SQL config
``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default.
Examples
--------
@ -451,9 +475,12 @@ class DataFrameReader(OptionUtils):
modifiedBefore = options.get('modifiedBefore', None)
modifiedAfter = options.get('modifiedAfter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
datetimeRebaseMode = options.get('datetimeRebaseMode', None)
int96RebaseMode = options.get('int96RebaseMode', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup, modifiedBefore=modifiedBefore,
modifiedAfter=modifiedAfter)
modifiedAfter=modifiedAfter, datetimeRebaseMode=datetimeRebaseMode,
int96RebaseMode=int96RebaseMode)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

View file

@ -667,7 +667,8 @@ class DataStreamReader(OptionUtils):
else:
raise TypeError("path can be only a single string")
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
datetimeRebaseMode=None, int96RebaseMode=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
@ -688,6 +689,30 @@ class DataStreamReader(OptionUtils):
recursively scan a directory for files. Using this option
disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
datetimeRebaseMode : str, optional
the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``,
``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar.
* ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of dates/timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian
to Proleptic Gregorian calendar.
If None is set, the value of the SQL config
``spark.sql.legacy.parquet.datetimeRebaseModeInRead`` is used by default.
int96RebaseMode : str, optional
the rebasing mode for ``INT96`` timestamps from the Julian to
Proleptic Gregorian calendar.
* ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps
that are ambiguous between the two calendars.
* ``CORRECTED``: loading of ``INT96`` timestamps without rebasing.
* ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian
to Proleptic Gregorian calendar.
If None is set, the value of the SQL config
``spark.sql.legacy.parquet.int96RebaseModeInRead`` is used by default.
Examples
--------
@ -698,7 +723,8 @@ class DataStreamReader(OptionUtils):
True
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
recursiveFileLookup=recursiveFileLookup,
datetimeRebaseMode=datetimeRebaseMode, int96RebaseMode=int96RebaseMode)
if isinstance(path, str):
return self._df(self._jreader.parquet(path))
else:

View file

@ -825,6 +825,29 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li>
* <li>`datetimeRebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values
* of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to
* Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of dates/timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* <li>`int96RebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps
* from the Julian to Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* </ul>
*
* @since 1.4.0

View file

@ -251,6 +251,9 @@ class ParquetFileFormat
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
@ -301,10 +304,10 @@ class ParquetFileFormat
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
int96RebaseModeInRead)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =

View file

@ -69,6 +69,19 @@ class ParquetOptions(
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.isParquetSchemaMergingEnabled)
/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
def datetimeRebaseModeInRead: String = parameters
.get(DATETIME_REBASE_MODE)
.getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
/**
* The rebasing mode for INT96 timestamp values in reads.
*/
def int96RebaseModeInRead: String = parameters
.get(INT96_REBASE_MODE)
.getOrElse(sqlConf.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
}
@ -89,4 +102,16 @@ object ParquetOptions {
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
}
// The option controls rebasing of the DATE and TIMESTAMP values between
// Julian and Proleptic Gregorian calendars. It impacts on the behaviour of the Parquet
// datasource similarly to the SQL config `spark.sql.legacy.parquet.datetimeRebaseModeInRead`,
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val DATETIME_REBASE_MODE = "datetimeRebaseMode"
// The option controls rebasing of the INT96 timestamp values between Julian and Proleptic
// Gregorian calendars. It impacts on the behaviour of the Parquet datasource similarly to
// the SQL config `spark.sql.legacy.parquet.int96RebaseModeInRead`.
// The valid option values are: `EXCEPTION`, `LEGACY` or `CORRECTED`.
val INT96_REBASE_MODE = "int96RebaseMode"
}

View file

@ -52,6 +52,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param readDataSchema Required schema of Parquet files.
* @param partitionSchema Schema of partitions.
* @param filters Filters to be pushed down in the batch scan.
* @param parquetOptions The options of Parquet datasource that are set for the read.
*/
case class ParquetPartitionReaderFactory(
sqlConf: SQLConf,
@ -59,7 +60,8 @@ case class ParquetPartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
filters: Array[Filter]) extends FilePartitionReaderFactory with Logging {
filters: Array[Filter],
parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with Logging {
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields)
private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
@ -74,6 +76,8 @@ case class ParquetPartitionReaderFactory(
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
@ -174,10 +178,10 @@ case class ParquetPartitionReaderFactory(
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
int96RebaseModeInRead)
val reader = buildReaderFunc(
split,
file.partitionValues,

View file

@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.parquet
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat
@ -24,7 +26,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
@ -76,8 +78,15 @@ case class ParquetScan(
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
val sqlConf = sparkSession.sessionState.conf
ParquetPartitionReaderFactory(
sqlConf,
broadcastedConf,
dataSchema,
readDataSchema,
readPartitionSchema,
pushedFilters,
new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf))
}
override def equals(obj: Any): Boolean = obj match {

View file

@ -493,6 +493,29 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* It does not change the behavior of partition discovery.</li>
* <li>`recursiveFileLookup`: recursively scan a directory for files. Using this option
* disables partition discovery</li>
* <li>`datetimeRebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values
* of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to
* Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of dates/timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* <li>`int96RebaseMode` (default is the value specified in the SQL config
* `spark.sql.legacy.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps
* from the Julian to Proleptic Gregorian calendar:
* <ul>
* <li>`EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous
* between the two calendars</li>
* <li>`CORRECTED` : loading of timestamps without rebasing</li>
* <li>`LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic
* Gregorian calendar</li>
* </ul>
* </li>
* </ul>
*
* @since 2.0.0

View file

@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException}
import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType}
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, EXCEPTION, LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.test.SharedSparkSession
abstract class ParquetRebaseDatetimeSuite
@ -97,6 +97,27 @@ abstract class ParquetRebaseDatetimeSuite
}
}
private def inReadConfToOptions(
conf: String,
mode: LegacyBehaviorPolicy.Value): Map[String, String] = conf match {
case SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key =>
Map(ParquetOptions.INT96_REBASE_MODE -> mode.toString)
case _ => Map(ParquetOptions.DATETIME_REBASE_MODE -> mode.toString)
}
private def runInMode(
conf: String,
modes: Seq[LegacyBehaviorPolicy.Value])(f: Map[String, String] => Unit): Unit = {
modes.foreach { mode =>
withSQLConf(conf -> mode.toString) { f(Map.empty) }
}
withSQLConf(conf -> EXCEPTION.toString) {
modes.foreach { mode =>
f(inReadConfToOptions(conf, mode))
}
}
}
test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") {
val N = 8
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
@ -132,9 +153,9 @@ abstract class ParquetRebaseDatetimeSuite
}
// For Parquet files written by Spark 3.0, we know the writer info and don't need the
// config to guide the rebase behavior.
withSQLConf(inReadConf -> LEGACY.toString) {
runInMode(inReadConf, Seq(LEGACY)) { options =>
checkAnswer(
spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase),
spark.read.format("parquet").options(options).load(path2_4, path3_0, path3_0_rebase),
(0 until N).flatMap { i =>
val (dictS, plainS) = rowFunc(i)
Seq.tabulate(3) { _ =>
@ -235,12 +256,10 @@ abstract class ParquetRebaseDatetimeSuite
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(inReadConf -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
runInMode(inReadConf, Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path),
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
}
// Force to not rebase to prove the written datetime values are rebased
@ -275,12 +294,12 @@ abstract class ParquetRebaseDatetimeSuite
withAllParquetReaders {
// The file metadata indicates if it needs rebase or not, so we can always get the
// correct result regardless of the "rebase mode" config.
Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) {
checkAnswer(
spark.read.parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
runInMode(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key,
Seq(LEGACY, CORRECTED, EXCEPTION)) { options =>
checkAnswer(
spark.read.options(options).parquet(path),
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
}
// Force to not rebase to prove the written datetime values are rebased and we will get