From d66a4e82eceb89a274edeb22c2fb4384bed5078b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 9 Nov 2018 22:42:48 -0800 Subject: [PATCH] [SPARK-25102][SQL] Write Spark version to ORC/Parquet file metadata ## What changes were proposed in this pull request? Currently, Spark writes Spark version number into Hive Table properties with `spark.sql.create.version`. ``` parameters:{ spark.sql.sources.schema.part.0={ "type":"struct", "fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}] }, transient_lastDdlTime=1541142761, spark.sql.sources.schema.numParts=1, spark.sql.create.version=2.4.0 } ``` This PR aims to write Spark versions to ORC/Parquet file metadata with `org.apache.spark.sql.create.version` because we used `org.apache.` prefix in Parquet metadata already. It's different from Hive Table property key `spark.sql.create.version`, but it seems that we cannot change Hive Table property for backward compatibility. After this PR, ORC and Parquet file generated by Spark will have the following metadata. **ORC (`native` and `hive` implmentation)** ``` $ orc-tools meta /tmp/o File Version: 0.12 with ... ... User Metadata: org.apache.spark.sql.create.version=3.0.0 ``` **PARQUET** ``` $ parquet-tools meta /tmp/p ... creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) extra: org.apache.spark.sql.create.version = 3.0.0 extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]} ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. This closes #22255. Closes #22932 from dongjoon-hyun/SPARK-25102. Authored-by: Dongjoon Hyun Signed-off-by: gatorsmile --- .../main/scala/org/apache/spark/package.scala | 3 +++ .../org/apache/spark/util/VersionUtils.scala | 14 ++++++++++ .../apache/spark/util/VersionUtilsSuite.scala | 25 ++++++++++++++++++ .../datasources/orc/OrcOutputWriter.scala | 15 ++++++++--- .../execution/datasources/orc/OrcUtils.scala | 14 +++++++--- .../parquet/ParquetWriteSupport.scala | 7 ++++- .../scala/org/apache/spark/sql/package.scala | 9 +++++++ .../columnar/InMemoryColumnarQuerySuite.scala | 4 +-- .../datasources/HadoopFsRelationSuite.scala | 2 +- .../datasources/orc/OrcSourceSuite.scala | 20 +++++++++++++- .../datasources/parquet/ParquetIOSuite.scala | 21 ++++++++++++++- .../spark/sql/hive/orc/OrcFileFormat.scala | 26 ++++++++++++++++--- 12 files changed, 144 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 8058a4d5db..5d0639e92c 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -19,6 +19,8 @@ package org.apache import java.util.Properties +import org.apache.spark.util.VersionUtils + /** * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, @@ -89,6 +91,7 @@ package object spark { } val SPARK_VERSION = SparkBuildInfo.spark_version + val SPARK_VERSION_SHORT = VersionUtils.shortVersion(SparkBuildInfo.spark_version) val SPARK_BRANCH = SparkBuildInfo.spark_branch val SPARK_REVISION = SparkBuildInfo.spark_revision val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user diff --git a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala index 828153b868..c0f8866dd5 100644 --- a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala @@ -23,6 +23,7 @@ package org.apache.spark.util private[spark] object VersionUtils { private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r + private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r /** * Given a Spark version string, return the major version number. @@ -36,6 +37,19 @@ private[spark] object VersionUtils { */ def minorVersion(sparkVersion: String): Int = majorMinorVersion(sparkVersion)._2 + /** + * Given a Spark version string, return the short version string. + * E.g., for 3.0.0-SNAPSHOT, return '3.0.0'. + */ + def shortVersion(sparkVersion: String): String = { + shortVersionRegex.findFirstMatchIn(sparkVersion) match { + case Some(m) => m.group(1) + case None => + throw new IllegalArgumentException(s"Spark tried to parse '$sparkVersion' as a Spark" + + s" version string, but it could not find the major/minor/maintenance version numbers.") + } + } + /** * Given a Spark version string, return the (major version number, minor version number). * E.g., for 2.0.1-SNAPSHOT, return (2, 0). diff --git a/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala index b36d6be231..56623ebea1 100644 --- a/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala @@ -73,4 +73,29 @@ class VersionUtilsSuite extends SparkFunSuite { } } } + + test("Return short version number") { + assert(shortVersion("3.0.0") === "3.0.0") + assert(shortVersion("3.0.0-SNAPSHOT") === "3.0.0") + withClue("shortVersion parsing should fail for missing maintenance version number") { + intercept[IllegalArgumentException] { + shortVersion("3.0") + } + } + withClue("shortVersion parsing should fail for invalid major version number") { + intercept[IllegalArgumentException] { + shortVersion("x.0.0") + } + } + withClue("shortVersion parsing should fail for invalid minor version number") { + intercept[IllegalArgumentException] { + shortVersion("3.x.0") + } + } + withClue("shortVersion parsing should fail for invalid maintenance version number") { + intercept[IllegalArgumentException] { + shortVersion("3.0.x") + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala index 84755bfa30..7e38fc651a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.orc.mapred.OrcStruct -import org.apache.orc.mapreduce.OrcOutputFormat +import org.apache.orc.OrcFile +import org.apache.orc.mapred.{OrcOutputFormat => OrcMapRedOutputFormat, OrcStruct} +import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter @@ -36,11 +37,17 @@ private[orc] class OrcOutputWriter( private[this] val serializer = new OrcSerializer(dataSchema) private val recordWriter = { - new OrcOutputFormat[OrcStruct]() { + val orcOutputFormat = new OrcOutputFormat[OrcStruct]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) } - }.getRecordWriter(context) + } + val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc") + val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration) + val writer = OrcFile.createWriter(filename, options) + val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer) + OrcUtils.addSparkVersionMetadata(writer) + recordWriter } override def write(row: InternalRow): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 95fb25bf5a..57d2c56e87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -17,18 +17,19 @@ package org.apache.spark.sql.execution.datasources.orc +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.orc.{OrcFile, Reader, TypeDescription} +import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer} -import org.apache.spark.SparkException +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types._ @@ -144,4 +145,11 @@ object OrcUtils extends Logging { } } } + + /** + * Add a metadata specifying Spark version. + */ + def addSparkVersionMetadata(writer: Writer): Unit = { + writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index b40b8c2e61..8814e3c6cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -29,7 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.io.api.{Binary, RecordConsumer} +import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.internal.Logging +import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -93,7 +95,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter] val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) - val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava + val metadata = Map( + SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT, + ParquetReadSupport.SPARK_METADATA_KEY -> schemaString + ).asJava logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 161e0102f0..354660e9d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -44,4 +44,13 @@ package object sql { type Strategy = SparkStrategy type DataFrame = Dataset[Row] + + /** + * Metadata key which is used to write Spark version in the followings: + * - Parquet file metadata + * - ORC file metadata + * + * Note that Hive table property `spark.sql.create.version` also has Spark version. + */ + private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index e1567d06e2..861aa179a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -506,7 +506,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 800) + assert(inMemoryRelation.computeStats().sizeInBytes === 868) // InMemoryRelation's stats is updated after materializing RDD dfFromFile.collect() @@ -519,7 +519,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats // is calculated - assert(inMemoryRelation2.computeStats().sizeInBytes === 800) + assert(inMemoryRelation2.computeStats().sizeInBytes === 868) // InMemoryRelation's stats should be updated after calculating stats of the table // clear cache to simulate a fresh environment diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index c1f2c18d14..6e08ee3c4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { import testImplicits._ Seq(1.0, 0.5).foreach { compressionFactor => withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, - "spark.sql.autoBroadcastJoinThreshold" -> "400") { + "spark.sql.autoBroadcastJoinThreshold" -> "434") { withTempPath { workDir => // the file size is 740 bytes val workDirPath = workDir.getAbsolutePath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index dc81c0585b..48910103e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp import java.util.Locale @@ -30,7 +31,8 @@ import org.apache.orc.OrcProto.Stream.Kind import org.apache.orc.impl.RecordReaderImpl import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.Row +import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -314,6 +316,22 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) } } + + test("Write Spark version into ORC file metadata") { + withTempPath { path => + spark.range(1).repartition(1).write.orc(path.getCanonicalPath) + + val partFiles = path.listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + val orcFilePath = new Path(partFiles.head.getAbsolutePath) + val readerOptions = OrcFile.readerOptions(new Configuration()) + val reader = OrcFile.createReader(orcFilePath, readerOptions) + val version = UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString + assert(version === SPARK_VERSION_SHORT) + } + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 002c42f23b..6b05b9c0f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -27,6 +27,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.parquet.HadoopReadOptions import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.example.data.simple.SimpleGroup @@ -34,10 +35,11 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.SparkException +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -799,6 +801,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, null, "foo")))) } } + + test("Write Spark version into Parquet metadata") { + withTempPath { dir => + val path = dir.getAbsolutePath + spark.range(1).repartition(1).write.parquet(path) + val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) + + val conf = new Configuration() + val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf) + val parquetReadOptions = HadoopReadOptions.builder(conf).build() + val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions) + val metaData = m.getFileMetaData.getKeyValueMetaData + m.close() + + assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT) + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 89e6ea8604..4e641e34c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.hive.orc import java.net.URI +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Properties import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -31,10 +33,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.orc.OrcConf.COMPRESS -import org.apache.spark.TaskContext +import org.apache.spark.{SPARK_VERSION_SHORT, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -274,12 +278,14 @@ private[orc] class OrcOutputWriter( override def close(): Unit = { if (recordWriterInstantiated) { + // Hive 1.2.1 ORC initializes its private `writer` field at the first write. + OrcFileFormat.addSparkVersionMetadata(recordWriter) recordWriter.close(Reporter.NULL) } } } -private[orc] object OrcFileFormat extends HiveInspectors { +private[orc] object OrcFileFormat extends HiveInspectors with Logging { // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. private[orc] val SARG_PUSHDOWN = "sarg.pushdown" @@ -339,4 +345,18 @@ private[orc] object OrcFileFormat extends HiveInspectors { val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) } + + /** + * Add a metadata specifying Spark version. + */ + def addSparkVersionMetadata(recordWriter: RecordWriter[NullWritable, Writable]): Unit = { + try { + val writerField = recordWriter.getClass.getDeclaredField("writer") + writerField.setAccessible(true) + val writer = writerField.get(recordWriter).asInstanceOf[Writer] + writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT)) + } catch { + case NonFatal(e) => log.warn(e.toString, e) + } + } }