[SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet

### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description https://github.com/apache/spark/pull/28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit b09b7f7cc0)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Max Gekk 2021-07-15 22:21:57 +03:00
parent ebc830f14e
commit 57849a54da
5 changed files with 138 additions and 89 deletions

View file

@ -266,11 +266,21 @@ class ParquetFileFormat
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@ -296,9 +306,6 @@ class ParquetFileFormat
None
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)

View file

@ -35,6 +35,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String
@ -48,7 +50,8 @@ class ParquetFilters(
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {
caseSensitive: Boolean,
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
// A map which contains parquet field name and data type, if predicate push down applies.
//
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
@ -129,14 +132,26 @@ class ParquetFilters(
private val ParquetTimestampMillisType =
ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0)
private def dateToDays(date: Any): Int = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
private def dateToDays(date: Any): Int = {
val gregorianDays = date match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
case _ => gregorianDays
}
}
private def timestampToMicros(v: Any): JLong = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
private def timestampToMicros(v: Any): JLong = {
val gregorianMicros = v match {
case i: Instant => DateTimeUtils.instantToMicros(i)
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
}
datetimeRebaseMode match {
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
case _ => gregorianMicros
}
}
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()

View file

@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory(
lazy val footerFileMetaData =
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory(
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)

View file

@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -51,8 +52,17 @@ case class ParquetScanBuilder(
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(
parquetSchema,
pushDownDate,
pushDownTimestamp,
pushDownDecimal,
pushDownStringStartWith,
pushDownInFilterThreshold,
isCaseSensitive,
// The rebase mode doesn't matter here because the filters are used to determine
// whether they is convertible.
LegacyBehaviorPolicy.CORRECTED)
parquetFilters.convertibleFilters(this.filters).toArray
}

View file

@ -45,7 +45,9 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY}
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.{INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedSQLTest
@ -73,11 +75,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
protected def createParquetFilters(
schema: MessageType,
caseSensitive: Option[Boolean] = None): ParquetFilters =
caseSensitive: Option[Boolean] = None,
datetimeRebaseMode: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED
): ParquetFilters =
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownInFilterThreshold,
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
datetimeRebaseMode)
override def beforeEach(): Unit = {
super.beforeEach()
@ -587,71 +592,75 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
def date: Date = Date.valueOf(s)
}
val data = Seq("2018-03-18", "2018-03-19", "2018-03-20", "2018-03-21")
val data = Seq("1000-01-01", "2018-03-19", "2018-03-20", "2018-03-21")
import testImplicits._
Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString) {
val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
implicit val df: DataFrame = inputDF
def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}
def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr)
fun(parsed)
}
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)
val dateAttr: Expression = df(colName).expr
assert(df(colName).expr.dataType === DateType)
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))
checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))
checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
checkFilterPredicate(dateAttr === "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr <=> "1000-01-01".date, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr =!= "1000-01-01".date, classOf[NotEq[_]],
Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i))))
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(dateAttr <= "1000-01-01".date, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]],
resultFun("2018-03-18"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("1000-01-01".date) === dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("1000-01-01".date) <=> dateAttr, classOf[Eq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]],
resultFun("2018-03-21"))
checkFilterPredicate(Literal("1000-01-01".date) >= dateAttr, classOf[LtEq[_]],
resultFun("1000-01-01"))
checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]],
resultFun("2018-03-21"))
checkFilterPredicate(
dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
classOf[Operators.Or],
Seq(Row(resultFun("1000-01-01")), Row(resultFun("2018-03-21"))))
Seq(3, 20).foreach { threshold =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
checkFilterPredicate(
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
"2018-03-22".date).map(Literal.apply)),
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
Row(resultFun("2018-03-21"))))
Seq(3, 20).foreach { threshold =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") {
checkFilterPredicate(
In(dateAttr, Array("2018-03-19".date, "2018-03-20".date, "2018-03-21".date,
"2018-03-22".date).map(Literal.apply)),
if (threshold == 3) classOf[Operators.And] else classOf[Operators.Or],
Seq(Row(resultFun("2018-03-19")), Row(resultFun("2018-03-20")),
Row(resultFun("2018-03-21"))))
}
}
}
}
@ -661,35 +670,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
test("filter pushdown - timestamp") {
Seq(true, false).foreach { java8Api =>
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED",
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
Seq(CORRECTED, LEGACY).foreach { rebaseMode =>
val millisData = Seq(
"1000-06-14 08:28:53.123",
"1582-06-15 08:28:53.001",
"1900-06-16 08:28:53.0",
"2018-06-17 08:28:53.999")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MILLIS.toString) {
testTimestampPushdown(millisData, java8Api)
}
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
val microsData = Seq(
"1000-06-14 08:28:53.123456",
"1582-06-15 08:28:53.123456",
"1900-06-16 08:28:53.123456",
"2018-06-17 08:28:53.123456")
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> TIMESTAMP_MICROS.toString) {
testTimestampPushdown(microsData, java8Api)
}
// spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.INT96.toString) {
// INT96 doesn't support pushdown
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> rebaseMode.toString,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> INT96.toString) {
import testImplicits._
withTempPath { file =>
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF