[SPARK-33163][SQL][TESTS][FOLLOWUP] Fix the test for the parquet metadata key 'org.apache.spark.legacyDateTime'

### What changes were proposed in this pull request?
1. Test both date and timestamp column types
2. Write the timestamp as the `TIMESTAMP_MICROS` logical type
3. Change the timestamp value to `'1000-01-01 01:02:03'` to check exception throwing.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the modified test suite:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
```

Closes #31396 from MaxGekk/parquet-test-metakey-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Max Gekk 2021-01-29 22:25:01 +09:00 committed by HyukjinKwon
parent 0f7a4977c9
commit 588ddcdf22

View file

@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtoc
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@ -1145,26 +1146,34 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
def saveTs(dir: java.io.File): Unit = {
Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
.repartition(1)
.write
.parquet(dir.getAbsolutePath)
def checkMetadataKey(dir: java.io.File, exists: Boolean): Unit = {
Seq("timestamp '1000-01-01 01:02:03'", "date '1000-01-01'").foreach { dt =>
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
sql(s"SELECT $dt AS dt")
.repartition(1)
.write
.mode("overwrite")
.parquet(dir.getAbsolutePath)
val metaData = getMetaData(dir)
val expected = if (exists) Some("") else None
assert(metaData.get(SPARK_LEGACY_DATETIME) === expected)
}
}
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir)(SPARK_LEGACY_DATETIME) === "")
checkMetadataKey(dir, exists = true)
}
}
Seq(CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir).get(SPARK_LEGACY_DATETIME).isEmpty)
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
withTempPath { dir =>
checkMetadataKey(dir, exists = false)
}
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> EXCEPTION.toString) {
withTempPath { dir => intercept[SparkException] { checkMetadataKey(dir, exists = false) } }
}
}
test("SPARK-33160: write the metadata key 'org.apache.spark.legacyINT96'") {