[SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Max Gekk 2020-10-16 10:28:15 +09:00 committed by HyukjinKwon
parent 9f5eff0ae1
commit 38c05af1d5
2 changed files with 73 additions and 18 deletions

View file

@ -1791,15 +1791,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDa
}
}
private def checkMetaData(path: java.io.File, key: String, expectedValue: String): Unit = {
val avroFiles = path.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(avroFiles.length === 1)
val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
assert(value === expectedValue)
}
test("SPARK-31327: Write Spark version into Avro file metadata") {
withTempPath { path =>
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
val avroFiles = path.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(avroFiles.length === 1)
val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
assert(version === SPARK_VERSION_SHORT)
checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
}
}
@ -1812,6 +1816,30 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDa
spark.read.format("avro").options(conf).load(path)
}
}
test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
def saveTs(dir: java.io.File): Unit = {
Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
.repartition(1)
.write
.format("avro")
.save(dir.getAbsolutePath)
}
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
withTempPath { dir =>
saveTs(dir)
checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
}
}
Seq(CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> mode.toString) {
withTempPath { dir =>
saveTs(dir)
checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
}
}
}
}
}
class AvroV1Suite extends AvroSuite {

View file

@ -859,20 +859,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
private def getMetaData(dir: java.io.File): Map[String, String] = {
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
val metadata = try {
m.getFileMetaData.getKeyValueMetaData
} finally {
m.close()
}
metadata.asScala.toMap
}
test("Write Spark version into Parquet metadata") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(1).repartition(1).write.parquet(path)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
val metaData = m.getFileMetaData.getKeyValueMetaData
m.close()
assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
}
}
@ -1109,6 +1113,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
}
test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
def saveTs(dir: java.io.File): Unit = {
Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
.repartition(1)
.write
.parquet(dir.getAbsolutePath)
}
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir)(SPARK_LEGACY_DATETIME) === "")
}
}
Seq(CORRECTED, EXCEPTION).foreach { mode =>
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) {
withTempPath { dir =>
saveTs(dir)
assert(getMetaData(dir).get(SPARK_LEGACY_DATETIME).isEmpty)
}
}
}
}
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)