From 6b1ca886c0066f4e10534336f3fce64cdebc79a5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 3 Apr 2020 12:43:33 +0000 Subject: [PATCH] [SPARK-31327][SQL] Write Spark version into Avro file metadata ### What changes were proposed in this pull request? Write Spark version into Avro file metadata ### Why are the changes needed? The version info is very useful for backward compatibility. This is also done in parquet/orc. ### Does this PR introduce any user-facing change? no ### How was this patch tested? new test Closes #28102 from cloud-fan/avro. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/avro/SparkAvroKeyOutputFormat.java | 94 +++++++++++++++++++ .../spark/sql/avro/AvroOutputWriter.scala | 12 ++- .../org/apache/spark/sql/avro/AvroSuite.scala | 14 ++- .../scala/org/apache/spark/sql/package.scala | 1 + 4 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java diff --git a/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java new file mode 100644 index 0000000000..55696a6ac2 --- /dev/null +++ b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.Syncable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +// A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so +// that we can set avro file metadata. +public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat { + public SparkAvroKeyOutputFormat(Map metadata) { + super(new SparkRecordWriterFactory(metadata)); + } + + static class SparkRecordWriterFactory extends RecordWriterFactory { + private final Map metadata; + SparkRecordWriterFactory(Map metadata) { + this.metadata = metadata; + } + + protected RecordWriter, NullWritable> create( + Schema writerSchema, + GenericData dataModel, + CodecFactory compressionCodec, + OutputStream outputStream, + int syncInterval) throws IOException { + return new SparkAvroKeyRecordWriter( + writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata); + } + } +} + +// This a fork of org.apache.avro.mapreduce.AvroKeyRecordWriter, in order to set file metadata. +class SparkAvroKeyRecordWriter extends RecordWriter, NullWritable> + implements Syncable { + + private final DataFileWriter mAvroFileWriter; + + SparkAvroKeyRecordWriter( + Schema writerSchema, + GenericData dataModel, + CodecFactory compressionCodec, + OutputStream outputStream, + int syncInterval, + Map metadata) throws IOException { + this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema)); + for (Map.Entry entry : metadata.entrySet()) { + this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue()); + } + this.mAvroFileWriter.setCodec(compressionCodec); + this.mAvroFileWriter.setSyncInterval(syncInterval); + this.mAvroFileWriter.create(writerSchema, outputStream); + } + + public void write(AvroKey record, NullWritable ignore) throws IOException { + this.mAvroFileWriter.append(record.datum()); + } + + public void close(TaskAttemptContext context) throws IOException { + this.mAvroFileWriter.close(); + } + + public long sync() throws IOException { + return this.mAvroFileWriter.sync(); + } +} diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 06507115f5..2cfa3a4826 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -19,14 +19,17 @@ package org.apache.spark.sql.avro import java.io.{IOException, OutputStream} +import scala.collection.JavaConverters._ + import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.AvroKey -import org.apache.avro.mapreduce.AvroKeyOutputFormat import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} +import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.types._ @@ -45,8 +48,9 @@ private[avro] class AvroOutputWriter( * Overrides the couple of methods responsible for generating the output streams / files so * that the data can be correctly partitioned */ - private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = - new AvroKeyOutputFormat[GenericRecord]() { + private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = { + val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava + new SparkAvroKeyOutputFormat(sparkVersion) { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) @@ -57,8 +61,8 @@ private[avro] class AvroOutputWriter( val path = getDefaultWorkFile(context, ".avro") path.getFileSystem(context.getConfiguration).create(path) } - }.getRecordWriter(context) + } override def write(row: InternalRow): Unit = { val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord]) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 9336d8e225..a5224fd104 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.TestingUDT.IntervalData import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -1620,6 +1620,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-31327: Write Spark version into Avro file metadata") { + withTempPath { path => + spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath) + val avroFiles = path.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(avroFiles.length === 1) + val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]()) + val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY) + assert(version === SPARK_VERSION_SHORT) + } + } } class AvroV1Suite extends AvroSuite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 61875931d2..58de6758f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -49,6 +49,7 @@ package object sql { * Metadata key which is used to write Spark version in the followings: * - Parquet file metadata * - ORC file metadata + * - Avro file metadata * * Note that Hive table property `spark.sql.create.version` also has Spark version. */