diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 1005a274d0..b0f2f8ed09 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1791,15 +1791,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDa } } + private def checkMetaData(path: java.io.File, key: String, expectedValue: String): Unit = { + val avroFiles = path.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(avroFiles.length === 1) + val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]()) + val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key) + assert(value === expectedValue) + } + test("SPARK-31327: Write Spark version into Avro file metadata") { withTempPath { path => spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath) - val avroFiles = path.listFiles() - .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) - assert(avroFiles.length === 1) - val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]()) - val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY) - assert(version === SPARK_VERSION_SHORT) + checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT) } } @@ -1812,6 +1816,30 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDa spark.read.format("avro").options(conf).load(path) } } + + test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") { + def saveTs(dir: java.io.File): Unit = { + Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF() + .repartition(1) + .write + .format("avro") + .save(dir.getAbsolutePath) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withTempPath { dir => + saveTs(dir) + checkMetaData(dir, SPARK_LEGACY_DATETIME, "") + } + } + Seq(CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> mode.toString) { + withTempPath { dir => + saveTs(dir) + checkMetaData(dir, SPARK_LEGACY_DATETIME, null) + } + } + } + } } class AvroV1Suite extends AvroSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 2dc8a062bb..ff406f7bc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -859,20 +859,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + private def getMetaData(dir: java.io.File): Map[String, String] = { + val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) + val conf = new Configuration() + val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf) + val parquetReadOptions = HadoopReadOptions.builder(conf).build() + val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions) + val metadata = try { + m.getFileMetaData.getKeyValueMetaData + } finally { + m.close() + } + metadata.asScala.toMap + } + test("Write Spark version into Parquet metadata") { withTempPath { dir => - val path = dir.getAbsolutePath - spark.range(1).repartition(1).write.parquet(path) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) - - val conf = new Configuration() - val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf) - val parquetReadOptions = HadoopReadOptions.builder(conf).build() - val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions) - val metaData = m.getFileMetaData.getKeyValueMetaData - m.close() - - assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT) + spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath) + assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT) } } @@ -1109,6 +1113,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } } + + test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") { + def saveTs(dir: java.io.File): Unit = { + Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF() + .repartition(1) + .write + .parquet(dir.getAbsolutePath) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { + withTempPath { dir => + saveTs(dir) + assert(getMetaData(dir)(SPARK_LEGACY_DATETIME) === "") + } + } + Seq(CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) { + withTempPath { dir => + saveTs(dir) + assert(getMetaData(dir).get(SPARK_LEGACY_DATETIME).isEmpty) + } + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)