[SPARK-25102][SQL] Write Spark version to ORC/Parquet file metadata

## What changes were proposed in this pull request?

Currently, Spark writes Spark version number into Hive Table properties with `spark.sql.create.version`.
```
parameters:{
  spark.sql.sources.schema.part.0={
    "type":"struct",
    "fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}]
  },
  transient_lastDdlTime=1541142761,
  spark.sql.sources.schema.numParts=1,
  spark.sql.create.version=2.4.0
}
```

This PR aims to write Spark versions to ORC/Parquet file metadata with `org.apache.spark.sql.create.version` because we used `org.apache.` prefix in Parquet metadata already. It's different from Hive Table property key `spark.sql.create.version`, but it seems that we cannot change Hive Table property for backward compatibility.

After this PR, ORC and Parquet file generated by Spark will have the following metadata.

**ORC (`native` and `hive` implmentation)**
```
$ orc-tools meta /tmp/o
File Version: 0.12 with ...
...
User Metadata:
  org.apache.spark.sql.create.version=3.0.0
```

**PARQUET**
```
$ parquet-tools meta /tmp/p
...
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a)
extra:       org.apache.spark.sql.create.version = 3.0.0
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

This closes #22255.

Closes #22932 from dongjoon-hyun/SPARK-25102.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Dongjoon Hyun 2018-11-09 22:42:48 -08:00 committed by gatorsmile
parent 8e5f3c6ba6
commit d66a4e82ec
12 changed files with 144 additions and 16 deletions

View file

@ -19,6 +19,8 @@ package org.apache
import java.util.Properties import java.util.Properties
import org.apache.spark.util.VersionUtils
/** /**
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
@ -89,6 +91,7 @@ package object spark {
} }
val SPARK_VERSION = SparkBuildInfo.spark_version val SPARK_VERSION = SparkBuildInfo.spark_version
val SPARK_VERSION_SHORT = VersionUtils.shortVersion(SparkBuildInfo.spark_version)
val SPARK_BRANCH = SparkBuildInfo.spark_branch val SPARK_BRANCH = SparkBuildInfo.spark_branch
val SPARK_REVISION = SparkBuildInfo.spark_revision val SPARK_REVISION = SparkBuildInfo.spark_revision
val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user

View file

@ -23,6 +23,7 @@ package org.apache.spark.util
private[spark] object VersionUtils { private[spark] object VersionUtils {
private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r
private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
/** /**
* Given a Spark version string, return the major version number. * Given a Spark version string, return the major version number.
@ -36,6 +37,19 @@ private[spark] object VersionUtils {
*/ */
def minorVersion(sparkVersion: String): Int = majorMinorVersion(sparkVersion)._2 def minorVersion(sparkVersion: String): Int = majorMinorVersion(sparkVersion)._2
/**
* Given a Spark version string, return the short version string.
* E.g., for 3.0.0-SNAPSHOT, return '3.0.0'.
*/
def shortVersion(sparkVersion: String): String = {
shortVersionRegex.findFirstMatchIn(sparkVersion) match {
case Some(m) => m.group(1)
case None =>
throw new IllegalArgumentException(s"Spark tried to parse '$sparkVersion' as a Spark" +
s" version string, but it could not find the major/minor/maintenance version numbers.")
}
}
/** /**
* Given a Spark version string, return the (major version number, minor version number). * Given a Spark version string, return the (major version number, minor version number).
* E.g., for 2.0.1-SNAPSHOT, return (2, 0). * E.g., for 2.0.1-SNAPSHOT, return (2, 0).

View file

@ -73,4 +73,29 @@ class VersionUtilsSuite extends SparkFunSuite {
} }
} }
} }
test("Return short version number") {
assert(shortVersion("3.0.0") === "3.0.0")
assert(shortVersion("3.0.0-SNAPSHOT") === "3.0.0")
withClue("shortVersion parsing should fail for missing maintenance version number") {
intercept[IllegalArgumentException] {
shortVersion("3.0")
}
}
withClue("shortVersion parsing should fail for invalid major version number") {
intercept[IllegalArgumentException] {
shortVersion("x.0.0")
}
}
withClue("shortVersion parsing should fail for invalid minor version number") {
intercept[IllegalArgumentException] {
shortVersion("3.x.0")
}
}
withClue("shortVersion parsing should fail for invalid maintenance version number") {
intercept[IllegalArgumentException] {
shortVersion("3.0.x")
}
}
}
} }

View file

@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.orc.mapred.OrcStruct import org.apache.orc.OrcFile
import org.apache.orc.mapreduce.OrcOutputFormat import org.apache.orc.mapred.{OrcOutputFormat => OrcMapRedOutputFormat, OrcStruct}
import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.execution.datasources.OutputWriter
@ -36,11 +37,17 @@ private[orc] class OrcOutputWriter(
private[this] val serializer = new OrcSerializer(dataSchema) private[this] val serializer = new OrcSerializer(dataSchema)
private val recordWriter = { private val recordWriter = {
new OrcOutputFormat[OrcStruct]() { val orcOutputFormat = new OrcOutputFormat[OrcStruct]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path) new Path(path)
} }
}.getRecordWriter(context) }
val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc")
val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration)
val writer = OrcFile.createWriter(filename, options)
val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer)
OrcUtils.addSparkVersionMetadata(writer)
recordWriter
} }
override def write(row: InternalRow): Unit = { override def write(row: InternalRow): Unit = {

View file

@ -17,18 +17,19 @@
package org.apache.spark.sql.execution.datasources.orc package org.apache.spark.sql.execution.datasources.orc
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale import java.util.Locale
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcFile, Reader, TypeDescription} import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
import org.apache.spark.SparkException import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
@ -144,4 +145,11 @@ object OrcUtils extends Logging {
} }
} }
} }
/**
* Add a metadata specifying Spark version.
*/
def addSparkVersionMetadata(writer: Writer): Unit = {
writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT))
}
} }

View file

@ -29,7 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils
@ -93,7 +95,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter] this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava val metadata = Map(
SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
).asJava
logInfo( logInfo(
s"""Initialized Parquet WriteSupport with Catalyst schema: s"""Initialized Parquet WriteSupport with Catalyst schema:

View file

@ -44,4 +44,13 @@ package object sql {
type Strategy = SparkStrategy type Strategy = SparkStrategy
type DataFrame = Dataset[Row] type DataFrame = Dataset[Row]
/**
* Metadata key which is used to write Spark version in the followings:
* - Parquet file metadata
* - ORC file metadata
*
* Note that Hive table property `spark.sql.create.version` also has Spark version.
*/
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
} }

View file

@ -506,7 +506,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
case plan: InMemoryRelation => plan case plan: InMemoryRelation => plan
}.head }.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized // InMemoryRelation's stats is file size before the underlying RDD is materialized
assert(inMemoryRelation.computeStats().sizeInBytes === 800) assert(inMemoryRelation.computeStats().sizeInBytes === 868)
// InMemoryRelation's stats is updated after materializing RDD // InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect() dfFromFile.collect()
@ -519,7 +519,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated // is calculated
assert(inMemoryRelation2.computeStats().sizeInBytes === 800) assert(inMemoryRelation2.computeStats().sizeInBytes === 868)
// InMemoryRelation's stats should be updated after calculating stats of the table // InMemoryRelation's stats should be updated after calculating stats of the table
// clear cache to simulate a fresh environment // clear cache to simulate a fresh environment

View file

@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
import testImplicits._ import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor => Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") { "spark.sql.autoBroadcastJoinThreshold" -> "434") {
withTempPath { workDir => withTempPath { workDir =>
// the file size is 740 bytes // the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath val workDirPath = workDir.getAbsolutePath

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.orc package org.apache.spark.sql.execution.datasources.orc
import java.io.File import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp import java.sql.Timestamp
import java.util.Locale import java.util.Locale
@ -30,7 +31,8 @@ import org.apache.orc.OrcProto.Stream.Kind
import org.apache.orc.impl.RecordReaderImpl import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.Row import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
@ -314,6 +316,22 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts))
} }
} }
test("Write Spark version into ORC file metadata") {
withTempPath { path =>
spark.range(1).repartition(1).write.orc(path.getCanonicalPath)
val partFiles = path.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 1)
val orcFilePath = new Path(partFiles.head.getAbsolutePath)
val readerOptions = OrcFile.readerOptions(new Configuration())
val reader = OrcFile.createReader(orcFilePath, readerOptions)
val version = UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
assert(version === SPARK_VERSION_SHORT)
}
}
} }
class OrcSourceSuite extends OrcSuite with SharedSQLContext { class OrcSourceSuite extends OrcSuite with SharedSQLContext {

View file

@ -27,6 +27,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.HadoopReadOptions
import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.simple.SimpleGroup
@ -34,10 +35,11 @@ import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@ -799,6 +801,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, null, "foo")))) checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, null, "foo"))))
} }
} }
test("Write Spark version into Parquet metadata") {
withTempPath { dir =>
val path = dir.getAbsolutePath
spark.range(1).repartition(1).write.parquet(path)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
val metaData = m.getFileMetaData.getKeyValueMetaData
m.close()
assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
}
}
} }
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

View file

@ -18,9 +18,11 @@
package org.apache.spark.sql.hive.orc package org.apache.spark.sql.hive.orc
import java.net.URI import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties import java.util.Properties
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
@ -31,10 +33,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.orc.OrcConf.COMPRESS import org.apache.orc.OrcConf.COMPRESS
import org.apache.spark.TaskContext import org.apache.spark.{SPARK_VERSION_SHORT, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
@ -274,12 +278,14 @@ private[orc] class OrcOutputWriter(
override def close(): Unit = { override def close(): Unit = {
if (recordWriterInstantiated) { if (recordWriterInstantiated) {
// Hive 1.2.1 ORC initializes its private `writer` field at the first write.
OrcFileFormat.addSparkVersionMetadata(recordWriter)
recordWriter.close(Reporter.NULL) recordWriter.close(Reporter.NULL)
} }
} }
} }
private[orc] object OrcFileFormat extends HiveInspectors { private[orc] object OrcFileFormat extends HiveInspectors with Logging {
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown" private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
@ -339,4 +345,18 @@ private[orc] object OrcFileFormat extends HiveInspectors {
val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
} }
/**
* Add a metadata specifying Spark version.
*/
def addSparkVersionMetadata(recordWriter: RecordWriter[NullWritable, Writable]): Unit = {
try {
val writerField = recordWriter.getClass.getDeclaredField("writer")
writerField.setAccessible(true)
val writer = writerField.get(recordWriter).asInstanceOf[Writer]
writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
}
} }